2022-12-22

TensorFlow Object Detection APIを触ってみた

k-tkr

こんにちは、R&Dエンジニアの@k-tkrです。 スマートショッピングではスマートマットを用いた重量による在庫管理を提供していますが、他にも在庫を数える方法がないかなと考え、画像認識を使ってモノを数える方法を試してみました。

今回はとりあえず手元にたくさん転がっていた抵抗器の数を数えてみることにします。 resistance

画像認識を使って写真の中の抵抗器を認識させるのに、機械学習ライブラリTensorFlowObject Detection APIを使用しました。

TensorFlowとは

Googleが開発しオープンソースで公開している機械学習ライブラリで、「TensorFlow を使用すると、初心者もエキスパートも、デスクトップ、モバイル、ウェブ、クラウド向けの機械学習モデルを簡単に作成できます。」(TensorFlowの概要より)

TensorFlow Object Detection APIとは

TenrorFlow Model Gardenの中にある物体検知用のライブラリです。TensorFlowのフレームワークの中で物体検知を行うことができます。

やったこと

Tensorflow Object Detection with Tensorflow 2を参考に試してみました。

Object Detection APIのインストール

Object Detection APIはtensorflow/modelsの中にあるのでリポジトリをクローンします。 /models/research/object_detection/packages/tf2の中に環境構築用のスクリプトsetup.pyが入っているので/models/researchにコピーして使用します。

cd /models/research
cp /object_detection/packages/tf2/setup.py .
pip install .

環境変数のPYTHONPATHに/models/research, /models/research/slim, /modelsを追加しておきます。

Protocol Buffersからのファイル生成

Object Detection APIのいくつかのファイルはProtocol Buffersで定義されているのでそこからpythonファイルを生成する必要があります。 protobufに従ってProtocol Buffersをインストールします。インストールが完了したら、/models/researchディレクトリで

protoc object_detection/protos/*.proto --python_out=.

を実行し、/models/research/object_detection/protosの中にpythonファイルが作られていることを確認します。

GPU環境のセットアップ

手元のPCにGPUが入っていたので活用するためにCUDAとcuDNNをインストールしました。最初は気づかなかったのですが、cuDNNを使用するためにzlibwapi.dllが必要なのでそれもダウンロードしcuDNNのdllが置いてある場所にコピーします。 GPUが認識されているかを確認するには

from tensorflow.pytnon.clientimport device_lib
device_lib.list_local_devices()

と打ってみてGPUの情報が表示されるかどうかで確認できます。 ## 画像のアノテーション 単純な画像認識では画像に写っているものが何かが分かれば十分ですが、物体検知では「何が」「どこに」写っているのかを認識させる必要があります。「どこに」の情報はバウンディングボックスと呼ばれる長方形のボックスを使って指定します。物体検知のアノテーションツールはいろいろありますが今回はVoTTを使ってみました。各画像に対してボックスとタグを付けていきます。

VoTT_annotation

最後にエクスポートボタンを押すと指定された形式でアノテーションファイルが作成されます。今回はVoTT JSONを選びました。

データの取り込み

VoTTで作成したJSONファイルからアノテーションデータを読み取りlistに保存しておきます。バウンディングボックスを画像の幅と高さに対する相対値で与える必要があることに注意します。

def create_annotation_from_json_files(files):
    train_image_filenames = []
    gt_labels = []
    gt_boxes = []

    tagdict = {}

    for file in files:
        jsn = open(file, 'r')
        json_load = json.load(jsn)
        asset = json_load['asset']
        image_file_path = asset['path']
        if image_file_path[:5] == 'file:':
            image_file_path = image_file_path[5:]

        train_image_filenames.append(image_file_path)

        width = asset['size']['width']
        height = asset['size']['height']

        regions = json_load['regions']
        tags = []
        points = []
        for region in regions:
            box = region['boundingBox']
            xleft = box['left'] / width
            xright = xleft + box['width'] / width
            ytop = box['top'] / height
            ybottom = ytop + box['height'] / height

            tag = region['tags']        
            for t in tag:
                if t not in tagdict.keys():
                    i = len(tagdict) + 1
                    tagdict[t] = i

                tags.append(tagdict[t])
                points.append([ytop, xleft, ybottom, xright])

        gt_labels.append(np.array(tags))
        gt_boxes.append(np.array(points))

    category_index = {}
    for t, i in tagdict.items():
        v = {'id': i, 'name': t}
        category_index[i] = v

    n_classes = len(category_index)
    return train_image_filenames, gt_labels, gt_boxes, category_index, n_classes

画像もnumpy array形式にしておきます。

def load_image_into_numpy_array(path):
    img_data = tf.io.gfile.GFile(path, 'rb').read()
    image = Image.open(BytesIO(img_data))
    (im_width, im_height) = image.size
    return np.array(image.getdata()).reshape(
        (im_height, im_width, 3)).astype(np.uint8)

画像を水増しするために90度ずつの回転を入れてみました。

def extend_training_samples(train_images_np, gt_boxes, gt_labels):
    train_images = []
    boxes = []
    labels = []
    for t,b,l in zip(train_images_np, gt_boxes, gt_labels):
        rot_image = t.copy()
        rot_box = b.copy()
        rot_label = l.copy()
        train_images.append(t)
        boxes.append(b)
        labels.append(l)
        for i in range(3):
            # [y1, x1, y2, x2]
            bx = []
            lb = []
            for bb, ll in zip(rot_box, rot_label):
                bx.append([1.0 - bb[3], bb[0], 1.0 - bb[1], bb[2]])
                lb.append(ll)

            rot_box = np.array(bx)
            boxes.append(rot_box)

            rot_label = np.array(lb)
            labels.append(rot_label)
            rot_image = np.rot90(rot_image).copy()
            train_images.append(rot_image)
    return train_images, boxes, labels

物体検知の場合画像を動かすとバウンディングボックスも動かさなければならないので割と面倒です。

画像、バウンディングボックス、ラベルをtensorにします。

label_id_offset = 1 # labelを0スタートにする
train_image_tensors = []
gt_classes_one_hot_tensors = []
gt_box_tensors = []

for(train_image_np, gt_box_np, gt_label_np) in zip(train_images_np, gt_boxes, gt_labels):
    train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor(train_image_np, dtype=tf.float32), axis=0))
    gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32))
    zero_indexed_groundtruth_classes = tf.convert_to_tensor(gt_label_np - label_id_offset)
    gt_classes_one_hot_tensors.append(tf.one_hot(zero_indexed_groundtruth_classes, n_classes)) # labelをone-hot tensorにする

ここでアノテーションが正しく読み込まれているかどうか確認してみます。

from object_detection.utils import visualization_utils as viz_utils
from matplotlib import pyplot as plt

def plot_detections(image_np,
                    boxes,
                    classes,
                    scores,
                    category_index,
                    image_name=None,
                    min_score_thresh=0.6):
    image_np_with_annotations = image_np.copy()
    viz_utils.visualize_boxes_and_labels_on_image_array(
        image_np_with_annotations,
        boxes,
        classes,
        scores,
        category_index,
        use_normalized_coordinates=True,
        min_score_thresh=min_score_thresh)
    plt.imshow(image_np_with_annotations)

dummy_scores = np.array([1.0]*10, dtype=np.float32)
plt.figure(figsize=(30, n_extended_images*2))
for idx in range(n_extended_images):
    plt.subplot(n_extended_images//4, 4, idx+1)
    plot_detections(
        train_images_np[idx],
        gt_boxes[idx],
        gt_labels[idx],
        dummy_scores, category_index)
plt.show()

すると下のような画像が表示されるので正しく読み込まれています。 annotation_check

モデル作成

Object Detection APIでは予めCoco2017のデータを使って学習されたモデルがTensorFlow 2 Detection Model Zooからダウンロードできます。今回はRetinaNet50を使いました。モデルをダウンロードし適当なディレクトリに解凍します。 このディレクトリの中にあるpipeline.configにモデルの様々な設定が書かれています。 以下のようにして設定を読み込み上書きします。

pipeline_config = 'models/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/pipeline_modified.config'
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']
model_config.ssd.num_classes = n_classes # annotationのクラス数
model_config.ssd.freeze_batchnorm = True

次に設定ファイルからベースとなるモデルを作ります。

detection_model = model_builder.build(model_config=model_config, is_training=True)

モデルのパラメタを最初からフィッティングし直すのは辛いので、予め計算されているパラメタを一部使用して学習をしていきます。ダウンロードしたモデルのcheckpointディレクトリに学習されたパラメタが保存されています。今回使用したモデルでは物体のクラスを判定するチェックポイントヘッドとバウンディングボックスの位置を判定するチェックポイントヘッドが分かれています。今回のクラス判定では保存されているクラスとは全く別のクラスを判定するので最初からフィッティングし直しますが、バウンディングボックスはcheckpointに保存されているものを使うことにします。

checkpoint_path = 'models/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint/ckpt-0'
fake_box_predictor = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    #_prediction_heads=detection_model._box_predictor._prediction_heads,
    _box_prediction_head=detection_model._box_predictor._box_prediction_head,
    )

fake_model = tf.compat.v2.train.Checkpoint(
          _feature_extractor=detection_model._feature_extractor,
          _box_predictor=fake_box_predictor)

ckpt = tf.compat.v2.train.Checkpoint(model=fake_model)
ckpt.restore(checkpoint_path).expect_partial()

チェックポイントの読み込みの仕組みはトレーニングのチェックポイントドキュメントが参考になりました。

トレーニング

まずは学習1ステップの関数を作成しておきます。

def get_model_train_step_function(model, optimizer, vars_to_fine_tune):
  @tf.function
  def train_step_fn(image_tensors,
                    groundtruth_boxes_list,
                    groundtruth_classes_list):
    image_sz = len(image_tensors)
    shapes = tf.constant(image_sz * [[640, 640, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list)
    with tf.GradientTape() as tape:
        preprocessed_images = tf.concat([detection_model.preprocess(image_tensor)[0] for image_tensor in image_tensors], axis=0)
        prediction_dict = model.predict(preprocessed_images, shapes)
        losses_dict = model.loss(prediction_dict, shapes)
        box_loss = losses_dict['Loss/localization_loss']
        class_loss = losses_dict['Loss/classification_loss']
        total_loss = box_loss + class_loss
        gradients = tape.gradient(total_loss, vars_to_fine_tune)
        optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
    return box_loss, class_loss

  return train_step_fn

次に今回トレーニングする変数を選択します。

trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
    if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
        to_fine_tune.append(var)

その後学習ループで上の関数を繰り返し呼ぶことでモデルを最適化していきます。

n_epochs = 300
n_batches = 4
n_training_data = len(train_images_np)

learning_rate = 0.01
optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9)

train_step_fn = get_model_train_step_function(
    detection_model, optimizer, to_fine_tune)

for epoch in range(n_epochs):
    all_keys = list(range(len(train_images_np)))
    random.shuffle(all_keys)
    for j in range(n_batches):
        s = j * n_training_data // n_batches
        e = (j + 1) * n_training_data // n_batches
        #print("start:{}, end:{}".format(s, e))
        example_keys = all_keys[s:e]

        gt_boxes_list = [gt_box_tensors[key] for key in example_keys]
        gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys]
        image_tensors = [train_image_tensors[key] for key in example_keys]

        box_loss, class_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list)

    print('Epoch ' + str(epoch + 1) + ' of ' + str(n_epochs) + ', loss=' +  str(box_loss.numpy()) + ', class_loss=' + str(class_loss.numpy()), flush=True)

結果

トレーニングされたモデルを使って物体検出を試してみます。

def detect(input_tensor):
    preprocessed_image, shapes = detection_model.preprocess(input_tensor)
    print(preprocessed_image)
    prediction_dict = detection_model.predict(preprocessed_image, shapes)
    return detection_model.postprocess(prediction_dict, shapes)

testpic = load.load_image_into_numpy_array(testpath)
input_tensor = np.expand_dims(testpic, axis=0)
input_tensor = tf.convert_to_tensor(input_tensor, dtype=tf.float32)
detections = detect(input_tensor)

ここで、detectionmodel.postprocess関数の戻り値はdictionaryでdetectionboxes, detectionscores, detectionclassesなどが含まれています。これらをデータ読み込みの際に作成したplot_detectionsに渡してバウンディングボックスを表示させています。ここではスコアが50%以上のものを表示させてみました。 ResistanceWithBoundingBoxWithBoundingBox

半分以上の抵抗器は認識されていますが、中には認識されていなかったりバウンディングボックスが複数できてしまったりしているものがあり、まだまだこれだけでは使えないなという印象でした。

終わりに

今回はとりあえず動かしてみることが目的の実験でしたが、物体認識の精度がまだまだ低いことの他にも - アノテーションが手作業であり、長方形のボックスしか使えない - 学習に時間が掛かる - カメラがとらえきれない奥行きのあるようなものを数えられない など、実際に使うにはいくつものハードルがあります。しかし重量測定以外の方法での在庫把握ができるようになるとこれまで重量計で測れなかったものについても在庫把握ができる可能性が出てくるので画像認識を含め様々な方法を検証していきたいと考えています。

最新の記事