こんにちは、エンジニアリング本部で主にバックエンドを担当している@biosugar0です。
スマートショッピングでは、Salesforceを利用しています。最近、システムとのデータの連携のためにGo言語でSalesforceのBulk APIを叩く検証を行ったので、使い方を紹介したいと思います。
Bulk APIを使うと、大量のデータをSalesforceにすばやく読み込んだり、組織データに対して一括クエリを実行したりすることができます。 今回は、データのアップロードを伴うinsert,delete,updateを中心に紹介します。
Bulk APIではJobを用いて非同期に処理を行います。そのため、データを処理するときには以下のようなAPIを叩く必要があります。
Jobを閉じると、Salesforceが登録されたJobを処理キューに追加し、処理が実行されます。 処理は非同期に行われるため、完了したことを確認するにはGETリクエストでステータスを取得する必要があります。
それぞれGoで検証したので、紹介します。
まずは、APIを叩くために 接続アプリケーションを作成する必要があります。 公式ドキュメントの認証を設定するを参考に準備をします。
接続アプリケーションが作成できたら、設定 >> アプリケーション >> アプリケーションマネージャとクリックして、作った接続アプリケーションを参照します。
API (OAuth 設定の有効化)
という欄にAPIでログインするための情報が記載されています。
コンシューマ鍵
とコンシューマの秘密
をこの後使います。
接続アプリケーションが作成できたら、認証用のURLに対してPOSTリクエストを送ってtokenを取得します。今回はSandbox環境で検証したので、 https://test.salesforce.com/services/oauth2/token
に対してリクエストを送ります。
今回はユーザー名とパスワードで認証を行いますが、ドキュメントに記載されているように、証明書とJWTを利用した方法などもあります。
必要なパラメータは以下です。
キー | 値 |
---|---|
grant_type | "password" |
client_id | コンシューマ鍵 |
client_secret | コンシューマの秘密 |
username | Salesforceユーザ名 |
password | Salesforceログインパスワード+セキュリティトークン |
パスワードはログインパスワードの後ろにセキュリティトークンを連結させます。
実際にGOでログイン処理を書くと、以下のようになります。
var (
SFSession *Session
)
type Session struct {
InstanceURL string `json:"instance_url"`
ID string `json:"id"`
TokenType string `json:"token_type"`
IssuedAt string `json:"issued_at"`
Signature string `json:"signature"`
AccessToken string `json:"access_token"`
}
func login() error {
// 環境変数から必要なパラメータを取得
var (
loginURL = config.GetEnvValue().Salesforce.LoginURL
password = config.GetEnvValue().Salesforce.Password
clientID = config.GetEnvValue().Salesforce.ClientID
clientSecret = config.GetEnvValue().Salesforce.ClientSecret
userName = config.GetEnvValue().Salesforce.UserName
securityToken = config.GetEnvValue().Salesforce.SecurityToken
)
values := url.Values{}
values.Add("grant_type", "password")
values.Add("client_id", clientID)
values.Add("client_secret", clientSecret)
values.Add("username", userName)
values.Add("password", password+securityToken)
res, err := http.PostForm(loginURL, values)
if err != nil {
return err
}
var session Session
decoder := json.NewDecoder(res.Body)
if err := decoder.Decode(&session); err != nil {
return err
}
log.Printf("Login successful. Instance: %s", session.InstanceURL)
log.Printf("Token : %s", session.AccessToken)
SFSession = &session
return nil
}
認証が通ると、レスポンスとして Session
構造体に定義したパラメータが返ってきます。
そのなかでこの後使うのは、以下の2つです。
データを処理するためのJobを作成し、データをアップロードする準備をします。作成時には処理の種類を指定し、以下のパラメータを指定できます。
Jobを作成すると、Jobのステータスは Open
になります。
Goでこの処理を書くと、以下のようになります。
type JobCreateReq struct {
ExternalIDFieldName string `json:"externalIdFieldName"`
Object string `json:"object"`
Operation string `json:"operation"`
}
type Job struct {
APIVersion float64 `json:"apiVersion"`
ColumnDelimiter string `json:"columnDelimiter"`
ConcurrencyMode string `json:"concurrencyMode"`
ContentType string `json:"contentType"`
ContentURL string `json:"contentUrl"`
CreatedByID string `json:"createdById"`
CreatedDate string `json:"createdDate"`
ID string `json:"id"`
LineEnding string `json:"lineEnding"`
Object string `json:"object"`
Operation string `json:"operation"`
State string `json:"state"`
SystemModstamp string `json:"systemModstamp"`
}
func jobInsertCreate() (Job, error) {
job := JobCreateReq{
Object: "Account",
Operation: "insert",
}
j, _ := json.Marshal(job)
jr := strings.NewReader(string(j))
jobURL := fmt.Sprintf("%s/services/data/v49.0/jobs/ingest", SFSession.InstanceURL)
spew.Dump(jobURL)
req, err := http.NewRequest("POST", jobURL, jr)
if err != nil {
log.Fatal(err)
return Job{}, err
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", SFSession.AccessToken))
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
return Job{}, err
}
spew.Dump(resp.StatusCode)
out := Job{}
err = json.NewDecoder(resp.Body).Decode(&out)
if err != nil {
log.Fatal(err)
return Job{}, err
}
return out, nil
}
先程の認証で取得したAccessTokenはAuthorization
ヘッダーに、
InstanceURLはリクエストURLで使います。
リクエストボディはObjectとOperationで、 Objectには処理の対象にするSalesforceのオブジェクト名を書きます。 Operationは処理の種類で、上記の例ではinsertを指定しています。
Jobの作成が成功するとJobのContentURLを含むレスポンスが返ってくるので、以降の処理ではJobのContentURLを使います。
Jobを作成したら、次はデータをアップロードします。 データはCSVのみに対応していて、そのフォーマットはJobの種類ごとに異なります。
1度に送れるデータ量はおおよそ100MBなので、それ以上のデータをアップロードする場合は複数のリクエストに分けてデータをアップロードします。
Insert Jobでは、それぞれのオブジェクトの必須パラメータを必ず含めたCSVを用意します。
Name,OrganizationID__c
test,TEST001
Go で書くと、以下のようになります。
func jobInsert(job Job) error {
var (
csvRecords [][]string
buf = new(bytes.Buffer)
writer = newCSVWriter(buf, false)
)
headerRecord := []string{
"Name",
"OrganizationID__c",
}
csvRecords = append(csvRecords, headerRecord)
testdata := []Account{
{
Name: "test",
OrganizationID: "TEST001",
},
}
for _, v := range testdata {
s := []string{
v.Name,
v.OrganizationID,
}
csvRecords = append(csvRecords, s)
}
for _, record := range csvRecords {
if err := writer.Write(record); err != nil {
panic(err)
}
writer.Flush()
}
b := buf.String()
spew.Dump(b)
csv := strings.NewReader(b)
url := fmt.Sprintf("%s/%s", SFSession.InstanceURL, job.ContentURL)
log.Printf("upload: %s", url)
req, err := http.NewRequest("PUT", url, csv)
if err != nil {
log.Fatal(err)
return err
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", SFSession.AccessToken))
req.Header.Set("Content-Type", "text/csv")
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
return err
}
log.Printf("upload %v", resp.StatusCode)
return nil
}
上記の例では、csvフォーマットで作成した文字列をio.Reader
interfaceを満たすstrings.Reader
に変換してBodyに入れています。
また、Authorizationヘッダーには認証で取得したAccessTokenを使用し、 リクエストURLにはJob作成時にレスポンスされたContentURLを用います。
Update Jobでは、以下のように更新対象のレコードのIDをId
というキーで記述する必要があります。
Id,OrganizationID__c
0028F0001TEST,TEST002
Goで書く場合、リクエストボディに入れるデータ以外はInsertの処理と同様にコードを書きます(省略)。
Delete Jobの場合は、削除の対象となるレコードのIDをId
というキーで記述します。
Id
0028F0001TEST
データのアップロードが終わったら、それをJobに知らせるリクエストを送ります。
リクエストボディには以下のようなJSONを含みます。
{
"state": "UploadComplete"
}
Jobを閉じるときは、Job作成時のレスポンスで返ってきたJobのIDをリクエストURLに含めて使用します。
Jobが閉じられると、SalesforceはJobを処理キューに追加して処理が非同期で実行されます。
Goで書くと以下のようになります。
func jobClose(job Job) error {
payload := "{\"state\": \"UploadComplete\"}"
url := fmt.Sprintf("%s/services/data/v49.0/jobs/ingest/%s", SFSession.InstanceURL, job.ID)
spew.Dump(url)
body := strings.NewReader(payload)
req, err := http.NewRequest("PATCH", url, body)
if err != nil {
log.Fatal(err)
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", SFSession.AccessToken))
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
return err
}
log.Printf("close %v", resp.StatusCode)
rb, _ := ioutil.ReadAll(resp.Body)
rbody := string(rb)
fmt.Println(rbody)
return nil
}
Jobの処理は非同期に行われるため、完了したかどうかステータスを取得して確認する必要があります。 そのリクエストには、Job作成時にレスポンスされたIDを含めます。
レスポンスされたJSONのstateパラメータを確認すると、Jobがどうなったかがわかります。
Goで書くと以下のようになります。
func jobCheck(jobID string) (Job, error) {
url := fmt.Sprintf("%s/services/data/v49.0/jobs/ingest/%s", SFSession.InstanceURL, jobID)
spew.Dump(url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatal(err)
return Job{}, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", SFSession.AccessToken))
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
return Job{}, err
}
log.Printf("check %v", resp.StatusCode)
out := Job{}
err = json.NewDecoder(resp.Body).Decode(&out)
if err != nil {
log.Fatal(err)
return Job{}, err
}
return out, nil
}
Bulk APIをGoで叩く方法を紹介しました。より詳しいAPIについてのドキュメントは公式のものが参考になるかと思います。 大量のデータを扱うためにJobという非同期な仕組みを使うので、エラーハンドリングなどは通常の単純なAPIより工夫する必要がありそうです。