2020-08-12

SalesforceのBulk APIをGoから叩く

biosugar0

はじめに

こんにちは、エンジニアリング本部で主にバックエンドを担当している@biosugar0です。

スマートショッピングでは、Salesforceを利用しています。最近、システムとのデータの連携のためにGo言語でSalesforceのBulk APIを叩く検証を行ったので、使い方を紹介したいと思います。

Bulk API

Bulk APIを使うと、大量のデータをSalesforceにすばやく読み込んだり、組織データに対して一括クエリを実行したりすることができます。 今回は、データのアップロードを伴うinsert,delete,updateを中心に紹介します。

Bulk APIではJobを用いて非同期に処理を行います。そのため、データを処理するときには以下のようなAPIを叩く必要があります。

  1. 認証
  2. Jobの作成
  3. Jobデータのアップロード
  4. Jobを閉じる
  5. Jobの処理状況の確認

Jobを閉じると、Salesforceが登録されたJobを処理キューに追加し、処理が実行されます。 処理は非同期に行われるため、完了したことを確認するにはGETリクエストでステータスを取得する必要があります。

それぞれGoで検証したので、紹介します。

1. 認証

まずは、APIを叩くために 接続アプリケーションを作成する必要があります。 公式ドキュメントの認証を設定するを参考に準備をします。

接続アプリケーションが作成できたら、設定 >> アプリケーション >> アプリケーションマネージャとクリックして、作った接続アプリケーションを参照します。

API (OAuth 設定の有効化) という欄にAPIでログインするための情報が記載されています。 コンシューマ鍵コンシューマの秘密をこの後使います。

接続アプリケーションが作成できたら、認証用のURLに対してPOSTリクエストを送ってtokenを取得します。今回はSandbox環境で検証したので、 https://test.salesforce.com/services/oauth2/token に対してリクエストを送ります。

今回はユーザー名とパスワードで認証を行いますが、ドキュメントに記載されているように、証明書とJWTを利用した方法などもあります。

必要なパラメータは以下です。

キー
grant_type"password"
client_idコンシューマ鍵
client_secretコンシューマの秘密
usernameSalesforceユーザ名
passwordSalesforceログインパスワード+セキュリティトークン

パスワードはログインパスワードの後ろにセキュリティトークンを連結させます。

実際にGOでログイン処理を書くと、以下のようになります。

var (
    SFSession *Session
)

type Session struct {
    InstanceURL string `json:"instance_url"`
    ID          string `json:"id"`
    TokenType   string `json:"token_type"`
    IssuedAt    string `json:"issued_at"`
    Signature   string `json:"signature"`
    AccessToken string `json:"access_token"`
}

func login() error {
  // 環境変数から必要なパラメータを取得
    var (
        loginURL      = config.GetEnvValue().Salesforce.LoginURL
        password      = config.GetEnvValue().Salesforce.Password
        clientID      = config.GetEnvValue().Salesforce.ClientID
        clientSecret  = config.GetEnvValue().Salesforce.ClientSecret
        userName      = config.GetEnvValue().Salesforce.UserName
        securityToken = config.GetEnvValue().Salesforce.SecurityToken
    )

    values := url.Values{}
    values.Add("grant_type", "password")
    values.Add("client_id", clientID)
    values.Add("client_secret", clientSecret)
    values.Add("username", userName)
    values.Add("password", password+securityToken)
    res, err := http.PostForm(loginURL, values)
    if err != nil {
        return err
    }
    var session Session
    decoder := json.NewDecoder(res.Body)
    if err := decoder.Decode(&session); err != nil {
        return err
    }
    log.Printf("Login successful. Instance: %s", session.InstanceURL)
    log.Printf("Token : %s", session.AccessToken)
    SFSession = &session
    return nil
}

認証が通ると、レスポンスとして Session 構造体に定義したパラメータが返ってきます。 そのなかでこの後使うのは、以下の2つです。

  • InstanceURL
  • AccessToken

2. Jobの作成

データを処理するためのJobを作成し、データをアップロードする準備をします。作成時には処理の種類を指定し、以下のパラメータを指定できます。

  • insert
  • delete
  • update
  • upsert

Jobを作成すると、Jobのステータスは Open になります。

Goでこの処理を書くと、以下のようになります。

type JobCreateReq struct {
    ExternalIDFieldName string `json:"externalIdFieldName"`
    Object              string `json:"object"`
    Operation           string `json:"operation"`
}

type Job struct {
    APIVersion      float64 `json:"apiVersion"`
    ColumnDelimiter string  `json:"columnDelimiter"`
    ConcurrencyMode string  `json:"concurrencyMode"`
    ContentType     string  `json:"contentType"`
    ContentURL      string  `json:"contentUrl"`
    CreatedByID     string  `json:"createdById"`
    CreatedDate     string  `json:"createdDate"`
    ID              string  `json:"id"`
    LineEnding      string  `json:"lineEnding"`
    Object          string  `json:"object"`
    Operation       string  `json:"operation"`
    State           string  `json:"state"`
    SystemModstamp  string  `json:"systemModstamp"`
}

func jobInsertCreate() (Job, error) {
    job := JobCreateReq{
        Object:    "Account",
        Operation: "insert",
    }
    j, _ := json.Marshal(job)
    jr := strings.NewReader(string(j))

    jobURL := fmt.Sprintf("%s/services/data/v49.0/jobs/ingest", SFSession.InstanceURL)
    spew.Dump(jobURL)
    req, err := http.NewRequest("POST", jobURL, jr)
    if err != nil {
        log.Fatal(err)
        return Job{}, err
    }
    req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", SFSession.AccessToken))
    req.Header.Set("Content-Type", "application/json")

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
        return Job{}, err
    }
    spew.Dump(resp.StatusCode)
    out := Job{}
    err = json.NewDecoder(resp.Body).Decode(&out)
    if err != nil {
        log.Fatal(err)
        return Job{}, err
    }
    return out, nil
}

先程の認証で取得したAccessTokenはAuthorization ヘッダーに、 InstanceURLはリクエストURLで使います。

リクエストボディはObjectとOperationで、 Objectには処理の対象にするSalesforceのオブジェクト名を書きます。 Operationは処理の種類で、上記の例ではinsertを指定しています。

Jobの作成が成功するとJobのContentURLを含むレスポンスが返ってくるので、以降の処理ではJobのContentURLを使います。

3. Jobデータのアップロード

Jobを作成したら、次はデータをアップロードします。 データはCSVのみに対応していて、そのフォーマットはJobの種類ごとに異なります。

1度に送れるデータ量はおおよそ100MBなので、それ以上のデータをアップロードする場合は複数のリクエストに分けてデータをアップロードします。

Insert

Insert Jobでは、それぞれのオブジェクトの必須パラメータを必ず含めたCSVを用意します。

Name,OrganizationID__c
test,TEST001

Go で書くと、以下のようになります。

func jobInsert(job Job) error {
    var (
        csvRecords [][]string
        buf        = new(bytes.Buffer)
        writer     = newCSVWriter(buf, false)
    )
    headerRecord := []string{
        "Name",
        "OrganizationID__c",
    }
    csvRecords = append(csvRecords, headerRecord)
    testdata := []Account{
        {
            Name:           "test",
            OrganizationID: "TEST001",
        },
    }
    for _, v := range testdata {
        s := []string{
            v.Name,
            v.OrganizationID,
        }
        csvRecords = append(csvRecords, s)
    }

    for _, record := range csvRecords {
        if err := writer.Write(record); err != nil {
            panic(err)
        }
        writer.Flush()
    }
    b := buf.String()
    spew.Dump(b)
    csv := strings.NewReader(b)
    url := fmt.Sprintf("%s/%s", SFSession.InstanceURL, job.ContentURL)
    log.Printf("upload: %s", url)
    req, err := http.NewRequest("PUT", url, csv)
    if err != nil {
        log.Fatal(err)
        return err
    }
    req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", SFSession.AccessToken))
    req.Header.Set("Content-Type", "text/csv")
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
        return err
    }
    log.Printf("upload %v", resp.StatusCode)

    return nil
}

上記の例では、csvフォーマットで作成した文字列をio.Reader interfaceを満たすstrings.Readerに変換してBodyに入れています。

また、Authorizationヘッダーには認証で取得したAccessTokenを使用し、 リクエストURLにはJob作成時にレスポンスされたContentURLを用います。

Update

Update Jobでは、以下のように更新対象のレコードのIDをId というキーで記述する必要があります。

Id,OrganizationID__c
0028F0001TEST,TEST002

Goで書く場合、リクエストボディに入れるデータ以外はInsertの処理と同様にコードを書きます(省略)。

Delete

Delete Jobの場合は、削除の対象となるレコードのIDをIdというキーで記述します。

Id
0028F0001TEST

4. Jobを閉じる

データのアップロードが終わったら、それをJobに知らせるリクエストを送ります。

リクエストボディには以下のようなJSONを含みます。

{
    "state": "UploadComplete"
}

Jobを閉じるときは、Job作成時のレスポンスで返ってきたJobのIDをリクエストURLに含めて使用します。

Jobが閉じられると、SalesforceはJobを処理キューに追加して処理が非同期で実行されます。

Goで書くと以下のようになります。

func jobClose(job Job) error {
    payload := "{\"state\": \"UploadComplete\"}"
    url := fmt.Sprintf("%s/services/data/v49.0/jobs/ingest/%s", SFSession.InstanceURL, job.ID)
    spew.Dump(url)
    body := strings.NewReader(payload)
    req, err := http.NewRequest("PATCH", url, body)
    if err != nil {
        log.Fatal(err)
        return err
    }
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", SFSession.AccessToken))
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
        return err
    }
    log.Printf("close %v", resp.StatusCode)
    rb, _ := ioutil.ReadAll(resp.Body)
    rbody := string(rb)
    fmt.Println(rbody)
    return nil
}

5. Jobの処理状況の確認

Jobの処理は非同期に行われるため、完了したかどうかステータスを取得して確認する必要があります。 そのリクエストには、Job作成時にレスポンスされたIDを含めます。

レスポンスされたJSONのstateパラメータを確認すると、Jobがどうなったかがわかります。

  • InProgress: Jobが処理中。
  • Aborted: Jobが中止された。
  • JobComplete: Jobの処理が完了。
  • Failed: Jobの処理で一部のレコードが失敗。

Goで書くと以下のようになります。

func jobCheck(jobID string) (Job, error) {
    url := fmt.Sprintf("%s/services/data/v49.0/jobs/ingest/%s", SFSession.InstanceURL, jobID)
    spew.Dump(url)
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        log.Fatal(err)
        return Job{}, err
    }
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", SFSession.AccessToken))
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
        return Job{}, err
    }
    log.Printf("check %v", resp.StatusCode)
    out := Job{}
    err = json.NewDecoder(resp.Body).Decode(&out)
    if err != nil {
        log.Fatal(err)
        return Job{}, err
    }
    return out, nil
}

おわりに

Bulk APIをGoで叩く方法を紹介しました。より詳しいAPIについてのドキュメントは公式のものが参考になるかと思います。 大量のデータを扱うためにJobという非同期な仕組みを使うので、エラーハンドリングなどは通常の単純なAPIより工夫する必要がありそうです。

最新の記事