物体検出結果の音声出力
はじめに
このレッスンでは、Lesson7で実装したYOLO物体検出の結果をサブスクライブし、検出された物体の名前を音声で出力するプログラムを作成します。これにより、ロボットが「何が見えているか」を音声で伝えることができるようになります。
プログラムの概要
このプログラムは以下の2つの主要な機能を持ちます:
- Lesson7のYOLO物体検出結果をサブスクライブ
- 検出された物体の名前を音声で出力
これらの機能を1つのノードに実装します:
kachaka_speak_detection
: 物体検出結果をサブスクライブし、音声で出力する
必要なパッケージ
このプログラムを実行するには、以下のパッケージが必要です:
image_yolo_detection
: Lesson7で実装した物体検出パッケージkachaka_interfaces
: Kachakaロボットのインターフェースを提供するパッケージ
プログラムの実装
音声出力ノード(kachaka_speak_detection.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from kachaka_interfaces.action import ExecKachakaCommand
from kachaka_interfaces.msg import KachakaCommand
from rclpy.action import ActionClient
import json
import time
class KachakaSpeakDetection(Node):
"""
Node for speaking detected objects
"""
def __init__(self):
super().__init__('kachaka_speak_detection')
# Set QoS profile for object detection data
qos = rclpy.qos.QoSProfile(
reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT,
durability=rclpy.qos.DurabilityPolicy.VOLATILE,
history=rclpy.qos.HistoryPolicy.KEEP_LAST,
depth=10
)
# Subscribe to object detection information from Lesson7
self.subscription = self.create_subscription(
String,
'/image_yolo_detection/objects', # Topic from Lesson7
self.object_callback,
qos
)
# Action client for speech output
self._action_client = ActionClient(
self, ExecKachakaCommand, "/kachaka/kachaka_command/execute"
)
self._action_client.wait_for_server()
# Dictionary for converting English object names to Japanese
self.japanese_names = {
'person': '人',
'car': '車',
'chair': '椅子',
'bottle': 'ボトル',
'cup': 'カップ',
'laptop': 'ノートパソコン',
'mouse': 'マウス',
'keyboard': 'キーボード',
'cell phone': 'スマートフォン',
'book': '本',
# Add more as needed
}
# Store the last detected objects
self.last_objects = []
# Create a timer to periodically speak the detected objects
self.timer = self.create_timer(5.0, self.timer_callback)
self.get_logger().info("Kachaka speak detection node has started.")
def object_callback(self, msg):
"""
Process detected object information and store it
"""
try:
# Convert JSON string to Python object
object_info = json.loads(msg.data)
self.last_objects = object_info.get("objects", [])
except Exception as e:
self.get_logger().error(f"Error processing object information: {str(e)}")
def timer_callback(self):
"""
Periodically speak the detected objects
"""
if not self.last_objects:
return
# Convert all detected objects to Japanese and create a summary message
japanese_objects = []
for obj in self.last_objects:
japanese_name = self.japanese_names.get(obj, obj)
japanese_objects.append(japanese_name)
# Create a summary message
if len(japanese_objects) == 1:
text = f"{japanese_objects[0]}を検出しました"
else:
text = f"{', '.join(japanese_objects[:-1])}と{japanese_objects[-1]}を検出しました"
# Execute speech output
self.send_goal(text)
self.get_logger().info(f"Speech output: {text}")
def send_goal(self, text):
"""
Send a speech command to Kachaka
"""
command = KachakaCommand()
command.command_type = KachakaCommand.SPEAK_COMMAND
command.speak_command_text = text
goal_msg = ExecKachakaCommand.Goal()
goal_msg.kachaka_command = command
future = self._action_client.send_goal_async(goal_msg)
future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""
Callback for when the goal is accepted or rejected
"""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info("Goal rejected")
return
self.get_logger().info("Goal accepted")
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
"""
Callback for when the goal is completed
"""
result = future.result().result
self.get_logger().info(f"Result: {result}")
def main(args=None):
rclpy.init(args=args)
node = KachakaSpeakDetection()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info("Shutting down...")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
プログラムの説明
音声出力ノード(kachaka_speak_detection.py)
このノードは以下の機能を持ちます:
- Lesson7のYOLO物体検出結果をサブスクライブ
- 英語の物体名を日本語に変換
- 音声で物体名を出力
主な処理の流れ:
- Lesson7の物体検出ノードからの情報を受け取る
- JSON形式の文字列をPythonオブジェクトに変換
- 英語の物体名を日本語に変換
- 音声で物体名を出力
音声出力の実装
音声出力は、Kachakaロボットのアクションサーバーを使用して実装しています。send_goal
メソッドでは、以下の手順で音声出力を行います:
KachakaCommand
メッセージの作成- コマンドタイプを
SPEAK_COMMAND
に設定 - 発話するテキストを設定
- ゴールメッセージの作成
- 非同期でゴールを送信
- コールバック関数を設定して、ゴールの受け入れと結果を処理
コールバック処理
アクションクライアントのコールバック処理は以下のように実装されています:
goal_response_callback
: ゴールが受け入れられたか拒否されたかを処理- ゴールが受け入れられた場合、結果を取得するためのコールバックを設定
- ゴールが拒否された場合、ログに記録
get_result_callback
: ゴールの実行結果を処理- 結果をログに記録
定期的な音声出力
検出された物体を定期的に音声出力するために、以下の機能を実装しています:
last_objects
変数: 最新の検出結果を保存- タイマー: 5秒ごとに
timer_callback
メソッドを呼び出し timer_callback
メソッド: 保存された検出結果を音声出力
使用方法
準備
- パッケージの作成
# ワークスペースのsrcディレクトリに移動
cd ~/ros2_ws/src
# パッケージの作成
ros2 pkg create --build-type ament_python kachaka_speak_detection --dependencies rclpy std_msgs kachaka_interfaces --node-name kachaka_speak_detection
- パッケージのビルド
# ワークスペースのルートディレクトリに移動
cd ~/ros2_ws
# パッケージのビルド
colcon build --packages-select kachaka_speak_detection
# 環境のセットアップ
source install/setup.bash
- Lesson7のYOLO物体検出ノードが実行されていることを確認
ros2 run image_yolo_detection image_yolo_detection
- 音声出力ノードを実行
ros2 run kachaka_speak_detection kachaka_speak_detection
演習
日本語の物体名を追加する
japanese_names
辞書に新しい物体名を追加してみましょう
検出確率の表示
- 検出された物体の確率も音声で出力するように修正してみましょう
特定の物体のみを検出する
- 特定の物体(例:人や車)のみを検出して音声出力するように修正してみましょう
物体の位置情報を追加する
- 検出された物体の位置情報(画面のどの部分に検出されたか)も音声で出力するように修正してみましょう
注意点
- Lesson7のYOLO物体検出ノードが実行されていないと、音声出力ノードは動作しません
- 音声出力中は他の処理がブロックされる可能性があります
- 物体名の日本語変換は必要に応じて拡張してください