その他

[ROS2 Humble]ROS2で使われる通信パターンまとめ

その他
この記事は約17分で読めます。

はじめに

こんにちは。

ロボット開発を始めて3ヶ月、現在は様々な壁にぶつかりつつも、次の一手を模索しながら開発を続けています。

前回はROS2の概要と、機能を分割する「ノード化」について紹介しました。

バラバラに分かれたノードたちがどうやって連携しているのか、通信の仕組みについて整理するため記事に残していきたいと思います。

今回やること

今回は、ROS2におけるノード間通信の3つの柱である 「Pub/Sub通信」「Service」「Action」 と、その通信の土台となる 「DDS」 についてまとめていきます。

ROS2の特徴である「分散処理」を理解するには、この通信周りを理解することが必要だと考えています。ここを理解しておくことで、複数のノードを組み合わせたロボット開発が一気に楽しくなると思います。

また、これらの通信を行う際には、どのようなデータ型を送るかを定義する必要があります。

そこで重要になるのが、独自のメッセージ定義を管理する robot_msgs パッケージです。本記事では、このパッケージ構成も踏まえながら各通信を解説します。

robot_ws/                     
├── src/                      
│   ├── robot_description/    
│   │   ├── urdf/            
│   │   ├── meshes/           
│   │   └── CMakeLists.txt
│   │
│   ├── robot_control/        
│   │   ├── include/          
│   │   ├── src/              
│   │   ├── launch/           
│   │   ├── config/           
│   │   ├── package.xml       
│   │   └── CMakeLists.txt    
│   │
│   └── robot_msgs/           # 独自のメッセージ定義(⭐️ここ大事)
│       ├── msg/              # .msgファイル(Pub/Sub通信)
│       └── srv/              # .srvファイル (Service)
│       └── action/           # .actionファイル (Action)
│
├── build/                    
├── install/                  
└── log/                      

ROS2の通信規格

DDS(Data Distribution Service)

まずはじめに、ROS2の通信を支えているのが DDS(Data Distribution Service) です。

これから説明する「Pub/Sub」「Service」「Action」といった通信パターンは、すべてこのDDSという堅牢な土台の上で動いています。

DDSとはリアルタイム性や分散システム間でデータを交換するために標準化された通信プロトコルのようで、データ中心の通信サービスとして設計されており、ロボティクスや車の自動運転など幅広い領域で用いられています。

通常のネットワーク通信では、「どのIPアドレスの、どのポートに送るか」という指定が必要ですが、DDSには 「ダイナミック・ディスカバリ(動的発見)」 という驚きの機能が備わっています。

・サーバーが不要: ROS1のように全体を管理する「ROS Master」を用意する必要がありません。
自動でつながる: 同じネットワーク内にいるノード同士が、お互いを勝手に見つけ出して通信を開始します。
柔軟なシステム構成: ノードが後から追加されたり、一部がネットワークから離脱したりしても、システム全体が止まることなく動的に再構成されます。

https://fast-dds.docs.eprosima.com/en/latest/より参照

fast DDS

DDSが通信規格であるのに対し、そのルールに基づいて実際にプログラムとして動くように作られたライブラリの一つが Fast DDS です。

eProsima社が開発したこのライブラリは、その名の通り速さと効率性に特化しており、効率的なリソース管理や高いリアルタイム性を特徴としています。

さらにFast DDSは導入が簡単で、ROS2をインストールした時点で、必要な依存関係として一緒にインストールされます。

中身の仕組みはDDSの規格に厳密に沿っているため、ユーザーが「Fast DDS専用の書き方」を意識する必要はなく、ROS2の標準的な書き方をするだけで、その裏側でFast DDSが効率的なデータ交換を代行しているようです。

Pub/Sub通信

Pub/Sub(パブリッシュ/サブスクライブ)通信は、ROS2において最も頻繁に利用される、非同期・一方通行の通信方式です。

この通信では特定のトピックという名前のついたチャンネルを介して、データをやり取りします。

Publisher(配信者): トピックにデータを配信する役割。
・Subscriber(購読者): そのトピックを購読し、データが流れてきたら受け取る役割。

pub/sub通信の最大の強みは、「1対多」の通信が容易であることです。

例えば、1つのセンサーが出力するデータを、「自己位置推定ノード」「障害物検知ノード」「描画ノード」といった複数のノードが同時に受け取ることができます。

Publisher

Publisherは、特定のトピックに対してデータを送信する役割を担います。 例えば、カメラで取得した画像データや、AIが物体を検知した結果を他のノードへ知らせる際に使用します。

ROS2でデータをやり取りするには、あらかじめ「どのようなデータ構造か」を決めておく必要があります。今回のプロジェクト構成では、robot_msgs/msg/ ディレクトリ内に .msg ファイルを作成します。

bear_detection.msg
# Bear detection result
bool detected
float32 confidence
float32 bbox_x
float32 bbox_y
float32 bbox_width
float32 bbox_height

下記のソースコードを例として配信には create_publisher 関数を使用します。これにより、トピックを定期的に配信したり、特定のイベントが発生した瞬間にデータを送ったりする「イベント駆動型」の設計が可能になります。

# camera_pan_node.py
# パブリッシャー(設計書準拠)
self.angle_pub = self.create_publisher(Float32, "/camera/pan_angle", 10)
self.image_pub = self.create_publisher(Image, "/camera/image_raw", 10)
self.camera_info_pub = self.create_publisher(CameraInfo, "/camera/camera_info", 10)
# bear_detection 互換: video_frames (CompressedImage)

video_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    durability=DurabilityPolicy.VOLATILE,
    history=HistoryPolicy.KEEP_LAST,
    depth=10,
)
self.video_frames_pub = self.create_publisher(
    CompressedImage, "video_frames", video_qos
)

Subscriber

Subscriberは、トピックに流れてきたメッセージを受信する役割を担います。

Publisherがデータを「配信」するのに対し、Subscriberはそのデータを「購読」して、特定の処理を動かします。

Subscriberの大きな特徴は、「データが届いた瞬間に動く(イベント駆動)」という点です。

例えば、カメラノードから画像データが届いた瞬間に「クマがいるかどうかを判定する関数」を呼び出す、といった設計が可能です。

これにより、各ノードが独立して、条件に応じた動作を行うことができます。

コードでは create_subscription を使用し購読します。

受信時にはデータを受け取った際に実行するコールバック関数を指定し、処理を動かします。

# bear_detection_node.py
self.result_pub = self.create_publisher(BearDetection, "/detection/bear_detected", 10)
self.detection_image_pub = self.create_publisher(
    CompressedImage, "/detection/detection_image", 10
)
# camera_pan_node と同一 QoS で接続を確実に
video_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    durability=DurabilityPolicy.VOLATILE,
    history=HistoryPolicy.KEEP_LAST,
    depth=10,
)
self.sub = self.create_subscription(
    CompressedImage,
    "video_frames",
    self.listener_callback,
    video_qos,
)

Service

Serviceは、「同期型・双方向」の通信方式です。

投げっぱなしのPub/Sub通信とは対照的に、以下のようなクライアントがリクエストを送り、サーバーがそれに対してレスポンスを返すという、丁寧な通信パターンです。

Serviceは、以下のような一回限りの、確実に行いたい処理に向いています。

設定の変更: センサーのON/OFF切り替え。
計算の依頼: 複雑な座標変換の計算結果をもらう。
パラメータの更新: 動作モードの切り替え。

Serviceで使用するデータは、robot_msgs/srv/ 内の .srv 拡張子を持つファイルで定義します。

最大の特徴は、「—」という仕切り線でリクエストとレスポンスが分かれている点です。

# LED control (on/off, color, duration)
# リクエスト(依頼内容)
bool    on            # ON/OFF
uint8   color         # 色指定
uint32  duration_sec  # 点灯時間
---
# レスポンス(返答内容)
bool    success       # 処理が成功したか

以下のコードは、ロボットのLEDを制御するサーバー側の実装例です。

5行目でcreate_service を使って、特定の名前でサービスを公開します。

class test(Node):
 def __init__(self):
   super().__init__("test_controller")

  self.led_service = self.create_service(LEDControl, '/led/control', self._led_control_callback)
  self.bear_detection_sub = self.create_subscription(BearDetection, '/detection/bear_detected', self._bear_detection_callback, 10)

・・・

 def _led_control_callback(self, request, response):
    if request.on:
        color = request.color
        self.raspbot.Ctrl_WQ2812_ALL(1, color)
        response.success = True
        self.get_logger().info(f"LED service ON, color {color}, duration {request.duration_sec}s")
        if request.duration_sec > 0:
            self.activate_alert(request.duration_sec)
    else:
        self.stop_alert()
        response.success = True
        self.get_logger().info("LED service OFF")

    return response

ただし現状では、レスポンスのsuccessについてはクライアントへ伝える処理を行なっていないため、エラー時にfalseにするような扱いはほとんどなく、処理後にtrueを返すようになっています。

Action

ActionはServiceを進化させたもので、時間がかかる処理に適した通信手法です

この通信では「ゴール」「リザルト」「フィードバック」の3つのデータの型を定義し、与えられたゴールを完了したらフィードバック付きでリザルトを返すという形になります。

  • 3つのデータ要素: ゴール、リザルト、フィードバックで構成されます。
  • キャンセル可能: 実行中に途中で止めることができます。
  • 例: 指定の角度への旋回、目的地へのナビゲーション。

実装例としてはまず、.action ファイルを使って長時間のタスクを管理します。

#AlignBodyToAngle.action
# 車体を指定角度へ旋回する Action
# カメラノードが熊検出時に送信し、車体位置合わせ完了を待つ(イベント駆動)

# Goal
float32 target_angle  # 目標角度(度、0~180)。カメラパン角度に合わせる
---
# Result
bool success          # 旋回完了
string message        # 結果メッセージ
---
# Feedback
float32 current_angle # 現在の車体角度(度)。進捗表示用

この通信パターンではロボットの車輪を制御するノードを例として、ActionServerでalign_body_to_angleを提供しています。

まずクライアント側の処理としては、下記のソースコードを例に、AlignBodyToAngleで定義したtarget_angleをゴールとして送り、フィードバックを返すような処理として実装しています、

# camera_pan_node.py
    def _send_align_goal(self, angle):
        if not self.align_client.wait_for_server(timeout_sec=3.0):
            self.get_logger().warn("align service not available")
            self.is_aligned = False
            return
        goal_msg = AlignBodyToAngle.Goal()
        goal_msg.target_angle = float(angle)
        self.get_logger().info(f"Sending align goal: {goal_msg.target_angle}")
        self._send_goal_future = self.align_client.send_goal_async(
            goal_msg,
            feedback_callback=self._align_feedback_callback
        )
        self._send_goal_future.add_done_callback(self._align_goal_response_callback)

    def _align_goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().warn("align goal rejected")
            self.is_aligned = False
            return
        self.get_logger().info("align goal accepted")
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self._align_result_callback)

また、サーバー側の処理としてはゴール目標としている角度分、ロボットを回転させる処理として実装しました。
(ただし、現在のプロジェクトではこの処理自体が不要という考えから実際は利用していません)

#motor_node.py
class MotorNode(Node):
    def __init__(self) -> None:
        super().__init__("motor_node")
        self._align_body_action_server = ActionServer(
            self,
            AlignBodyToAngle,
            "align_body_to_angle",
            self._execute_align_body_goal,
        )
・・・
    def _execute_align_body_goal(self, goal_handle) -> AlignBodyToAngle.Result:

        target_angle = float(goal_handle.request.target_angle)

        self.get_logger().info(f"AlignBodyToAngle: target={target_angle:.1f} (旋回無効化中)")

        # --- 熊検知時の旋回を無効化(コメントアウト) ---
        self._action_executing = True
        try:
            success, msg = self.bear_handler.align_to_angle(target_angle)
            result = AlignBodyToAngle.Result(success=success, message=msg)
        
            if not success:
                self.get_logger().error(f"AlignBodyToAngle failed: {msg}")
        
            goal_handle.succeed()
            return result
        finally:
            self._action_executing = False

        # 旋回せずに成功を返す
        result = AlignBodyToAngle.Result(success=True, message="align disabled")
        goal_handle.succeed()
        return result

まとめ

ROS2の通信は、以下の3つを使い分けるのがセオリーです。

  • Pub/Sub: 絶え間なく流れるデータ(センサー値など)
  • Service: すぐに終わる確実な処理(スイッチのON/OFFなど)
  • Action: 時間がかかる複雑な動作(ナビゲーションなど)

これらの違いを意識すると、ノード間の設計がスムーズになるかと思います。

おわりに

いかがでしたでしょうか。

今回はROS2で用いられる通信方法として、ベースとなっているDDSという通信規格のほか、3つの通信パターンについて紹介しました。

この通信周りをまだ理解しきれていないので、参考URLの方もぜひチェックしてもらえればと思います。

次回はロボット開発と相性の良いサブサンプションアーキテクチャについてまとめていく予定です。

参考URL

Data Distribution Service(DDS)について - Qiita
背景 DDSについて調べたことをまとめておく。 リアルタイムおよび分散型システム間でデータを交換するための標準化された通信プロトコル。 DDSは、IoT(Internet of Things)や組み込みシステム、産業用自動化など、さまざまな領域で広く使用されている。 DD...
Pub/Sub メッセージングとは - AWS
Pub/Sub メッセージングとは何か、企業が Pub/Sub メッセージングを使用する方法と理由、および AWS で Pub/Sub メッセージングを使用する方法。
実習ROS 2 Pub&Sub通信 - Qiita
環境 本記事は以下の環境を想定して記述している。 項目 値 OS Ubuntu 22.04 ROS ROS 2 Humble 概要 本記事では、ROS(ROS 1、ROS 2共通)の基本となる通信手段であるPub&Sub型通信(トピック通信)につ...
実習ROS 2 Service通信 - Qiita
環境 本記事は以下の環境を想定した記事である。 項目 値 OS Ubuntu 22.04 ROS ROS 2 Humble 概要 このページでは、ROS(ROS 1、ROS 2共通)の基本となる通信手段であるService通信について説明し、それ...
実習ROS 2 Action通信 - Qiita
環境 本記事は以下の環境を想定して記述している。 項目 値 OS Ubuntu 22.04 ROS ROS 2 Humble 概要 この記事では、ROSの通信方式の1つであるAction通信について、以下の内容を説明する。 ROS(ROS 1,...
3.6.0