ROS2 觸覺感測陣列 + Rerun 視覺化完整指南

場景說明

  • 自定義 ROS2 訊息,包含矩陣排列的觸覺感測資料(法向力 raw data + 三軸力陣列)
  • 目標:在 Rerun 中呈現 2D 熱力圖(法向力強度)疊加 2D 箭頭(合力方向,箭頭長短代表 Z 軸大小)
  • 矩陣尺寸(幾乘幾)由執行腳本時指定,Rerun 本身不支援動態 reshape 滑桿
  • 支援兩種工作流:
    1. 即時視覺化:ROS2 節點訂閱 → 即時推送到 Rerun Viewer
    2. MCAP 離線回放:讀取既有 MCAP 檔案 → 匯出 .rrd 或直接在 Viewer 中回放

1. 自定義訊息定義

在你的 ROS2 套件中建立 msg/TactileSensor.msg

std_msgs/Header header

# Overall force (combined resultant)
float32 force_x
float32 force_y
float32 force_z

# Per-point force arrays (row-major, length = rows * cols)
float32[] force_array_x
float32[] force_array_y
float32[] force_array_z

# Raw normal force data (row-major, length = rows * cols)
uint16[] data

CMakeLists.txt 加入:

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/TactileSensor.msg"
  DEPENDENCIES std_msgs
)

2. 安裝依賴

# Rerun SDK
pip install rerun-sdk==0.31
 
# 離線 MCAP 解讀(不需 ROS2 環境)
pip install mcap-ros2-support
 
# 熱力圖上色
pip install matplotlib numpy

3. ROS2 Publisher 範例

#!/usr/bin/env python3
# tactile_publisher.py — 模擬發布觸覺感測資料
import argparse
import numpy as np
import rclpy
from rclpy.node import Node
from your_msgs.msg import TactileSensor  # 替換成你的套件名稱
 
 
class TactilePublisher(Node):
    def __init__(self, rows: int, cols: int):
        super().__init__("tactile_publisher")
        self.rows = rows
        self.cols = cols
        self.n = rows * cols
        self.pub = self.create_publisher(TactileSensor, "/tactile_sensor", 10)
        self.timer = self.create_timer(0.05, self.publish_cb)  # 20 Hz
        self.get_logger().info(f"Publishing {rows}x{cols} tactile data")
 
    def publish_cb(self):
        t = self.get_clock().now().nanoseconds * 1e-9
        n = self.n
 
        # 模擬資料(替換成真實感測器讀值)
        raw = np.random.randint(0, 4096, n, dtype=np.uint16)
        fx = np.sin(t + np.arange(n) * 0.1).astype(np.float32)
        fy = np.cos(t + np.arange(n) * 0.1).astype(np.float32)
        fz = np.abs(np.random.normal(2.0, 0.5, n)).astype(np.float32)
 
        msg = TactileSensor()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = "tactile_frame"
        msg.data = raw.tolist()
        msg.force_array_x = fx.tolist()
        msg.force_array_y = fy.tolist()
        msg.force_array_z = fz.tolist()
        msg.force_x = float(np.mean(fx))
        msg.force_y = float(np.mean(fy))
        msg.force_z = float(np.mean(fz))
        self.pub.publish(msg)
 
 
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--rows", type=int, default=8)
    parser.add_argument("--cols", type=int, default=8)
    args = parser.parse_args()
 
    rclpy.init()
    rclpy.spin(TactilePublisher(args.rows, args.cols))
    rclpy.shutdown()
 
 
if __name__ == "__main__":
    main()

執行:

python3 tactile_publisher.py --rows 8 --cols 8

4. 即時視覺化:ROS2 → Rerun

#!/usr/bin/env python3
# tactile_rerun_live.py — 即時訂閱並推送到 Rerun Viewer
import argparse
import numpy as np
import matplotlib.cm as cm
import rerun as rr
import rerun.blueprint as rrb
import rclpy
from rclpy.node import Node
from your_msgs.msg import TactileSensor  # 替換成你的套件名稱
 
 
def make_heatmap_rgb(data: list, rows: int, cols: int) -> np.ndarray:
    """將 uint16 raw data 轉成 (rows, cols, 3) uint8 RGB 熱力圖。"""
    arr = np.array(data, dtype=np.float32).reshape(rows, cols)
    norm = (arr - arr.min()) / (arr.max() - arr.min() + 1e-6)
    rgba = cm.turbo(norm)  # (rows, cols, 4) float [0,1]
    return (rgba[:, :, :3] * 255).astype(np.uint8)
 
 
def make_arrows(fx, fy, fz, rows, cols):
    """
    建立 2D 箭頭的 origins 和 vectors。
    - 方向:fx/fy 合力水平方向
    - 長度:fz 大小(第三軸)
    """
    # origins: 每個 cell 的中心點(col + 0.5, row + 0.5)
    ys, xs = np.mgrid[0:rows, 0:cols]
    origins = np.stack(
        [xs.ravel().astype(float) + 0.5, ys.ravel().astype(float) + 0.5], axis=1
    )
 
    fx_arr = np.array(fx, dtype=float)
    fy_arr = np.array(fy, dtype=float)
    fz_arr = np.array(fz, dtype=float)
 
    # 正規化 xy 方向,以 fz 大小決定箭頭長度(最大 0.45 個 cell)
    xy_mag = np.sqrt(fx_arr**2 + fy_arr**2) + 1e-8
    scale = 0.45
    vectors = np.stack(
        [fx_arr / xy_mag * fz_arr * scale, fy_arr / xy_mag * fz_arr * scale], axis=1
    )
 
    return origins, vectors
 
 
class TactileRerunNode(Node):
    def __init__(self, rows: int, cols: int):
        super().__init__("tactile_rerun_live")
        self.rows = rows
        self.cols = cols
        self.sub = self.create_subscription(
            TactileSensor, "/tactile_sensor", self.callback, 10
        )
        self.get_logger().info(f"Rerun live viz ready ({rows}x{cols})")
 
    def callback(self, msg: TactileSensor):
        n = len(msg.data)
        rows, cols = self.rows, self.cols
 
        if n != rows * cols:
            self.get_logger().warn(
                f"Data length {n} != {rows}x{cols}={rows*cols}, skipping"
            )
            return
 
        # 對齊 ROS2 時間戳記到 Rerun timeline
        ts_ns = msg.header.stamp.sec * 1_000_000_000 + msg.header.stamp.nanosec
        rr.set_time_nanos("ros_time", ts_ns)
 
        # 1. 熱力圖(法向力強度)
        heatmap = make_heatmap_rgb(msg.data, rows, cols)
        rr.log("tactile/heatmap", rr.Image(heatmap))
 
        # 2. 合力方向箭頭(疊在熱力圖上)
        origins, vectors = make_arrows(
            msg.force_array_x, msg.force_array_y, msg.force_array_z, rows, cols
        )
        rr.log(
            "tactile/arrows",
            rr.Arrows2D(origins=origins, vectors=vectors, radii=0.04),
        )
 
        # 3. 整體合力純量(時間序列)
        rr.log("tactile/overall/fx", rr.Scalar(msg.force_x))
        rr.log("tactile/overall/fy", rr.Scalar(msg.force_y))
        rr.log("tactile/overall/fz", rr.Scalar(msg.force_z))
 
 
def send_blueprint(rows: int, cols: int):
    blueprint = rrb.Blueprint(
        rrb.Horizontal(
            rrb.Spatial2DView(
                name=f"Tactile Map ({rows}×{cols})",
                contents=["tactile/heatmap", "tactile/arrows"],
            ),
            rrb.TimeSeriesView(
                name="Overall Force",
                contents=["tactile/overall/**"],
            ),
            column_shares=[3, 1],
        ),
        auto_space_views=False,
    )
    rr.send_blueprint(blueprint)
 
 
def main():
    parser = argparse.ArgumentParser(description="Tactile sensor live Rerun viz")
    parser.add_argument("--rows", type=int, default=8, help="Grid rows")
    parser.add_argument("--cols", type=int, default=8, help="Grid cols")
    parser.add_argument(
        "--connect",
        metavar="HOST:PORT",
        default=None,
        help="連接到既有 Rerun Viewer(例如 127.0.0.1:9876),省略則自動啟動",
    )
    args = parser.parse_args()
 
    rr.init("tactile_live", spawn=(args.connect is None))
    if args.connect:
        rr.connect_tcp(args.connect)
 
    send_blueprint(args.rows, args.cols)
 
    rclpy.init()
    rclpy.spin(TactileRerunNode(args.rows, args.cols))
    rclpy.shutdown()
 
 
if __name__ == "__main__":
    main()

執行:

# 啟動 Rerun Viewer(自動彈出)
python3 tactile_rerun_live.py --rows 8 --cols 8
 
# 或連接到已開啟的 Viewer
python3 tactile_rerun_live.py --rows 16 --cols 12 --connect 127.0.0.1:9876

5. MCAP 離線回放

不需要 ROS2 環境。MCAP 檔案中已內嵌訊息定義,mcap-ros2-support 可直接解碼自定義訊息。

#!/usr/bin/env python3
# tactile_rerun_mcap.py — 離線 MCAP 轉換並在 Rerun 中回放
import argparse
import numpy as np
import matplotlib.cm as cm
import rerun as rr
import rerun.blueprint as rrb
from mcap_ros2.reader import read_ros2_messages
 
# 重用上面的輔助函數
from tactile_rerun_live import make_heatmap_rgb, make_arrows, send_blueprint
 
 
def replay_mcap(
    bag_path: str,
    rows: int,
    cols: int,
    topic: str = "/tactile_sensor",
    output_rrd: str = None,
):
    """
    讀取 MCAP 並推送到 Rerun。
    - output_rrd: 若指定,將結果存成 .rrd 檔案(之後用 rerun xxx.rrd 開啟)
    - 否則直接在 Viewer 中播放
    """
    rr.init("tactile_mcap", spawn=(output_rrd is None))
    if output_rrd:
        rr.save(output_rrd)
        print(f"輸出到 {output_rrd},完成後用 `rerun {output_rrd}` 開啟")
 
    send_blueprint(rows, cols)
 
    n_expected = rows * cols
    n_frames = 0
 
    with open(bag_path, "rb") as f:
        for schema, channel, message, decoded_msg in read_ros2_messages(
            f, topics=[topic]
        ):
            ts_ns = message.publish_time  # 發布時間(奈秒)
            rr.set_time_nanos("ros_time", ts_ns)
 
            data = list(decoded_msg.data)
            n = len(data)
            if n != n_expected:
                print(f"[skip] frame {n_frames}: data len {n} != {n_expected}")
                continue
 
            # 熱力圖
            heatmap = make_heatmap_rgb(data, rows, cols)
            rr.log("tactile/heatmap", rr.Image(heatmap))
 
            # 箭頭
            origins, vectors = make_arrows(
                list(decoded_msg.force_array_x),
                list(decoded_msg.force_array_y),
                list(decoded_msg.force_array_z),
                rows, cols,
            )
            rr.log("tactile/arrows", rr.Arrows2D(origins=origins, vectors=vectors, radii=0.04))
 
            # 整體合力
            rr.log("tactile/overall/fx", rr.Scalar(decoded_msg.force_x))
            rr.log("tactile/overall/fy", rr.Scalar(decoded_msg.force_y))
            rr.log("tactile/overall/fz", rr.Scalar(decoded_msg.force_z))
 
            n_frames += 1
 
    print(f"完成:共處理 {n_frames} 個 frames")
 
 
def main():
    parser = argparse.ArgumentParser(description="MCAP → Rerun tactile replay")
    parser.add_argument("bag", help="MCAP 檔案路徑(例如 recording.mcap)")
    parser.add_argument("--rows", type=int, default=8)
    parser.add_argument("--cols", type=int, default=8)
    parser.add_argument("--topic", default="/tactile_sensor")
    parser.add_argument(
        "--save",
        metavar="OUTPUT.rrd",
        default=None,
        help="將結果存成 .rrd 檔(不指定則直接開啟 Viewer)",
    )
    args = parser.parse_args()
 
    replay_mcap(args.bag, args.rows, args.cols, args.topic, args.save)
 
 
if __name__ == "__main__":
    main()

MCAP 執行方式

# 直接在 Rerun Viewer 中回放(自動開啟視窗)
python3 tactile_rerun_mcap.py recording.mcap --rows 8 --cols 8
 
# 匯出成 .rrd 檔,之後隨時開啟
python3 tactile_rerun_mcap.py recording.mcap --rows 8 --cols 8 --save output.rrd
rerun output.rrd
 
# 指定 topic 名稱
python3 tactile_rerun_mcap.py recording.mcap --rows 16 --cols 12 --topic /my_tactile

注意:若 MCAP 是 ROS2 bag 資料夾格式(bag_name/ 內有 .db3.mcap),需先找到 .mcap 檔案路徑,例如 bag_name/bag_name_0.mcap


6. 動態調整矩陣尺寸

Rerun 本身不支援在 UI 中即時 reshape 資料。實際操作方式:

情境做法
不確定尺寸先用 --rows 1 --cols N(單列)驗證資料長度,再調整
換感測器型號重新執行腳本並帶入新的 --rows R --cols C
同一次錄製中尺寸不變一次指定即可,腳本會驗證每幀長度

若需要在同一個 .rrd 中比較不同尺寸配置,可以多次執行並用不同的 entity path 前綴:

# 在同一個 session 中記錄兩種解析度
rr.log("tactile_8x8/heatmap", rr.Image(heatmap_8x8))
rr.log("tactile_16x4/heatmap", rr.Image(heatmap_16x4))

7. Rerun Blueprint 說明

Blueprint 定義了 Viewer 的視圖配置。上方程式碼中的 send_blueprint() 建立了:

┌────────────────────────────┬──────────────┐
│  Spatial2DView             │ TimeSeriesView│
│  - tactile/heatmap         │ - overall/fx │
│  - tactile/arrows (疊加)   │ - overall/fy │
│  (左側 75% 寬度)           │ - overall/fz │
└────────────────────────────┴──────────────┘
  • Spatial2DView 會自動將 rr.Image 作為背景,rr.Arrows2D 疊加在上面(共享同一個 2D 座標空間)
  • 箭頭座標系:(0,0) 在左上,X 向右,Y 向下,與圖像像素座標一致
  • 若要調整箭頭顏色(依 fz 大小上色),可加入:
fz_norm = (fz_arr / fz_arr.max() * 255).astype(np.uint8)
colors = cm.plasma(fz_norm / 255.0)[:, :3]  # (N, 3) RGB
rr.log("tactile/arrows", rr.Arrows2D(
    origins=origins,
    vectors=vectors,
    radii=0.04,
    colors=(colors * 255).astype(np.uint8),
))

8. 完整工作流程總結

[真實感測器 / 模擬] → ROS2 Publisher
                              ↓
                    /tactile_sensor topic
                    ┌─────────────────────┐
                    │  選其一             │
                    │ A) tactile_rerun_live.py  → Rerun Viewer (即時)
                    │ B) ros2 bag record -o bag.mcap /tactile_sensor
                    │    → tactile_rerun_mcap.py → Rerun Viewer (離線)
                    └─────────────────────┘
                              ↓
                    Rerun Viewer
                    ├── Spatial2DView: 熱力圖 + 力向量箭頭
                    └── TimeSeriesView: 整體合力 fx/fy/fz

9. 完整 ROS2 Workspace 結構與建置方式

9.1 目錄結構

tactile_ws/
├── src/
│   ├── tactile_msgs/              # 自定義訊息套件(CMake)
│   │   ├── CMakeLists.txt
│   │   ├── package.xml
│   │   └── msg/
│   │       └── TactileSensor.msg
│   └── tactile_viz/               # 視覺化節點套件(Python / ament_python)
│       ├── package.xml
│       ├── setup.py
│       ├── setup.cfg
│       ├── resource/
│       │   └── tactile_viz        # 空白標記檔
│       └── tactile_viz/
│           ├── __init__.py
│           ├── publisher.py       # 模擬發布節點
│           └── rerun_live.py      # 即時 Rerun 視覺化節點
├── scripts/
│   └── mcap_replay.py             # 離線 MCAP 回放(獨立腳本,不需 ROS2)
├── install/   (colcon 自動產生)
├── build/     (colcon 自動產生)
└── log/       (colcon 自動產生)

9.2 tactile_msgs 套件

src/tactile_msgs/msg/TactileSensor.msg(同第 1 節)

src/tactile_msgs/CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(tactile_msgs)
 
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
 
rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/TactileSensor.msg"
  DEPENDENCIES std_msgs
)
 
ament_package()

src/tactile_msgs/package.xml

<?xml version="1.0"?>
<package format="3">
  <name>tactile_msgs</name>
  <version>0.1.0</version>
  <description>Custom tactile sensor messages</description>
  <maintainer email="you@example.com">Your Name</maintainer>
  <license>Apache-2.0</license>
 
  <buildtool_depend>ament_cmake</buildtool_depend>
  <buildtool_depend>rosidl_default_generators</buildtool_depend>
 
  <depend>std_msgs</depend>
 
  <member_of_group>rosidl_interface_packages</member_of_group>
 
  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

9.3 tactile_viz 套件

src/tactile_viz/package.xml

<?xml version="1.0"?>
<package format="3">
  <name>tactile_viz</name>
  <version>0.1.0</version>
  <description>Tactile sensor Rerun visualization nodes</description>
  <maintainer email="you@example.com">Your Name</maintainer>
  <license>Apache-2.0</license>
 
  <depend>rclpy</depend>
  <depend>tactile_msgs</depend>
  <depend>std_msgs</depend>
 
  <buildtool_depend>ament_python</buildtool_depend>
 
  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

src/tactile_viz/setup.py

from setuptools import find_packages, setup
 
package_name = "tactile_viz"
 
setup(
    name=package_name,
    version="0.1.0",
    packages=find_packages(exclude=["test"]),
    data_files=[
        ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
        (f"share/{package_name}", ["package.xml"]),
    ],
    install_requires=["setuptools"],
    zip_safe=True,
    entry_points={
        "console_scripts": [
            "publisher = tactile_viz.publisher:main",
            "rerun_live = tactile_viz.rerun_live:main",
        ],
    },
)

src/tactile_viz/setup.cfg

[develop]
script_dir=$base/lib/tactile_viz
[install]
install_scripts=$base/lib/tactile_viz

src/tactile_viz/resource/tactile_viz — 空白檔案,執行 touch 建立即可

src/tactile_viz/tactile_viz/__init__.py — 空白

src/tactile_viz/tactile_viz/publisher.py — 即第 3 節程式碼,將 import 改為:

from tactile_msgs.msg import TactileSensor

src/tactile_viz/tactile_viz/rerun_live.py — 即第 4 節程式碼,同上修改 import

scripts/mcap_replay.py — 即第 5 節程式碼(獨立腳本,不依賴 ROS2)

9.4 建置步驟

# 1. 建立 workspace 並進入
mkdir -p ~/tactile_ws/src
cd ~/tactile_ws
 
# 2. 複製套件到 src/(或 git clone)
# cp -r /path/to/tactile_msgs src/
# cp -r /path/to/tactile_viz src/
 
# 3. 安裝 Python 依賴(workspace 外)
pip install rerun-sdk==0.31 mcap-ros2-support matplotlib numpy
 
# 4. Source ROS2 Humble
source /opt/ros/humble/setup.bash
 
# 5. 解析依賴
rosdep install --from-paths src --ignore-src -r -y
 
# 6. 建置(先建 tactile_msgs,再建 tactile_viz)
colcon build --packages-select tactile_msgs
source install/setup.bash
colcon build --packages-select tactile_viz
 
# 7. Source workspace(每次新終端機都需要)
source install/setup.bash

提示:可以把 source ~/tactile_ws/install/setup.bash 加到 ~/.bashrc 避免每次手動 source。

9.5 執行

每個指令開新終端機,都先 source ~/tactile_ws/install/setup.bash

# 終端機 1:發布模擬資料
ros2 run tactile_viz publisher --ros-args -p rows:=8 -p cols:=8
# 或直接 python3
python3 src/tactile_viz/tactile_viz/publisher.py --rows 8 --cols 8
 
# 終端機 2:即時 Rerun 視覺化(自動開啟 Viewer)
ros2 run tactile_viz rerun_live --rows 8 --cols 8
# 或直接 python3
python3 src/tactile_viz/tactile_viz/rerun_live.py --rows 8 --cols 8
 
# 錄製 MCAP
ros2 bag record -o ~/bags/tactile_bag /tactile_sensor
 
# 離線回放(不需 source ROS2)
python3 scripts/mcap_replay.py ~/bags/tactile_bag/tactile_bag_0.mcap --rows 8 --cols 8

注意 --rows/--cols 傳遞方式:直接執行 python3 時用 --rows 8,透過 ros2 run 時需改為 ROS2 parameter 語法 -p rows:=8,或在 publisher.py / rerun_live.py 內改用 declare_parameter。最簡單的做法是直接 python3 執行。


參考資料