Google Coral USB Accelerator で SSD(Single Shot MultiBox Detector)を実行する
Ubuntu/RaspberryPiでGoogle Coral USB Acceleratorを使用できるようにしてみる。
一部を除き、基本的にともに同じ手順で大丈夫(なハズ…)
通常のtfliteのモデルファイルはCPUのみ(またはCPU+GPU)を使用するように構成されていて、
そのまま実行してもEdge-TPUは使用されない。
そこで、Edge-TPU使用モデルに変換するため、edgetpu_compilerを使用する必要がある。
(ただし、Ubuntuのみ。RasPi不可。)
CPU使用モデルからEdge-TPU使用モデルに変換するツールedgetpu_compiler
をインストールする。
aptリポジトリの登録はTPUライブラリインストール時に行っているので不要(以下ではコメントアウトしてある)。
# curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
# echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | sudo tee /etc/apt/sources.list.d/coral-edgetpu.list
# sudo apt update
sudo apt-get install edgetpu-compiler
基本的に以下。
tfliteファイルを喰わせるとファイル名に_edgetpu
を付加したファイルが作成される。
これがEdge-TPU使用するモデルファイル。
詳細はEdge TPU Compiler を参照。
edgetpu_compiler [options] model...
openCVをインストールしておく。
pip install opencv-python
mkdir -p /work/coral/ssd/
cd /work/coral/ssd/
用意されているSSDのモデルをダウンロードし、Edge-TPU使用モデルを作成する。
実行が終了すると、models/coco_ssd_mobilenet_v1_1.0_quant_edgetpu.tflite
ができる。
mkdir models
cd models
curl -O https://storage.googleapis.com/download.tensorflow.org/models/tflite/coco_ssd_mobilenet_v1_1.0_quant_2018_06_29.zip
unzip coco_ssd_mobilenet_v1_1.0_quant_2018_06_29.zip
mv detect.tflite coco_ssd_mobilenet_v1_1.0_quant.tflite # リネームしなくてもいいけど、後でわからなくならないようにリネームしておく
mv labelmap.txt coco_ssd_mobilenet_v1_1.0_quant.labels # 同上
edgetpu_compiler coco_ssd_mobilenet_v1_1.0_quant.tflite # EdgeTPU使用モデルに変換
cd ../
適当な画像をimage
ディレクトリに用意しておく。
使用できるファイル形式はjpg
とmp4
mkdir images
cd images/
# 適当な画像ファイルをダウンロードする
cd ../
以下の内容をobject_detection_demo_ssd.py
として保存しておく。
#!/usr/bin/env python
import sys
import platform
import os
import time
import math
import logging as log
from argparse import ArgumentParser, SUPPRESS
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
# shared library
EDGETPU_SHARED_LIB = {
'Linux': 'libedgetpu.so.1',
'Darwin': 'libedgetpu.1.dylib',
'Windows': 'edgetpu.dll'
}[platform.system()]
# コマンドラインパーサの構築
def build_argparser():
parser = ArgumentParser(add_help=False)
args = parser.add_argument_group('Options')
args.add_argument('-h', '--help', action='help', default=SUPPRESS, help='Show this help message and exit.')
args.add_argument("-m", "--model", help="Required. Path to an .xml file with a trained model.", required=True, type=str)
args.add_argument("-i", "--input", help="Required. Path to a image/video file. (Specify 'cam' to work with camera)", required=True, type=str)
args.add_argument("-l", "--labels", help="Optional. Labels mapping file", default=None, type=str)
parser.add_argument("--save", help="Optional. Save result to specified file", default=None, type=str)
parser.add_argument("--log", help="Optional. Save log to specified file", default=None, type=str)
parser.add_argument("--no_disp", action='store_true', help="Optional. without image display")
return parser
# interpreter の生成
def make_interpreter(model_file):
# CPU/TPU使用の識別
# 「ファイル名に"_edgetpu"が含まれていたら」の識別方法もアリかもしれない
with open(model_file, "rb") as f:
# モデルデータを読み込む
tfdata = f.read()
# モデルファイル中に"edgetpu-custom-op"が含まれていたらTPU使用モデル
cpu = not b"edgetpu-custom-op" in tfdata
if cpu :
print('**** USE CPU ONLY!! ****')
else :
print('**** USE WITH TPU ****')
if cpu :
return tflite.Interpreter(model_path=model_file)
else :
return tflite.Interpreter(
model_path = model_file,
experimental_delegates = [
tflite.load_delegate(EDGETPU_SHARED_LIB)
])
# 結果の解析と表示
def parse_result(interpreter, frame, labels_map, args) :
img_height, img_width = frame.shape[:2]
output_details = interpreter.get_output_details()
_, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']
tflite_results1 = interpreter.get_tensor(output_details[0]['index']) # Locations (Top, Left, Bottom, Right)
tflite_results2 = interpreter.get_tensor(output_details[1]['index']) # Classes (0=Person)
tflite_results3 = interpreter.get_tensor(output_details[2]['index']) # Scores
tflite_results4 = interpreter.get_tensor(output_details[3]['index']) # Number of detections
for i in range(int(tflite_results4[0])):
top = tflite_results1[0, i][0] * input_height
left = tflite_results1[0, i][1] * input_width
bottom = tflite_results1[0, i][2] * input_height
right = tflite_results1[0, i][3] * input_width
# ラベルが定義されていればラベルを読み出し、なければclass ID
class_id = tflite_results2[0, i].astype(int) + 1
if labels_map :
if len(labels_map) > class_id :
class_name = labels_map[class_id]
else :
class_name = str(class_id)
else :
class_name = str(class_id)
prob = tflite_results3[0, i]
if prob >= 0.5:
print(f'Class={class_name:15} Probability={prob:4f} Location=({int(left)},{int(top)})-({int(right)},{int(bottom)})')
top = int(top * img_height / input_height)
left = int(left * img_width / input_width)
bottom = int(bottom * img_height / input_height)
right = int(right * img_width / input_width)
# 表示色
if class_id == 1 :
# class_id = 1のときは緑
color = (128, 255, 128)
else :
# それ以外のときは赤
color = (128, 128, 255)
# 対象物の枠とラベルの描画
cv2.rectangle(frame, (left, top), (right, bottom), color, 2)
cv2.rectangle(frame, (left, top+20), (left+160, top), color, -1)
cv2.putText(frame, "{} ({:.3f})".format(class_name, prob),
(left, top+15), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 0))
return
def main():
log.basicConfig(format="[ %(levelname)s ] %(message)s", level=log.INFO, stream=sys.stdout)
args = build_argparser().parse_args()
model = args.model
no_disp = args.no_disp
model_label = None
if args.labels:
model_label = args.labels
else:
model_label = os.path.splitext(model)[0] + ".labels"
if not os.path.isfile(model_label) :
model_label = None
# ------------- 1. Plugin initialization for specified device and load extensions library if specified -------------
log.info("Creating Inference Engine...")
# interpreterの構築
interpreter = make_interpreter(args.model)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
# 入力データ
if args.input == 'cam':
input_stream = 0
else:
input_stream = os.path.abspath(args.input)
assert os.path.isfile(input_stream), "Specified input file doesn't exist"
cap = cv2.VideoCapture(input_stream)
# 幅と高さを取得
img_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
img_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
disp_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + 90 # 情報表示領域分を追加
# フレームレート(1フレームの時間単位はミリ秒)の取得
org_frame_rate = int(cap.get(cv2.CAP_PROP_FPS)) # オリジナルのフレームレート
org_frame_time = 1.0 / cap.get(cv2.CAP_PROP_FPS) # オリジナルのフレーム時間
writer = None
if args.save :
# フォーマット
fmt = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
writer = cv2.VideoWriter(args.save, fmt, org_frame_rate, (img_width, disp_height))
log_f = None
if args.log :
log_f = open(args.log, mode='w')
log_f.write(f'frame_number, frame_time, preprocess_time, inf_time, parse_time, render_time, wait_time\n') # 見出し行
if model_label:
# ラベルファイルの読み込み
with open(model_label, 'r') as f:
labels_map = [x.strip() for x in f]
else:
# ラベルファイルがない場合
labels_map = None
# フレーム数
number_input_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
number_input_frames = 1 if number_input_frames != -1 and number_input_frames < 0 else number_input_frames
# ----------------------------------------------- 6. Doing inference -----------------------------------------------
log.info("Starting inference...")
print("To close the application, press 'CTRL+C' here or switch to the output window and press ESC key")
# 実行時間測定用変数の初期化
frame_time = 0
preprocess_time = 0
inf_time = 0
parse_time = 0
render_time = 0
# フレーム測定用タイマ
prev_time = time.time()
frame_number = 0
# モデルの入力サイズ
_, input_height, input_width, _ = input_details[0]['shape']
while cap.isOpened():
# 画像の前処理 =============================================================================
preprocess_start = time.time() # 前処理開始時刻
# 画像の読み込み
ret, frame = cap.read()
if not ret:
break
# 表示用領域を含んだフレームを作成
pad_img = np.zeros((disp_height, img_width, 3), np.uint8)
# モデル入力用にリサイズ
inf_start = time.time() # 推論処理開始時刻
in_frame = cv2.resize(frame, (input_width, input_height)) # input size of coco ssd mobilenet?
in_frame = in_frame[:, :, [2,1,0]] # BGR -> RGB
in_frame = np.expand_dims(in_frame, axis=0) # 3D -> 4D
preprocess_end = time.time()
preprocess_time = preprocess_end - preprocess_start # 前処理にかかった時間
# 推論実行 =============================================================================
inf_start = time.time() # 推論処理開始時刻
# 推論本体
interpreter.set_tensor(input_details[0]['index'], in_frame)
interpreter.invoke()
inf_end = time.time()
inf_time = inf_end - inf_start # 推論処理にかかった時間
# 検出結果の解析 =============================================================================
parse_start = time.time() # 解析処理開始時刻
parse_result(interpreter, frame, labels_map, args)
parse_end = time.time()
parse_time = parse_end - parse_start # 解析処理にかかった時間
# 結果の表示 =============================================================================
render_start = time.time() # 表示処理開始時刻
# 測定データの表示
frame_number_message = f'frame_number : {frame_number:5d}'
if frame_time == 0 :
frame_time_message = 'Frame time : ---'
else :
frame_time_message = f'Frame time : {(frame_time * 1000):.3f} ms {(1/frame_time):.2f} fps'
inf_time_message = f'Inference time : {(inf_time * 1000):.3f} ms'
render_time_message = f'Rendering time : {(render_time * 1000):.3f} ms'
parsing_time_message = f'parse time : {(parse_time * 1000):.3f} ms'
# 結果の書き込み
cv2.putText(pad_img, frame_number_message, (10, img_height + 15), cv2.FONT_HERSHEY_COMPLEX, 0.5, (255, 128, 128), 1)
cv2.putText(pad_img, inf_time_message, (10, img_height + 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (255, 128, 128), 1)
cv2.putText(pad_img, parsing_time_message, (10, img_height + 45), cv2.FONT_HERSHEY_COMPLEX, 0.5, (255, 128, 128), 1)
cv2.putText(pad_img, render_time_message, (10, img_height + 60), cv2.FONT_HERSHEY_COMPLEX, 0.5, (255, 128, 128), 1)
cv2.putText(pad_img, frame_time_message, (10, img_height + 75), cv2.FONT_HERSHEY_COMPLEX, 0.5, (255, 128, 128), 1)
# 表示用領域に画像をコピー
pad_img[:img_height, :img_width] = frame
# 表示
if not no_disp :
cv2.imshow("Detection Results", pad_img) # 表示
else :
print(f'frame_number: {frame_number:5d}')
# 画像の保存
if writer:
# 保存が設定されているときは画像を保存
writer.write(pad_img)
render_end = time.time()
render_time = render_end - render_start # 表示処理にかかった時間
# タイミング調整 =============================================================================
wait_start = time.time()
if no_disp :
# 表示しない場合は無駄な待ち時間を確保しない
wait_key_code = 1
else :
# フレーム先頭からここまでの時間
cur_total_time = wait_start - preprocess_start
# フレーム間処理の待ち時間(フレームが1枚だけの場合は永久待ち)
if number_input_frames == 1:
wait_key_code = 0
else :
if org_frame_time < cur_total_time :
# オリジナルフレーム時間がここまでの時間より短ければ最短時間
wait_key_code = 1
else :
# オリジナルフレーム時間とここまでの時間の差(msec単位に変換して小数点以下切り上げ)
wait_key_code = math.ceil((org_frame_time - cur_total_time) * 1000)
key = cv2.waitKey(wait_key_code)
if key == 27:
# ESCキー
break
wait_end = time.time()
wait_time = wait_end - wait_start
# フレーム処理終了 =============================================================================
cur_time = time.time()
frame_time = cur_time - prev_time # 1フレームの処理時間
prev_time = cur_time
if log_f :
log_f.write(f'{frame_number:5d}, {frame_time * 1000:.3f}, {preprocess_time * 1000:.3f}, {inf_time * 1000:.3f}, {parse_time * 1000:.3f}, {render_time * 1000:.3f}, {wait_key_code}, {wait_time * 1000:.3f}\n')
frame_number = frame_number + 1
# 後片付け
if writer:
writer.release()
if log_f :
log_f.close()
cv2.destroyAllWindows()
if __name__ == '__main__':
sys.exit(main() or 0)
interpreter(認識エンジン)を構築する処理
# CPU/TPU使用の識別
# 「ファイル名に"_edgetpu"が含まれていたら」の識別方法もアリかもしれない
with open(model_file, "rb") as f:
# モデルデータを読み込む
tfdata = f.read()
# モデルファイル中に"edgetpu-custom-op"が含まれていたらTPU使用モデル
cpu = not b"edgetpu-custom-op" in tfdata
if cpu :
print('**** USE CPU ONLY!! ****')
else :
print('**** USE WITH TPU ****')
モデルファイル内に"edgetpu-custom-op"
という文字列が含まれていたらTPU使用モデルと判定してTPU使用のための共有ライブラリをダウンロードして初期化する。
そうでなければCPUのみ使用モデルなので、 共有ライブラリなしで初期化する。
もっと単純にモデルファイル名に"_edgetpu"
が含まれていたらTPU使用モデルと判定しても良いかもしれない(edgetpu_compiler
は出力ファイル名に"_edgetpu"
を付加するので)。
認識結果の解析と結果の表示(検出枠とラベルの描画)
認識結果から検出した座標、クラスID、スコアを取得し、出力画像に描画している。
出力データの構成はここを参照。
どうやらこのモデルは検出個数が10個に設定されているようだ。
メインルーチン
# タイミング調整 =============================================================================
wait_start = time.time()
if no_disp :
# 表示しない場合は無駄な待ち時間を確保しない
wait_key_code = 1
else :
# フレーム先頭からここまでの時間
cur_total_time = wait_start - preprocess_start
# フレーム間処理の待ち時間(フレームが1枚だけの場合は永久待ち)
if number_input_frames == 1:
wait_key_code = 0
else :
if org_frame_time < cur_total_time :
# オリジナルフレーム時間がここまでの時間より短ければ最短時間
wait_key_code = 1
else :
# オリジナルフレーム時間とここまでの時間の差(msec単位に変換して小数点以下切り上げ)
wait_key_code = math.ceil((org_frame_time - cur_total_time) * 1000)
key = cv2.waitKey(wait_key_code)
タイミング調整部分は、認識処理が速かった場合に結果表示画像の表示時間を実際の入力画像の表示時間に合わせるため、ちょっと小細工を入れてある。
使用できるオプションはヘルプ表示で。
python object_detection_demo_ssd.py --help
usage: object_detection_demo_ssd.py [-h] -m MODEL -i INPUT [-l LABELS]
[--save SAVE] [--log LOG] [--no_disp]
optional arguments:
--save SAVE Optional. Save result to specified file
--log LOG Optional. Save log to specified file
--no_disp Optional. without image display
Options:
-h, --help Show this help message and exit.
-m MODEL, --model MODEL
Required. Path to an .xml file with a trained model.
-i INPUT, --input INPUT
Required. Path to a image/video file. (Specify 'cam'
to work with camera)
-l LABELS, --labels LABELS
Optional. Labels mapping file
以下のコマンドを実行すると、認識結果画像が表示される。
画像が表示されたウィンドウでESCキーを押すと終了する。
python object_detection_demo_ssd.py --model models/coco_ssd_mobilenet_v1_1.0_quant_edgetpu.tflite --input images/cat.jpg
以下のコマンドを実行すると、認識結果画像が表示される。
再生が終了するか、画像が表示されたウィンドウでESCキーを押すと終了する。
python object_detection_demo_ssd.py --model models/coco_ssd_mobilenet_v1_1.0_quant_edgetpu.tflite --input images/testvideo3.mp4
python object_detection_demo_ssd.py --model models/coco_ssd_mobilenet_v1_1.0_quant.tflite --input images/cat.jpg
python object_detection_demo_ssd.py --model models/coco_ssd_mobilenet_v1_1.0_quant.tflite --input images/testvideo3.mp4