LiDARセンサーによる障害物回避制御

このドキュメントでは、ROS2 Humble環境において、PythonでLiDARセンサーを使用した障害物回避制御を実装するプログラムの作り方を説明します。ノードは /kachaka/manual_control/cmd_vel トピックに対して geometry_msgs/Twist 型のメッセージを発行し、/kachaka/lidar/scan トピックから sensor_msgs/LaserScan 型のメッセージを購読します。

プログラムの概要

  • LiDARセンサーを使用して、ロボットの周囲の障害物を検出します
  • 検出した障害物の情報に基づいて、ロボットの動作を制御します
  • 前方、右側、左側の3つのセクターに分けて障害物を検出します
  • 障害物を回避するための基本的な動作(直進、回転、停止)を実装します

1. LiDARセンサーとは

LiDAR(Light Detection And Ranging)は、レーザー光を使用して物体までの距離を測定するセンサーです。360度の範囲をスキャンし、各方向の障害物までの距離を取得することができます。

LiDARセンサーの特徴

  • 高精度: ミリメートル単位での距離測定が可能です
  • 広範囲: 360度の範囲をスキャンできます(*センサに依存)
  • 高速: リアルタイムでの距離データの取得が可能です

障害物回避への応用

LiDARセンサーを使用することで、以下のような障害物回避機能を実現できます:

  • 前方の障害物の検出と回避
  • 周囲の障害物の検出と回避
  • 安全な経路の計画と実行

2. 前提条件

  • ROS2 Humble がインストール済み
  • Python3 が利用可能
  • ROS2ワークスペース(例: ~/ros2_ws)がセットアップ済み

必要なパッケージ

  • rclpy: ROS2のPythonクライアントライブラリ
  • geometry_msgs: 位置や速度などの幾何学メッセージを扱うためのパッケージ
  • sensor_msgs: センサーデータを扱うためのパッケージ
  • numpy: 数値計算のためのライブラリ

3. パッケージの作成

まず、ROS2ワークスペースの src ディレクトリ内でパッケージを作成します。

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python kachaka_lidar_control --dependencies rclpy geometry_msgs sensor_msgs numpy --node-name kachaka_lidar_control

このコマンドにより、以下の構造のパッケージが生成されます:

kachaka_lidar_control/
├── resource/
│   └── kachaka_lidar_control
├── kachaka_lidar_control/
│   ├── __init__.py
│   └── kachaka_lidar_control.py
├── test/
├── package.xml
└── setup.py

4. ノードの実装

kachaka_lidar_control パッケージ内の kachaka_lidar_control ディレクトリに、以下の内容の kachaka_lidar_control.py ファイルを作成します。

kachaka_lidar_control.py

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
import numpy as np

from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
# Import for QoS settings
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy

def normalize_angle(angle):
    """
    Normalize angle to the range [-π, π).
    """
    return (angle + np.pi) % (2.0 * np.pi) - np.pi

class KachakaLidarControl(Node):
    def __init__(self):
        super().__init__('kachaka_lidar_control')

        # Create QoS profile for LaserScan subscription
        qos_profile = QoSProfile(
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=10,
            reliability=QoSReliabilityPolicy.BEST_EFFORT
        )

        # Publisher for velocity commands
        self.cmd_vel_pub = self.create_publisher(Twist, '/kachaka/manual_control/cmd_vel', 10)

        # Subscriber for LiDAR scan data
        self.scan_sub = self.create_subscription(LaserScan, '/kachaka/lidar/scan', self.callback_scan, qos_profile)

        # Initialize minimum distance variables for each sector
        self.front_min = None
        self.right_min = None
        self.left_min = None

        # Create a timer to call process function at 10Hz
        self.timer = self.create_timer(0.1, self.process)

    def callback_scan(self, data):
        """
          -  Front sector: -15° to +15°     ( -0.26 rad to +0.26 rad )
          -  Right sector: -180° to -15°    ( -3.14 rad to -0.26 rad )
          -  Left sector:  +15° to +180°    ( +0.26 rad to +3.14 rad )
        """

        # Sector boundaries in the shifted coordinate system
        front_min_angle = np.deg2rad(-15)   # -15°
        front_max_angle = np.deg2rad(15)    # +15°
        right_min_angle = -np.pi            # -180°
        right_max_angle = np.deg2rad(-15)   # -15°
        left_min_angle  = np.deg2rad(15)    # +15°
        left_max_angle  = np.pi             # +180°

        # Initialize minimum distances with sensor's maximum range
        front_min = data.range_max
        right_min = data.range_max
        left_min  = data.range_max

        # Iterate through each distance reading and calculate shifted angle
        angle = data.angle_min
        for r in data.ranges:
            # Skip invalid readings
            if r == 0.0 or r < data.range_min:
                angle += data.angle_increment
                continue

            # Shift angle by +π/2 so that "front" is 0 in the code
            shifted_angle = angle + np.pi / 2
            shifted_angle = normalize_angle(shifted_angle)

            # Check which sector the shifted angle belongs to
            if front_min_angle <= shifted_angle <= front_max_angle:
                # Front sector
                if r < front_min:
                    front_min = r
            elif right_min_angle <= shifted_angle < right_max_angle:
                # Right sector
                if r < right_min:
                    right_min = r
            elif left_min_angle < shifted_angle <= left_max_angle:
                # Left sector
                if r < left_min:
                    left_min = r

            angle += data.angle_increment

        # Save results for use in process()
        self.front_min = front_min
        self.right_min = right_min
        self.left_min  = left_min

        # Debug log
        self.get_logger().info(
            f"Front: {front_min:.2f} m, Right: {right_min:.2f} m, Left: {left_min:.2f} m"
        )

    def process(self):
        """
        Movement logic:
          - If front sector is clear (> threshold), move forward.
          - Otherwise, if right sector is blocked, rotate left.
          - Otherwise, if left sector is blocked, rotate right.
          - Otherwise, stop.
        """
        vel = Twist()
        threshold = 1.0  # meters

        if self.front_min is not None and self.front_min > threshold:
            # Front is clear
            vel.linear.x = 0.2
            vel.angular.z = 0.0
        else:
            # Front is blocked or invalid
            if self.right_min is not None and self.right_min < threshold:
                # Obstacle on right => rotate left
                vel.linear.x = 0.0
                vel.angular.z = 0.5
            elif self.left_min is not None and self.left_min < threshold:
                # Obstacle on left => rotate right
                vel.linear.x = 0.0
                vel.angular.z = -0.5
            else:
                # No clear direction => stop
                vel.linear.x = 0.0
                vel.angular.z = 0.0

        self.cmd_vel_pub.publish(vel)

def main(args=None):
    rclpy.init(args=args)
    node = KachakaLidarControl()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

5. setup.py の編集

–node-nameを使用していない場合はパッケージルートにある setup.py のエントリーポイントを設定します。

    entry_points={
        'console_scripts': [
            'kachaka_lidar_control = kachaka_lidar_control.kachaka_lidar_control:main'
        ],

6. パッケージのビルドと実行

パッケージのビルド

cd ~/ros2_ws
colcon build --packages-select kachaka_lidar_control

環境設定の読み込み

source install/setup.bash

ノードの実行

ros2 run kachaka_lidar_control kachaka_lidar_control

7. コードの詳細解説

クラスとメソッド

  • KachakaLidarControl: メインのノードクラス
    • __init__: ノードの初期化、パブリッシャーとサブスクライバーの設定
    • callback_scan: LiDARスキャンデータを受信したときのコールバック関数
    • process: 障害物の情報に基づいてロボットの動作を決定する関数

制御パラメータ

  • threshold: 障害物と判断する距離の閾値 [m]
  • front_min, right_min, left_min: 各セクターでの最小距離 [m]
  • vel.linear.x: 並進速度 [m/s]
  • vel.angular.z: 角速度 [rad/s]

動作シーケンス

プログラムは以下の動作シーケンスを実行します:

  1. 前方に障害物がない場合(front_min > threshold): 直進
  2. 前方に障害物があり、右側に障害物がある場合: 左回転
  3. 前方に障害物があり、左側に障害物がある場合: 右回転
  4. それ以外の場合: 停止

LiDARセンサーの実装ポイント

  1. セクターの分割:
    • 前方、右側、左側の3つのセクターに分けて障害物を検出します
    • 各セクターの境界は角度で定義されています

2 無効なデータの処理:

  • 距離が0または最小範囲未満のデータは無効として扱います
  • これにより、ノイズや異常値の影響を軽減します
  1. QoSプロファイルの設定:
    • LiDARスキャンデータのサブスクライバーには、BEST_EFFORT 信頼性のQoSプロファイルを設定しています
    • これにより、データの遅延や欠落を許容し、リアルタイム性を重視します

演習

複数の障害物の回避

複数の障害物がある環境で、ロボットが安全に移動できるように制御プログラムを拡張してみましょう。

内容

好きな障害物回避アルゴリズムを実装してみよう!