ROS2 觸覺感測陣列 + Rerun 視覺化完整指南
場景說明
- 自定義 ROS2 訊息,包含矩陣排列的觸覺感測資料(法向力 raw data + 三軸力陣列)
- 目標:在 Rerun 中呈現 2D 熱力圖(法向力強度)疊加 2D 箭頭(合力方向,箭頭長短代表 Z 軸大小)
- 矩陣尺寸(幾乘幾)由執行腳本時指定,Rerun 本身不支援動態 reshape 滑桿
- 支援兩種工作流:
- 即時視覺化:ROS2 節點訂閱 → 即時推送到 Rerun Viewer
- MCAP 離線回放:讀取既有 MCAP 檔案 → 匯出
.rrd或直接在 Viewer 中回放
1. 自定義訊息定義
在你的 ROS2 套件中建立 msg/TactileSensor.msg:
std_msgs/Header header
# Overall force (combined resultant)
float32 force_x
float32 force_y
float32 force_z
# Per-point force arrays (row-major, length = rows * cols)
float32[] force_array_x
float32[] force_array_y
float32[] force_array_z
# Raw normal force data (row-major, length = rows * cols)
uint16[] data
CMakeLists.txt 加入:
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/TactileSensor.msg"
DEPENDENCIES std_msgs
)2. 安裝依賴
# Rerun SDK
pip install rerun-sdk==0.31
# 離線 MCAP 解讀(不需 ROS2 環境)
pip install mcap-ros2-support
# 熱力圖上色
pip install matplotlib numpy3. ROS2 Publisher 範例
#!/usr/bin/env python3
# tactile_publisher.py — 模擬發布觸覺感測資料
import argparse
import numpy as np
import rclpy
from rclpy.node import Node
from your_msgs.msg import TactileSensor # 替換成你的套件名稱
class TactilePublisher(Node):
def __init__(self, rows: int, cols: int):
super().__init__("tactile_publisher")
self.rows = rows
self.cols = cols
self.n = rows * cols
self.pub = self.create_publisher(TactileSensor, "/tactile_sensor", 10)
self.timer = self.create_timer(0.05, self.publish_cb) # 20 Hz
self.get_logger().info(f"Publishing {rows}x{cols} tactile data")
def publish_cb(self):
t = self.get_clock().now().nanoseconds * 1e-9
n = self.n
# 模擬資料(替換成真實感測器讀值)
raw = np.random.randint(0, 4096, n, dtype=np.uint16)
fx = np.sin(t + np.arange(n) * 0.1).astype(np.float32)
fy = np.cos(t + np.arange(n) * 0.1).astype(np.float32)
fz = np.abs(np.random.normal(2.0, 0.5, n)).astype(np.float32)
msg = TactileSensor()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = "tactile_frame"
msg.data = raw.tolist()
msg.force_array_x = fx.tolist()
msg.force_array_y = fy.tolist()
msg.force_array_z = fz.tolist()
msg.force_x = float(np.mean(fx))
msg.force_y = float(np.mean(fy))
msg.force_z = float(np.mean(fz))
self.pub.publish(msg)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--rows", type=int, default=8)
parser.add_argument("--cols", type=int, default=8)
args = parser.parse_args()
rclpy.init()
rclpy.spin(TactilePublisher(args.rows, args.cols))
rclpy.shutdown()
if __name__ == "__main__":
main()執行:
python3 tactile_publisher.py --rows 8 --cols 84. 即時視覺化:ROS2 → Rerun
#!/usr/bin/env python3
# tactile_rerun_live.py — 即時訂閱並推送到 Rerun Viewer
import argparse
import numpy as np
import matplotlib.cm as cm
import rerun as rr
import rerun.blueprint as rrb
import rclpy
from rclpy.node import Node
from your_msgs.msg import TactileSensor # 替換成你的套件名稱
def make_heatmap_rgb(data: list, rows: int, cols: int) -> np.ndarray:
"""將 uint16 raw data 轉成 (rows, cols, 3) uint8 RGB 熱力圖。"""
arr = np.array(data, dtype=np.float32).reshape(rows, cols)
norm = (arr - arr.min()) / (arr.max() - arr.min() + 1e-6)
rgba = cm.turbo(norm) # (rows, cols, 4) float [0,1]
return (rgba[:, :, :3] * 255).astype(np.uint8)
def make_arrows(fx, fy, fz, rows, cols):
"""
建立 2D 箭頭的 origins 和 vectors。
- 方向:fx/fy 合力水平方向
- 長度:fz 大小(第三軸)
"""
# origins: 每個 cell 的中心點(col + 0.5, row + 0.5)
ys, xs = np.mgrid[0:rows, 0:cols]
origins = np.stack(
[xs.ravel().astype(float) + 0.5, ys.ravel().astype(float) + 0.5], axis=1
)
fx_arr = np.array(fx, dtype=float)
fy_arr = np.array(fy, dtype=float)
fz_arr = np.array(fz, dtype=float)
# 正規化 xy 方向,以 fz 大小決定箭頭長度(最大 0.45 個 cell)
xy_mag = np.sqrt(fx_arr**2 + fy_arr**2) + 1e-8
scale = 0.45
vectors = np.stack(
[fx_arr / xy_mag * fz_arr * scale, fy_arr / xy_mag * fz_arr * scale], axis=1
)
return origins, vectors
class TactileRerunNode(Node):
def __init__(self, rows: int, cols: int):
super().__init__("tactile_rerun_live")
self.rows = rows
self.cols = cols
self.sub = self.create_subscription(
TactileSensor, "/tactile_sensor", self.callback, 10
)
self.get_logger().info(f"Rerun live viz ready ({rows}x{cols})")
def callback(self, msg: TactileSensor):
n = len(msg.data)
rows, cols = self.rows, self.cols
if n != rows * cols:
self.get_logger().warn(
f"Data length {n} != {rows}x{cols}={rows*cols}, skipping"
)
return
# 對齊 ROS2 時間戳記到 Rerun timeline
ts_ns = msg.header.stamp.sec * 1_000_000_000 + msg.header.stamp.nanosec
rr.set_time_nanos("ros_time", ts_ns)
# 1. 熱力圖(法向力強度)
heatmap = make_heatmap_rgb(msg.data, rows, cols)
rr.log("tactile/heatmap", rr.Image(heatmap))
# 2. 合力方向箭頭(疊在熱力圖上)
origins, vectors = make_arrows(
msg.force_array_x, msg.force_array_y, msg.force_array_z, rows, cols
)
rr.log(
"tactile/arrows",
rr.Arrows2D(origins=origins, vectors=vectors, radii=0.04),
)
# 3. 整體合力純量(時間序列)
rr.log("tactile/overall/fx", rr.Scalar(msg.force_x))
rr.log("tactile/overall/fy", rr.Scalar(msg.force_y))
rr.log("tactile/overall/fz", rr.Scalar(msg.force_z))
def send_blueprint(rows: int, cols: int):
blueprint = rrb.Blueprint(
rrb.Horizontal(
rrb.Spatial2DView(
name=f"Tactile Map ({rows}×{cols})",
contents=["tactile/heatmap", "tactile/arrows"],
),
rrb.TimeSeriesView(
name="Overall Force",
contents=["tactile/overall/**"],
),
column_shares=[3, 1],
),
auto_space_views=False,
)
rr.send_blueprint(blueprint)
def main():
parser = argparse.ArgumentParser(description="Tactile sensor live Rerun viz")
parser.add_argument("--rows", type=int, default=8, help="Grid rows")
parser.add_argument("--cols", type=int, default=8, help="Grid cols")
parser.add_argument(
"--connect",
metavar="HOST:PORT",
default=None,
help="連接到既有 Rerun Viewer(例如 127.0.0.1:9876),省略則自動啟動",
)
args = parser.parse_args()
rr.init("tactile_live", spawn=(args.connect is None))
if args.connect:
rr.connect_tcp(args.connect)
send_blueprint(args.rows, args.cols)
rclpy.init()
rclpy.spin(TactileRerunNode(args.rows, args.cols))
rclpy.shutdown()
if __name__ == "__main__":
main()執行:
# 啟動 Rerun Viewer(自動彈出)
python3 tactile_rerun_live.py --rows 8 --cols 8
# 或連接到已開啟的 Viewer
python3 tactile_rerun_live.py --rows 16 --cols 12 --connect 127.0.0.1:98765. MCAP 離線回放
不需要 ROS2 環境。MCAP 檔案中已內嵌訊息定義,mcap-ros2-support 可直接解碼自定義訊息。
#!/usr/bin/env python3
# tactile_rerun_mcap.py — 離線 MCAP 轉換並在 Rerun 中回放
import argparse
import numpy as np
import matplotlib.cm as cm
import rerun as rr
import rerun.blueprint as rrb
from mcap_ros2.reader import read_ros2_messages
# 重用上面的輔助函數
from tactile_rerun_live import make_heatmap_rgb, make_arrows, send_blueprint
def replay_mcap(
bag_path: str,
rows: int,
cols: int,
topic: str = "/tactile_sensor",
output_rrd: str = None,
):
"""
讀取 MCAP 並推送到 Rerun。
- output_rrd: 若指定,將結果存成 .rrd 檔案(之後用 rerun xxx.rrd 開啟)
- 否則直接在 Viewer 中播放
"""
rr.init("tactile_mcap", spawn=(output_rrd is None))
if output_rrd:
rr.save(output_rrd)
print(f"輸出到 {output_rrd},完成後用 `rerun {output_rrd}` 開啟")
send_blueprint(rows, cols)
n_expected = rows * cols
n_frames = 0
with open(bag_path, "rb") as f:
for schema, channel, message, decoded_msg in read_ros2_messages(
f, topics=[topic]
):
ts_ns = message.publish_time # 發布時間(奈秒)
rr.set_time_nanos("ros_time", ts_ns)
data = list(decoded_msg.data)
n = len(data)
if n != n_expected:
print(f"[skip] frame {n_frames}: data len {n} != {n_expected}")
continue
# 熱力圖
heatmap = make_heatmap_rgb(data, rows, cols)
rr.log("tactile/heatmap", rr.Image(heatmap))
# 箭頭
origins, vectors = make_arrows(
list(decoded_msg.force_array_x),
list(decoded_msg.force_array_y),
list(decoded_msg.force_array_z),
rows, cols,
)
rr.log("tactile/arrows", rr.Arrows2D(origins=origins, vectors=vectors, radii=0.04))
# 整體合力
rr.log("tactile/overall/fx", rr.Scalar(decoded_msg.force_x))
rr.log("tactile/overall/fy", rr.Scalar(decoded_msg.force_y))
rr.log("tactile/overall/fz", rr.Scalar(decoded_msg.force_z))
n_frames += 1
print(f"完成:共處理 {n_frames} 個 frames")
def main():
parser = argparse.ArgumentParser(description="MCAP → Rerun tactile replay")
parser.add_argument("bag", help="MCAP 檔案路徑(例如 recording.mcap)")
parser.add_argument("--rows", type=int, default=8)
parser.add_argument("--cols", type=int, default=8)
parser.add_argument("--topic", default="/tactile_sensor")
parser.add_argument(
"--save",
metavar="OUTPUT.rrd",
default=None,
help="將結果存成 .rrd 檔(不指定則直接開啟 Viewer)",
)
args = parser.parse_args()
replay_mcap(args.bag, args.rows, args.cols, args.topic, args.save)
if __name__ == "__main__":
main()MCAP 執行方式
# 直接在 Rerun Viewer 中回放(自動開啟視窗)
python3 tactile_rerun_mcap.py recording.mcap --rows 8 --cols 8
# 匯出成 .rrd 檔,之後隨時開啟
python3 tactile_rerun_mcap.py recording.mcap --rows 8 --cols 8 --save output.rrd
rerun output.rrd
# 指定 topic 名稱
python3 tactile_rerun_mcap.py recording.mcap --rows 16 --cols 12 --topic /my_tactile注意:若 MCAP 是 ROS2 bag 資料夾格式(
bag_name/內有.db3或.mcap),需先找到.mcap檔案路徑,例如bag_name/bag_name_0.mcap。
6. 動態調整矩陣尺寸
Rerun 本身不支援在 UI 中即時 reshape 資料。實際操作方式:
| 情境 | 做法 |
|---|---|
| 不確定尺寸 | 先用 --rows 1 --cols N(單列)驗證資料長度,再調整 |
| 換感測器型號 | 重新執行腳本並帶入新的 --rows R --cols C |
| 同一次錄製中尺寸不變 | 一次指定即可,腳本會驗證每幀長度 |
若需要在同一個 .rrd 中比較不同尺寸配置,可以多次執行並用不同的 entity path 前綴:
# 在同一個 session 中記錄兩種解析度
rr.log("tactile_8x8/heatmap", rr.Image(heatmap_8x8))
rr.log("tactile_16x4/heatmap", rr.Image(heatmap_16x4))7. Rerun Blueprint 說明
Blueprint 定義了 Viewer 的視圖配置。上方程式碼中的 send_blueprint() 建立了:
┌────────────────────────────┬──────────────┐
│ Spatial2DView │ TimeSeriesView│
│ - tactile/heatmap │ - overall/fx │
│ - tactile/arrows (疊加) │ - overall/fy │
│ (左側 75% 寬度) │ - overall/fz │
└────────────────────────────┴──────────────┘
- Spatial2DView 會自動將
rr.Image作為背景,rr.Arrows2D疊加在上面(共享同一個 2D 座標空間) - 箭頭座標系:(0,0) 在左上,X 向右,Y 向下,與圖像像素座標一致
- 若要調整箭頭顏色(依 fz 大小上色),可加入:
fz_norm = (fz_arr / fz_arr.max() * 255).astype(np.uint8)
colors = cm.plasma(fz_norm / 255.0)[:, :3] # (N, 3) RGB
rr.log("tactile/arrows", rr.Arrows2D(
origins=origins,
vectors=vectors,
radii=0.04,
colors=(colors * 255).astype(np.uint8),
))8. 完整工作流程總結
[真實感測器 / 模擬] → ROS2 Publisher
↓
/tactile_sensor topic
┌─────────────────────┐
│ 選其一 │
│ A) tactile_rerun_live.py → Rerun Viewer (即時)
│ B) ros2 bag record -o bag.mcap /tactile_sensor
│ → tactile_rerun_mcap.py → Rerun Viewer (離線)
└─────────────────────┘
↓
Rerun Viewer
├── Spatial2DView: 熱力圖 + 力向量箭頭
└── TimeSeriesView: 整體合力 fx/fy/fz
9. 完整 ROS2 Workspace 結構與建置方式
9.1 目錄結構
tactile_ws/
├── src/
│ ├── tactile_msgs/ # 自定義訊息套件(CMake)
│ │ ├── CMakeLists.txt
│ │ ├── package.xml
│ │ └── msg/
│ │ └── TactileSensor.msg
│ └── tactile_viz/ # 視覺化節點套件(Python / ament_python)
│ ├── package.xml
│ ├── setup.py
│ ├── setup.cfg
│ ├── resource/
│ │ └── tactile_viz # 空白標記檔
│ └── tactile_viz/
│ ├── __init__.py
│ ├── publisher.py # 模擬發布節點
│ └── rerun_live.py # 即時 Rerun 視覺化節點
├── scripts/
│ └── mcap_replay.py # 離線 MCAP 回放(獨立腳本,不需 ROS2)
├── install/ (colcon 自動產生)
├── build/ (colcon 自動產生)
└── log/ (colcon 自動產生)
9.2 tactile_msgs 套件
src/tactile_msgs/msg/TactileSensor.msg(同第 1 節)
src/tactile_msgs/CMakeLists.txt
cmake_minimum_required(VERSION 3.8)
project(tactile_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/TactileSensor.msg"
DEPENDENCIES std_msgs
)
ament_package()src/tactile_msgs/package.xml
<?xml version="1.0"?>
<package format="3">
<name>tactile_msgs</name>
<version>0.1.0</version>
<description>Custom tactile sensor messages</description>
<maintainer email="you@example.com">Your Name</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>std_msgs</depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>9.3 tactile_viz 套件
src/tactile_viz/package.xml
<?xml version="1.0"?>
<package format="3">
<name>tactile_viz</name>
<version>0.1.0</version>
<description>Tactile sensor Rerun visualization nodes</description>
<maintainer email="you@example.com">Your Name</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>tactile_msgs</depend>
<depend>std_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>src/tactile_viz/setup.py
from setuptools import find_packages, setup
package_name = "tactile_viz"
setup(
name=package_name,
version="0.1.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
entry_points={
"console_scripts": [
"publisher = tactile_viz.publisher:main",
"rerun_live = tactile_viz.rerun_live:main",
],
},
)src/tactile_viz/setup.cfg
[develop]
script_dir=$base/lib/tactile_viz
[install]
install_scripts=$base/lib/tactile_vizsrc/tactile_viz/resource/tactile_viz — 空白檔案,執行 touch 建立即可
src/tactile_viz/tactile_viz/__init__.py — 空白
src/tactile_viz/tactile_viz/publisher.py — 即第 3 節程式碼,將 import 改為:
from tactile_msgs.msg import TactileSensorsrc/tactile_viz/tactile_viz/rerun_live.py — 即第 4 節程式碼,同上修改 import
scripts/mcap_replay.py — 即第 5 節程式碼(獨立腳本,不依賴 ROS2)
9.4 建置步驟
# 1. 建立 workspace 並進入
mkdir -p ~/tactile_ws/src
cd ~/tactile_ws
# 2. 複製套件到 src/(或 git clone)
# cp -r /path/to/tactile_msgs src/
# cp -r /path/to/tactile_viz src/
# 3. 安裝 Python 依賴(workspace 外)
pip install rerun-sdk==0.31 mcap-ros2-support matplotlib numpy
# 4. Source ROS2 Humble
source /opt/ros/humble/setup.bash
# 5. 解析依賴
rosdep install --from-paths src --ignore-src -r -y
# 6. 建置(先建 tactile_msgs,再建 tactile_viz)
colcon build --packages-select tactile_msgs
source install/setup.bash
colcon build --packages-select tactile_viz
# 7. Source workspace(每次新終端機都需要)
source install/setup.bash提示:可以把
source ~/tactile_ws/install/setup.bash加到~/.bashrc避免每次手動 source。
9.5 執行
每個指令開新終端機,都先 source ~/tactile_ws/install/setup.bash。
# 終端機 1:發布模擬資料
ros2 run tactile_viz publisher --ros-args -p rows:=8 -p cols:=8
# 或直接 python3
python3 src/tactile_viz/tactile_viz/publisher.py --rows 8 --cols 8
# 終端機 2:即時 Rerun 視覺化(自動開啟 Viewer)
ros2 run tactile_viz rerun_live --rows 8 --cols 8
# 或直接 python3
python3 src/tactile_viz/tactile_viz/rerun_live.py --rows 8 --cols 8
# 錄製 MCAP
ros2 bag record -o ~/bags/tactile_bag /tactile_sensor
# 離線回放(不需 source ROS2)
python3 scripts/mcap_replay.py ~/bags/tactile_bag/tactile_bag_0.mcap --rows 8 --cols 8注意
--rows/--cols傳遞方式:直接執行 python3 時用--rows 8,透過ros2 run時需改為 ROS2 parameter 語法-p rows:=8,或在publisher.py/rerun_live.py內改用declare_parameter。最簡單的做法是直接python3執行。