본문 바로가기
Unreal Engine/언리얼-ROS-Physical 통합 프로젝트

[UnrealRobotics: SO-101] (15) Unreal Engine에서 LeRobot Record/Replay 구현

by 테크앤아트 2026. 5. 20.
728x90
반응형

 

 

 


 

 

지금까지 구현된 Unreal to Robot 데이터 흐름

Unreal (C++, PIE)
    ↓ rosbridge publish (WebSocket, JSON)
        ↓ /moveit_goal_named 또는 /moveit_goal_joints
rosbridge_server (WSL2)
    ↓ ROS2 토픽 재발행
moveit_goal_node.py (rclpy)
    ↓ MoveGroup action goal 조립
        ↓ /move_action (action)
move_group (MoveIt 2)
    ↓ Plan (OMPL) + Execute
        ↓ /arm_controller/follow_joint_trajectory (action)
joint_trajectory_action_server.py
    ↓ ZMQ REQ (degrees)
lerobot_worker.py (conda, Py3.12)
    ├─→ safety clamp (physical limits)
    ├─→ send_action() → Follower arm (실물 동작)
    └─→ get_observation() → ZMQ PUB
         ↓
ros_bridge_node.py → /joint_states
    ↓ rosbridge WebSocket
Unreal (C++) → ParseAndApplyJointStates → 3D 모델 업데이트

이미지 버전

 

 

계획대로라면 이후 작업은 Unreal 뷰포트 인터랙션 작업이다. 언리얼 뷰포트 상에서 로봇을 잡고 움직이면 실제 로봇도 똑같이 움직이게 하는 것.

 

 

먼저, 뷰포트에서 목표를 지정하는 방식을 어떻게 할 것인가를 생각해야 한다. 마우스 클릭으로 바닥/테이블 위 지점을 찍거나 기즈모를 이용해서 조작하거나.

 

 

그 전에 왜 이 기능을 넣어야 하는지를 생각해야 한다. 가상 모니터링 시스템 상에서 명령을 내려서 실물 로봇을 움직여야 하는 상황이 있을까? 있다면 어떤 상황이 있을까?

- 원격 유지보수 상황이 가장 대표적일 수 있다. 로봇이 작업 중 멈추거나 이상 자세에 빠졌을 때, 현장에 사람이 바로 갈 수 없는 경우 등?

 

 

사실상 디지털 트윈의 본질적 가치는 모니터링과 상태 파악이다. 실물 로봇이 지금 어떤 자세인지, 관절 값이 정상 범위인지, 이력이 어떤지를 시각적으로 보여주는 것. 언리얼→실물 로봇 데모는 실용적 근거가 약할 수 있다는 생각이 문득 들었다.

 

 

다시 처음부터 돌아가서 실제 공장에서 로봇이 사용된다고 생각하면, 공장 로봇 제어는 미리 프로그래밍된 작업 시퀀스를 실행하는 것이지 작업자가 로봇을 원격으로 직접 제어할 일은 거의 없다. 따라서 아직 나는 미리 프로그래밍된 작업 시퀀스 형태의 작업조차 만들어 놓지 않았기 때문에 이 기능을 추가 해야 한다. 로봇이 어떻게 작업할 지를 아직 만들지 않았다는 말이다.

 

 

"A에서 집고 B에 놓기" 반복 작업을 시키려면 LeRobot의 recording + replay 기능을 사용하면 된다. 내가 리더암을 잡고 기록하면 LeRobot이 관절 궤적을 기록하고 그대로 재생하는 방식이다. MoveIt을 경유하는 경로라면 trajectory가 waypoint 단위로 쪼개져서 덜덜 떨리지만 record + replay를 사용하면 해당 문제가 발생하지 않는다.

 

 

위 내용을 토대로 이후 작업 방향을 다시 기획해보면, 아래와 같다.

  1. 언리얼 UI에서 Record/Stop/Replay 제어
    1. LeRobot은 이미 SO101 전용 CLI 명령으로 lerobot-teleoperate, lerobot-record, lerobot-replay를 제공하고 있음.
    2. Record UI 버튼 → rosbridge publish → bridge node → ZMQ → worker가 텔레옵 모드 진입 + 관절 궤적 저장 시작.
      Stop UI 버튼 → 같은 경로로 기록 중단 + 파일 저장.
      Replay 버튼 → 저장된 궤적을 worker가 send_action()으로 직접 재생.
    3. Worker가 MoveIt을 거치지 않고 Python API 레벨에서 LeRobot API로 직접 서보에 쓰기 때문에 떨림 현상 없음.
      home/ready 버튼은 이미 MoveIt 경유로 동작하는데, 이것도 worker 직접 경로로 바꾸면 더 부드러워질 수 있음.
    4. 구현에 필요한 건 worker에 record/stop/replay 명령 핸들러 추가(ZMQ REP 확장), bridge node에 해당 명령 중계, 언리얼에서 토픽 publish 정도
  2. Gizmo 드래그 → Planning → Execute (애매함)
    1. 5DOF 제약 때문에 Cartesian pose(position + orientation 6개 값)로 보내면 IK가 자주 실패한다.
    2. orientation을 빼고 position-only로 보내면 성공률이 올라가긴 하지만 end effector가 어떤 방향을 향하는지 제어가 안된다. 아니면 각 관절의 gizmo를 개별로 드래그하는 joint space 방식이면 IK 문제가 없을 수도?
  3. 오류/안전 처리
  4. 씬 비주얼 업데이트

 

 


 

 

 

1단계: Worker에 Record/Replay 명령 추가 (lerobot_worker.py 수정)

현재 handle_command()pingsend_follower_action 두 개 명령이 있음. 아래 명령을 추가한다.

  • start_record
    worker의 상태를 recording으로 전환. 텔레옵 모드를 자동 활성화하고 매 루프마다 follower 관절값 + 타임스탬프를 리스트에 append. Leader가 없으면 에러 반환.
  • stop_record
    기록 중단. 누적된 프레임을 JSON 파일로 저장 (~/recordings/recording_YYYYMMDD_HHMMSS.json). 텔레옵 해제. 저장된 파일명 반환.
  • start_replay
    지정된 파일(또는 가장 최근 파일)을 로드하여 타이밍에 맞게 send_action() 재생. 재생 중에도 매 프레임 관절값을 PUB으로 발행(Unreal에서 실시간 반영). loop 옵션으로 반복 재생 가능.
  • stop_replay / estop
    재생 즉시 중단. estop은 어떤 상태에서든 모든 동작 중단.
  • list_recordings
    저장된 기록 파일 목록 반환.
  • 상태 관리
    self.state를 추가 - idle, recording, replaying 세 가지 상태에 따라 run loop의 동작이 달라진다. PUB 메시지에 "state": "recording" 등을 포함시켜 bridge node → rosbridge → Unreal까지 상태가 전파되도록 한다.

*replay는 별도의 스레드를 쓰지 않고 루프 매 틱마다 지금 재생해야 할 프레임이 있는지 체크하는 방식으로 run() 루프 안에서 처리한다. 이렇게 해야 estop 등 ZMQ 명령을 놓치지 않는다.

 

 

더보기
#!/usr/bin/env python3
"""
lerobot_worker.py — LeRobot ↔ ZMQ bridge worker

Runs in conda env 'lerobot' (Python 3.12).
Reads joint states from SO-ARM-101 via LeRobot API at ~30Hz,
publishes them over ZMQ PUB, and accepts commands via ZMQ REP.

Supports record/replay of joint trajectories via ZMQ commands:
  - start_record: begin teleop + record joint frames
  - stop_record:  stop recording, save to ~/recordings/
  - start_replay: replay a saved recording on the follower arm
  - stop_replay:  stop replay immediately
  - estop:        emergency stop — abort all motion immediately
  - list_recordings: list saved recording files

Usage:
    conda activate lerobot
    sudo chmod 666 /dev/ttyACM*
    python scripts/lerobot_worker.py [--follower-port /dev/ttyACM0] [--leader-port /dev/ttyACM1]
                                      [--pub-addr tcp://127.0.0.1:5555]
                                      [--rep-addr tcp://127.0.0.1:5556]
                                      [--rate 30]
                                      [--follower-only | --leader-only]
                                      [--teleop]
                                      [--recordings-dir ~/recordings]
"""

import argparse
import json
import os
import pathlib
import signal
import sys
import time
import traceback
from datetime import datetime

import zmq


# ---------------------------------------------------------------------------
# Joint name ordering (matches URDF & LeRobot calibration — Phase 3.2 verified)
# ---------------------------------------------------------------------------
JOINT_NAMES = [
    "shoulder_pan",
    "shoulder_lift",
    "elbow_flex",
    "wrist_flex",
    "wrist_roll",
    "gripper",
]

# ---------------------------------------------------------------------------
# Joint limits in LeRobot DEGREES (empirically measured via calibration).
# These are absolute physical limits — commands are never allowed beyond these.
# Measured by reading get_observation() degrees at calibration range_min/max
# raw encoder positions, using two-point linear interpolation.
# NOTE: These are in LeRobot's coordinate frame (0° = calibration midpoint),
#       NOT URDF coordinates (which use a different zero point).
# ---------------------------------------------------------------------------
JOINT_LIMITS_DEG = {
    "shoulder_pan":  (-119.91, +119.91),
    "shoulder_lift": (-104.62, +104.62),
    "elbow_flex":    ( -97.01,  +97.01),
    "wrist_flex":    (-102.68, +102.68),
    "wrist_roll":    (-180.00, +180.00),
    "gripper":       (  +0.14,  +99.45),
}


def build_args():
    p = argparse.ArgumentParser(description="LeRobot ZMQ worker for SO-ARM-101")
    p.add_argument("--follower-port", default="/dev/ttyACM0",
                    help="Serial port for follower arm")
    p.add_argument("--leader-port", default="/dev/ttyACM1",
                    help="Serial port for leader arm")
    p.add_argument("--follower-id", default="so101_twin_follower",
                    help="Calibration ID for follower")
    p.add_argument("--leader-id", default="so101_twin_leader",
                    help="Calibration ID for leader")
    p.add_argument("--pub-addr", default="tcp://127.0.0.1:5555",
                    help="ZMQ PUB bind address for joint states")
    p.add_argument("--rep-addr", default="tcp://127.0.0.1:5556",
                    help="ZMQ REP bind address for commands")
    p.add_argument("--rate", type=float, default=30.0,
                    help="Read loop frequency in Hz")
    p.add_argument("--follower-only", action="store_true",
                    help="Only connect follower (no leader)")
    p.add_argument("--leader-only", action="store_true",
                    help="Only connect leader (no follower)")
    p.add_argument("--teleop", action="store_true",
                    help="Teleoperation mode: leader arm controls follower arm")
    p.add_argument("--cmd-limit", type=float, default=5.0,
                    help="Safety clamp: max degrees from initial position per joint (default 5.0)")
    p.add_argument("--cmd-joints", default="shoulder_pan",
                    help="Comma-separated joint names that accept commands "
                         "(default: shoulder_pan). Use 'all' for all joints.")
    p.add_argument("--recordings-dir", default="~/recordings",
                    help="Directory to save/load recorded trajectories")
    return p.parse_args()


class LeRobotWorker:
    def __init__(self, args):
        self.args = args
        self.running = True
        self.follower = None
        self.leader = None
        self.teleop = args.teleop

        # Safety clamp: initial follower position recorded at startup,
        # commands are clamped to ±cmd_limit degrees from this baseline.
        # Only joints listed in allowed_cmd_joints accept commands;
        # others are silently ignored (held at current position).
        self.follower_initial_pos = None       # dict: {joint_name: degrees}
        self.cmd_limit_deg = args.cmd_limit    # default 5.0°
        self.allowed_cmd_joints = (
            set(JOINT_NAMES) if args.cmd_joints.strip().lower() == "all"
            else set(j.strip() for j in args.cmd_joints.split(",") if j.strip())
        )

        # ---------------------------------------------------------------------------
        # Record / Replay state
        # ---------------------------------------------------------------------------
        # state: "idle", "recording", "replaying"
        self.state = "idle"

        # Recording buffer: list of {"ts": float, "joints": {name: deg, ...}}
        self.recording_buffer = []
        self.recording_start_time = None
        # Whether teleop was active before recording started (to restore after stop)
        self.teleop_before_record = False

        # Replay state
        self.replay_frames = []        # loaded frames from file
        self.replay_index = 0          # current frame index
        self.replay_start_time = None  # monotonic time when replay started
        self.replay_loop = False       # whether to loop replay
        self.replay_filename = ""      # currently playing file

        # Recordings directory
        self.recordings_dir = pathlib.Path(os.path.expanduser(args.recordings_dir))
        self.recordings_dir.mkdir(parents=True, exist_ok=True)
        print(f"[worker] Recordings dir: {self.recordings_dir}")

        # ZMQ setup
        self.ctx = zmq.Context()

        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.bind(args.pub_addr)
        print(f"[worker] PUB bound on {args.pub_addr}")

        self.rep = self.ctx.socket(zmq.REP)
        self.rep.bind(args.rep_addr)
        print(f"[worker] REP bound on {args.rep_addr}")

        # Poller for non-blocking REP check inside the read loop
        self.poller = zmq.Poller()
        self.poller.register(self.rep, zmq.POLLIN)

    def connect_robots(self):
        """Connect to follower and/or leader via LeRobot API."""
        if not self.args.leader_only:
            print(f"[worker] Connecting follower on {self.args.follower_port} "
                  f"(id={self.args.follower_id}) ...")
            from lerobot.robots.so_follower import SO101Follower, SO101FollowerConfig
            cfg = SO101FollowerConfig(
                port=self.args.follower_port,
                id=self.args.follower_id,
            )
            self.follower = SO101Follower(cfg)
            self.follower.connect()
            print("[worker] Follower connected.")

        if not self.args.follower_only:
            print(f"[worker] Connecting leader on {self.args.leader_port} "
                  f"(id={self.args.leader_id}) ...")
            from lerobot.teleoperators.so_leader import SOLeader, SO101LeaderConfig
            cfg = SO101LeaderConfig(
                port=self.args.leader_port,
                id=self.args.leader_id,
            )
            self.leader = SOLeader(cfg)
            self.leader.connect()
            print("[worker] Leader connected.")

        # Validate teleop requirements
        if self.teleop:
            if self.follower is None or self.leader is None:
                print("[worker] ERROR: --teleop requires both follower and leader connected.")
                print("[worker] Cannot use --teleop with --follower-only or --leader-only.")
                sys.exit(1)
            print("[worker] *** TELEOP MODE ENABLED ***")
            print("[worker] Leader arm will control follower arm in real-time.")
            print("[worker] Move leader arm to starting position, then press Enter to begin...")
            input()
            print("[worker] Teleop active!")

        # Record initial follower position for safety clamping
        if self.follower is not None:
            init_pos = self.read_follower()

            # Load calibration JSON to display physical range info
            calib_range = self._load_calibration_range("follower")

            if init_pos is not None:
                self.follower_initial_pos = dict(init_pos)

                # Print calibration range info
                if calib_range:
                    print("[worker] === CALIBRATION RANGE (raw encoder values) ===")
                    for name in JOINT_NAMES:
                        if name in calib_range:
                            rmin, rmax, cur_raw = calib_range[name]
                            total = rmax - rmin
                            if cur_raw is not None and total > 0:
                                pct = (cur_raw - rmin) / total * 100.0
                                print(f"  {name:20s}: raw {cur_raw:4d}  "
                                      f"range [{rmin:4d} ~ {rmax:4d}]  "
                                      f"({pct:5.1f}% of range)")
                            else:
                                print(f"  {name:20s}: raw  ???  "
                                      f"range [{rmin:4d} ~ {rmax:4d}]")

                print("[worker] === SAFETY CLAMP BASELINE (follower initial position) ===")
                for name in JOINT_NAMES:
                    v = init_pos.get(name, 0.0)
                    soft_lo = v - self.cmd_limit_deg
                    soft_hi = v + self.cmd_limit_deg
                    hard_lo, hard_hi = JOINT_LIMITS_DEG.get(name, (-180.0, 180.0))
                    eff_lo = max(soft_lo, hard_lo)
                    eff_hi = min(soft_hi, hard_hi)
                    marker = " <-- CMD OK" if name in self.allowed_cmd_joints else ""
                    print(f"  {name:20s}: {v:+8.2f}°  "
                          f"effective: [{eff_lo:+8.2f}° ~ {eff_hi:+8.2f}°]  "
                          f"(physical: [{hard_lo:+8.2f}°~{hard_hi:+8.2f}°]){marker}")
                print(f"[worker] Allowed command joints: {sorted(self.allowed_cmd_joints)}")
                print(f"[worker] Clamp limit: ±{self.cmd_limit_deg}° from baseline")
            else:
                print("[worker] WARNING: Could not read initial follower position. "
                      "Commands will be REJECTED until baseline is set.")

    def _load_calibration_range(self, arm_type="follower"):
        """
        Load calibration JSON and read current raw positions.
        Returns dict: {joint_name: (range_min, range_max, current_raw)} or None.
        """
        if arm_type == "follower":
            calib_path = (
                pathlib.Path.home()
                / ".cache/huggingface/lerobot/calibration/robots/so_follower"
                / f"{self.args.follower_id}.json"
            )
            robot = self.follower
        else:
            calib_path = (
                pathlib.Path.home()
                / ".cache/huggingface/lerobot/calibration/teleoperators/so_leader"
                / f"{self.args.leader_id}.json"
            )
            robot = self.leader

        try:
            with open(calib_path) as f:
                calib = json.load(f)
            print(f"[worker] Loaded calibration from {calib_path}")

            # Read current raw positions (without normalization)
            current_raw = {}
            if robot is not None:
                try:
                    raw = robot.bus.sync_read("Present_Position", normalize=False)
                    current_raw = dict(raw)
                except Exception as e:
                    print(f"[worker] Could not read raw positions: {e}")

            result = {}
            for name in JOINT_NAMES:
                if name in calib:
                    rmin = calib[name].get("range_min", 0)
                    rmax = calib[name].get("range_max", 4095)
                    cur = current_raw.get(name, None)
                    result[name] = (rmin, rmax, cur)

            return result

        except FileNotFoundError:
            print(f"[worker] Calibration file not found: {calib_path}")
            return None
        except Exception as e:
            print(f"[worker] Error loading calibration: {e}")
            return None

    def read_follower(self):
        """Read follower joint positions in degrees. Returns dict or None."""
        if self.follower is None:
            return None
        try:
            obs = self.follower.get_observation()
            return {name: float(obs[f"{name}.pos"]) for name in JOINT_NAMES}
        except Exception as e:
            print(f"[worker] Follower read error: {e}")
            return None

    def read_leader(self):
        """Read leader joint positions in degrees. Returns dict or None."""
        if self.leader is None:
            return None
        try:
            action = self.leader.get_action()
            return {name: float(action[f"{name}.pos"]) for name in JOINT_NAMES}
        except Exception as e:
            print(f"[worker] Leader read error: {e}")
            return None

    def send_follower_action(self, joint_positions_deg):
        """
        Send joint positions (degrees) to follower via LeRobot send_action().
        joint_positions_deg: dict like {"shoulder_pan": 10.0, ...}
        """
        if self.follower is None:
            return
        try:
            action_dict = {f"{name}.pos": float(joint_positions_deg[name])
                           for name in JOINT_NAMES if name in joint_positions_deg}
            self.follower.send_action(action_dict)
        except Exception as e:
            print(f"[worker] Follower send_action error: {e}")

    def handle_command(self, msg):
        """
        Process a command from the ROS bridge node.
        Expected format:
            {"cmd": "send_follower_action", "args": {"shoulder_pan": 0.0, ...}}
            {"cmd": "ping"}
            {"cmd": "start_record"}
            {"cmd": "stop_record"}
            {"cmd": "start_replay", "args": {"filename": "...", "loop": false}}
            {"cmd": "stop_replay"}
            {"cmd": "estop"}
            {"cmd": "list_recordings"}
        Returns a response dict.
        """
        cmd = msg.get("cmd", "")

        if cmd == "ping":
            return {"status": "ok", "state": self.state, "ts": time.time()}

        elif cmd == "estop":
            return self._handle_estop()

        elif cmd == "start_record":
            return self._handle_start_record()

        elif cmd == "stop_record":
            return self._handle_stop_record()

        elif cmd == "start_replay":
            args = msg.get("args", {})
            return self._handle_start_replay(args)

        elif cmd == "stop_replay":
            return self._handle_stop_replay()

        elif cmd == "list_recordings":
            return self._handle_list_recordings()

        elif cmd == "send_follower_action":
            if self.follower is None:
                return {"status": "error", "reason": "follower not connected"}
            if self.follower_initial_pos is None:
                return {"status": "error", "reason": "safety baseline not set (no initial position)"}
            joint_args = msg.get("args", {})
            if not joint_args:
                return {"status": "error", "reason": "empty args"}
            try:
                # Two-layer safety clamp:
                #   Layer 1: baseline ± cmd_limit (session safety)
                #   Layer 2: JOINT_LIMITS_DEG (absolute physical limits, measured empirically)
                clamped = {}
                rejected = []
                for name, target_deg in joint_args.items():
                    if name not in JOINT_NAMES:
                        continue
                    if name not in self.allowed_cmd_joints:
                        rejected.append(name)
                        continue

                    target = float(target_deg)

                    # Layer 1: baseline ± cmd_limit
                    baseline = self.follower_initial_pos.get(name, 0.0)
                    soft_lo = baseline - self.cmd_limit_deg
                    soft_hi = baseline + self.cmd_limit_deg

                    # Layer 2: absolute physical limits
                    hard_lo, hard_hi = JOINT_LIMITS_DEG.get(name, (-180.0, 180.0))

                    # Effective range = intersection of both
                    eff_lo = max(soft_lo, hard_lo)
                    eff_hi = min(soft_hi, hard_hi)

                    clamped_val = max(eff_lo, min(eff_hi, target))
                    clamped[name] = clamped_val

                    if abs(clamped_val - target) > 0.01:
                        reason = ""
                        if target < hard_lo or target > hard_hi:
                            reason = " [PHYSICAL LIMIT]"
                        else:
                            reason = " [BASELINE LIMIT]"
                        print(f"[worker] CLAMP {name}: "
                              f"requested {target:.2f}° → clamped to {clamped_val:.2f}° "
                              f"(effective: {eff_lo:.2f}° ~ {eff_hi:.2f}°){reason}")

                if not clamped:
                    return {"status": "error",
                            "reason": f"no allowed joints in command "
                                      f"(rejected: {rejected}, allowed: {sorted(self.allowed_cmd_joints)})"}

                action_dict = {f"{name}.pos": val for name, val in clamped.items()}
                self.follower.send_action(action_dict)
                resp = {"status": "ok", "clamped": clamped}
                if rejected:
                    resp["rejected_joints"] = rejected
                return resp
            except Exception as e:
                return {"status": "error", "reason": str(e)}

        else:
            return {"status": "error", "reason": f"unknown cmd: {cmd}"}

    # -----------------------------------------------------------------------
    # Record / Replay / E-Stop handlers
    # -----------------------------------------------------------------------

    def _handle_estop(self):
        """Emergency stop: abort any ongoing motion, return to idle."""
        prev_state = self.state
        self.state = "idle"
        self.teleop = False
        self.recording_buffer = []
        self.replay_frames = []
        self.replay_index = 0
        print(f"[worker] *** E-STOP *** (was: {prev_state})")
        return {"status": "ok", "state": "idle", "previous_state": prev_state}

    def _handle_start_record(self):
        """Start recording: activate teleop + buffer joint frames."""
        if self.state != "idle":
            return {"status": "error",
                    "reason": f"cannot start recording in state '{self.state}'"}
        if self.follower is None or self.leader is None:
            return {"status": "error",
                    "reason": "recording requires both follower and leader connected"}

        self.teleop_before_record = self.teleop
        self.teleop = True
        self.recording_buffer = []
        self.recording_start_time = time.monotonic()
        self.state = "recording"
        print("[worker] *** RECORDING STARTED *** (teleop activated)")
        return {"status": "ok", "state": "recording"}

    def _handle_stop_record(self):
        """Stop recording: save buffer to file, deactivate teleop."""
        if self.state != "recording":
            return {"status": "error",
                    "reason": f"not recording (state: '{self.state}')"}

        num_frames = len(self.recording_buffer)
        if num_frames == 0:
            self.state = "idle"
            self.teleop = self.teleop_before_record
            print("[worker] Recording stopped — 0 frames, nothing saved.")
            return {"status": "ok", "state": "idle", "frames": 0, "filename": None}

        # Calculate duration from frame timestamps
        duration = self.recording_buffer[-1]["ts"] - self.recording_buffer[0]["ts"]

        # Build recording file
        timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"recording_{timestamp_str}.json"
        filepath = self.recordings_dir / filename

        recording_data = {
            "version": 1,
            "recorded_at": datetime.now().isoformat(),
            "rate_hz": self.args.rate,
            "num_frames": num_frames,
            "duration_sec": round(duration, 3),
            "joint_names": JOINT_NAMES,
            "frames": self.recording_buffer,
        }

        with open(filepath, "w") as f:
            json.dump(recording_data, f, indent=2)

        self.state = "idle"
        self.teleop = self.teleop_before_record
        self.recording_buffer = []
        print(f"[worker] Recording saved: {filepath}")
        print(f"[worker]   {num_frames} frames, {duration:.1f}s")
        return {
            "status": "ok",
            "state": "idle",
            "filename": filename,
            "frames": num_frames,
            "duration_sec": round(duration, 3),
        }

    def _handle_start_replay(self, args):
        """Load a recording file and start replaying on the follower."""
        if self.state != "idle":
            return {"status": "error",
                    "reason": f"cannot start replay in state '{self.state}'"}
        if self.follower is None:
            return {"status": "error",
                    "reason": "replay requires follower connected"}

        filename = args.get("filename", "")
        self.replay_loop = args.get("loop", False)

        # If no filename, use the most recent recording
        if not filename:
            recordings = self._get_recording_files()
            if not recordings:
                return {"status": "error", "reason": "no recordings found"}
            filename = recordings[-1]  # most recent (sorted by name = sorted by time)

        filepath = self.recordings_dir / filename
        if not filepath.exists():
            return {"status": "error",
                    "reason": f"file not found: {filename}"}

        try:
            with open(filepath) as f:
                data = json.load(f)
        except Exception as e:
            return {"status": "error", "reason": f"failed to load: {e}"}

        frames = data.get("frames", [])
        if len(frames) < 2:
            return {"status": "error",
                    "reason": f"recording too short ({len(frames)} frames)"}

        self.replay_frames = frames
        self.replay_index = 0
        self.replay_start_time = time.monotonic()
        self.replay_filename = filename
        self.teleop = False  # disable teleop during replay
        self.state = "replaying"

        duration = data.get("duration_sec", 0)
        print(f"[worker] *** REPLAY STARTED *** {filename}")
        print(f"[worker]   {len(frames)} frames, {duration}s, "
              f"loop={'yes' if self.replay_loop else 'no'}")
        return {
            "status": "ok",
            "state": "replaying",
            "filename": filename,
            "frames": len(frames),
            "duration_sec": duration,
            "loop": self.replay_loop,
        }

    def _handle_stop_replay(self):
        """Stop replay immediately."""
        if self.state != "replaying":
            return {"status": "error",
                    "reason": f"not replaying (state: '{self.state}')"}

        self.state = "idle"
        self.replay_frames = []
        self.replay_index = 0
        print(f"[worker] Replay stopped ({self.replay_filename})")
        return {"status": "ok", "state": "idle"}

    def _handle_list_recordings(self):
        """List all saved recording files."""
        files = self._get_recording_files()
        # Add metadata summary for each file
        summaries = []
        for fname in files:
            try:
                with open(self.recordings_dir / fname) as f:
                    data = json.load(f)
                summaries.append({
                    "filename": fname,
                    "frames": data.get("num_frames", 0),
                    "duration_sec": data.get("duration_sec", 0),
                    "recorded_at": data.get("recorded_at", ""),
                })
            except Exception:
                summaries.append({"filename": fname})
        return {"status": "ok", "recordings": summaries}

    def _get_recording_files(self):
        """Get sorted list of recording JSON files."""
        files = sorted(
            f.name for f in self.recordings_dir.glob("recording_*.json")
            if f.is_file()
        )
        return files

    # -----------------------------------------------------------------------
    # Replay tick — called each iteration of the main loop
    # -----------------------------------------------------------------------

    def _tick_replay(self):
        """
        Advance replay by one tick. Sends the appropriate frame(s) to follower
        based on elapsed time since replay start.
        Returns the follower position after sending (for PUB).
        """
        if not self.replay_frames or self.replay_index >= len(self.replay_frames):
            # Replay finished
            if self.replay_loop and self.replay_frames:
                # Loop: restart from beginning
                self.replay_index = 0
                self.replay_start_time = time.monotonic()
                print(f"[worker] Replay looping ({self.replay_filename})")
            else:
                self.state = "idle"
                print(f"[worker] Replay finished ({self.replay_filename})")
                return self.read_follower()

        elapsed = time.monotonic() - self.replay_start_time
        first_ts = self.replay_frames[0]["ts"]

        # Advance to the frame that matches current elapsed time
        while self.replay_index < len(self.replay_frames):
            frame = self.replay_frames[self.replay_index]
            frame_time = frame["ts"] - first_ts
            if frame_time <= elapsed:
                # Send this frame to follower
                joints = frame.get("joints", {})
                if joints:
                    self.send_follower_action(joints)
                self.replay_index += 1
            else:
                break  # wait for next tick

        # Re-read follower actual position
        return self.read_follower()

    def check_commands(self):
        """Non-blocking check for incoming REP commands."""
        events = dict(self.poller.poll(timeout=0))  # 0 = non-blocking
        if self.rep in events:
            try:
                raw = self.rep.recv_string(zmq.NOBLOCK)
                msg = json.loads(raw)
                response = self.handle_command(msg)
                self.rep.send_string(json.dumps(response))
            except zmq.Again:
                pass
            except Exception as e:
                # Must always send a reply on REP socket
                self.rep.send_string(json.dumps({
                    "status": "error", "reason": str(e)
                }))

    def run(self):
        """Main read loop at configured rate."""
        period = 1.0 / self.args.rate
        seq = 0
        teleop_count = 0
        print(f"[worker] Starting read loop at {self.args.rate} Hz "
              f"(period={period*1000:.1f}ms)")

        while self.running:
            t_start = time.monotonic()

            # --- Replaying mode: advance replay timeline ---
            if self.state == "replaying":
                follower_pos = self._tick_replay()
                leader_pos = self.read_leader()

            else:
                # Read joint states
                follower_pos = self.read_follower()
                leader_pos = self.read_leader()

                # --- Teleop: send leader position to follower ---
                if self.teleop and leader_pos is not None:
                    self.send_follower_action(leader_pos)
                    teleop_count += 1
                    # Re-read follower after sending action to get updated position
                    follower_pos = self.read_follower()

                # --- Recording mode: buffer the current frame ---
                if self.state == "recording" and follower_pos is not None:
                    frame = {
                        "ts": time.monotonic() - self.recording_start_time,
                        "joints": dict(follower_pos),
                    }
                    self.recording_buffer.append(frame)

            # Build and publish message (degrees — bridge node converts to radians)
            msg = {
                "seq": seq,
                "ts": time.time(),
                "state": self.state,
            }
            if follower_pos is not None:
                msg["follower"] = follower_pos
            if leader_pos is not None:
                msg["leader"] = leader_pos
            # Add recording/replay metadata
            if self.state == "recording":
                msg["recording_frames"] = len(self.recording_buffer)
            elif self.state == "replaying":
                msg["replay_progress"] = {
                    "index": self.replay_index,
                    "total": len(self.replay_frames),
                    "filename": self.replay_filename,
                    "loop": self.replay_loop,
                }

            self.pub.send_string(json.dumps(msg))
            seq += 1

            # Check for commands (non-blocking)
            self.check_commands()

            # Rate limiting
            elapsed = time.monotonic() - t_start
            sleep_time = period - elapsed
            if sleep_time > 0:
                time.sleep(sleep_time)

    def shutdown(self):
        """Clean shutdown."""
        print("\n[worker] Shutting down...")
        self.running = False
        self.state = "idle"

        # Save any in-progress recording
        if self.recording_buffer:
            try:
                self._handle_stop_record()
                print("[worker] Saved in-progress recording before shutdown.")
            except Exception:
                pass

        if self.follower is not None:
            try:
                self.follower.disconnect()
                print("[worker] Follower disconnected.")
            except Exception:
                pass

        if self.leader is not None:
            try:
                self.leader.disconnect()
                print("[worker] Leader disconnected.")
            except Exception:
                pass

        self.pub.close()
        self.rep.close()
        self.ctx.term()
        print("[worker] ZMQ cleaned up. Bye.")


def main():
    args = build_args()
    worker = LeRobotWorker(args)

    # Graceful shutdown on Ctrl+C
    def sig_handler(sig, frame):
        worker.shutdown()
        sys.exit(0)

    signal.signal(signal.SIGINT, sig_handler)
    signal.signal(signal.SIGTERM, sig_handler)

    try:
        worker.connect_robots()
        worker.run()
    except Exception:
        traceback.print_exc()
        worker.shutdown()
        sys.exit(1)


if __name__ == "__main__":
    main()

* _tick_replay()가 매 루프 틱마다 경과 시간을 확인하고 해당 시점의 프레임을 follower에 전송. 기록할 때의 원래 속도 그대로 재생된다.

* shutdown 시 recording이 진행 중이면 자동 저장. E-stop은 상태와 무관하게 즉시 동작.

 

 


 

 

 

 

2단계: Bridge Node에 명령 중계 토픽 추가 (ros_bridge_node.py 수정)

더보기
#!/usr/bin/env python3
"""
ros_bridge_node.py — ZMQ ↔ ROS2 bridge node

Runs in .venv-ros-bridge (Python 3.10) with rclpy + pyzmq.
Subscribes to joint states from lerobot_worker via ZMQ,
converts degrees→radians, and publishes sensor_msgs/JointState.
Also subscribes to /follower_joint_commands and forwards to worker via ZMQ REQ.

Relays record/replay/estop commands:
  - /robot_command (std_msgs/String) → ZMQ REQ → worker
  - Worker state published on /robot_status (std_msgs/String) every tick

Usage:
    export PATH=$(echo $PATH | tr ':' '\\n' | grep -v miniforge | tr '\\n' ':' | sed 's/:$//')
    cd ~/UnrealRobotics
    source .venv-ros-bridge/bin/activate
    source /opt/ros/humble/setup.bash
    python src/lerobot_ros2_bridge/lerobot_ros2_bridge/ros_bridge_node.py \\
        [--sub-addr tcp://127.0.0.1:5555] \\
        [--req-addr tcp://127.0.0.1:5556]
"""

import argparse
import json
import math
import sys
import traceback

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from std_msgs.msg import String
import zmq


# ---------------------------------------------------------------------------
# Joint names — must match URDF, LeRobot calibration, and worker output
# (Phase 3.2 verified: all 6 names identical across all three sources)
# ---------------------------------------------------------------------------
JOINT_NAMES = [
    "shoulder_pan",
    "shoulder_lift",
    "elbow_flex",
    "wrist_flex",
    "wrist_roll",
    "gripper",
]


def deg2rad(deg: float) -> float:
    return deg * math.pi / 180.0


def rad2deg(rad: float) -> float:
    return rad * 180.0 / math.pi


class RosBridgeNode(Node):
    def __init__(self, args):
        super().__init__("lerobot_ros2_bridge")
        self.get_logger().info("Initializing lerobot_ros2_bridge node")

        # --- ZMQ setup ---
        self.zmq_ctx = zmq.Context()

        # SUB: receive joint states from worker
        self.zmq_sub = self.zmq_ctx.socket(zmq.SUB)
        self.zmq_sub.connect(args.sub_addr)
        self.zmq_sub.setsockopt_string(zmq.SUBSCRIBE, "")
        # Don't block waiting for messages — use RCVTIMEO
        self.zmq_sub.setsockopt(zmq.RCVTIMEO, 0)
        self.get_logger().info(f"ZMQ SUB connected to {args.sub_addr}")

        # REQ: send commands to worker
        self.zmq_req = self.zmq_ctx.socket(zmq.REQ)
        self.zmq_req.connect(args.req_addr)
        self.zmq_req.setsockopt(zmq.RCVTIMEO, 1000)  # 1s timeout for replies
        self.get_logger().info(f"ZMQ REQ connected to {args.req_addr}")

        # --- ROS2 publishers ---
        # Follower joint states (for robot_state_publisher + rosbridge → Unreal)
        self.follower_pub = self.create_publisher(JointState, "/joint_states", 10)
        # Leader joint states (separate topic)
        self.leader_pub = self.create_publisher(JointState, "/leader_joint_states", 10)

        # --- ROS2 subscriber for commands from Unreal (via rosbridge) ---
        self.cmd_sub = self.create_subscription(
            JointState,
            "/follower_joint_commands",
            self.on_joint_command,
            10,
        )

        # --- Record/Replay/E-Stop command relay ---
        # Unreal publishes JSON command strings to /robot_command,
        # bridge relays them to worker via ZMQ REQ.
        self.robot_cmd_sub = self.create_subscription(
            String,
            "/robot_command",
            self.on_robot_command,
            10,
        )
        # Worker state feedback → Unreal
        self.robot_status_pub = self.create_publisher(String, "/robot_status", 10)
        self.last_worker_state = ""

        # --- Timer: poll ZMQ at ~100Hz (faster than worker's 30Hz to avoid buffering) ---
        self.poll_timer = self.create_timer(0.01, self.poll_zmq)

        # Stats
        self.recv_count = 0
        self.last_log_time = self.get_clock().now()

    def poll_zmq(self):
        """Non-blocking poll of ZMQ SUB for worker messages."""
        # Drain all available messages (in case we're slightly behind)
        messages_this_tick = 0
        while messages_this_tick < 5:  # cap per tick to avoid starving ROS callbacks
            try:
                raw = self.zmq_sub.recv_string(zmq.NOBLOCK)
            except zmq.Again:
                break

            messages_this_tick += 1
            self.recv_count += 1

            try:
                msg = json.loads(raw)
            except json.JSONDecodeError as e:
                self.get_logger().warn(f"JSON parse error: {e}")
                continue

            now = self.get_clock().now().to_msg()

            # Publish follower joint states
            if "follower" in msg:
                js = JointState()
                js.header.stamp = now
                js.header.frame_id = ""
                js.name = list(JOINT_NAMES)
                js.position = [
                    deg2rad(msg["follower"].get(name, 0.0))
                    for name in JOINT_NAMES
                ]
                # velocity and effort left empty (not available from LeRobot API)
                self.follower_pub.publish(js)

            # Publish leader joint states
            if "leader" in msg:
                js = JointState()
                js.header.stamp = now
                js.header.frame_id = ""
                js.name = list(JOINT_NAMES)
                js.position = [
                    deg2rad(msg["leader"].get(name, 0.0))
                    for name in JOINT_NAMES
                ]
                self.leader_pub.publish(js)

            # Publish worker state to /robot_status (only on change or periodically)
            worker_state = msg.get("state", "idle")
            if worker_state != self.last_worker_state:
                status_msg = String()
                status_data = {"state": worker_state}
                # Include recording/replay metadata if present
                if "recording_frames" in msg:
                    status_data["recording_frames"] = msg["recording_frames"]
                if "replay_progress" in msg:
                    status_data["replay_progress"] = msg["replay_progress"]
                status_msg.data = json.dumps(status_data)
                self.robot_status_pub.publish(status_msg)
                self.last_worker_state = worker_state
                self.get_logger().info(f"Worker state: {worker_state}")

        # Periodic logging (every 10 seconds)
        now_time = self.get_clock().now()
        if (now_time - self.last_log_time).nanoseconds > 10_000_000_000:
            self.get_logger().info(
                f"ZMQ recv total: {self.recv_count} messages"
            )
            self.last_log_time = now_time

    def on_joint_command(self, msg: JointState):
        """
        Receive joint commands from ROS2 (e.g. from Unreal via rosbridge),
        convert radians→degrees, and forward to worker via ZMQ REQ.
        """
        if len(msg.name) == 0 or len(msg.position) == 0:
            self.get_logger().warn("Empty joint command received, ignoring")
            return

        # Build command dict (degrees)
        joint_args = {}
        for name, pos_rad in zip(msg.name, msg.position):
            if name in JOINT_NAMES:
                joint_args[name] = rad2deg(pos_rad)

        if not joint_args:
            self.get_logger().warn("No recognized joint names in command")
            return

        cmd = {
            "cmd": "send_follower_action",
            "args": joint_args,
        }

        try:
            self.zmq_req.send_string(json.dumps(cmd))
            reply_raw = self.zmq_req.recv_string()
            reply = json.loads(reply_raw)
            if reply.get("status") != "ok":
                self.get_logger().warn(
                    f"Worker command error: {reply.get('reason', 'unknown')}"
                )
        except zmq.Again:
            self.get_logger().error("Worker REQ timeout — is lerobot_worker running?")
        except Exception as e:
            self.get_logger().error(f"Worker command exception: {e}")

    def on_robot_command(self, msg: String):
        """
        Receive record/replay/estop commands from Unreal (via rosbridge),
        relay to worker via ZMQ REQ, and publish response on /robot_status.

        Expected msg.data formats:
            '{"cmd": "start_record"}'
            '{"cmd": "stop_record"}'
            '{"cmd": "start_replay", "args": {"filename": "...", "loop": false}}'
            '{"cmd": "start_replay"}'                    (plays most recent)
            '{"cmd": "stop_replay"}'
            '{"cmd": "estop"}'
            '{"cmd": "list_recordings"}'
        """
        try:
            cmd = json.loads(msg.data)
        except json.JSONDecodeError:
            # Support simple string commands: "estop", "start_record", etc.
            cmd = {"cmd": msg.data.strip()}

        cmd_name = cmd.get("cmd", "")
        self.get_logger().info(f"Robot command received: {cmd_name}")

        try:
            self.zmq_req.send_string(json.dumps(cmd))
            reply_raw = self.zmq_req.recv_string()
            reply = json.loads(reply_raw)

            # Publish response on /robot_status
            status_msg = String()
            status_msg.data = reply_raw
            self.robot_status_pub.publish(status_msg)

            status = reply.get("status", "unknown")
            if status == "ok":
                new_state = reply.get("state", "")
                if new_state:
                    self.last_worker_state = new_state
                self.get_logger().info(f"Command '{cmd_name}' OK: {reply}")
            else:
                self.get_logger().warn(
                    f"Command '{cmd_name}' error: {reply.get('reason', 'unknown')}"
                )
        except zmq.Again:
            self.get_logger().error(
                f"Worker REQ timeout for '{cmd_name}' — is lerobot_worker running?"
            )
            # Publish timeout error on /robot_status
            err_msg = String()
            err_msg.data = json.dumps({
                "status": "error", "reason": "worker timeout", "cmd": cmd_name
            })
            self.robot_status_pub.publish(err_msg)
        except Exception as e:
            self.get_logger().error(f"Robot command exception: {e}")
            err_msg = String()
            err_msg.data = json.dumps({
                "status": "error", "reason": str(e), "cmd": cmd_name
            })
            self.robot_status_pub.publish(err_msg)

    def destroy_node(self):
        """Clean shutdown of ZMQ resources."""
        self.get_logger().info("Shutting down ZMQ...")
        self.zmq_sub.close()
        self.zmq_req.close()
        self.zmq_ctx.term()
        super().destroy_node()


def main():
    parser = argparse.ArgumentParser(description="ROS2 ↔ ZMQ bridge node")
    parser.add_argument("--sub-addr", default="tcp://127.0.0.1:5555",
                        help="ZMQ SUB address (worker PUB)")
    parser.add_argument("--req-addr", default="tcp://127.0.0.1:5556",
                        help="ZMQ REQ address (worker REP)")
    # ROS2 may pass extra args — use parse_known_args
    args, unknown = parser.parse_known_args()

    rclpy.init(args=unknown if unknown else None)
    node = RosBridgeNode(args)

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == "__main__":
    main()

 

  • 새로운 토픽 2개 추가
    • /robot_command (std_msgs/String, 구독)
      Unreal에서 JSON 명령 수신. '{"cmd":"start_record"}' 같은 JSON 형태, 또는 단순 문자열 "estop"도 지원
    • /robot_status (std_msgs/String, 발행)
      worker의 명령 응답과 상태 변화를 Unreal에 전달
  • on_robot_command 핸들러
    /robot_command로 들어온 명령을 그대로 ZMQ REQ로 worker에 전달하고, 응답을 /robot_status에 발행한다. 타임아웃이나 에러 발생 시에도 에러 메시지를 /robot_status에 발행하여 Unreal에서 피드백을 받을 수 있다.
  • poll_zmq 확장
    worker PUB 메시지에 새로 추가된 "state" 필드를 읽어서 상태 변화 시 /robot_status에 발행. recording 프레임 수, replay 진행률 메타데이터도 포함.

 

 

 


 

 

 

3단계: Unreal C++ 수정 (RobotVisualizer에 버튼 추가)

RobotVisualizer.h

더보기
#pragma once

#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "RobotVisualizer.generated.h"

class UStaticMesh;
class UStaticMeshComponent;
class USceneComponent;
class URosBridgeSubsystem;

/**
 * Visualizes the SO-ARM-101 follower arm in Unreal Engine and provides
 * MoveIt command interface via rosbridge.
 *
 * The component hierarchy mirrors the URDF link/joint structure:
 *   BaseLink -> ShoulderPanJoint -> ShoulderLink -> ShoulderLiftJoint -> ...
 *
 * Each "Joint" SceneComponent is where the ROS joint angle gets applied
 * as a local Z-axis rotation. All child links/meshes rotate with it.
 *
 * Phase 8 additions:
 *   - SendNamedTarget(): publish to /moveit_goal_named (std_msgs/String)
 *   - SendJointGoal(): publish to /moveit_goal_joints (sensor_msgs/JointState)
 *   - SendPoseGoal(): publish to /moveit_goal_pose (geometry_msgs/PoseStamped)
 *   - Blueprint-callable + editor-testable via UPROPERTY buttons
 *
 * Phase 9 additions (Record/Replay/E-Stop):
 *   - StartRecord(): begin teleop recording on worker
 *   - StopRecord(): stop recording, save trajectory
 *   - StartReplay(): replay most recent (or named) recording
 *   - StopReplay(): stop replay
 *   - EStop(): emergency stop all motion
 *   - All commands publish JSON to /robot_command topic
 *   - Worker state feedback via /robot_status subscription
 */
UCLASS()
class SO101_TWIN_API ARobotVisualizer : public AActor
{
	GENERATED_BODY()

public:
	ARobotVisualizer();

protected:
	virtual void BeginPlay() override;
	virtual void EndPlay(const EEndPlayReason::Type EndPlayReason) override;

	// =================================================================
	// Configuration
	// =================================================================

	UPROPERTY(EditAnywhere, Category = "ROS|Bridge")
	FString RosBridgeUrl = TEXT("ws://127.0.0.1:9090/?x=1");

	UPROPERTY(EditAnywhere, Category = "ROS|Topics")
	FString JointStateTopic = TEXT("/joint_states");

	UPROPERTY(EditAnywhere, Category = "ROS|Topics")
	FString JointStateType = TEXT("sensor_msgs/JointState");

	// =================================================================
	// MoveIt Command Interface (Phase 8)
	// =================================================================

	// --- Named target ---

	/** Named target to send (e.g. "home", "ready"). Set in Details panel, then call SendNamedTarget(). */
	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt")
	FString MoveItNamedTarget = TEXT("home");

	/** Send the named target to MoveIt via /moveit_goal_named topic. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|MoveIt")
	void SendNamedTarget();

	// --- Joint goal ---

	/** Joint goal values in radians. Set in Details panel, then call SendJointGoal(). */
	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalShoulderPan = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalShoulderLift = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalElbowFlex = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalWristFlex = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalWristRoll = 0.0f;

	/** Send joint goal to MoveIt via /moveit_goal_joints topic. Values are in radians. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|MoveIt|Joints")
	void SendJointGoal();

	// --- Pose goal (Cartesian, position-only for 5DOF) ---

	/** Target position in UE coordinates (cm). Converted to ROS (meters) on send. */
	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Pose")
	FVector GoalPositionUE = FVector(10.0f, 0.0f, 15.0f);

	/** Send position-only goal to MoveIt via /moveit_goal_pose topic.
	 *  GoalPositionUE is in Unreal cm, auto-converted to ROS meters with Y flip. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|MoveIt|Pose")
	void SendPoseGoal();

	// =================================================================
	// Record / Replay / E-Stop (Phase 9)
	// =================================================================

	/** Start recording: activates teleop on worker, buffers joint trajectory. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Record")
	void StartRecord();

	/** Stop recording: saves trajectory to file on worker. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Record")
	void StopRecord();

	/** Replay filename (empty = most recent recording). */
	UPROPERTY(EditAnywhere, Category = "ROS|Replay")
	FString ReplayFilename;

	/** Whether to loop the replay continuously. */
	UPROPERTY(EditAnywhere, Category = "ROS|Replay")
	bool bReplayLoop = false;

	/** Start replaying a recorded trajectory on the follower arm. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Replay")
	void StartReplay();

	/** Stop replay immediately. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Replay")
	void StopReplay();

	/** Emergency stop: abort ALL motion immediately (recording, replay, teleop). */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Safety")
	void EStop();

	/** Current worker state (idle/recording/replaying). Updated from /robot_status. */
	UPROPERTY(VisibleAnywhere, Category = "ROS|Status")
	FString WorkerState = TEXT("unknown");

private:
	// =================================================================
	// Component hierarchy
	// =================================================================

	UPROPERTY(VisibleAnywhere, Category = "Robot")
	TObjectPtr<USceneComponent> RobotRoot;

	// Link SceneComponents
	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> BaseLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> ShoulderLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> UpperArmLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> LowerArmLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> WristLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> GripperLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> MovingJawLink;

	// Joint SceneComponents
	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ShoulderPanJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ShoulderLiftJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ElbowFlexJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> WristFlexJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> WristRollJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> GripperJoint;

	// Joint name -> component mapping
	UPROPERTY()
	TMap<FName, TObjectPtr<USceneComponent>> JointComponentMap;

	// Mesh components
	UPROPERTY()
	TArray<TObjectPtr<UStaticMeshComponent>> AllMeshComponents;

	// =================================================================
	// ROS connection
	// =================================================================

	UFUNCTION()
	void OnRosBridgeConnected();

	UFUNCTION()
	void OnRosMessage(const FString& Topic, const FString& MessageJson);

	void ParseAndApplyJointStates(const FString& MessageJson);

	// =================================================================
	// MoveIt publish helpers
	// =================================================================

	/** Advertise MoveIt command topics. Called once on connect. */
	void AdvertiseMoveItTopics();

	/** Whether MoveIt topics have been advertised in this connection session. */
	bool bMoveItTopicsAdvertised = false;

	// =================================================================
	// Record / Replay / E-Stop helpers
	// =================================================================

	/** Advertise /robot_command and subscribe /robot_status. Called once on connect. */
	void SetupRecordReplayTopics();

	/** Whether record/replay topics have been set up. */
	bool bRecordReplayTopicsSetup = false;

	/** Send a JSON command to /robot_command topic. */
	void PublishRobotCommand(const FString& JsonCmd);

	/** Handle /robot_status messages from the bridge node. */
	UFUNCTION()
	void OnRobotStatus(const FString& Topic, const FString& MessageJson);

	// =================================================================
	// Helpers (declared in original header, kept for compatibility)
	// =================================================================

	USceneComponent* CreateJointComponent(const FName& Name, USceneComponent* Parent,
		const FVector& Location, const FRotator& Rotation);

	USceneComponent* CreateLinkComponent(const FName& Name, USceneComponent* Parent);

	UStaticMeshComponent* AttachMesh(USceneComponent* Parent, UStaticMesh* Mesh,
		const FName& Name, const FVector& Location, const FRotator& Rotation,
		bool bIsMotor);
};

Details 패널에 새로운 카테고리별 버튼이 추가

  • ROS|Record: StartRecord, StopRecord 버튼
  • ROS|Replay: ReplayFilename (비우면 최신 파일), bReplayLoop 체크박스, StartReplay, StopReplay 버튼
  • ROS|Safety: EStop 버튼
  • ROS|Status: WorkerState — 현재 worker 상태가 실시간으로 표시됨 (VisibleAnywhere)

 

 

RobotVisualizer.cpp

더보기
#include "RobotVisualizer.h"
#include "RosCoordConv.h"
#include "RosBridgeSubsystem.h"
#include "RosBridgeLog.h"

#include "Components/SceneComponent.h"
#include "Components/StaticMeshComponent.h"
#include "Engine/StaticMesh.h"
#include "Engine/Engine.h"
#include "Engine/World.h"
#include "Kismet/GameplayStatics.h"
#include "Dom/JsonObject.h"
#include "Dom/JsonValue.h"
#include "Serialization/JsonReader.h"
#include "Serialization/JsonSerializer.h"
#include "Serialization/JsonWriter.h"
#include "UObject/ConstructorHelpers.h"

// =============================================================================
// Mesh asset path helper
// =============================================================================

static UStaticMesh* LoadMeshAsset(const TCHAR* AssetName)
{
	// All meshes live under /Game/Robot/Meshes/
	FString Path = FString::Printf(TEXT("/Game/Robot/Meshes/%s.%s"), AssetName, AssetName);
	UStaticMesh* Mesh = Cast<UStaticMesh>(StaticLoadObject(UStaticMesh::StaticClass(), nullptr, *Path));
	if (!Mesh)
	{
		UE_LOG(LogRosBridge, Warning, TEXT("Failed to load mesh: %s"), *Path);
	}
	return Mesh;
}

// =============================================================================
// Constructor — build the entire component hierarchy
// =============================================================================

ARobotVisualizer::ARobotVisualizer()
{
	PrimaryActorTick.bCanEverTick = false;

	// --- Root ---
	RobotRoot = CreateDefaultSubobject<USceneComponent>(TEXT("RobotRoot"));
	RootComponent = RobotRoot;

	// =========================================================================
	// URDF data converted to UE coordinates:
	//   Position: meters * 100 = cm, Y flipped
	//   Rotation: RPY radians -> FRotator degrees, pitch & yaw negated
	//
	// All values below are pre-computed from so101_follower.urdf.
	// =========================================================================

	// --- base_link (attached directly to root, no joint) ---
	BaseLink = CreateDefaultSubobject<USceneComponent>(TEXT("BaseLink"));
	BaseLink->SetupAttachment(RobotRoot);

	// --- shoulder_pan joint ---
	// URDF origin: xyz(0.0388353, 0, 0.0624) rpy(3.14159, 0, -3.14159)
	ShoulderPanJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderPanJoint"));
	ShoulderPanJoint->SetupAttachment(BaseLink);
	ShoulderPanJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0388353, 0.0, 0.0624));
	ShoulderPanJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159));

	ShoulderLink = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLink"));
	ShoulderLink->SetupAttachment(ShoulderPanJoint);

	// --- shoulder_lift joint ---
	// URDF origin: xyz(-0.0303992, -0.0182778, -0.0542) rpy(-1.5708, -1.5708, 0)
	ShoulderLiftJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLiftJoint"));
	ShoulderLiftJoint->SetupAttachment(ShoulderLink);
	ShoulderLiftJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.0303992, -0.0182778, -0.0542));
	ShoulderLiftJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0));

	UpperArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("UpperArmLink"));
	UpperArmLink->SetupAttachment(ShoulderLiftJoint);

	// --- elbow_flex joint ---
	// URDF origin: xyz(-0.11257, -0.028, 0) rpy(0, 0, 1.5708)
	ElbowFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ElbowFlexJoint"));
	ElbowFlexJoint->SetupAttachment(UpperArmLink);
	ElbowFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.11257, -0.028, 0.0));
	ElbowFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708));

	LowerArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("LowerArmLink"));
	LowerArmLink->SetupAttachment(ElbowFlexJoint);

	// --- wrist_flex joint ---
	// URDF origin: xyz(-0.1349, 0.0052, 0) rpy(0, 0, -1.5708)
	WristFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristFlexJoint"));
	WristFlexJoint->SetupAttachment(LowerArmLink);
	WristFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.1349, 0.0052, 0.0));
	WristFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708));

	WristLink = CreateDefaultSubobject<USceneComponent>(TEXT("WristLink"));
	WristLink->SetupAttachment(WristFlexJoint);

	// --- wrist_roll joint ---
	// URDF origin: xyz(0, -0.0611, 0.0181) rpy(1.5708, 0.0486795, 3.14159)
	WristRollJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristRollJoint"));
	WristRollJoint->SetupAttachment(WristLink);
	WristRollJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0, -0.0611, 0.0181));
	WristRollJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159));

	GripperLink = CreateDefaultSubobject<USceneComponent>(TEXT("GripperLink"));
	GripperLink->SetupAttachment(WristRollJoint);

	// --- gripper joint ---
	// URDF origin: xyz(0.0202, 0.0188, -0.0234) rpy(1.5708, 0, 0)
	GripperJoint = CreateDefaultSubobject<USceneComponent>(TEXT("GripperJoint"));
	GripperJoint->SetupAttachment(GripperLink);
	GripperJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0202, 0.0188, -0.0234));
	GripperJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0));

	MovingJawLink = CreateDefaultSubobject<USceneComponent>(TEXT("MovingJawLink"));
	MovingJawLink->SetupAttachment(GripperJoint);

	// --- Joint name mapping (matches ROS /joint_states names) ---
	JointComponentMap.Add(FName("shoulder_pan"),  ShoulderPanJoint);
	JointComponentMap.Add(FName("shoulder_lift"), ShoulderLiftJoint);
	JointComponentMap.Add(FName("elbow_flex"),    ElbowFlexJoint);
	JointComponentMap.Add(FName("wrist_flex"),    WristFlexJoint);
	JointComponentMap.Add(FName("wrist_roll"),    WristRollJoint);
	JointComponentMap.Add(FName("gripper"),       GripperJoint);
}

// =============================================================================
// BeginPlay — load meshes and attach, connect to ROS
// =============================================================================

void ARobotVisualizer::BeginPlay()
{
	Super::BeginPlay();

	// --- Load meshes and attach to links ---
	// Meshes are loaded at runtime (not in constructor) because
	// StaticLoadObject is safer to call here and allows hot-reload.

	// Helper lambda to reduce repetition
	auto Attach = [this](USceneComponent* Parent, const TCHAR* MeshName,
		double RosX, double RosY, double RosZ,
		double RosRoll, double RosPitch, double RosYaw,
		bool bIsMotor = false)
	{
		UStaticMesh* Mesh = LoadMeshAsset(MeshName);
		if (!Mesh) return;

		UStaticMeshComponent* SMC = NewObject<UStaticMeshComponent>(this);
		SMC->SetStaticMesh(Mesh);
		SMC->SetRelativeLocation(RosCoordConv::RosToUePosition(RosX, RosY, RosZ));
		SMC->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(RosRoll, RosPitch, RosYaw));
		SMC->SetCollisionEnabled(ECollisionEnabled::NoCollision);
		SMC->AttachToComponent(Parent, FAttachmentTransformRules::KeepRelativeTransform);
		SMC->RegisterComponent();
		AllMeshComponents.Add(SMC);
	};

	// === base_link meshes ===
	Attach(BaseLink, TEXT("base_motor_holder_so101_v1"),
		-0.00636471, -0.0000994414, -0.0024,
		1.5708, 0.0, 1.5708);
	Attach(BaseLink, TEXT("base_so101_v2"),
		-0.00636471, 0.0, -0.0024,
		1.5708, 0.0, 1.5708);
	Attach(BaseLink, TEXT("sts3215_03a_v1"),
		0.0263353, 0.0, 0.0437,
		0.0, 0.0, 0.0, true);
	Attach(BaseLink, TEXT("waveshare_mounting_plate_so101_v2"),
		-0.0309827, -0.000199441, 0.0474,
		1.5708, 0.0, 1.5708);

	// === shoulder_link meshes ===
	Attach(ShoulderLink, TEXT("sts3215_03a_v1"),
		-0.0303992, 0.000422241, -0.0417,
		1.5708, 1.5708, 0.0, true);
	Attach(ShoulderLink, TEXT("motor_holder_so101_base_v1"),
		-0.0675992, -0.000177759, 0.0158499,
		1.5708, -1.5708, 0.0);
	Attach(ShoulderLink, TEXT("rotation_pitch_so101_v1"),
		0.0122008, 0.0000222413, 0.0464,
		-1.5708, 0.0, 0.0);

	// === upper_arm_link meshes ===
	Attach(UpperArmLink, TEXT("sts3215_03a_v1"),
		-0.11257, -0.0155, 0.0187,
		-3.14159, 0.0, -1.5708, true);
	Attach(UpperArmLink, TEXT("upper_arm_so101_v1"),
		-0.065085, 0.012, 0.0182,
		3.14159, 0.0, 0.0);

	// === lower_arm_link meshes ===
	Attach(LowerArmLink, TEXT("under_arm_so101_v1"),
		-0.0648499, -0.032, 0.0182,
		3.14159, 0.0, 0.0);
	Attach(LowerArmLink, TEXT("motor_holder_so101_wrist_v1"),
		-0.0648499, -0.032, 0.018,
		-3.14159, 0.0, 0.0);
	Attach(LowerArmLink, TEXT("sts3215_03a_v1"),
		-0.1224, 0.0052, 0.0187,
		-3.14159, 0.0, -3.14159, true);

	// === wrist_link meshes ===
	Attach(WristLink, TEXT("sts3215_03a_no_horn_v1"),
		0.0, -0.0424, 0.0306,
		1.5708, 1.5708, 0.0, true);
	Attach(WristLink, TEXT("wrist_roll_pitch_so101_v2"),
		0.0, -0.028, 0.0181,
		-1.5708, -1.5708, 0.0);

	// === gripper_link meshes ===
	Attach(GripperLink, TEXT("sts3215_03a_v1"),
		0.0077, 0.0001, -0.0234,
		-1.5708, 0.0, 0.0, true);
	Attach(GripperLink, TEXT("wrist_roll_follower_so101_v1"),
		0.0, -0.000218214, 0.000949706,
		-3.14159, 0.0, 0.0);

	// === moving_jaw_link meshes ===
	Attach(MovingJawLink, TEXT("moving_jaw_so101_v1"),
		0.0, 0.0, 0.0189,
		0.0, 0.0, 0.0);

	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: %d mesh components created"), AllMeshComponents.Num());

	// --- Connect to ROS via Subsystem ---
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// Bind delegates.
	Ros->OnTopicMessage.AddDynamic(this, &ARobotVisualizer::OnRosMessage);
	Ros->OnConnected.AddDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);

	// Subscribe is now queued even before connection — the subsystem will
	// send it automatically when connected (including on reconnect).
	Ros->Subscribe(JointStateTopic, JointStateType);

	// Queue MoveIt topic advertisements (sent on connect).
	AdvertiseMoveItTopics();

	// Queue record/replay/estop topics.
	SetupRecordReplayTopics();

	// Initiate connection if not already connected.
	if (!Ros->IsConnected())
	{
		Ros->Connect(RosBridgeUrl);
	}
	else
	{
		// Already connected (e.g. another actor already called Connect).
		UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: already connected, subscription sent."));
	}
}

// =============================================================================
// EndPlay
// =============================================================================

void ARobotVisualizer::EndPlay(const EEndPlayReason::Type EndPlayReason)
{
	if (UGameInstance* GI = UGameplayStatics::GetGameInstance(this))
	{
		if (URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>())
		{
			Ros->OnTopicMessage.RemoveDynamic(this, &ARobotVisualizer::OnRosMessage);
			Ros->OnConnected.RemoveDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);
		}
	}

	bMoveItTopicsAdvertised = false;
	bRecordReplayTopicsSetup = false;

	Super::EndPlay(EndPlayReason);
}

// =============================================================================
// ROS connection callback
// =============================================================================

void ARobotVisualizer::OnRosBridgeConnected()
{
	UE_LOG(LogRosBridge, Log,
		TEXT("RobotVisualizer: rosbridge connected — subscriptions restored by subsystem."));
}

// =============================================================================
// MoveIt topic advertisements
// =============================================================================

void ARobotVisualizer::AdvertiseMoveItTopics()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// These are queued and sent automatically when connected.
	Ros->Advertise(TEXT("/moveit_goal_named"), TEXT("std_msgs/String"));
	Ros->Advertise(TEXT("/moveit_goal_joints"), TEXT("sensor_msgs/JointState"));
	Ros->Advertise(TEXT("/moveit_goal_pose"), TEXT("geometry_msgs/PoseStamped"));

	bMoveItTopicsAdvertised = true;
	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: MoveIt command topics advertised."));
}

// =============================================================================
// MoveIt commands — SendNamedTarget
// =============================================================================

void ARobotVisualizer::SendNamedTarget()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("SendNamedTarget: not connected to rosbridge."));
		return;
	}

	// std_msgs/String: {"data": "home"}
	FString MsgJson = FString::Printf(TEXT("{\"data\":\"%s\"}"), *MoveItNamedTarget);
	Ros->Publish(TEXT("/moveit_goal_named"), MsgJson);

	UE_LOG(LogRosBridge, Log, TEXT("SendNamedTarget: published '%s' to /moveit_goal_named"), *MoveItNamedTarget);
}

// =============================================================================
// MoveIt commands — SendJointGoal
// =============================================================================

void ARobotVisualizer::SendJointGoal()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("SendJointGoal: not connected to rosbridge."));
		return;
	}

	// sensor_msgs/JointState:
	// {
	//   "header": {"stamp": {"sec": 0, "nanosec": 0}, "frame_id": ""},
	//   "name": ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll"],
	//   "position": [0.0, -0.5, 0.5, 0.0, 0.0],
	//   "velocity": [],
	//   "effort": []
	// }

	FString MsgJson = FString::Printf(
		TEXT("{\"header\":{\"stamp\":{\"sec\":0,\"nanosec\":0},\"frame_id\":\"\"},")
		TEXT("\"name\":[\"shoulder_pan\",\"shoulder_lift\",\"elbow_flex\",\"wrist_flex\",\"wrist_roll\"],")
		TEXT("\"position\":[%f,%f,%f,%f,%f],")
		TEXT("\"velocity\":[],\"effort\":[]}"),
		GoalShoulderPan, GoalShoulderLift, GoalElbowFlex, GoalWristFlex, GoalWristRoll
	);

	Ros->Publish(TEXT("/moveit_goal_joints"), MsgJson);

	UE_LOG(LogRosBridge, Log,
		TEXT("SendJointGoal: [%.3f, %.3f, %.3f, %.3f, %.3f] to /moveit_goal_joints"),
		GoalShoulderPan, GoalShoulderLift, GoalElbowFlex, GoalWristFlex, GoalWristRoll);
}

// =============================================================================
// MoveIt commands — SendPoseGoal
// =============================================================================

void ARobotVisualizer::SendPoseGoal()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("SendPoseGoal: not connected to rosbridge."));
		return;
	}

	// Convert UE position (cm, left-handed) to ROS (meters, right-handed)
	double RosX, RosY, RosZ;
	RosCoordConv::UeToRosPosition(GoalPositionUE, RosX, RosY, RosZ);

	// geometry_msgs/PoseStamped (orientation defaults to identity — position-only goal)
	FString MsgJson = FString::Printf(
		TEXT("{\"header\":{\"stamp\":{\"sec\":0,\"nanosec\":0},\"frame_id\":\"base_link\"},")
		TEXT("\"pose\":{\"position\":{\"x\":%f,\"y\":%f,\"z\":%f},")
		TEXT("\"orientation\":{\"x\":0.0,\"y\":0.0,\"z\":0.0,\"w\":1.0}}}"),
		RosX, RosY, RosZ
	);

	Ros->Publish(TEXT("/moveit_goal_pose"), MsgJson);

	UE_LOG(LogRosBridge, Log,
		TEXT("SendPoseGoal: UE(%.1f, %.1f, %.1f)cm -> ROS(%.4f, %.4f, %.4f)m to /moveit_goal_pose"),
		GoalPositionUE.X, GoalPositionUE.Y, GoalPositionUE.Z,
		RosX, RosY, RosZ);
}

// =============================================================================
// Message handling
// =============================================================================

void ARobotVisualizer::OnRosMessage(const FString& Topic, const FString& MessageJson)
{
	if (Topic == JointStateTopic)
	{
		ParseAndApplyJointStates(MessageJson);
	}
	else if (Topic == TEXT("/robot_status"))
	{
		OnRobotStatus(Topic, MessageJson);
	}
}

void ARobotVisualizer::ParseAndApplyJointStates(const FString& MessageJson)
{
	// Parse sensor_msgs/JointState JSON:
	// { "name": ["shoulder_pan", ...], "position": [0.1, ...], ... }

	TSharedPtr<FJsonObject> Json;
	TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(MessageJson);
	if (!FJsonSerializer::Deserialize(Reader, Json) || !Json.IsValid())
	{
		return;
	}

	const TArray<TSharedPtr<FJsonValue>>* NameArray = nullptr;
	const TArray<TSharedPtr<FJsonValue>>* PosArray = nullptr;

	if (!Json->TryGetArrayField(TEXT("name"), NameArray) ||
		!Json->TryGetArrayField(TEXT("position"), PosArray))
	{
		return;
	}

	const int32 Count = FMath::Min(NameArray->Num(), PosArray->Num());
	for (int32 i = 0; i < Count; ++i)
	{
		const FName JointName(*(*NameArray)[i]->AsString());
		const double AngleRad = (*PosArray)[i]->AsNumber();

		TObjectPtr<USceneComponent>* JointComp = JointComponentMap.Find(JointName);
		if (JointComp && *JointComp)
		{
			const float AngleDeg = RosCoordConv::RosJointAngleToUeDegrees(AngleRad);

			FRotator BaseRot;
			if (JointName == FName("shoulder_pan"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159);
			else if (JointName == FName("shoulder_lift"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0);
			else if (JointName == FName("elbow_flex"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708);
			else if (JointName == FName("wrist_flex"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708);
			else if (JointName == FName("wrist_roll"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159);
			else if (JointName == FName("gripper"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0);

			const FQuat BaseQuat = BaseRot.Quaternion();
			const FQuat JointQuat = FQuat(FVector::UpVector, FMath::DegreesToRadians(AngleDeg));
			const FQuat FinalQuat = BaseQuat * JointQuat;

			(*JointComp)->SetRelativeRotation(FinalQuat.Rotator());
		}
	}
}

// =============================================================================
// Record / Replay / E-Stop — Topic setup
// =============================================================================

void ARobotVisualizer::SetupRecordReplayTopics()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// Advertise the command topic
	Ros->Advertise(TEXT("/robot_command"), TEXT("std_msgs/String"));

	// Subscribe to status feedback
	Ros->Subscribe(TEXT("/robot_status"), TEXT("std_msgs/String"));

	bRecordReplayTopicsSetup = true;
	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: Record/Replay topics set up."));
}

// =============================================================================
// Record / Replay / E-Stop — Command publisher helper
// =============================================================================

void ARobotVisualizer::PublishRobotCommand(const FString& JsonCmd)
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("PublishRobotCommand: not connected to rosbridge."));
		if (GEngine)
		{
			GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Red,
				TEXT("Robot: Not connected to rosbridge!"));
		}
		return;
	}

	// std_msgs/String: {"data": "<json_cmd>"}
	// The JSON command is nested inside the "data" field, with quotes escaped.
	FString EscapedCmd = JsonCmd.Replace(TEXT("\""), TEXT("\\\""));
	FString MsgJson = FString::Printf(TEXT("{\"data\":\"%s\"}"), *EscapedCmd);
	Ros->Publish(TEXT("/robot_command"), MsgJson);

	UE_LOG(LogRosBridge, Log, TEXT("PublishRobotCommand: %s"), *JsonCmd);
}

// =============================================================================
// Record / Replay / E-Stop — Button handlers
// =============================================================================

void ARobotVisualizer::StartRecord()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"start_record\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Green,
			TEXT("Robot: Recording started (teleop active)"));
	}
}

void ARobotVisualizer::StopRecord()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"stop_record\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Yellow,
			TEXT("Robot: Recording stopped, saving..."));
	}
}

void ARobotVisualizer::StartReplay()
{
	FString ArgsJson;
	if (ReplayFilename.IsEmpty())
	{
		// No filename = most recent recording
		ArgsJson = FString::Printf(TEXT("{\"cmd\":\"start_replay\",\"args\":{\"loop\":%s}}"),
			bReplayLoop ? TEXT("true") : TEXT("false"));
	}
	else
	{
		ArgsJson = FString::Printf(
			TEXT("{\"cmd\":\"start_replay\",\"args\":{\"filename\":\"%s\",\"loop\":%s}}"),
			*ReplayFilename,
			bReplayLoop ? TEXT("true") : TEXT("false"));
	}
	PublishRobotCommand(ArgsJson);

	if (GEngine)
	{
		FString DisplayName = ReplayFilename.IsEmpty() ? TEXT("(most recent)") : ReplayFilename;
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Cyan,
			FString::Printf(TEXT("Robot: Replaying %s (loop=%s)"),
				*DisplayName, bReplayLoop ? TEXT("yes") : TEXT("no")));
	}
}

void ARobotVisualizer::StopReplay()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"stop_replay\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Yellow,
			TEXT("Robot: Replay stopped"));
	}
}

void ARobotVisualizer::EStop()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"estop\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 5.0f, FColor::Red,
			TEXT("*** E-STOP *** All motion halted"));
	}
}

// =============================================================================
// Record / Replay — Status feedback handler
// =============================================================================

void ARobotVisualizer::OnRobotStatus(const FString& Topic, const FString& MessageJson)
{
	// rosbridge wraps std_msgs/String as: {"data": "..."}
	TSharedPtr<FJsonObject> OuterJson;
	TSharedRef<TJsonReader<>> OuterReader = TJsonReaderFactory<>::Create(MessageJson);
	if (!FJsonSerializer::Deserialize(OuterReader, OuterJson) || !OuterJson.IsValid())
	{
		return;
	}

	FString DataStr;
	if (!OuterJson->TryGetStringField(TEXT("data"), DataStr))
	{
		return;
	}

	// Parse the inner JSON status
	TSharedPtr<FJsonObject> StatusJson;
	TSharedRef<TJsonReader<>> StatusReader = TJsonReaderFactory<>::Create(DataStr);
	if (!FJsonSerializer::Deserialize(StatusReader, StatusJson) || !StatusJson.IsValid())
	{
		return;
	}

	FString State;
	if (StatusJson->TryGetStringField(TEXT("state"), State))
	{
		if (State != WorkerState)
		{
			WorkerState = State;
			UE_LOG(LogRosBridge, Log, TEXT("Worker state: %s"), *WorkerState);

			if (GEngine)
			{
				FColor Color = FColor::White;
				if (State == TEXT("recording")) Color = FColor::Green;
				else if (State == TEXT("replaying")) Color = FColor::Cyan;
				else if (State == TEXT("idle")) Color = FColor::Silver;

				GEngine->AddOnScreenDebugMessage(-1, 3.0f, Color,
					FString::Printf(TEXT("Robot state: %s"), *WorkerState));
			}
		}
	}

	// Log errors from commands
	FString Status;
	if (StatusJson->TryGetStringField(TEXT("status"), Status) && Status == TEXT("error"))
	{
		FString Reason;
		StatusJson->TryGetStringField(TEXT("reason"), Reason);
		UE_LOG(LogRosBridge, Warning, TEXT("Robot command error: %s"), *Reason);

		if (GEngine)
		{
			GEngine->AddOnScreenDebugMessage(-1, 5.0f, FColor::Red,
				FString::Printf(TEXT("Robot error: %s"), *Reason));
		}
	}

	// Log recording saved info
	FString Filename;
	if (StatusJson->TryGetStringField(TEXT("filename"), Filename) && !Filename.IsEmpty())
	{
		int32 Frames = 0;
		StatusJson->TryGetNumberField(TEXT("frames"), Frames);
		double Duration = 0.0;
		StatusJson->TryGetNumberField(TEXT("duration_sec"), Duration);

		UE_LOG(LogRosBridge, Log, TEXT("Recording saved: %s (%d frames, %.1fs)"),
			*Filename, Frames, Duration);

		if (GEngine)
		{
			GEngine->AddOnScreenDebugMessage(-1, 5.0f, FColor::Green,
				FString::Printf(TEXT("Recording saved: %s (%d frames, %.1fs)"),
					*Filename, Frames, Duration));
		}
	}
}

수정 사항

 

  • SetupRecordReplayTopics()
    BeginPlay에서 /robot_command advertise + /robot_status subscribe
  • PublishRobotCommand()
    JSON 명령을 std_msgs/String 형태로 /robot_command에 publish
  • StartRecord/StopRecord/StartReplay/StopReplay/EStop
    각각 적절한 JSON 명령을 publish + 뷰포트에 컬러 디버그 메시지 표시
    • 초록: recording 상태, 저장 완료
    • 시안: replaying 상태
    • 노랑: 중단됨
    • 빨강: E-stop, 에러, 연결 안 됨
  • OnRobotStatus()
    /robot_status 메시지를 파싱하여 WorkerState 업데이트 + 에러/저장 완료 등 뷰포트 피드백
  • OnRosMessage()에 /robot_status 라우팅 추가

 

 

 


 

 

4단계: 테스트

lerobot_worker.py, ros_bridge_node.py, RobotVisualizer.h, RobotVisualizer.cpp 총 4개의 파일 모두 수정했다면 빌드해서 이상이 없는지 확인한다.

 

 

프로젝트 파일에 마우스 오른쪽 버튼을 눌러 [Generate Visual Studio project files],

프로젝트.sln 파일을 열어서 솔루션 빌드.

프로젝트 빌드 성공 확인.

 

 

이제 WSL 터미널 켜고 테스트를 해본다.

 

 

로봇 연결

Attach-Both

 

 

터미널 1 (conda - worker)

source ~/miniforge3/etc/profile.d/conda.sh
conda activate lerobot
sudo chmod 666 /dev/ttyACM*
cd ~/UnrealRobotics/src/lerobot_ros2_bridge
python scripts/lerobot_worker.py --cmd-joints all --cmd-limit 120

 

 

터미널 2 (ROS2 - bridge node)

# 파일 수정 후 재빌드 필요
cd ~/UnrealRobotics
colcon build --packages-select lerobot_ros2_bridge --symlink-install
export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
source ~/UnrealRobotics/install/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 run lerobot_ros2_bridge bridge_node

 

 

터미널 3 (ROS2 - rosbridge)

export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
source ~/UnrealRobotics/install/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 launch rosbridge_server rosbridge_websocket_launch.xml

 

 

언리얼 열기

레벨에 Robot Visualizer 배치

 

 

Details 패널에서 Record, Replay, Safety 항목 확인

 

 

작동 시키면 Record도 잘 되고 Replay도 잘 되는 것을 볼 수 있다. 하지만 고쳐야 할 부분이 있다.

팔로워와 리더가 처음에 정렬이 안되어있으면 Start Record를 하자마자 팔로워 암이 튀어버리는 문제. Sync 버튼을 눌러서 팔로워와 리더를 먼저 싱크해 놓으면 될 것 같다.

  • Sync On → 텔레옵 활성화, leader로 follower를 부드럽게 정렬
  • Start Record → Sync가 이미 켜져있으니 자연스럽게 기록 시작 (튀지 않음)
  • Stop Record → 기록 저장, Sync는 유지 (사용자가 직접 끌 때까지)
  • Start Replay → Sync 자동 해제 + 재생 시작
  • Stop Replay / EStop 후 → Sync On으로 다시 연결 → 손으로 rest로 복귀

 

 


 

 

4-1단계: Sync 버튼 추가, 로직 수정

새 워크플로우 확정

 

  • Sync On → leader↔follower 동기화 시작 (부드럽게 정렬)
  • Start Record → Sync가 켜진 상태에서만 가능 (안 켜져있으면 에러 반환)
  • Stop Record → 기록 저장, Sync는 유지 (손으로 계속 조작 가능)
  • Start Replay → Sync 자동 해제 + 재생
  • Stop Replay / EStop → 멈춤
  • Sync On → 다시 연결 → 손으로 rest로 복귀
  • Sync Off → 필요 시 수동 해제

 

 

 

lerobot_worker.py

더보기
#!/usr/bin/env python3
"""
lerobot_worker.py — LeRobot ↔ ZMQ bridge worker

Runs in conda env 'lerobot' (Python 3.12).
Reads joint states from SO-ARM-101 via LeRobot API at ~30Hz,
publishes them over ZMQ PUB, and accepts commands via ZMQ REP.

Supports record/replay of joint trajectories via ZMQ commands:
  - start_teleop: activate leader→follower sync (teleop)
  - stop_teleop:  deactivate teleop sync
  - start_record: begin teleop + record joint frames
  - stop_record:  stop recording, save to ~/recordings/ (teleop stays on)
  - start_replay: replay a saved recording on the follower arm (teleop off)
  - stop_replay:  stop replay immediately
  - estop:        emergency stop — abort all motion immediately
  - list_recordings: list saved recording files

Usage:
    conda activate lerobot
    sudo chmod 666 /dev/ttyACM*
    python scripts/lerobot_worker.py [--follower-port /dev/ttyACM0] [--leader-port /dev/ttyACM1]
                                      [--pub-addr tcp://127.0.0.1:5555]
                                      [--rep-addr tcp://127.0.0.1:5556]
                                      [--rate 30]
                                      [--follower-only | --leader-only]
                                      [--teleop]
                                      [--recordings-dir ~/recordings]
"""

import argparse
import json
import os
import pathlib
import signal
import sys
import time
import traceback
from datetime import datetime

import zmq


# ---------------------------------------------------------------------------
# Joint name ordering (matches URDF & LeRobot calibration — Phase 3.2 verified)
# ---------------------------------------------------------------------------
JOINT_NAMES = [
    "shoulder_pan",
    "shoulder_lift",
    "elbow_flex",
    "wrist_flex",
    "wrist_roll",
    "gripper",
]

# ---------------------------------------------------------------------------
# Joint limits in LeRobot DEGREES (empirically measured via calibration).
# These are absolute physical limits — commands are never allowed beyond these.
# Measured by reading get_observation() degrees at calibration range_min/max
# raw encoder positions, using two-point linear interpolation.
# NOTE: These are in LeRobot's coordinate frame (0° = calibration midpoint),
#       NOT URDF coordinates (which use a different zero point).
# ---------------------------------------------------------------------------
JOINT_LIMITS_DEG = {
    "shoulder_pan":  (-119.91, +119.91),
    "shoulder_lift": (-104.62, +104.62),
    "elbow_flex":    ( -97.01,  +97.01),
    "wrist_flex":    (-102.68, +102.68),
    "wrist_roll":    (-180.00, +180.00),
    "gripper":       (  +0.14,  +99.45),
}


def build_args():
    p = argparse.ArgumentParser(description="LeRobot ZMQ worker for SO-ARM-101")
    p.add_argument("--follower-port", default="/dev/ttyACM0",
                    help="Serial port for follower arm")
    p.add_argument("--leader-port", default="/dev/ttyACM1",
                    help="Serial port for leader arm")
    p.add_argument("--follower-id", default="so101_twin_follower",
                    help="Calibration ID for follower")
    p.add_argument("--leader-id", default="so101_twin_leader",
                    help="Calibration ID for leader")
    p.add_argument("--pub-addr", default="tcp://127.0.0.1:5555",
                    help="ZMQ PUB bind address for joint states")
    p.add_argument("--rep-addr", default="tcp://127.0.0.1:5556",
                    help="ZMQ REP bind address for commands")
    p.add_argument("--rate", type=float, default=30.0,
                    help="Read loop frequency in Hz")
    p.add_argument("--follower-only", action="store_true",
                    help="Only connect follower (no leader)")
    p.add_argument("--leader-only", action="store_true",
                    help="Only connect leader (no follower)")
    p.add_argument("--teleop", action="store_true",
                    help="Teleoperation mode: leader arm controls follower arm")
    p.add_argument("--cmd-limit", type=float, default=5.0,
                    help="Safety clamp: max degrees from initial position per joint (default 5.0)")
    p.add_argument("--cmd-joints", default="shoulder_pan",
                    help="Comma-separated joint names that accept commands "
                         "(default: shoulder_pan). Use 'all' for all joints.")
    p.add_argument("--recordings-dir", default="~/recordings",
                    help="Directory to save/load recorded trajectories")
    return p.parse_args()


class LeRobotWorker:
    def __init__(self, args):
        self.args = args
        self.running = True
        self.follower = None
        self.leader = None
        self.teleop = args.teleop

        # Safety clamp: initial follower position recorded at startup,
        # commands are clamped to ±cmd_limit degrees from this baseline.
        # Only joints listed in allowed_cmd_joints accept commands;
        # others are silently ignored (held at current position).
        self.follower_initial_pos = None       # dict: {joint_name: degrees}
        self.cmd_limit_deg = args.cmd_limit    # default 5.0°
        self.allowed_cmd_joints = (
            set(JOINT_NAMES) if args.cmd_joints.strip().lower() == "all"
            else set(j.strip() for j in args.cmd_joints.split(",") if j.strip())
        )

        # ---------------------------------------------------------------------------
        # Record / Replay state
        # ---------------------------------------------------------------------------
        # state: "idle", "recording", "replaying"
        self.state = "idle"

        # Recording buffer: list of {"ts": float, "joints": {name: deg, ...}}
        self.recording_buffer = []
        self.recording_start_time = None

        # Replay state
        self.replay_frames = []        # loaded frames from file
        self.replay_index = 0          # current frame index
        self.replay_start_time = None  # monotonic time when replay started
        self.replay_loop = False       # whether to loop replay
        self.replay_filename = ""      # currently playing file

        # Recordings directory
        self.recordings_dir = pathlib.Path(os.path.expanduser(args.recordings_dir))
        self.recordings_dir.mkdir(parents=True, exist_ok=True)
        print(f"[worker] Recordings dir: {self.recordings_dir}")

        # ZMQ setup
        self.ctx = zmq.Context()

        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.bind(args.pub_addr)
        print(f"[worker] PUB bound on {args.pub_addr}")

        self.rep = self.ctx.socket(zmq.REP)
        self.rep.bind(args.rep_addr)
        print(f"[worker] REP bound on {args.rep_addr}")

        # Poller for non-blocking REP check inside the read loop
        self.poller = zmq.Poller()
        self.poller.register(self.rep, zmq.POLLIN)

    def connect_robots(self):
        """Connect to follower and/or leader via LeRobot API."""
        if not self.args.leader_only:
            print(f"[worker] Connecting follower on {self.args.follower_port} "
                  f"(id={self.args.follower_id}) ...")
            from lerobot.robots.so_follower import SO101Follower, SO101FollowerConfig
            cfg = SO101FollowerConfig(
                port=self.args.follower_port,
                id=self.args.follower_id,
            )
            self.follower = SO101Follower(cfg)
            self.follower.connect()
            print("[worker] Follower connected.")

        if not self.args.follower_only:
            print(f"[worker] Connecting leader on {self.args.leader_port} "
                  f"(id={self.args.leader_id}) ...")
            from lerobot.teleoperators.so_leader import SOLeader, SO101LeaderConfig
            cfg = SO101LeaderConfig(
                port=self.args.leader_port,
                id=self.args.leader_id,
            )
            self.leader = SOLeader(cfg)
            self.leader.connect()
            print("[worker] Leader connected.")

        # Validate teleop requirements
        if self.teleop:
            if self.follower is None or self.leader is None:
                print("[worker] ERROR: --teleop requires both follower and leader connected.")
                print("[worker] Cannot use --teleop with --follower-only or --leader-only.")
                sys.exit(1)
            print("[worker] *** TELEOP MODE ENABLED ***")
            print("[worker] Leader arm will control follower arm in real-time.")
            print("[worker] Move leader arm to starting position, then press Enter to begin...")
            input()
            print("[worker] Teleop active!")

        # Record initial follower position for safety clamping
        if self.follower is not None:
            init_pos = self.read_follower()

            # Load calibration JSON to display physical range info
            calib_range = self._load_calibration_range("follower")

            if init_pos is not None:
                self.follower_initial_pos = dict(init_pos)

                # Print calibration range info
                if calib_range:
                    print("[worker] === CALIBRATION RANGE (raw encoder values) ===")
                    for name in JOINT_NAMES:
                        if name in calib_range:
                            rmin, rmax, cur_raw = calib_range[name]
                            total = rmax - rmin
                            if cur_raw is not None and total > 0:
                                pct = (cur_raw - rmin) / total * 100.0
                                print(f"  {name:20s}: raw {cur_raw:4d}  "
                                      f"range [{rmin:4d} ~ {rmax:4d}]  "
                                      f"({pct:5.1f}% of range)")
                            else:
                                print(f"  {name:20s}: raw  ???  "
                                      f"range [{rmin:4d} ~ {rmax:4d}]")

                print("[worker] === SAFETY CLAMP BASELINE (follower initial position) ===")
                for name in JOINT_NAMES:
                    v = init_pos.get(name, 0.0)
                    soft_lo = v - self.cmd_limit_deg
                    soft_hi = v + self.cmd_limit_deg
                    hard_lo, hard_hi = JOINT_LIMITS_DEG.get(name, (-180.0, 180.0))
                    eff_lo = max(soft_lo, hard_lo)
                    eff_hi = min(soft_hi, hard_hi)
                    marker = " <-- CMD OK" if name in self.allowed_cmd_joints else ""
                    print(f"  {name:20s}: {v:+8.2f}°  "
                          f"effective: [{eff_lo:+8.2f}° ~ {eff_hi:+8.2f}°]  "
                          f"(physical: [{hard_lo:+8.2f}°~{hard_hi:+8.2f}°]){marker}")
                print(f"[worker] Allowed command joints: {sorted(self.allowed_cmd_joints)}")
                print(f"[worker] Clamp limit: ±{self.cmd_limit_deg}° from baseline")
            else:
                print("[worker] WARNING: Could not read initial follower position. "
                      "Commands will be REJECTED until baseline is set.")

    def _load_calibration_range(self, arm_type="follower"):
        """
        Load calibration JSON and read current raw positions.
        Returns dict: {joint_name: (range_min, range_max, current_raw)} or None.
        """
        if arm_type == "follower":
            calib_path = (
                pathlib.Path.home()
                / ".cache/huggingface/lerobot/calibration/robots/so_follower"
                / f"{self.args.follower_id}.json"
            )
            robot = self.follower
        else:
            calib_path = (
                pathlib.Path.home()
                / ".cache/huggingface/lerobot/calibration/teleoperators/so_leader"
                / f"{self.args.leader_id}.json"
            )
            robot = self.leader

        try:
            with open(calib_path) as f:
                calib = json.load(f)
            print(f"[worker] Loaded calibration from {calib_path}")

            # Read current raw positions (without normalization)
            current_raw = {}
            if robot is not None:
                try:
                    raw = robot.bus.sync_read("Present_Position", normalize=False)
                    current_raw = dict(raw)
                except Exception as e:
                    print(f"[worker] Could not read raw positions: {e}")

            result = {}
            for name in JOINT_NAMES:
                if name in calib:
                    rmin = calib[name].get("range_min", 0)
                    rmax = calib[name].get("range_max", 4095)
                    cur = current_raw.get(name, None)
                    result[name] = (rmin, rmax, cur)

            return result

        except FileNotFoundError:
            print(f"[worker] Calibration file not found: {calib_path}")
            return None
        except Exception as e:
            print(f"[worker] Error loading calibration: {e}")
            return None

    def read_follower(self):
        """Read follower joint positions in degrees. Returns dict or None."""
        if self.follower is None:
            return None
        try:
            obs = self.follower.get_observation()
            return {name: float(obs[f"{name}.pos"]) for name in JOINT_NAMES}
        except Exception as e:
            print(f"[worker] Follower read error: {e}")
            return None

    def read_leader(self):
        """Read leader joint positions in degrees. Returns dict or None."""
        if self.leader is None:
            return None
        try:
            action = self.leader.get_action()
            return {name: float(action[f"{name}.pos"]) for name in JOINT_NAMES}
        except Exception as e:
            print(f"[worker] Leader read error: {e}")
            return None

    def send_follower_action(self, joint_positions_deg):
        """
        Send joint positions (degrees) to follower via LeRobot send_action().
        joint_positions_deg: dict like {"shoulder_pan": 10.0, ...}
        """
        if self.follower is None:
            return
        try:
            action_dict = {f"{name}.pos": float(joint_positions_deg[name])
                           for name in JOINT_NAMES if name in joint_positions_deg}
            self.follower.send_action(action_dict)
        except Exception as e:
            print(f"[worker] Follower send_action error: {e}")

    def handle_command(self, msg):
        """
        Process a command from the ROS bridge node.
        Expected format:
            {"cmd": "send_follower_action", "args": {"shoulder_pan": 0.0, ...}}
            {"cmd": "ping"}
            {"cmd": "start_teleop"}
            {"cmd": "stop_teleop"}
            {"cmd": "start_record"}
            {"cmd": "stop_record"}
            {"cmd": "start_replay", "args": {"filename": "...", "loop": false}}
            {"cmd": "stop_replay"}
            {"cmd": "estop"}
            {"cmd": "list_recordings"}
        Returns a response dict.
        """
        cmd = msg.get("cmd", "")

        if cmd == "ping":
            return {"status": "ok", "state": self.state,
                    "teleop": self.teleop, "ts": time.time()}

        elif cmd == "estop":
            return self._handle_estop()

        elif cmd == "start_teleop":
            return self._handle_start_teleop()

        elif cmd == "stop_teleop":
            return self._handle_stop_teleop()

        elif cmd == "start_record":
            return self._handle_start_record()

        elif cmd == "stop_record":
            return self._handle_stop_record()

        elif cmd == "start_replay":
            args = msg.get("args", {})
            return self._handle_start_replay(args)

        elif cmd == "stop_replay":
            return self._handle_stop_replay()

        elif cmd == "list_recordings":
            return self._handle_list_recordings()

        elif cmd == "send_follower_action":
            if self.follower is None:
                return {"status": "error", "reason": "follower not connected"}
            if self.follower_initial_pos is None:
                return {"status": "error", "reason": "safety baseline not set (no initial position)"}
            joint_args = msg.get("args", {})
            if not joint_args:
                return {"status": "error", "reason": "empty args"}
            try:
                # Two-layer safety clamp:
                #   Layer 1: baseline ± cmd_limit (session safety)
                #   Layer 2: JOINT_LIMITS_DEG (absolute physical limits, measured empirically)
                clamped = {}
                rejected = []
                for name, target_deg in joint_args.items():
                    if name not in JOINT_NAMES:
                        continue
                    if name not in self.allowed_cmd_joints:
                        rejected.append(name)
                        continue

                    target = float(target_deg)

                    # Layer 1: baseline ± cmd_limit
                    baseline = self.follower_initial_pos.get(name, 0.0)
                    soft_lo = baseline - self.cmd_limit_deg
                    soft_hi = baseline + self.cmd_limit_deg

                    # Layer 2: absolute physical limits
                    hard_lo, hard_hi = JOINT_LIMITS_DEG.get(name, (-180.0, 180.0))

                    # Effective range = intersection of both
                    eff_lo = max(soft_lo, hard_lo)
                    eff_hi = min(soft_hi, hard_hi)

                    clamped_val = max(eff_lo, min(eff_hi, target))
                    clamped[name] = clamped_val

                    if abs(clamped_val - target) > 0.01:
                        reason = ""
                        if target < hard_lo or target > hard_hi:
                            reason = " [PHYSICAL LIMIT]"
                        else:
                            reason = " [BASELINE LIMIT]"
                        print(f"[worker] CLAMP {name}: "
                              f"requested {target:.2f}° → clamped to {clamped_val:.2f}° "
                              f"(effective: {eff_lo:.2f}° ~ {eff_hi:.2f}°){reason}")

                if not clamped:
                    return {"status": "error",
                            "reason": f"no allowed joints in command "
                                      f"(rejected: {rejected}, allowed: {sorted(self.allowed_cmd_joints)})"}

                action_dict = {f"{name}.pos": val for name, val in clamped.items()}
                self.follower.send_action(action_dict)
                resp = {"status": "ok", "clamped": clamped}
                if rejected:
                    resp["rejected_joints"] = rejected
                return resp
            except Exception as e:
                return {"status": "error", "reason": str(e)}

        else:
            return {"status": "error", "reason": f"unknown cmd: {cmd}"}

    # -----------------------------------------------------------------------
    # Teleop (Sync) handlers
    # -----------------------------------------------------------------------

    def _handle_start_teleop(self):
        """Activate leader→follower sync (teleop)."""
        if self.state == "replaying":
            return {"status": "error",
                    "reason": "cannot start teleop during replay — stop replay first"}
        if self.follower is None or self.leader is None:
            return {"status": "error",
                    "reason": "teleop requires both follower and leader connected"}
        if self.teleop:
            return {"status": "ok", "state": self.state, "teleop": True,
                    "msg": "teleop already active"}

        self.teleop = True
        print("[worker] Teleop (sync) ON — leader → follower")
        return {"status": "ok", "state": self.state, "teleop": True}

    def _handle_stop_teleop(self):
        """Deactivate leader→follower sync (teleop)."""
        if self.state == "recording":
            return {"status": "error",
                    "reason": "cannot stop teleop during recording — stop recording first"}
        self.teleop = False
        print("[worker] Teleop (sync) OFF")
        return {"status": "ok", "state": self.state, "teleop": False}

    # -----------------------------------------------------------------------
    # Record / Replay / E-Stop handlers
    # -----------------------------------------------------------------------

    def _handle_estop(self):
        """Emergency stop: abort any ongoing motion, return to idle."""
        prev_state = self.state
        prev_teleop = self.teleop
        self.state = "idle"
        self.teleop = False
        self.recording_buffer = []
        self.replay_frames = []
        self.replay_index = 0
        print(f"[worker] *** E-STOP *** (was: state={prev_state}, teleop={prev_teleop})")
        return {"status": "ok", "state": "idle", "teleop": False,
                "previous_state": prev_state}

    def _handle_start_record(self):
        """Start recording: teleop must already be active (Sync On first)."""
        if self.state != "idle":
            return {"status": "error",
                    "reason": f"cannot start recording in state '{self.state}'"}
        if self.follower is None or self.leader is None:
            return {"status": "error",
                    "reason": "recording requires both follower and leader connected"}
        if not self.teleop:
            return {"status": "error",
                    "reason": "teleop (sync) must be active before recording — "
                              "press Sync On first"}

        self.recording_buffer = []
        self.recording_start_time = time.monotonic()
        self.state = "recording"
        print("[worker] *** RECORDING STARTED *** (teleop already active)")
        return {"status": "ok", "state": "recording", "teleop": True}

    def _handle_stop_record(self):
        """Stop recording: save buffer to file. Teleop stays ON."""
        if self.state != "recording":
            return {"status": "error",
                    "reason": f"not recording (state: '{self.state}')"}

        num_frames = len(self.recording_buffer)
        if num_frames == 0:
            self.state = "idle"
            # Teleop stays ON — user controls it via Sync button
            print("[worker] Recording stopped — 0 frames, nothing saved.")
            return {"status": "ok", "state": "idle", "teleop": self.teleop,
                    "frames": 0, "filename": None}

        # Calculate duration from frame timestamps
        duration = self.recording_buffer[-1]["ts"] - self.recording_buffer[0]["ts"]

        # Build recording file
        timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"recording_{timestamp_str}.json"
        filepath = self.recordings_dir / filename

        recording_data = {
            "version": 1,
            "recorded_at": datetime.now().isoformat(),
            "rate_hz": self.args.rate,
            "num_frames": num_frames,
            "duration_sec": round(duration, 3),
            "joint_names": JOINT_NAMES,
            "frames": self.recording_buffer,
        }

        with open(filepath, "w") as f:
            json.dump(recording_data, f, indent=2)

        self.state = "idle"
        # Teleop stays ON — user controls it via Sync button
        self.recording_buffer = []
        print(f"[worker] Recording saved: {filepath}")
        print(f"[worker]   {num_frames} frames, {duration:.1f}s")
        return {
            "status": "ok",
            "state": "idle",
            "teleop": self.teleop,
            "filename": filename,
            "frames": num_frames,
            "duration_sec": round(duration, 3),
        }

    def _handle_start_replay(self, args):
        """Load a recording file and start replaying on the follower."""
        if self.state != "idle":
            return {"status": "error",
                    "reason": f"cannot start replay in state '{self.state}'"}
        if self.follower is None:
            return {"status": "error",
                    "reason": "replay requires follower connected"}

        filename = args.get("filename", "")
        self.replay_loop = args.get("loop", False)

        # If no filename, use the most recent recording
        if not filename:
            recordings = self._get_recording_files()
            if not recordings:
                return {"status": "error", "reason": "no recordings found"}
            filename = recordings[-1]  # most recent (sorted by name = sorted by time)

        filepath = self.recordings_dir / filename
        if not filepath.exists():
            return {"status": "error",
                    "reason": f"file not found: {filename}"}

        try:
            with open(filepath) as f:
                data = json.load(f)
        except Exception as e:
            return {"status": "error", "reason": f"failed to load: {e}"}

        frames = data.get("frames", [])
        if len(frames) < 2:
            return {"status": "error",
                    "reason": f"recording too short ({len(frames)} frames)"}

        self.replay_frames = frames
        self.replay_index = 0
        self.replay_start_time = time.monotonic()
        self.replay_filename = filename
        self.teleop = False  # disable teleop during replay
        self.state = "replaying"

        duration = data.get("duration_sec", 0)
        print(f"[worker] *** REPLAY STARTED *** {filename}")
        print(f"[worker]   {len(frames)} frames, {duration}s, "
              f"loop={'yes' if self.replay_loop else 'no'}")
        return {
            "status": "ok",
            "state": "replaying",
            "filename": filename,
            "frames": len(frames),
            "duration_sec": duration,
            "loop": self.replay_loop,
        }

    def _handle_stop_replay(self):
        """Stop replay immediately."""
        if self.state != "replaying":
            return {"status": "error",
                    "reason": f"not replaying (state: '{self.state}')"}

        self.state = "idle"
        self.replay_frames = []
        self.replay_index = 0
        print(f"[worker] Replay stopped ({self.replay_filename})")
        return {"status": "ok", "state": "idle"}

    def _handle_list_recordings(self):
        """List all saved recording files."""
        files = self._get_recording_files()
        # Add metadata summary for each file
        summaries = []
        for fname in files:
            try:
                with open(self.recordings_dir / fname) as f:
                    data = json.load(f)
                summaries.append({
                    "filename": fname,
                    "frames": data.get("num_frames", 0),
                    "duration_sec": data.get("duration_sec", 0),
                    "recorded_at": data.get("recorded_at", ""),
                })
            except Exception:
                summaries.append({"filename": fname})
        return {"status": "ok", "recordings": summaries}

    def _get_recording_files(self):
        """Get sorted list of recording JSON files."""
        files = sorted(
            f.name for f in self.recordings_dir.glob("recording_*.json")
            if f.is_file()
        )
        return files

    # -----------------------------------------------------------------------
    # Replay tick — called each iteration of the main loop
    # -----------------------------------------------------------------------

    def _tick_replay(self):
        """
        Advance replay by one tick. Sends the appropriate frame(s) to follower
        based on elapsed time since replay start.
        Returns the follower position after sending (for PUB).
        """
        if not self.replay_frames or self.replay_index >= len(self.replay_frames):
            # Replay finished
            if self.replay_loop and self.replay_frames:
                # Loop: restart from beginning
                self.replay_index = 0
                self.replay_start_time = time.monotonic()
                print(f"[worker] Replay looping ({self.replay_filename})")
            else:
                self.state = "idle"
                print(f"[worker] Replay finished ({self.replay_filename})")
                return self.read_follower()

        elapsed = time.monotonic() - self.replay_start_time
        first_ts = self.replay_frames[0]["ts"]

        # Advance to the frame that matches current elapsed time
        while self.replay_index < len(self.replay_frames):
            frame = self.replay_frames[self.replay_index]
            frame_time = frame["ts"] - first_ts
            if frame_time <= elapsed:
                # Send this frame to follower
                joints = frame.get("joints", {})
                if joints:
                    self.send_follower_action(joints)
                self.replay_index += 1
            else:
                break  # wait for next tick

        # Re-read follower actual position
        return self.read_follower()

    def check_commands(self):
        """Non-blocking check for incoming REP commands."""
        events = dict(self.poller.poll(timeout=0))  # 0 = non-blocking
        if self.rep in events:
            try:
                raw = self.rep.recv_string(zmq.NOBLOCK)
                msg = json.loads(raw)
                response = self.handle_command(msg)
                self.rep.send_string(json.dumps(response))
            except zmq.Again:
                pass
            except Exception as e:
                # Must always send a reply on REP socket
                self.rep.send_string(json.dumps({
                    "status": "error", "reason": str(e)
                }))

    def run(self):
        """Main read loop at configured rate."""
        period = 1.0 / self.args.rate
        seq = 0
        teleop_count = 0
        print(f"[worker] Starting read loop at {self.args.rate} Hz "
              f"(period={period*1000:.1f}ms)")

        while self.running:
            t_start = time.monotonic()

            # --- Replaying mode: advance replay timeline ---
            if self.state == "replaying":
                follower_pos = self._tick_replay()
                leader_pos = self.read_leader()

            else:
                # Read joint states
                follower_pos = self.read_follower()
                leader_pos = self.read_leader()

                # --- Teleop: send leader position to follower ---
                if self.teleop and leader_pos is not None:
                    self.send_follower_action(leader_pos)
                    teleop_count += 1
                    # Re-read follower after sending action to get updated position
                    follower_pos = self.read_follower()

                # --- Recording mode: buffer the current frame ---
                if self.state == "recording" and follower_pos is not None:
                    frame = {
                        "ts": time.monotonic() - self.recording_start_time,
                        "joints": dict(follower_pos),
                    }
                    self.recording_buffer.append(frame)

            # Build and publish message (degrees — bridge node converts to radians)
            msg = {
                "seq": seq,
                "ts": time.time(),
                "state": self.state,
                "teleop": self.teleop,
            }
            if follower_pos is not None:
                msg["follower"] = follower_pos
            if leader_pos is not None:
                msg["leader"] = leader_pos
            # Add recording/replay metadata
            if self.state == "recording":
                msg["recording_frames"] = len(self.recording_buffer)
            elif self.state == "replaying":
                msg["replay_progress"] = {
                    "index": self.replay_index,
                    "total": len(self.replay_frames),
                    "filename": self.replay_filename,
                    "loop": self.replay_loop,
                }

            self.pub.send_string(json.dumps(msg))
            seq += 1

            # Check for commands (non-blocking)
            self.check_commands()

            # Rate limiting
            elapsed = time.monotonic() - t_start
            sleep_time = period - elapsed
            if sleep_time > 0:
                time.sleep(sleep_time)

    def shutdown(self):
        """Clean shutdown."""
        print("\n[worker] Shutting down...")
        self.running = False
        self.state = "idle"

        # Save any in-progress recording
        if self.recording_buffer:
            try:
                self._handle_stop_record()
                print("[worker] Saved in-progress recording before shutdown.")
            except Exception:
                pass

        if self.follower is not None:
            try:
                self.follower.disconnect()
                print("[worker] Follower disconnected.")
            except Exception:
                pass

        if self.leader is not None:
            try:
                self.leader.disconnect()
                print("[worker] Leader disconnected.")
            except Exception:
                pass

        self.pub.close()
        self.rep.close()
        self.ctx.term()
        print("[worker] ZMQ cleaned up. Bye.")


def main():
    args = build_args()
    worker = LeRobotWorker(args)

    # Graceful shutdown on Ctrl+C
    def sig_handler(sig, frame):
        worker.shutdown()
        sys.exit(0)

    signal.signal(signal.SIGINT, sig_handler)
    signal.signal(signal.SIGTERM, sig_handler)

    try:
        worker.connect_robots()
        worker.run()
    except Exception:
        traceback.print_exc()
        worker.shutdown()
        sys.exit(1)


if __name__ == "__main__":
    main()

변경 사항

  • start_teleop / stop_teleop 명령 추가
  • start_record는 teleop이 이미 켜져있어야 동작 (아니면 에러)
  • stop_record는 teleop을 유지
  • PUB 메시지에 "teleop": true/false 포함

 

 

ros_bridge_node.py

더보기
#!/usr/bin/env python3
"""
ros_bridge_node.py — ZMQ ↔ ROS2 bridge node

Runs in .venv-ros-bridge (Python 3.10) with rclpy + pyzmq.
Subscribes to joint states from lerobot_worker via ZMQ,
converts degrees→radians, and publishes sensor_msgs/JointState.
Also subscribes to /follower_joint_commands and forwards to worker via ZMQ REQ.

Relays record/replay/estop commands:
  - /robot_command (std_msgs/String) → ZMQ REQ → worker
  - Worker state published on /robot_status (std_msgs/String) every tick

Usage:
    export PATH=$(echo $PATH | tr ':' '\\n' | grep -v miniforge | tr '\\n' ':' | sed 's/:$//')
    cd ~/UnrealRobotics
    source .venv-ros-bridge/bin/activate
    source /opt/ros/humble/setup.bash
    python src/lerobot_ros2_bridge/lerobot_ros2_bridge/ros_bridge_node.py \\
        [--sub-addr tcp://127.0.0.1:5555] \\
        [--req-addr tcp://127.0.0.1:5556]
"""

import argparse
import json
import math
import sys
import traceback

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from std_msgs.msg import String
import zmq


# ---------------------------------------------------------------------------
# Joint names — must match URDF, LeRobot calibration, and worker output
# (Phase 3.2 verified: all 6 names identical across all three sources)
# ---------------------------------------------------------------------------
JOINT_NAMES = [
    "shoulder_pan",
    "shoulder_lift",
    "elbow_flex",
    "wrist_flex",
    "wrist_roll",
    "gripper",
]


def deg2rad(deg: float) -> float:
    return deg * math.pi / 180.0


def rad2deg(rad: float) -> float:
    return rad * 180.0 / math.pi


class RosBridgeNode(Node):
    def __init__(self, args):
        super().__init__("lerobot_ros2_bridge")
        self.get_logger().info("Initializing lerobot_ros2_bridge node")

        # --- ZMQ setup ---
        self.zmq_ctx = zmq.Context()

        # SUB: receive joint states from worker
        self.zmq_sub = self.zmq_ctx.socket(zmq.SUB)
        self.zmq_sub.connect(args.sub_addr)
        self.zmq_sub.setsockopt_string(zmq.SUBSCRIBE, "")
        # Don't block waiting for messages — use RCVTIMEO
        self.zmq_sub.setsockopt(zmq.RCVTIMEO, 0)
        self.get_logger().info(f"ZMQ SUB connected to {args.sub_addr}")

        # REQ: send commands to worker
        self.zmq_req = self.zmq_ctx.socket(zmq.REQ)
        self.zmq_req.connect(args.req_addr)
        self.zmq_req.setsockopt(zmq.RCVTIMEO, 1000)  # 1s timeout for replies
        self.get_logger().info(f"ZMQ REQ connected to {args.req_addr}")

        # --- ROS2 publishers ---
        # Follower joint states (for robot_state_publisher + rosbridge → Unreal)
        self.follower_pub = self.create_publisher(JointState, "/joint_states", 10)
        # Leader joint states (separate topic)
        self.leader_pub = self.create_publisher(JointState, "/leader_joint_states", 10)

        # --- ROS2 subscriber for commands from Unreal (via rosbridge) ---
        self.cmd_sub = self.create_subscription(
            JointState,
            "/follower_joint_commands",
            self.on_joint_command,
            10,
        )

        # --- Record/Replay/E-Stop command relay ---
        # Unreal publishes JSON command strings to /robot_command,
        # bridge relays them to worker via ZMQ REQ.
        self.robot_cmd_sub = self.create_subscription(
            String,
            "/robot_command",
            self.on_robot_command,
            10,
        )
        # Worker state feedback → Unreal
        self.robot_status_pub = self.create_publisher(String, "/robot_status", 10)
        self.last_worker_state = ""
        self.last_worker_teleop = None

        # --- Timer: poll ZMQ at ~100Hz (faster than worker's 30Hz to avoid buffering) ---
        self.poll_timer = self.create_timer(0.01, self.poll_zmq)

        # Stats
        self.recv_count = 0
        self.last_log_time = self.get_clock().now()

    def poll_zmq(self):
        """Non-blocking poll of ZMQ SUB for worker messages."""
        # Drain all available messages (in case we're slightly behind)
        messages_this_tick = 0
        while messages_this_tick < 5:  # cap per tick to avoid starving ROS callbacks
            try:
                raw = self.zmq_sub.recv_string(zmq.NOBLOCK)
            except zmq.Again:
                break

            messages_this_tick += 1
            self.recv_count += 1

            try:
                msg = json.loads(raw)
            except json.JSONDecodeError as e:
                self.get_logger().warn(f"JSON parse error: {e}")
                continue

            now = self.get_clock().now().to_msg()

            # Publish follower joint states
            if "follower" in msg:
                js = JointState()
                js.header.stamp = now
                js.header.frame_id = ""
                js.name = list(JOINT_NAMES)
                js.position = [
                    deg2rad(msg["follower"].get(name, 0.0))
                    for name in JOINT_NAMES
                ]
                # velocity and effort left empty (not available from LeRobot API)
                self.follower_pub.publish(js)

            # Publish leader joint states
            if "leader" in msg:
                js = JointState()
                js.header.stamp = now
                js.header.frame_id = ""
                js.name = list(JOINT_NAMES)
                js.position = [
                    deg2rad(msg["leader"].get(name, 0.0))
                    for name in JOINT_NAMES
                ]
                self.leader_pub.publish(js)

            # Publish worker state to /robot_status (only on change)
            worker_state = msg.get("state", "idle")
            worker_teleop = msg.get("teleop", False)
            state_changed = (worker_state != self.last_worker_state or
                             worker_teleop != self.last_worker_teleop)
            if state_changed:
                status_msg = String()
                status_data = {"state": worker_state, "teleop": worker_teleop}
                # Include recording/replay metadata if present
                if "recording_frames" in msg:
                    status_data["recording_frames"] = msg["recording_frames"]
                if "replay_progress" in msg:
                    status_data["replay_progress"] = msg["replay_progress"]
                status_msg.data = json.dumps(status_data)
                self.robot_status_pub.publish(status_msg)
                self.last_worker_state = worker_state
                self.last_worker_teleop = worker_teleop
                self.get_logger().info(
                    f"Worker state: {worker_state}, teleop: {worker_teleop}")

        # Periodic logging (every 10 seconds)
        now_time = self.get_clock().now()
        if (now_time - self.last_log_time).nanoseconds > 10_000_000_000:
            self.get_logger().info(
                f"ZMQ recv total: {self.recv_count} messages"
            )
            self.last_log_time = now_time

    def on_joint_command(self, msg: JointState):
        """
        Receive joint commands from ROS2 (e.g. from Unreal via rosbridge),
        convert radians→degrees, and forward to worker via ZMQ REQ.
        """
        if len(msg.name) == 0 or len(msg.position) == 0:
            self.get_logger().warn("Empty joint command received, ignoring")
            return

        # Build command dict (degrees)
        joint_args = {}
        for name, pos_rad in zip(msg.name, msg.position):
            if name in JOINT_NAMES:
                joint_args[name] = rad2deg(pos_rad)

        if not joint_args:
            self.get_logger().warn("No recognized joint names in command")
            return

        cmd = {
            "cmd": "send_follower_action",
            "args": joint_args,
        }

        try:
            self.zmq_req.send_string(json.dumps(cmd))
            reply_raw = self.zmq_req.recv_string()
            reply = json.loads(reply_raw)
            if reply.get("status") != "ok":
                self.get_logger().warn(
                    f"Worker command error: {reply.get('reason', 'unknown')}"
                )
        except zmq.Again:
            self.get_logger().error("Worker REQ timeout — is lerobot_worker running?")
        except Exception as e:
            self.get_logger().error(f"Worker command exception: {e}")

    def on_robot_command(self, msg: String):
        """
        Receive record/replay/estop commands from Unreal (via rosbridge),
        relay to worker via ZMQ REQ, and publish response on /robot_status.

        Expected msg.data formats:
            '{"cmd": "start_record"}'
            '{"cmd": "stop_record"}'
            '{"cmd": "start_replay", "args": {"filename": "...", "loop": false}}'
            '{"cmd": "start_replay"}'                    (plays most recent)
            '{"cmd": "stop_replay"}'
            '{"cmd": "estop"}'
            '{"cmd": "list_recordings"}'
        """
        try:
            cmd = json.loads(msg.data)
        except json.JSONDecodeError:
            # Support simple string commands: "estop", "start_record", etc.
            cmd = {"cmd": msg.data.strip()}

        cmd_name = cmd.get("cmd", "")
        self.get_logger().info(f"Robot command received: {cmd_name}")

        try:
            self.zmq_req.send_string(json.dumps(cmd))
            reply_raw = self.zmq_req.recv_string()
            reply = json.loads(reply_raw)

            # Publish response on /robot_status
            status_msg = String()
            status_msg.data = reply_raw
            self.robot_status_pub.publish(status_msg)

            status = reply.get("status", "unknown")
            if status == "ok":
                new_state = reply.get("state", "")
                if new_state:
                    self.last_worker_state = new_state
                self.get_logger().info(f"Command '{cmd_name}' OK: {reply}")
            else:
                self.get_logger().warn(
                    f"Command '{cmd_name}' error: {reply.get('reason', 'unknown')}"
                )
        except zmq.Again:
            self.get_logger().error(
                f"Worker REQ timeout for '{cmd_name}' — is lerobot_worker running?"
            )
            # Publish timeout error on /robot_status
            err_msg = String()
            err_msg.data = json.dumps({
                "status": "error", "reason": "worker timeout", "cmd": cmd_name
            })
            self.robot_status_pub.publish(err_msg)
        except Exception as e:
            self.get_logger().error(f"Robot command exception: {e}")
            err_msg = String()
            err_msg.data = json.dumps({
                "status": "error", "reason": str(e), "cmd": cmd_name
            })
            self.robot_status_pub.publish(err_msg)

    def destroy_node(self):
        """Clean shutdown of ZMQ resources."""
        self.get_logger().info("Shutting down ZMQ...")
        self.zmq_sub.close()
        self.zmq_req.close()
        self.zmq_ctx.term()
        super().destroy_node()


def main():
    parser = argparse.ArgumentParser(description="ROS2 ↔ ZMQ bridge node")
    parser.add_argument("--sub-addr", default="tcp://127.0.0.1:5555",
                        help="ZMQ SUB address (worker PUB)")
    parser.add_argument("--req-addr", default="tcp://127.0.0.1:5556",
                        help="ZMQ REQ address (worker REP)")
    # ROS2 may pass extra args — use parse_known_args
    args, unknown = parser.parse_known_args()

    rclpy.init(args=unknown if unknown else None)
    node = RosBridgeNode(args)

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == "__main__":
    main()

 

변경 사항

  • teleop 상태 변화도 /robot_status에 발행

 

 

RobotVisualizer.h

더보기
#pragma once

#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "RobotVisualizer.generated.h"

class UStaticMesh;
class UStaticMeshComponent;
class USceneComponent;
class URosBridgeSubsystem;

/**
 * Visualizes the SO-ARM-101 follower arm in Unreal Engine and provides
 * MoveIt command interface via rosbridge.
 *
 * The component hierarchy mirrors the URDF link/joint structure:
 *   BaseLink -> ShoulderPanJoint -> ShoulderLink -> ShoulderLiftJoint -> ...
 *
 * Each "Joint" SceneComponent is where the ROS joint angle gets applied
 * as a local Z-axis rotation. All child links/meshes rotate with it.
 *
 * Phase 8 additions:
 *   - SendNamedTarget(): publish to /moveit_goal_named (std_msgs/String)
 *   - SendJointGoal(): publish to /moveit_goal_joints (sensor_msgs/JointState)
 *   - SendPoseGoal(): publish to /moveit_goal_pose (geometry_msgs/PoseStamped)
 *   - Blueprint-callable + editor-testable via UPROPERTY buttons
 *
 * Phase 9 additions (Record/Replay/E-Stop):
 *   - StartRecord(): begin teleop recording on worker
 *   - StopRecord(): stop recording, save trajectory
 *   - StartReplay(): replay most recent (or named) recording
 *   - StopReplay(): stop replay
 *   - EStop(): emergency stop all motion
 *   - All commands publish JSON to /robot_command topic
 *   - Worker state feedback via /robot_status subscription
 */
UCLASS()
class SO101_TWIN_API ARobotVisualizer : public AActor
{
	GENERATED_BODY()

public:
	ARobotVisualizer();

protected:
	virtual void BeginPlay() override;
	virtual void EndPlay(const EEndPlayReason::Type EndPlayReason) override;

	// =================================================================
	// Configuration
	// =================================================================

	UPROPERTY(EditAnywhere, Category = "ROS|Bridge")
	FString RosBridgeUrl = TEXT("ws://127.0.0.1:9090/?x=1");

	UPROPERTY(EditAnywhere, Category = "ROS|Topics")
	FString JointStateTopic = TEXT("/joint_states");

	UPROPERTY(EditAnywhere, Category = "ROS|Topics")
	FString JointStateType = TEXT("sensor_msgs/JointState");

	// =================================================================
	// MoveIt Command Interface (Phase 8)
	// =================================================================

	// --- Named target ---

	/** Named target to send (e.g. "home", "ready"). Set in Details panel, then call SendNamedTarget(). */
	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt")
	FString MoveItNamedTarget = TEXT("home");

	/** Send the named target to MoveIt via /moveit_goal_named topic. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|MoveIt")
	void SendNamedTarget();

	// --- Joint goal ---

	/** Joint goal values in radians. Set in Details panel, then call SendJointGoal(). */
	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalShoulderPan = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalShoulderLift = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalElbowFlex = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalWristFlex = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalWristRoll = 0.0f;

	/** Send joint goal to MoveIt via /moveit_goal_joints topic. Values are in radians. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|MoveIt|Joints")
	void SendJointGoal();

	// --- Pose goal (Cartesian, position-only for 5DOF) ---

	/** Target position in UE coordinates (cm). Converted to ROS (meters) on send. */
	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Pose")
	FVector GoalPositionUE = FVector(10.0f, 0.0f, 15.0f);

	/** Send position-only goal to MoveIt via /moveit_goal_pose topic.
	 *  GoalPositionUE is in Unreal cm, auto-converted to ROS meters with Y flip. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|MoveIt|Pose")
	void SendPoseGoal();

	// =================================================================
	// Teleop Sync (Phase 9)
	// =================================================================

	/** Activate leader→follower sync (teleop). Must be ON before recording. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Sync")
	void SyncOn();

	/** Deactivate leader→follower sync. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Sync")
	void SyncOff();

	/** Whether teleop (sync) is currently active. Updated from /robot_status. */
	UPROPERTY(VisibleAnywhere, Category = "ROS|Status")
	bool bSyncActive = false;

	// =================================================================
	// Record / Replay / E-Stop (Phase 9)
	// =================================================================

	/** Start recording: activates teleop on worker, buffers joint trajectory. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Record")
	void StartRecord();

	/** Stop recording: saves trajectory to file on worker. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Record")
	void StopRecord();

	/** Replay filename (empty = most recent recording). */
	UPROPERTY(EditAnywhere, Category = "ROS|Replay")
	FString ReplayFilename;

	/** Whether to loop the replay continuously. */
	UPROPERTY(EditAnywhere, Category = "ROS|Replay")
	bool bReplayLoop = false;

	/** Start replaying a recorded trajectory on the follower arm. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Replay")
	void StartReplay();

	/** Stop replay immediately. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Replay")
	void StopReplay();

	/** Emergency stop: abort ALL motion immediately (recording, replay, teleop). */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Safety")
	void EStop();

	/** Current worker state (idle/recording/replaying). Updated from /robot_status. */
	UPROPERTY(VisibleAnywhere, Category = "ROS|Status")
	FString WorkerState = TEXT("unknown");

private:
	// =================================================================
	// Component hierarchy
	// =================================================================

	UPROPERTY(VisibleAnywhere, Category = "Robot")
	TObjectPtr<USceneComponent> RobotRoot;

	// Link SceneComponents
	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> BaseLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> ShoulderLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> UpperArmLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> LowerArmLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> WristLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> GripperLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> MovingJawLink;

	// Joint SceneComponents
	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ShoulderPanJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ShoulderLiftJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ElbowFlexJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> WristFlexJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> WristRollJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> GripperJoint;

	// Joint name -> component mapping
	UPROPERTY()
	TMap<FName, TObjectPtr<USceneComponent>> JointComponentMap;

	// Mesh components
	UPROPERTY()
	TArray<TObjectPtr<UStaticMeshComponent>> AllMeshComponents;

	// =================================================================
	// ROS connection
	// =================================================================

	UFUNCTION()
	void OnRosBridgeConnected();

	UFUNCTION()
	void OnRosMessage(const FString& Topic, const FString& MessageJson);

	void ParseAndApplyJointStates(const FString& MessageJson);

	// =================================================================
	// MoveIt publish helpers
	// =================================================================

	/** Advertise MoveIt command topics. Called once on connect. */
	void AdvertiseMoveItTopics();

	/** Whether MoveIt topics have been advertised in this connection session. */
	bool bMoveItTopicsAdvertised = false;

	// =================================================================
	// Record / Replay / E-Stop helpers
	// =================================================================

	/** Advertise /robot_command and subscribe /robot_status. Called once on connect. */
	void SetupRecordReplayTopics();

	/** Whether record/replay topics have been set up. */
	bool bRecordReplayTopicsSetup = false;

	/** Send a JSON command to /robot_command topic. */
	void PublishRobotCommand(const FString& JsonCmd);

	/** Handle /robot_status messages from the bridge node. */
	UFUNCTION()
	void OnRobotStatus(const FString& Topic, const FString& MessageJson);

	// =================================================================
	// Helpers (declared in original header, kept for compatibility)
	// =================================================================

	USceneComponent* CreateJointComponent(const FName& Name, USceneComponent* Parent,
		const FVector& Location, const FRotator& Rotation);

	USceneComponent* CreateLinkComponent(const FName& Name, USceneComponent* Parent);

	UStaticMeshComponent* AttachMesh(USceneComponent* Parent, UStaticMesh* Mesh,
		const FName& Name, const FVector& Location, const FRotator& Rotation,
		bool bIsMotor);
};

 

 

RobotVisualizer.cpp

더보기
#include "RobotVisualizer.h"
#include "RosCoordConv.h"
#include "RosBridgeSubsystem.h"
#include "RosBridgeLog.h"

#include "Components/SceneComponent.h"
#include "Components/StaticMeshComponent.h"
#include "Engine/StaticMesh.h"
#include "Engine/Engine.h"
#include "Engine/World.h"
#include "Kismet/GameplayStatics.h"
#include "Dom/JsonObject.h"
#include "Dom/JsonValue.h"
#include "Serialization/JsonReader.h"
#include "Serialization/JsonSerializer.h"
#include "Serialization/JsonWriter.h"
#include "UObject/ConstructorHelpers.h"

// =============================================================================
// Mesh asset path helper
// =============================================================================

static UStaticMesh* LoadMeshAsset(const TCHAR* AssetName)
{
	// All meshes live under /Game/Robot/Meshes/
	FString Path = FString::Printf(TEXT("/Game/Robot/Meshes/%s.%s"), AssetName, AssetName);
	UStaticMesh* Mesh = Cast<UStaticMesh>(StaticLoadObject(UStaticMesh::StaticClass(), nullptr, *Path));
	if (!Mesh)
	{
		UE_LOG(LogRosBridge, Warning, TEXT("Failed to load mesh: %s"), *Path);
	}
	return Mesh;
}

// =============================================================================
// Constructor — build the entire component hierarchy
// =============================================================================

ARobotVisualizer::ARobotVisualizer()
{
	PrimaryActorTick.bCanEverTick = false;

	// --- Root ---
	RobotRoot = CreateDefaultSubobject<USceneComponent>(TEXT("RobotRoot"));
	RootComponent = RobotRoot;

	// =========================================================================
	// URDF data converted to UE coordinates:
	//   Position: meters * 100 = cm, Y flipped
	//   Rotation: RPY radians -> FRotator degrees, pitch & yaw negated
	//
	// All values below are pre-computed from so101_follower.urdf.
	// =========================================================================

	// --- base_link (attached directly to root, no joint) ---
	BaseLink = CreateDefaultSubobject<USceneComponent>(TEXT("BaseLink"));
	BaseLink->SetupAttachment(RobotRoot);

	// --- shoulder_pan joint ---
	// URDF origin: xyz(0.0388353, 0, 0.0624) rpy(3.14159, 0, -3.14159)
	ShoulderPanJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderPanJoint"));
	ShoulderPanJoint->SetupAttachment(BaseLink);
	ShoulderPanJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0388353, 0.0, 0.0624));
	ShoulderPanJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159));

	ShoulderLink = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLink"));
	ShoulderLink->SetupAttachment(ShoulderPanJoint);

	// --- shoulder_lift joint ---
	// URDF origin: xyz(-0.0303992, -0.0182778, -0.0542) rpy(-1.5708, -1.5708, 0)
	ShoulderLiftJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLiftJoint"));
	ShoulderLiftJoint->SetupAttachment(ShoulderLink);
	ShoulderLiftJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.0303992, -0.0182778, -0.0542));
	ShoulderLiftJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0));

	UpperArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("UpperArmLink"));
	UpperArmLink->SetupAttachment(ShoulderLiftJoint);

	// --- elbow_flex joint ---
	// URDF origin: xyz(-0.11257, -0.028, 0) rpy(0, 0, 1.5708)
	ElbowFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ElbowFlexJoint"));
	ElbowFlexJoint->SetupAttachment(UpperArmLink);
	ElbowFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.11257, -0.028, 0.0));
	ElbowFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708));

	LowerArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("LowerArmLink"));
	LowerArmLink->SetupAttachment(ElbowFlexJoint);

	// --- wrist_flex joint ---
	// URDF origin: xyz(-0.1349, 0.0052, 0) rpy(0, 0, -1.5708)
	WristFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristFlexJoint"));
	WristFlexJoint->SetupAttachment(LowerArmLink);
	WristFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.1349, 0.0052, 0.0));
	WristFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708));

	WristLink = CreateDefaultSubobject<USceneComponent>(TEXT("WristLink"));
	WristLink->SetupAttachment(WristFlexJoint);

	// --- wrist_roll joint ---
	// URDF origin: xyz(0, -0.0611, 0.0181) rpy(1.5708, 0.0486795, 3.14159)
	WristRollJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristRollJoint"));
	WristRollJoint->SetupAttachment(WristLink);
	WristRollJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0, -0.0611, 0.0181));
	WristRollJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159));

	GripperLink = CreateDefaultSubobject<USceneComponent>(TEXT("GripperLink"));
	GripperLink->SetupAttachment(WristRollJoint);

	// --- gripper joint ---
	// URDF origin: xyz(0.0202, 0.0188, -0.0234) rpy(1.5708, 0, 0)
	GripperJoint = CreateDefaultSubobject<USceneComponent>(TEXT("GripperJoint"));
	GripperJoint->SetupAttachment(GripperLink);
	GripperJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0202, 0.0188, -0.0234));
	GripperJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0));

	MovingJawLink = CreateDefaultSubobject<USceneComponent>(TEXT("MovingJawLink"));
	MovingJawLink->SetupAttachment(GripperJoint);

	// --- Joint name mapping (matches ROS /joint_states names) ---
	JointComponentMap.Add(FName("shoulder_pan"),  ShoulderPanJoint);
	JointComponentMap.Add(FName("shoulder_lift"), ShoulderLiftJoint);
	JointComponentMap.Add(FName("elbow_flex"),    ElbowFlexJoint);
	JointComponentMap.Add(FName("wrist_flex"),    WristFlexJoint);
	JointComponentMap.Add(FName("wrist_roll"),    WristRollJoint);
	JointComponentMap.Add(FName("gripper"),       GripperJoint);
}

// =============================================================================
// BeginPlay — load meshes and attach, connect to ROS
// =============================================================================

void ARobotVisualizer::BeginPlay()
{
	Super::BeginPlay();

	// --- Load meshes and attach to links ---
	// Meshes are loaded at runtime (not in constructor) because
	// StaticLoadObject is safer to call here and allows hot-reload.

	// Helper lambda to reduce repetition
	auto Attach = [this](USceneComponent* Parent, const TCHAR* MeshName,
		double RosX, double RosY, double RosZ,
		double RosRoll, double RosPitch, double RosYaw,
		bool bIsMotor = false)
	{
		UStaticMesh* Mesh = LoadMeshAsset(MeshName);
		if (!Mesh) return;

		UStaticMeshComponent* SMC = NewObject<UStaticMeshComponent>(this);
		SMC->SetStaticMesh(Mesh);
		SMC->SetRelativeLocation(RosCoordConv::RosToUePosition(RosX, RosY, RosZ));
		SMC->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(RosRoll, RosPitch, RosYaw));
		SMC->SetCollisionEnabled(ECollisionEnabled::NoCollision);
		SMC->AttachToComponent(Parent, FAttachmentTransformRules::KeepRelativeTransform);
		SMC->RegisterComponent();
		AllMeshComponents.Add(SMC);
	};

	// === base_link meshes ===
	Attach(BaseLink, TEXT("base_motor_holder_so101_v1"),
		-0.00636471, -0.0000994414, -0.0024,
		1.5708, 0.0, 1.5708);
	Attach(BaseLink, TEXT("base_so101_v2"),
		-0.00636471, 0.0, -0.0024,
		1.5708, 0.0, 1.5708);
	Attach(BaseLink, TEXT("sts3215_03a_v1"),
		0.0263353, 0.0, 0.0437,
		0.0, 0.0, 0.0, true);
	Attach(BaseLink, TEXT("waveshare_mounting_plate_so101_v2"),
		-0.0309827, -0.000199441, 0.0474,
		1.5708, 0.0, 1.5708);

	// === shoulder_link meshes ===
	Attach(ShoulderLink, TEXT("sts3215_03a_v1"),
		-0.0303992, 0.000422241, -0.0417,
		1.5708, 1.5708, 0.0, true);
	Attach(ShoulderLink, TEXT("motor_holder_so101_base_v1"),
		-0.0675992, -0.000177759, 0.0158499,
		1.5708, -1.5708, 0.0);
	Attach(ShoulderLink, TEXT("rotation_pitch_so101_v1"),
		0.0122008, 0.0000222413, 0.0464,
		-1.5708, 0.0, 0.0);

	// === upper_arm_link meshes ===
	Attach(UpperArmLink, TEXT("sts3215_03a_v1"),
		-0.11257, -0.0155, 0.0187,
		-3.14159, 0.0, -1.5708, true);
	Attach(UpperArmLink, TEXT("upper_arm_so101_v1"),
		-0.065085, 0.012, 0.0182,
		3.14159, 0.0, 0.0);

	// === lower_arm_link meshes ===
	Attach(LowerArmLink, TEXT("under_arm_so101_v1"),
		-0.0648499, -0.032, 0.0182,
		3.14159, 0.0, 0.0);
	Attach(LowerArmLink, TEXT("motor_holder_so101_wrist_v1"),
		-0.0648499, -0.032, 0.018,
		-3.14159, 0.0, 0.0);
	Attach(LowerArmLink, TEXT("sts3215_03a_v1"),
		-0.1224, 0.0052, 0.0187,
		-3.14159, 0.0, -3.14159, true);

	// === wrist_link meshes ===
	Attach(WristLink, TEXT("sts3215_03a_no_horn_v1"),
		0.0, -0.0424, 0.0306,
		1.5708, 1.5708, 0.0, true);
	Attach(WristLink, TEXT("wrist_roll_pitch_so101_v2"),
		0.0, -0.028, 0.0181,
		-1.5708, -1.5708, 0.0);

	// === gripper_link meshes ===
	Attach(GripperLink, TEXT("sts3215_03a_v1"),
		0.0077, 0.0001, -0.0234,
		-1.5708, 0.0, 0.0, true);
	Attach(GripperLink, TEXT("wrist_roll_follower_so101_v1"),
		0.0, -0.000218214, 0.000949706,
		-3.14159, 0.0, 0.0);

	// === moving_jaw_link meshes ===
	Attach(MovingJawLink, TEXT("moving_jaw_so101_v1"),
		0.0, 0.0, 0.0189,
		0.0, 0.0, 0.0);

	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: %d mesh components created"), AllMeshComponents.Num());

	// --- Connect to ROS via Subsystem ---
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// Bind delegates.
	Ros->OnTopicMessage.AddDynamic(this, &ARobotVisualizer::OnRosMessage);
	Ros->OnConnected.AddDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);

	// Subscribe is now queued even before connection — the subsystem will
	// send it automatically when connected (including on reconnect).
	Ros->Subscribe(JointStateTopic, JointStateType);

	// Queue MoveIt topic advertisements (sent on connect).
	AdvertiseMoveItTopics();

	// Queue record/replay/estop topics.
	SetupRecordReplayTopics();

	// Initiate connection if not already connected.
	if (!Ros->IsConnected())
	{
		Ros->Connect(RosBridgeUrl);
	}
	else
	{
		// Already connected (e.g. another actor already called Connect).
		UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: already connected, subscription sent."));
	}
}

// =============================================================================
// EndPlay
// =============================================================================

void ARobotVisualizer::EndPlay(const EEndPlayReason::Type EndPlayReason)
{
	if (UGameInstance* GI = UGameplayStatics::GetGameInstance(this))
	{
		if (URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>())
		{
			Ros->OnTopicMessage.RemoveDynamic(this, &ARobotVisualizer::OnRosMessage);
			Ros->OnConnected.RemoveDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);
		}
	}

	bMoveItTopicsAdvertised = false;
	bRecordReplayTopicsSetup = false;

	Super::EndPlay(EndPlayReason);
}

// =============================================================================
// ROS connection callback
// =============================================================================

void ARobotVisualizer::OnRosBridgeConnected()
{
	UE_LOG(LogRosBridge, Log,
		TEXT("RobotVisualizer: rosbridge connected — subscriptions restored by subsystem."));
}

// =============================================================================
// MoveIt topic advertisements
// =============================================================================

void ARobotVisualizer::AdvertiseMoveItTopics()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// These are queued and sent automatically when connected.
	Ros->Advertise(TEXT("/moveit_goal_named"), TEXT("std_msgs/String"));
	Ros->Advertise(TEXT("/moveit_goal_joints"), TEXT("sensor_msgs/JointState"));
	Ros->Advertise(TEXT("/moveit_goal_pose"), TEXT("geometry_msgs/PoseStamped"));

	bMoveItTopicsAdvertised = true;
	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: MoveIt command topics advertised."));
}

// =============================================================================
// MoveIt commands — SendNamedTarget
// =============================================================================

void ARobotVisualizer::SendNamedTarget()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("SendNamedTarget: not connected to rosbridge."));
		return;
	}

	// std_msgs/String: {"data": "home"}
	FString MsgJson = FString::Printf(TEXT("{\"data\":\"%s\"}"), *MoveItNamedTarget);
	Ros->Publish(TEXT("/moveit_goal_named"), MsgJson);

	UE_LOG(LogRosBridge, Log, TEXT("SendNamedTarget: published '%s' to /moveit_goal_named"), *MoveItNamedTarget);
}

// =============================================================================
// MoveIt commands — SendJointGoal
// =============================================================================

void ARobotVisualizer::SendJointGoal()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("SendJointGoal: not connected to rosbridge."));
		return;
	}

	// sensor_msgs/JointState:
	// {
	//   "header": {"stamp": {"sec": 0, "nanosec": 0}, "frame_id": ""},
	//   "name": ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll"],
	//   "position": [0.0, -0.5, 0.5, 0.0, 0.0],
	//   "velocity": [],
	//   "effort": []
	// }

	FString MsgJson = FString::Printf(
		TEXT("{\"header\":{\"stamp\":{\"sec\":0,\"nanosec\":0},\"frame_id\":\"\"},")
		TEXT("\"name\":[\"shoulder_pan\",\"shoulder_lift\",\"elbow_flex\",\"wrist_flex\",\"wrist_roll\"],")
		TEXT("\"position\":[%f,%f,%f,%f,%f],")
		TEXT("\"velocity\":[],\"effort\":[]}"),
		GoalShoulderPan, GoalShoulderLift, GoalElbowFlex, GoalWristFlex, GoalWristRoll
	);

	Ros->Publish(TEXT("/moveit_goal_joints"), MsgJson);

	UE_LOG(LogRosBridge, Log,
		TEXT("SendJointGoal: [%.3f, %.3f, %.3f, %.3f, %.3f] to /moveit_goal_joints"),
		GoalShoulderPan, GoalShoulderLift, GoalElbowFlex, GoalWristFlex, GoalWristRoll);
}

// =============================================================================
// MoveIt commands — SendPoseGoal
// =============================================================================

void ARobotVisualizer::SendPoseGoal()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("SendPoseGoal: not connected to rosbridge."));
		return;
	}

	// Convert UE position (cm, left-handed) to ROS (meters, right-handed)
	double RosX, RosY, RosZ;
	RosCoordConv::UeToRosPosition(GoalPositionUE, RosX, RosY, RosZ);

	// geometry_msgs/PoseStamped (orientation defaults to identity — position-only goal)
	FString MsgJson = FString::Printf(
		TEXT("{\"header\":{\"stamp\":{\"sec\":0,\"nanosec\":0},\"frame_id\":\"base_link\"},")
		TEXT("\"pose\":{\"position\":{\"x\":%f,\"y\":%f,\"z\":%f},")
		TEXT("\"orientation\":{\"x\":0.0,\"y\":0.0,\"z\":0.0,\"w\":1.0}}}"),
		RosX, RosY, RosZ
	);

	Ros->Publish(TEXT("/moveit_goal_pose"), MsgJson);

	UE_LOG(LogRosBridge, Log,
		TEXT("SendPoseGoal: UE(%.1f, %.1f, %.1f)cm -> ROS(%.4f, %.4f, %.4f)m to /moveit_goal_pose"),
		GoalPositionUE.X, GoalPositionUE.Y, GoalPositionUE.Z,
		RosX, RosY, RosZ);
}

// =============================================================================
// Message handling
// =============================================================================

void ARobotVisualizer::OnRosMessage(const FString& Topic, const FString& MessageJson)
{
	if (Topic == JointStateTopic)
	{
		ParseAndApplyJointStates(MessageJson);
	}
	else if (Topic == TEXT("/robot_status"))
	{
		OnRobotStatus(Topic, MessageJson);
	}
}

void ARobotVisualizer::ParseAndApplyJointStates(const FString& MessageJson)
{
	// Parse sensor_msgs/JointState JSON:
	// { "name": ["shoulder_pan", ...], "position": [0.1, ...], ... }

	TSharedPtr<FJsonObject> Json;
	TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(MessageJson);
	if (!FJsonSerializer::Deserialize(Reader, Json) || !Json.IsValid())
	{
		return;
	}

	const TArray<TSharedPtr<FJsonValue>>* NameArray = nullptr;
	const TArray<TSharedPtr<FJsonValue>>* PosArray = nullptr;

	if (!Json->TryGetArrayField(TEXT("name"), NameArray) ||
		!Json->TryGetArrayField(TEXT("position"), PosArray))
	{
		return;
	}

	const int32 Count = FMath::Min(NameArray->Num(), PosArray->Num());
	for (int32 i = 0; i < Count; ++i)
	{
		const FName JointName(*(*NameArray)[i]->AsString());
		const double AngleRad = (*PosArray)[i]->AsNumber();

		TObjectPtr<USceneComponent>* JointComp = JointComponentMap.Find(JointName);
		if (JointComp && *JointComp)
		{
			const float AngleDeg = RosCoordConv::RosJointAngleToUeDegrees(AngleRad);

			FRotator BaseRot;
			if (JointName == FName("shoulder_pan"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159);
			else if (JointName == FName("shoulder_lift"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0);
			else if (JointName == FName("elbow_flex"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708);
			else if (JointName == FName("wrist_flex"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708);
			else if (JointName == FName("wrist_roll"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159);
			else if (JointName == FName("gripper"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0);

			const FQuat BaseQuat = BaseRot.Quaternion();
			const FQuat JointQuat = FQuat(FVector::UpVector, FMath::DegreesToRadians(AngleDeg));
			const FQuat FinalQuat = BaseQuat * JointQuat;

			(*JointComp)->SetRelativeRotation(FinalQuat.Rotator());
		}
	}
}

// =============================================================================
// Record / Replay / E-Stop — Topic setup
// =============================================================================

void ARobotVisualizer::SetupRecordReplayTopics()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// Advertise the command topic
	Ros->Advertise(TEXT("/robot_command"), TEXT("std_msgs/String"));

	// Subscribe to status feedback
	Ros->Subscribe(TEXT("/robot_status"), TEXT("std_msgs/String"));

	bRecordReplayTopicsSetup = true;
	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: Record/Replay topics set up."));
}

// =============================================================================
// Record / Replay / E-Stop — Command publisher helper
// =============================================================================

void ARobotVisualizer::PublishRobotCommand(const FString& JsonCmd)
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("PublishRobotCommand: not connected to rosbridge."));
		if (GEngine)
		{
			GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Red,
				TEXT("Robot: Not connected to rosbridge!"));
		}
		return;
	}

	// std_msgs/String: {"data": "<json_cmd>"}
	// The JSON command is nested inside the "data" field, with quotes escaped.
	FString EscapedCmd = JsonCmd.Replace(TEXT("\""), TEXT("\\\""));
	FString MsgJson = FString::Printf(TEXT("{\"data\":\"%s\"}"), *EscapedCmd);
	Ros->Publish(TEXT("/robot_command"), MsgJson);

	UE_LOG(LogRosBridge, Log, TEXT("PublishRobotCommand: %s"), *JsonCmd);
}

// =============================================================================
// Record / Replay / E-Stop — Button handlers
// =============================================================================

void ARobotVisualizer::StartRecord()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"start_record\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Green,
			TEXT("Robot: Recording started (teleop active)"));
	}
}

void ARobotVisualizer::StopRecord()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"stop_record\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Yellow,
			TEXT("Robot: Recording stopped, saving..."));
	}
}

void ARobotVisualizer::StartReplay()
{
	FString ArgsJson;
	if (ReplayFilename.IsEmpty())
	{
		// No filename = most recent recording
		ArgsJson = FString::Printf(TEXT("{\"cmd\":\"start_replay\",\"args\":{\"loop\":%s}}"),
			bReplayLoop ? TEXT("true") : TEXT("false"));
	}
	else
	{
		ArgsJson = FString::Printf(
			TEXT("{\"cmd\":\"start_replay\",\"args\":{\"filename\":\"%s\",\"loop\":%s}}"),
			*ReplayFilename,
			bReplayLoop ? TEXT("true") : TEXT("false"));
	}
	PublishRobotCommand(ArgsJson);

	if (GEngine)
	{
		FString DisplayName = ReplayFilename.IsEmpty() ? TEXT("(most recent)") : ReplayFilename;
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Cyan,
			FString::Printf(TEXT("Robot: Replaying %s (loop=%s)"),
				*DisplayName, bReplayLoop ? TEXT("yes") : TEXT("no")));
	}
}

void ARobotVisualizer::StopReplay()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"stop_replay\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Yellow,
			TEXT("Robot: Replay stopped"));
	}
}

void ARobotVisualizer::EStop()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"estop\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 5.0f, FColor::Red,
			TEXT("*** E-STOP *** All motion halted"));
	}
}

// =============================================================================
// Teleop Sync — SyncOn / SyncOff
// =============================================================================

void ARobotVisualizer::SyncOn()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"start_teleop\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Green,
			TEXT("Sync ON: leader -> follower active"));
	}
}

void ARobotVisualizer::SyncOff()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"stop_teleop\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Yellow,
			TEXT("Sync OFF: leader -> follower deactivated"));
	}
}

// =============================================================================
// Record / Replay — Status feedback handler
// =============================================================================

void ARobotVisualizer::OnRobotStatus(const FString& Topic, const FString& MessageJson)
{
	// rosbridge wraps std_msgs/String as: {"data": "..."}
	TSharedPtr<FJsonObject> OuterJson;
	TSharedRef<TJsonReader<>> OuterReader = TJsonReaderFactory<>::Create(MessageJson);
	if (!FJsonSerializer::Deserialize(OuterReader, OuterJson) || !OuterJson.IsValid())
	{
		return;
	}

	FString DataStr;
	if (!OuterJson->TryGetStringField(TEXT("data"), DataStr))
	{
		return;
	}

	// Parse the inner JSON status
	TSharedPtr<FJsonObject> StatusJson;
	TSharedRef<TJsonReader<>> StatusReader = TJsonReaderFactory<>::Create(DataStr);
	if (!FJsonSerializer::Deserialize(StatusReader, StatusJson) || !StatusJson.IsValid())
	{
		return;
	}

	FString State;
	if (StatusJson->TryGetStringField(TEXT("state"), State))
	{
		if (State != WorkerState)
		{
			WorkerState = State;
			UE_LOG(LogRosBridge, Log, TEXT("Worker state: %s"), *WorkerState);

			if (GEngine)
			{
				FColor Color = FColor::White;
				if (State == TEXT("recording")) Color = FColor::Green;
				else if (State == TEXT("replaying")) Color = FColor::Cyan;
				else if (State == TEXT("idle")) Color = FColor::Silver;

				GEngine->AddOnScreenDebugMessage(-1, 3.0f, Color,
					FString::Printf(TEXT("Robot state: %s"), *WorkerState));
			}
		}
	}

	// Update sync (teleop) status
	bool bTeleop = false;
	if (StatusJson->TryGetBoolField(TEXT("teleop"), bTeleop))
	{
		if (bTeleop != bSyncActive)
		{
			bSyncActive = bTeleop;
			UE_LOG(LogRosBridge, Log, TEXT("Sync (teleop): %s"),
				bSyncActive ? TEXT("ON") : TEXT("OFF"));
		}
	}

	// Log errors from commands
	FString Status;
	if (StatusJson->TryGetStringField(TEXT("status"), Status) && Status == TEXT("error"))
	{
		FString Reason;
		StatusJson->TryGetStringField(TEXT("reason"), Reason);
		UE_LOG(LogRosBridge, Warning, TEXT("Robot command error: %s"), *Reason);

		if (GEngine)
		{
			GEngine->AddOnScreenDebugMessage(-1, 5.0f, FColor::Red,
				FString::Printf(TEXT("Robot error: %s"), *Reason));
		}
	}

	// Log recording saved info
	FString Filename;
	if (StatusJson->TryGetStringField(TEXT("filename"), Filename) && !Filename.IsEmpty())
	{
		int32 Frames = 0;
		StatusJson->TryGetNumberField(TEXT("frames"), Frames);
		double Duration = 0.0;
		StatusJson->TryGetNumberField(TEXT("duration_sec"), Duration);

		UE_LOG(LogRosBridge, Log, TEXT("Recording saved: %s (%d frames, %.1fs)"),
			*Filename, Frames, Duration);

		if (GEngine)
		{
			GEngine->AddOnScreenDebugMessage(-1, 5.0f, FColor::Green,
				FString::Printf(TEXT("Recording saved: %s (%d frames, %.1fs)"),
					*Filename, Frames, Duration));
		}
	}
}

Unreal 변경 사항

  • Details 패널에 ROS|Sync 카테고리: SyncOn, SyncOff 버튼
  • ROS|Status에 bSyncActive (현재 Sync 상태 표시)

 

 


 

 

4-2단계: 수정 후 테스트

언리얼 프로젝트 빌드를 다시 한다.

[Generate Visual Studio project files], 솔루션 빌드

빌드 성공 확인

 

 

터미널 1 (conda - worker)

source ~/miniforge3/etc/profile.d/conda.sh
conda activate lerobot
sudo chmod 666 /dev/ttyACM*
cd ~/UnrealRobotics/src/lerobot_ros2_bridge
python scripts/lerobot_worker.py --cmd-joints all --cmd-limit 120

 

 

터미널 2 (ROS2 - bridge node)

# 파일 수정 후 재빌드 필요
cd ~/UnrealRobotics
colcon build --packages-select lerobot_ros2_bridge --symlink-install
export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
source ~/UnrealRobotics/install/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 run lerobot_ros2_bridge bridge_node

 

 

터미널 3 (ROS2 - rosbridge)

export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
source ~/UnrealRobotics/install/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 launch rosbridge_server rosbridge_websocket_launch.xml

 

 

언리얼 열어서 플레이

리더암을 잡고 record한 대로 움직이는데, 처음 포지션과 마지막 포지션이 맞지 않으면 loop를 돌 때 팍 튀는 현상을 볼 수 있었다.

이 부분은 처음 포지션과 목표 포지션 사이의 보간으로 해결한다. UPROPERTY로 노출해서 Details 패널에서 얼마나 천천히 보간 할지를 설정할 수 있게 한다.

 

 


 

 

4-3단계: 현재위치 - 첫 프레임 위치 보간(interpolation)

lerobot_worker.py

더보기
#!/usr/bin/env python3
"""
lerobot_worker.py — LeRobot ↔ ZMQ bridge worker

Runs in conda env 'lerobot' (Python 3.12).
Reads joint states from SO-ARM-101 via LeRobot API at ~30Hz,
publishes them over ZMQ PUB, and accepts commands via ZMQ REP.

Supports record/replay of joint trajectories via ZMQ commands:
  - start_teleop: activate leader→follower sync (teleop)
  - stop_teleop:  deactivate teleop sync
  - start_record: begin teleop + record joint frames
  - stop_record:  stop recording, save to ~/recordings/ (teleop stays on)
  - start_replay: replay a saved recording on the follower arm (teleop off)
  - stop_replay:  stop replay immediately
  - estop:        emergency stop — abort all motion immediately
  - list_recordings: list saved recording files

Usage:
    conda activate lerobot
    sudo chmod 666 /dev/ttyACM*
    python scripts/lerobot_worker.py [--follower-port /dev/ttyACM0] [--leader-port /dev/ttyACM1]
                                      [--pub-addr tcp://127.0.0.1:5555]
                                      [--rep-addr tcp://127.0.0.1:5556]
                                      [--rate 30]
                                      [--follower-only | --leader-only]
                                      [--teleop]
                                      [--recordings-dir ~/recordings]
"""

import argparse
import json
import math
import os
import pathlib
import signal
import sys
import time
import traceback
from datetime import datetime

import zmq


# ---------------------------------------------------------------------------
# Joint name ordering (matches URDF & LeRobot calibration — Phase 3.2 verified)
# ---------------------------------------------------------------------------
JOINT_NAMES = [
    "shoulder_pan",
    "shoulder_lift",
    "elbow_flex",
    "wrist_flex",
    "wrist_roll",
    "gripper",
]

# ---------------------------------------------------------------------------
# Joint limits in LeRobot DEGREES (empirically measured via calibration).
# These are absolute physical limits — commands are never allowed beyond these.
# Measured by reading get_observation() degrees at calibration range_min/max
# raw encoder positions, using two-point linear interpolation.
# NOTE: These are in LeRobot's coordinate frame (0° = calibration midpoint),
#       NOT URDF coordinates (which use a different zero point).
# ---------------------------------------------------------------------------
JOINT_LIMITS_DEG = {
    "shoulder_pan":  (-119.91, +119.91),
    "shoulder_lift": (-104.62, +104.62),
    "elbow_flex":    ( -97.01,  +97.01),
    "wrist_flex":    (-102.68, +102.68),
    "wrist_roll":    (-180.00, +180.00),
    "gripper":       (  +0.14,  +99.45),
}


def build_args():
    p = argparse.ArgumentParser(description="LeRobot ZMQ worker for SO-ARM-101")
    p.add_argument("--follower-port", default="/dev/ttyACM0",
                    help="Serial port for follower arm")
    p.add_argument("--leader-port", default="/dev/ttyACM1",
                    help="Serial port for leader arm")
    p.add_argument("--follower-id", default="so101_twin_follower",
                    help="Calibration ID for follower")
    p.add_argument("--leader-id", default="so101_twin_leader",
                    help="Calibration ID for leader")
    p.add_argument("--pub-addr", default="tcp://127.0.0.1:5555",
                    help="ZMQ PUB bind address for joint states")
    p.add_argument("--rep-addr", default="tcp://127.0.0.1:5556",
                    help="ZMQ REP bind address for commands")
    p.add_argument("--rate", type=float, default=30.0,
                    help="Read loop frequency in Hz")
    p.add_argument("--follower-only", action="store_true",
                    help="Only connect follower (no leader)")
    p.add_argument("--leader-only", action="store_true",
                    help="Only connect leader (no follower)")
    p.add_argument("--teleop", action="store_true",
                    help="Teleoperation mode: leader arm controls follower arm")
    p.add_argument("--cmd-limit", type=float, default=5.0,
                    help="Safety clamp: max degrees from initial position per joint (default 5.0)")
    p.add_argument("--cmd-joints", default="shoulder_pan",
                    help="Comma-separated joint names that accept commands "
                         "(default: shoulder_pan). Use 'all' for all joints.")
    p.add_argument("--recordings-dir", default="~/recordings",
                    help="Directory to save/load recorded trajectories")
    p.add_argument("--approach-speed", type=float, default=45.0,
                    help="Max degrees/sec for approach moves before replay "
                         "and loop transitions (default: 45.0)")
    return p.parse_args()


class LeRobotWorker:
    def __init__(self, args):
        self.args = args
        self.running = True
        self.follower = None
        self.leader = None
        self.teleop = args.teleop

        # Safety clamp: initial follower position recorded at startup,
        # commands are clamped to ±cmd_limit degrees from this baseline.
        # Only joints listed in allowed_cmd_joints accept commands;
        # others are silently ignored (held at current position).
        self.follower_initial_pos = None       # dict: {joint_name: degrees}
        self.cmd_limit_deg = args.cmd_limit    # default 5.0°
        self.allowed_cmd_joints = (
            set(JOINT_NAMES) if args.cmd_joints.strip().lower() == "all"
            else set(j.strip() for j in args.cmd_joints.split(",") if j.strip())
        )

        # ---------------------------------------------------------------------------
        # Record / Replay state
        # ---------------------------------------------------------------------------
        # state: "idle", "recording", "replaying"
        self.state = "idle"

        # Recording buffer: list of {"ts": float, "joints": {name: deg, ...}}
        self.recording_buffer = []
        self.recording_start_time = None

        # Replay state
        self.replay_frames = []        # loaded frames from file
        self.replay_index = 0          # current frame index
        self.replay_start_time = None  # monotonic time when replay started
        self.replay_loop = False       # whether to loop replay
        self.replay_filename = ""      # currently playing file

        # Approach (smooth transition) state
        # "approaching" phase runs before replay and between loop iterations
        self.approaching = False         # whether we're in approach phase
        self.approach_start_pos = {}     # joint positions at start of approach
        self.approach_target_pos = {}    # target joint positions (first frame)
        self.approach_start_time = None  # monotonic time when approach started
        self.approach_duration = 0.0     # calculated duration in seconds
        self.approach_speed = args.approach_speed  # degrees per second

        # Recordings directory
        self.recordings_dir = pathlib.Path(os.path.expanduser(args.recordings_dir))
        self.recordings_dir.mkdir(parents=True, exist_ok=True)
        print(f"[worker] Recordings dir: {self.recordings_dir}")

        # ZMQ setup
        self.ctx = zmq.Context()

        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.bind(args.pub_addr)
        print(f"[worker] PUB bound on {args.pub_addr}")

        self.rep = self.ctx.socket(zmq.REP)
        self.rep.bind(args.rep_addr)
        print(f"[worker] REP bound on {args.rep_addr}")

        # Poller for non-blocking REP check inside the read loop
        self.poller = zmq.Poller()
        self.poller.register(self.rep, zmq.POLLIN)

    def connect_robots(self):
        """Connect to follower and/or leader via LeRobot API."""
        if not self.args.leader_only:
            print(f"[worker] Connecting follower on {self.args.follower_port} "
                  f"(id={self.args.follower_id}) ...")
            from lerobot.robots.so_follower import SO101Follower, SO101FollowerConfig
            cfg = SO101FollowerConfig(
                port=self.args.follower_port,
                id=self.args.follower_id,
            )
            self.follower = SO101Follower(cfg)
            self.follower.connect()
            print("[worker] Follower connected.")

        if not self.args.follower_only:
            print(f"[worker] Connecting leader on {self.args.leader_port} "
                  f"(id={self.args.leader_id}) ...")
            from lerobot.teleoperators.so_leader import SOLeader, SO101LeaderConfig
            cfg = SO101LeaderConfig(
                port=self.args.leader_port,
                id=self.args.leader_id,
            )
            self.leader = SOLeader(cfg)
            self.leader.connect()
            print("[worker] Leader connected.")

        # Validate teleop requirements
        if self.teleop:
            if self.follower is None or self.leader is None:
                print("[worker] ERROR: --teleop requires both follower and leader connected.")
                print("[worker] Cannot use --teleop with --follower-only or --leader-only.")
                sys.exit(1)
            print("[worker] *** TELEOP MODE ENABLED ***")
            print("[worker] Leader arm will control follower arm in real-time.")
            print("[worker] Move leader arm to starting position, then press Enter to begin...")
            input()
            print("[worker] Teleop active!")

        # Record initial follower position for safety clamping
        if self.follower is not None:
            init_pos = self.read_follower()

            # Load calibration JSON to display physical range info
            calib_range = self._load_calibration_range("follower")

            if init_pos is not None:
                self.follower_initial_pos = dict(init_pos)

                # Print calibration range info
                if calib_range:
                    print("[worker] === CALIBRATION RANGE (raw encoder values) ===")
                    for name in JOINT_NAMES:
                        if name in calib_range:
                            rmin, rmax, cur_raw = calib_range[name]
                            total = rmax - rmin
                            if cur_raw is not None and total > 0:
                                pct = (cur_raw - rmin) / total * 100.0
                                print(f"  {name:20s}: raw {cur_raw:4d}  "
                                      f"range [{rmin:4d} ~ {rmax:4d}]  "
                                      f"({pct:5.1f}% of range)")
                            else:
                                print(f"  {name:20s}: raw  ???  "
                                      f"range [{rmin:4d} ~ {rmax:4d}]")

                print("[worker] === SAFETY CLAMP BASELINE (follower initial position) ===")
                for name in JOINT_NAMES:
                    v = init_pos.get(name, 0.0)
                    soft_lo = v - self.cmd_limit_deg
                    soft_hi = v + self.cmd_limit_deg
                    hard_lo, hard_hi = JOINT_LIMITS_DEG.get(name, (-180.0, 180.0))
                    eff_lo = max(soft_lo, hard_lo)
                    eff_hi = min(soft_hi, hard_hi)
                    marker = " <-- CMD OK" if name in self.allowed_cmd_joints else ""
                    print(f"  {name:20s}: {v:+8.2f}°  "
                          f"effective: [{eff_lo:+8.2f}° ~ {eff_hi:+8.2f}°]  "
                          f"(physical: [{hard_lo:+8.2f}°~{hard_hi:+8.2f}°]){marker}")
                print(f"[worker] Allowed command joints: {sorted(self.allowed_cmd_joints)}")
                print(f"[worker] Clamp limit: ±{self.cmd_limit_deg}° from baseline")
            else:
                print("[worker] WARNING: Could not read initial follower position. "
                      "Commands will be REJECTED until baseline is set.")

    def _load_calibration_range(self, arm_type="follower"):
        """
        Load calibration JSON and read current raw positions.
        Returns dict: {joint_name: (range_min, range_max, current_raw)} or None.
        """
        if arm_type == "follower":
            calib_path = (
                pathlib.Path.home()
                / ".cache/huggingface/lerobot/calibration/robots/so_follower"
                / f"{self.args.follower_id}.json"
            )
            robot = self.follower
        else:
            calib_path = (
                pathlib.Path.home()
                / ".cache/huggingface/lerobot/calibration/teleoperators/so_leader"
                / f"{self.args.leader_id}.json"
            )
            robot = self.leader

        try:
            with open(calib_path) as f:
                calib = json.load(f)
            print(f"[worker] Loaded calibration from {calib_path}")

            # Read current raw positions (without normalization)
            current_raw = {}
            if robot is not None:
                try:
                    raw = robot.bus.sync_read("Present_Position", normalize=False)
                    current_raw = dict(raw)
                except Exception as e:
                    print(f"[worker] Could not read raw positions: {e}")

            result = {}
            for name in JOINT_NAMES:
                if name in calib:
                    rmin = calib[name].get("range_min", 0)
                    rmax = calib[name].get("range_max", 4095)
                    cur = current_raw.get(name, None)
                    result[name] = (rmin, rmax, cur)

            return result

        except FileNotFoundError:
            print(f"[worker] Calibration file not found: {calib_path}")
            return None
        except Exception as e:
            print(f"[worker] Error loading calibration: {e}")
            return None

    def read_follower(self):
        """Read follower joint positions in degrees. Returns dict or None."""
        if self.follower is None:
            return None
        try:
            obs = self.follower.get_observation()
            return {name: float(obs[f"{name}.pos"]) for name in JOINT_NAMES}
        except Exception as e:
            print(f"[worker] Follower read error: {e}")
            return None

    def read_leader(self):
        """Read leader joint positions in degrees. Returns dict or None."""
        if self.leader is None:
            return None
        try:
            action = self.leader.get_action()
            return {name: float(action[f"{name}.pos"]) for name in JOINT_NAMES}
        except Exception as e:
            print(f"[worker] Leader read error: {e}")
            return None

    def send_follower_action(self, joint_positions_deg):
        """
        Send joint positions (degrees) to follower via LeRobot send_action().
        joint_positions_deg: dict like {"shoulder_pan": 10.0, ...}
        """
        if self.follower is None:
            return
        try:
            action_dict = {f"{name}.pos": float(joint_positions_deg[name])
                           for name in JOINT_NAMES if name in joint_positions_deg}
            self.follower.send_action(action_dict)
        except Exception as e:
            print(f"[worker] Follower send_action error: {e}")

    def handle_command(self, msg):
        """
        Process a command from the ROS bridge node.
        Expected format:
            {"cmd": "send_follower_action", "args": {"shoulder_pan": 0.0, ...}}
            {"cmd": "ping"}
            {"cmd": "start_teleop"}
            {"cmd": "stop_teleop"}
            {"cmd": "start_record"}
            {"cmd": "stop_record"}
            {"cmd": "start_replay", "args": {"filename": "...", "loop": false}}
            {"cmd": "stop_replay"}
            {"cmd": "estop"}
            {"cmd": "list_recordings"}
        Returns a response dict.
        """
        cmd = msg.get("cmd", "")

        if cmd == "ping":
            return {"status": "ok", "state": self.state,
                    "teleop": self.teleop, "ts": time.time()}

        elif cmd == "estop":
            return self._handle_estop()

        elif cmd == "start_teleop":
            return self._handle_start_teleop()

        elif cmd == "stop_teleop":
            return self._handle_stop_teleop()

        elif cmd == "start_record":
            return self._handle_start_record()

        elif cmd == "stop_record":
            return self._handle_stop_record()

        elif cmd == "start_replay":
            args = msg.get("args", {})
            return self._handle_start_replay(args)

        elif cmd == "stop_replay":
            return self._handle_stop_replay()

        elif cmd == "list_recordings":
            return self._handle_list_recordings()

        elif cmd == "send_follower_action":
            if self.follower is None:
                return {"status": "error", "reason": "follower not connected"}
            if self.follower_initial_pos is None:
                return {"status": "error", "reason": "safety baseline not set (no initial position)"}
            joint_args = msg.get("args", {})
            if not joint_args:
                return {"status": "error", "reason": "empty args"}
            try:
                # Two-layer safety clamp:
                #   Layer 1: baseline ± cmd_limit (session safety)
                #   Layer 2: JOINT_LIMITS_DEG (absolute physical limits, measured empirically)
                clamped = {}
                rejected = []
                for name, target_deg in joint_args.items():
                    if name not in JOINT_NAMES:
                        continue
                    if name not in self.allowed_cmd_joints:
                        rejected.append(name)
                        continue

                    target = float(target_deg)

                    # Layer 1: baseline ± cmd_limit
                    baseline = self.follower_initial_pos.get(name, 0.0)
                    soft_lo = baseline - self.cmd_limit_deg
                    soft_hi = baseline + self.cmd_limit_deg

                    # Layer 2: absolute physical limits
                    hard_lo, hard_hi = JOINT_LIMITS_DEG.get(name, (-180.0, 180.0))

                    # Effective range = intersection of both
                    eff_lo = max(soft_lo, hard_lo)
                    eff_hi = min(soft_hi, hard_hi)

                    clamped_val = max(eff_lo, min(eff_hi, target))
                    clamped[name] = clamped_val

                    if abs(clamped_val - target) > 0.01:
                        reason = ""
                        if target < hard_lo or target > hard_hi:
                            reason = " [PHYSICAL LIMIT]"
                        else:
                            reason = " [BASELINE LIMIT]"
                        print(f"[worker] CLAMP {name}: "
                              f"requested {target:.2f}° → clamped to {clamped_val:.2f}° "
                              f"(effective: {eff_lo:.2f}° ~ {eff_hi:.2f}°){reason}")

                if not clamped:
                    return {"status": "error",
                            "reason": f"no allowed joints in command "
                                      f"(rejected: {rejected}, allowed: {sorted(self.allowed_cmd_joints)})"}

                action_dict = {f"{name}.pos": val for name, val in clamped.items()}
                self.follower.send_action(action_dict)
                resp = {"status": "ok", "clamped": clamped}
                if rejected:
                    resp["rejected_joints"] = rejected
                return resp
            except Exception as e:
                return {"status": "error", "reason": str(e)}

        else:
            return {"status": "error", "reason": f"unknown cmd: {cmd}"}

    # -----------------------------------------------------------------------
    # Teleop (Sync) handlers
    # -----------------------------------------------------------------------

    def _handle_start_teleop(self):
        """Activate leader→follower sync (teleop) with smooth approach."""
        if self.state == "replaying":
            return {"status": "error",
                    "reason": "cannot start teleop during replay — stop replay first"}
        if self.follower is None or self.leader is None:
            return {"status": "error",
                    "reason": "teleop requires both follower and leader connected"}
        if self.teleop:
            return {"status": "ok", "state": self.state, "teleop": True,
                    "msg": "teleop already active"}
        if self.state == "syncing":
            return {"status": "ok", "state": self.state, "teleop": False,
                    "msg": "already approaching leader position"}

        # Read leader's current position as approach target
        leader_pos = self.read_leader()
        if leader_pos is None:
            # Can't read leader — just turn on teleop directly
            self.teleop = True
            print("[worker] Teleop (sync) ON — leader → follower (no approach)")
            return {"status": "ok", "state": self.state, "teleop": True}

        # Start approach to leader position, then activate teleop
        self.state = "syncing"
        self._start_approach(leader_pos)
        if not self.approaching:
            # Already close enough — activate teleop immediately
            self.state = "idle"
            self.teleop = True
            print("[worker] Teleop (sync) ON — already aligned")
            return {"status": "ok", "state": self.state, "teleop": True}

        print("[worker] Syncing: approaching leader position before teleop ON")
        return {"status": "ok", "state": "syncing", "teleop": False,
                "approach_sec": round(self.approach_duration, 2)}

    def _handle_stop_teleop(self):
        """Deactivate leader→follower sync (teleop)."""
        if self.state == "recording":
            return {"status": "error",
                    "reason": "cannot stop teleop during recording — stop recording first"}
        # Cancel syncing approach if in progress
        if self.state == "syncing":
            self.approaching = False
            self.state = "idle"
            print("[worker] Syncing cancelled")
        self.teleop = False
        print("[worker] Teleop (sync) OFF")
        return {"status": "ok", "state": self.state, "teleop": False}

    # -----------------------------------------------------------------------
    # Record / Replay / E-Stop handlers
    # -----------------------------------------------------------------------

    def _handle_estop(self):
        """Emergency stop: abort any ongoing motion, return to idle."""
        prev_state = self.state
        prev_teleop = self.teleop
        self.state = "idle"
        self.teleop = False
        self.recording_buffer = []
        self.replay_frames = []
        self.replay_index = 0
        self.approaching = False
        print(f"[worker] *** E-STOP *** (was: state={prev_state}, teleop={prev_teleop})")
        return {"status": "ok", "state": "idle", "teleop": False,
                "previous_state": prev_state}

    def _handle_start_record(self):
        """Start recording: teleop must already be active (Sync On first)."""
        if self.state != "idle":
            return {"status": "error",
                    "reason": f"cannot start recording in state '{self.state}'"}
        if self.follower is None or self.leader is None:
            return {"status": "error",
                    "reason": "recording requires both follower and leader connected"}
        if not self.teleop:
            return {"status": "error",
                    "reason": "teleop (sync) must be active before recording — "
                              "press Sync On first"}

        self.recording_buffer = []
        self.recording_start_time = time.monotonic()
        self.state = "recording"
        print("[worker] *** RECORDING STARTED *** (teleop already active)")
        return {"status": "ok", "state": "recording", "teleop": True}

    def _handle_stop_record(self):
        """Stop recording: save buffer to file. Teleop stays ON."""
        if self.state != "recording":
            return {"status": "error",
                    "reason": f"not recording (state: '{self.state}')"}

        num_frames = len(self.recording_buffer)
        if num_frames == 0:
            self.state = "idle"
            # Teleop stays ON — user controls it via Sync button
            print("[worker] Recording stopped — 0 frames, nothing saved.")
            return {"status": "ok", "state": "idle", "teleop": self.teleop,
                    "frames": 0, "filename": None}

        # Calculate duration from frame timestamps
        duration = self.recording_buffer[-1]["ts"] - self.recording_buffer[0]["ts"]

        # Build recording file
        timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"recording_{timestamp_str}.json"
        filepath = self.recordings_dir / filename

        recording_data = {
            "version": 1,
            "recorded_at": datetime.now().isoformat(),
            "rate_hz": self.args.rate,
            "num_frames": num_frames,
            "duration_sec": round(duration, 3),
            "joint_names": JOINT_NAMES,
            "frames": self.recording_buffer,
        }

        with open(filepath, "w") as f:
            json.dump(recording_data, f, indent=2)

        self.state = "idle"
        # Teleop stays ON — user controls it via Sync button
        self.recording_buffer = []
        print(f"[worker] Recording saved: {filepath}")
        print(f"[worker]   {num_frames} frames, {duration:.1f}s")
        return {
            "status": "ok",
            "state": "idle",
            "teleop": self.teleop,
            "filename": filename,
            "frames": num_frames,
            "duration_sec": round(duration, 3),
        }

    def _handle_start_replay(self, args):
        """Load a recording file and start replaying on the follower."""
        if self.state != "idle":
            return {"status": "error",
                    "reason": f"cannot start replay in state '{self.state}'"}
        if self.follower is None:
            return {"status": "error",
                    "reason": "replay requires follower connected"}

        filename = args.get("filename", "")
        self.replay_loop = args.get("loop", False)

        # Allow overriding approach speed per command
        speed = args.get("approach_speed", self.approach_speed)

        # If no filename, use the most recent recording
        if not filename:
            recordings = self._get_recording_files()
            if not recordings:
                return {"status": "error", "reason": "no recordings found"}
            filename = recordings[-1]  # most recent (sorted by name = sorted by time)

        filepath = self.recordings_dir / filename
        if not filepath.exists():
            return {"status": "error",
                    "reason": f"file not found: {filename}"}

        try:
            with open(filepath) as f:
                data = json.load(f)
        except Exception as e:
            return {"status": "error", "reason": f"failed to load: {e}"}

        frames = data.get("frames", [])
        if len(frames) < 2:
            return {"status": "error",
                    "reason": f"recording too short ({len(frames)} frames)"}

        self.replay_frames = frames
        self.replay_index = 0
        self.replay_filename = filename
        self.teleop = False  # disable teleop during replay
        self.state = "replaying"

        # Start approach phase: smoothly move from current position to first frame
        first_frame_joints = frames[0].get("joints", {})
        self._start_approach(first_frame_joints, speed)

        duration = data.get("duration_sec", 0)
        print(f"[worker] *** REPLAY STARTED *** {filename}")
        print(f"[worker]   {len(frames)} frames, {duration}s, "
              f"loop={'yes' if self.replay_loop else 'no'}, "
              f"approach={self.approach_duration:.1f}s")
        return {
            "status": "ok",
            "state": "replaying",
            "filename": filename,
            "frames": len(frames),
            "duration_sec": duration,
            "loop": self.replay_loop,
            "approach_sec": round(self.approach_duration, 2),
        }

    def _handle_stop_replay(self):
        """Stop replay immediately."""
        if self.state != "replaying":
            return {"status": "error",
                    "reason": f"not replaying (state: '{self.state}')"}

        self.state = "idle"
        self.replay_frames = []
        self.replay_index = 0
        print(f"[worker] Replay stopped ({self.replay_filename})")
        return {"status": "ok", "state": "idle"}

    def _handle_list_recordings(self):
        """List all saved recording files."""
        files = self._get_recording_files()
        # Add metadata summary for each file
        summaries = []
        for fname in files:
            try:
                with open(self.recordings_dir / fname) as f:
                    data = json.load(f)
                summaries.append({
                    "filename": fname,
                    "frames": data.get("num_frames", 0),
                    "duration_sec": data.get("duration_sec", 0),
                    "recorded_at": data.get("recorded_at", ""),
                })
            except Exception:
                summaries.append({"filename": fname})
        return {"status": "ok", "recordings": summaries}

    def _get_recording_files(self):
        """Get sorted list of recording JSON files."""
        files = sorted(
            f.name for f in self.recordings_dir.glob("recording_*.json")
            if f.is_file()
        )
        return files

    # -----------------------------------------------------------------------
    # Approach (smooth transition) logic
    # -----------------------------------------------------------------------

    def _start_approach(self, target_joints, speed=None):
        """
        Begin a smooth approach from current follower position to target_joints.
        Duration is calculated from the largest joint angle difference
        divided by speed (degrees/sec).
        """
        if speed is None:
            speed = self.approach_speed

        current = self.read_follower()
        if current is None:
            # Can't read current position — skip approach
            self.approaching = False
            self.replay_start_time = time.monotonic()
            return

        self.approach_start_pos = dict(current)
        self.approach_target_pos = dict(target_joints)

        # Calculate duration based on max joint difference
        max_diff = 0.0
        for name in JOINT_NAMES:
            if name in current and name in target_joints:
                diff = abs(target_joints[name] - current[name])
                max_diff = max(max_diff, diff)

        if max_diff < 1.0:
            # Already close enough — skip approach
            self.approaching = False
            self.replay_start_time = time.monotonic()
            return

        self.approach_duration = max_diff / speed
        # Minimum 0.3s, even for small moves
        self.approach_duration = max(self.approach_duration, 0.3)
        self.approach_start_time = time.monotonic()
        self.approaching = True
        print(f"[worker] Approach: {max_diff:.1f}° max diff, "
              f"{self.approach_duration:.1f}s at {speed}°/s")

    def _tick_approach(self):
        """
        Advance the approach interpolation by one tick.
        Returns interpolated joint positions, or None if approach is finished.
        """
        if not self.approaching:
            return None

        elapsed = time.monotonic() - self.approach_start_time
        t = elapsed / self.approach_duration  # 0.0 → 1.0

        if t >= 1.0:
            # Approach finished — send final target
            self.send_follower_action(self.approach_target_pos)
            self.approaching = False
            self.replay_start_time = time.monotonic()
            print("[worker] Approach complete")
            return self.read_follower()

        # Smooth interpolation using ease-in-out (cosine)
        # t=0 → factor=0, t=1 → factor=1, smooth acceleration/deceleration
        factor = (1.0 - math.cos(t * math.pi)) / 2.0

        interpolated = {}
        for name in JOINT_NAMES:
            start_val = self.approach_start_pos.get(name, 0.0)
            end_val = self.approach_target_pos.get(name, 0.0)
            interpolated[name] = start_val + (end_val - start_val) * factor

        self.send_follower_action(interpolated)
        return self.read_follower()

    # -----------------------------------------------------------------------
    # Replay tick — called each iteration of the main loop
    # -----------------------------------------------------------------------

    def _tick_replay(self):
        """
        Advance replay by one tick. Handles approach phase, then frame playback.
        Returns the follower position after sending (for PUB).
        """
        # --- Approach phase: smooth transition to start position ---
        if self.approaching:
            return self._tick_approach()

        # --- Normal replay ---
        if not self.replay_frames or self.replay_index >= len(self.replay_frames):
            # Replay finished
            if self.replay_loop and self.replay_frames:
                # Loop: approach back to first frame before restarting
                first_frame_joints = self.replay_frames[0].get("joints", {})
                self.replay_index = 0
                self._start_approach(first_frame_joints)
                if self.approaching:
                    print(f"[worker] Replay loop — approaching start position")
                    return self.read_follower()
                else:
                    # Already close enough, restart immediately
                    print(f"[worker] Replay looping ({self.replay_filename})")
            else:
                self.state = "idle"
                print(f"[worker] Replay finished ({self.replay_filename})")
                return self.read_follower()

        elapsed = time.monotonic() - self.replay_start_time
        first_ts = self.replay_frames[0]["ts"]

        # Advance to the frame that matches current elapsed time
        while self.replay_index < len(self.replay_frames):
            frame = self.replay_frames[self.replay_index]
            frame_time = frame["ts"] - first_ts
            if frame_time <= elapsed:
                # Send this frame to follower
                joints = frame.get("joints", {})
                if joints:
                    self.send_follower_action(joints)
                self.replay_index += 1
            else:
                break  # wait for next tick

        # Re-read follower actual position
        return self.read_follower()

    def check_commands(self):
        """Non-blocking check for incoming REP commands."""
        events = dict(self.poller.poll(timeout=0))  # 0 = non-blocking
        if self.rep in events:
            try:
                raw = self.rep.recv_string(zmq.NOBLOCK)
                msg = json.loads(raw)
                response = self.handle_command(msg)
                self.rep.send_string(json.dumps(response))
            except zmq.Again:
                pass
            except Exception as e:
                # Must always send a reply on REP socket
                self.rep.send_string(json.dumps({
                    "status": "error", "reason": str(e)
                }))

    def run(self):
        """Main read loop at configured rate."""
        period = 1.0 / self.args.rate
        seq = 0
        teleop_count = 0
        print(f"[worker] Starting read loop at {self.args.rate} Hz "
              f"(period={period*1000:.1f}ms)")

        while self.running:
            t_start = time.monotonic()

            # --- Replaying mode: advance replay timeline ---
            if self.state == "replaying":
                follower_pos = self._tick_replay()
                leader_pos = self.read_leader()

            # --- Syncing mode: approach leader position, then activate teleop ---
            elif self.state == "syncing":
                if self.approaching:
                    follower_pos = self._tick_approach()
                else:
                    # Approach finished — activate teleop
                    self.state = "idle"
                    self.teleop = True
                    print("[worker] Teleop (sync) ON — approach complete")
                    follower_pos = self.read_follower()
                leader_pos = self.read_leader()

            else:
                # Read joint states
                follower_pos = self.read_follower()
                leader_pos = self.read_leader()

                # --- Teleop: send leader position to follower ---
                if self.teleop and leader_pos is not None:
                    self.send_follower_action(leader_pos)
                    teleop_count += 1
                    # Re-read follower after sending action to get updated position
                    follower_pos = self.read_follower()

                # --- Recording mode: buffer the current frame ---
                if self.state == "recording" and follower_pos is not None:
                    frame = {
                        "ts": time.monotonic() - self.recording_start_time,
                        "joints": dict(follower_pos),
                    }
                    self.recording_buffer.append(frame)

            # Build and publish message (degrees — bridge node converts to radians)
            msg = {
                "seq": seq,
                "ts": time.time(),
                "state": self.state,
                "teleop": self.teleop,
            }
            if follower_pos is not None:
                msg["follower"] = follower_pos
            if leader_pos is not None:
                msg["leader"] = leader_pos
            # Add recording/replay metadata
            if self.state == "recording":
                msg["recording_frames"] = len(self.recording_buffer)
            elif self.state == "replaying":
                msg["replay_progress"] = {
                    "index": self.replay_index,
                    "total": len(self.replay_frames),
                    "filename": self.replay_filename,
                    "loop": self.replay_loop,
                    "approaching": self.approaching,
                }

            self.pub.send_string(json.dumps(msg))
            seq += 1

            # Check for commands (non-blocking)
            self.check_commands()

            # Rate limiting
            elapsed = time.monotonic() - t_start
            sleep_time = period - elapsed
            if sleep_time > 0:
                time.sleep(sleep_time)

    def shutdown(self):
        """Clean shutdown."""
        print("\n[worker] Shutting down...")
        self.running = False
        self.state = "idle"

        # Save any in-progress recording
        if self.recording_buffer:
            try:
                self._handle_stop_record()
                print("[worker] Saved in-progress recording before shutdown.")
            except Exception:
                pass

        if self.follower is not None:
            try:
                self.follower.disconnect()
                print("[worker] Follower disconnected.")
            except Exception:
                pass

        if self.leader is not None:
            try:
                self.leader.disconnect()
                print("[worker] Leader disconnected.")
            except Exception:
                pass

        self.pub.close()
        self.rep.close()
        self.ctx.term()
        print("[worker] ZMQ cleaned up. Bye.")


def main():
    args = build_args()
    worker = LeRobotWorker(args)

    # Graceful shutdown on Ctrl+C
    def sig_handler(sig, frame):
        worker.shutdown()
        sys.exit(0)

    signal.signal(signal.SIGINT, sig_handler)
    signal.signal(signal.SIGTERM, sig_handler)

    try:
        worker.connect_robots()
        worker.run()
    except Exception:
        traceback.print_exc()
        worker.shutdown()
        sys.exit(1)


if __name__ == "__main__":
    main()

 

 

ros_bridge_node.py

더보기
#!/usr/bin/env python3
"""
ros_bridge_node.py — ZMQ ↔ ROS2 bridge node

Runs in .venv-ros-bridge (Python 3.10) with rclpy + pyzmq.
Subscribes to joint states from lerobot_worker via ZMQ,
converts degrees→radians, and publishes sensor_msgs/JointState.
Also subscribes to /follower_joint_commands and forwards to worker via ZMQ REQ.

Relays record/replay/estop commands:
  - /robot_command (std_msgs/String) → ZMQ REQ → worker
  - Worker state published on /robot_status (std_msgs/String) every tick

Usage:
    export PATH=$(echo $PATH | tr ':' '\\n' | grep -v miniforge | tr '\\n' ':' | sed 's/:$//')
    cd ~/UnrealRobotics
    source .venv-ros-bridge/bin/activate
    source /opt/ros/humble/setup.bash
    python src/lerobot_ros2_bridge/lerobot_ros2_bridge/ros_bridge_node.py \\
        [--sub-addr tcp://127.0.0.1:5555] \\
        [--req-addr tcp://127.0.0.1:5556]
"""

import argparse
import json
import math
import sys
import traceback

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from std_msgs.msg import String
import zmq


# ---------------------------------------------------------------------------
# Joint names — must match URDF, LeRobot calibration, and worker output
# (Phase 3.2 verified: all 6 names identical across all three sources)
# ---------------------------------------------------------------------------
JOINT_NAMES = [
    "shoulder_pan",
    "shoulder_lift",
    "elbow_flex",
    "wrist_flex",
    "wrist_roll",
    "gripper",
]


def deg2rad(deg: float) -> float:
    return deg * math.pi / 180.0


def rad2deg(rad: float) -> float:
    return rad * 180.0 / math.pi


class RosBridgeNode(Node):
    def __init__(self, args):
        super().__init__("lerobot_ros2_bridge")
        self.get_logger().info("Initializing lerobot_ros2_bridge node")

        # --- ZMQ setup ---
        self.zmq_ctx = zmq.Context()

        # SUB: receive joint states from worker
        self.zmq_sub = self.zmq_ctx.socket(zmq.SUB)
        self.zmq_sub.connect(args.sub_addr)
        self.zmq_sub.setsockopt_string(zmq.SUBSCRIBE, "")
        # Don't block waiting for messages — use RCVTIMEO
        self.zmq_sub.setsockopt(zmq.RCVTIMEO, 0)
        self.get_logger().info(f"ZMQ SUB connected to {args.sub_addr}")

        # REQ: send commands to worker
        self.zmq_req = self.zmq_ctx.socket(zmq.REQ)
        self.zmq_req.connect(args.req_addr)
        self.zmq_req.setsockopt(zmq.RCVTIMEO, 1000)  # 1s timeout for replies
        self.get_logger().info(f"ZMQ REQ connected to {args.req_addr}")

        # --- ROS2 publishers ---
        # Follower joint states (for robot_state_publisher + rosbridge → Unreal)
        self.follower_pub = self.create_publisher(JointState, "/joint_states", 10)
        # Leader joint states (separate topic)
        self.leader_pub = self.create_publisher(JointState, "/leader_joint_states", 10)

        # --- ROS2 subscriber for commands from Unreal (via rosbridge) ---
        self.cmd_sub = self.create_subscription(
            JointState,
            "/follower_joint_commands",
            self.on_joint_command,
            10,
        )

        # --- Record/Replay/E-Stop command relay ---
        # Unreal publishes JSON command strings to /robot_command,
        # bridge relays them to worker via ZMQ REQ.
        self.robot_cmd_sub = self.create_subscription(
            String,
            "/robot_command",
            self.on_robot_command,
            10,
        )
        # Worker state feedback → Unreal
        self.robot_status_pub = self.create_publisher(String, "/robot_status", 10)
        self.last_worker_state = ""
        self.last_worker_teleop = None

        # --- Timer: poll ZMQ at ~100Hz (faster than worker's 30Hz to avoid buffering) ---
        self.poll_timer = self.create_timer(0.01, self.poll_zmq)

        # Stats
        self.recv_count = 0
        self.last_log_time = self.get_clock().now()

    def poll_zmq(self):
        """Non-blocking poll of ZMQ SUB for worker messages."""
        # Drain all available messages (in case we're slightly behind)
        messages_this_tick = 0
        while messages_this_tick < 5:  # cap per tick to avoid starving ROS callbacks
            try:
                raw = self.zmq_sub.recv_string(zmq.NOBLOCK)
            except zmq.Again:
                break

            messages_this_tick += 1
            self.recv_count += 1

            try:
                msg = json.loads(raw)
            except json.JSONDecodeError as e:
                self.get_logger().warn(f"JSON parse error: {e}")
                continue

            now = self.get_clock().now().to_msg()

            # Publish follower joint states
            if "follower" in msg:
                js = JointState()
                js.header.stamp = now
                js.header.frame_id = ""
                js.name = list(JOINT_NAMES)
                js.position = [
                    deg2rad(msg["follower"].get(name, 0.0))
                    for name in JOINT_NAMES
                ]
                # velocity and effort left empty (not available from LeRobot API)
                self.follower_pub.publish(js)

            # Publish leader joint states
            if "leader" in msg:
                js = JointState()
                js.header.stamp = now
                js.header.frame_id = ""
                js.name = list(JOINT_NAMES)
                js.position = [
                    deg2rad(msg["leader"].get(name, 0.0))
                    for name in JOINT_NAMES
                ]
                self.leader_pub.publish(js)

            # Publish worker state to /robot_status (only on change)
            worker_state = msg.get("state", "idle")
            worker_teleop = msg.get("teleop", False)
            state_changed = (worker_state != self.last_worker_state or
                             worker_teleop != self.last_worker_teleop)
            if state_changed:
                status_msg = String()
                status_data = {"state": worker_state, "teleop": worker_teleop}
                # Include recording/replay metadata if present
                if "recording_frames" in msg:
                    status_data["recording_frames"] = msg["recording_frames"]
                if "replay_progress" in msg:
                    status_data["replay_progress"] = msg["replay_progress"]
                status_msg.data = json.dumps(status_data)
                self.robot_status_pub.publish(status_msg)
                self.last_worker_state = worker_state
                self.last_worker_teleop = worker_teleop
                self.get_logger().info(
                    f"Worker state: {worker_state}, teleop: {worker_teleop}")

        # Periodic logging (every 10 seconds)
        now_time = self.get_clock().now()
        if (now_time - self.last_log_time).nanoseconds > 10_000_000_000:
            self.get_logger().info(
                f"ZMQ recv total: {self.recv_count} messages"
            )
            self.last_log_time = now_time

    def on_joint_command(self, msg: JointState):
        """
        Receive joint commands from ROS2 (e.g. from Unreal via rosbridge),
        convert radians→degrees, and forward to worker via ZMQ REQ.
        """
        if len(msg.name) == 0 or len(msg.position) == 0:
            self.get_logger().warn("Empty joint command received, ignoring")
            return

        # Build command dict (degrees)
        joint_args = {}
        for name, pos_rad in zip(msg.name, msg.position):
            if name in JOINT_NAMES:
                joint_args[name] = rad2deg(pos_rad)

        if not joint_args:
            self.get_logger().warn("No recognized joint names in command")
            return

        cmd = {
            "cmd": "send_follower_action",
            "args": joint_args,
        }

        try:
            self.zmq_req.send_string(json.dumps(cmd))
            reply_raw = self.zmq_req.recv_string()
            reply = json.loads(reply_raw)
            if reply.get("status") != "ok":
                self.get_logger().warn(
                    f"Worker command error: {reply.get('reason', 'unknown')}"
                )
        except zmq.Again:
            self.get_logger().error("Worker REQ timeout — is lerobot_worker running?")
        except Exception as e:
            self.get_logger().error(f"Worker command exception: {e}")

    def on_robot_command(self, msg: String):
        """
        Receive record/replay/estop commands from Unreal (via rosbridge),
        relay to worker via ZMQ REQ, and publish response on /robot_status.

        Expected msg.data formats:
            '{"cmd": "start_record"}'
            '{"cmd": "stop_record"}'
            '{"cmd": "start_replay", "args": {"filename": "...", "loop": false}}'
            '{"cmd": "start_replay"}'                    (plays most recent)
            '{"cmd": "stop_replay"}'
            '{"cmd": "estop"}'
            '{"cmd": "list_recordings"}'
        """
        try:
            cmd = json.loads(msg.data)
        except json.JSONDecodeError:
            # Support simple string commands: "estop", "start_record", etc.
            cmd = {"cmd": msg.data.strip()}

        cmd_name = cmd.get("cmd", "")
        self.get_logger().info(f"Robot command received: {cmd_name}")

        try:
            self.zmq_req.send_string(json.dumps(cmd))
            reply_raw = self.zmq_req.recv_string()
            reply = json.loads(reply_raw)

            # Publish response on /robot_status
            status_msg = String()
            status_msg.data = reply_raw
            self.robot_status_pub.publish(status_msg)

            status = reply.get("status", "unknown")
            if status == "ok":
                new_state = reply.get("state", "")
                if new_state:
                    self.last_worker_state = new_state
                self.get_logger().info(f"Command '{cmd_name}' OK: {reply}")
            else:
                self.get_logger().warn(
                    f"Command '{cmd_name}' error: {reply.get('reason', 'unknown')}"
                )
        except zmq.Again:
            self.get_logger().error(
                f"Worker REQ timeout for '{cmd_name}' — is lerobot_worker running?"
            )
            # Publish timeout error on /robot_status
            err_msg = String()
            err_msg.data = json.dumps({
                "status": "error", "reason": "worker timeout", "cmd": cmd_name
            })
            self.robot_status_pub.publish(err_msg)
        except Exception as e:
            self.get_logger().error(f"Robot command exception: {e}")
            err_msg = String()
            err_msg.data = json.dumps({
                "status": "error", "reason": str(e), "cmd": cmd_name
            })
            self.robot_status_pub.publish(err_msg)

    def destroy_node(self):
        """Clean shutdown of ZMQ resources."""
        self.get_logger().info("Shutting down ZMQ...")
        self.zmq_sub.close()
        self.zmq_req.close()
        self.zmq_ctx.term()
        super().destroy_node()


def main():
    parser = argparse.ArgumentParser(description="ROS2 ↔ ZMQ bridge node")
    parser.add_argument("--sub-addr", default="tcp://127.0.0.1:5555",
                        help="ZMQ SUB address (worker PUB)")
    parser.add_argument("--req-addr", default="tcp://127.0.0.1:5556",
                        help="ZMQ REQ address (worker REP)")
    # ROS2 may pass extra args — use parse_known_args
    args, unknown = parser.parse_known_args()

    rclpy.init(args=unknown if unknown else None)
    node = RosBridgeNode(args)

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == "__main__":
    main()

 

 

RobotVisualizer.h

더보기
#pragma once

#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "RobotVisualizer.generated.h"

class UStaticMesh;
class UStaticMeshComponent;
class USceneComponent;
class URosBridgeSubsystem;

/**
 * Visualizes the SO-ARM-101 follower arm in Unreal Engine and provides
 * MoveIt command interface via rosbridge.
 *
 * The component hierarchy mirrors the URDF link/joint structure:
 *   BaseLink -> ShoulderPanJoint -> ShoulderLink -> ShoulderLiftJoint -> ...
 *
 * Each "Joint" SceneComponent is where the ROS joint angle gets applied
 * as a local Z-axis rotation. All child links/meshes rotate with it.
 *
 * Phase 8 additions:
 *   - SendNamedTarget(): publish to /moveit_goal_named (std_msgs/String)
 *   - SendJointGoal(): publish to /moveit_goal_joints (sensor_msgs/JointState)
 *   - SendPoseGoal(): publish to /moveit_goal_pose (geometry_msgs/PoseStamped)
 *   - Blueprint-callable + editor-testable via UPROPERTY buttons
 *
 * Phase 9 additions (Record/Replay/E-Stop):
 *   - StartRecord(): begin teleop recording on worker
 *   - StopRecord(): stop recording, save trajectory
 *   - StartReplay(): replay most recent (or named) recording
 *   - StopReplay(): stop replay
 *   - EStop(): emergency stop all motion
 *   - All commands publish JSON to /robot_command topic
 *   - Worker state feedback via /robot_status subscription
 */
UCLASS()
class SO101_TWIN_API ARobotVisualizer : public AActor
{
	GENERATED_BODY()

public:
	ARobotVisualizer();

protected:
	virtual void BeginPlay() override;
	virtual void EndPlay(const EEndPlayReason::Type EndPlayReason) override;

	// =================================================================
	// Configuration
	// =================================================================

	UPROPERTY(EditAnywhere, Category = "ROS|Bridge")
	FString RosBridgeUrl = TEXT("ws://127.0.0.1:9090/?x=1");

	UPROPERTY(EditAnywhere, Category = "ROS|Topics")
	FString JointStateTopic = TEXT("/joint_states");

	UPROPERTY(EditAnywhere, Category = "ROS|Topics")
	FString JointStateType = TEXT("sensor_msgs/JointState");

	// =================================================================
	// MoveIt Command Interface (Phase 8)
	// =================================================================

	// --- Named target ---

	/** Named target to send (e.g. "home", "ready"). Set in Details panel, then call SendNamedTarget(). */
	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt")
	FString MoveItNamedTarget = TEXT("home");

	/** Send the named target to MoveIt via /moveit_goal_named topic. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|MoveIt")
	void SendNamedTarget();

	// --- Joint goal ---

	/** Joint goal values in radians. Set in Details panel, then call SendJointGoal(). */
	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalShoulderPan = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalShoulderLift = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalElbowFlex = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalWristFlex = 0.0f;

	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Joints")
	float GoalWristRoll = 0.0f;

	/** Send joint goal to MoveIt via /moveit_goal_joints topic. Values are in radians. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|MoveIt|Joints")
	void SendJointGoal();

	// --- Pose goal (Cartesian, position-only for 5DOF) ---

	/** Target position in UE coordinates (cm). Converted to ROS (meters) on send. */
	UPROPERTY(EditAnywhere, Category = "ROS|MoveIt|Pose")
	FVector GoalPositionUE = FVector(10.0f, 0.0f, 15.0f);

	/** Send position-only goal to MoveIt via /moveit_goal_pose topic.
	 *  GoalPositionUE is in Unreal cm, auto-converted to ROS meters with Y flip. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|MoveIt|Pose")
	void SendPoseGoal();

	// =================================================================
	// Teleop Sync (Phase 9)
	// =================================================================

	/** Activate leader→follower sync (teleop). Must be ON before recording. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Sync")
	void SyncOn();

	/** Deactivate leader→follower sync. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Sync")
	void SyncOff();

	/** Whether teleop (sync) is currently active. Updated from /robot_status. */
	UPROPERTY(VisibleAnywhere, Category = "ROS|Status")
	bool bSyncActive = false;

	// =================================================================
	// Record / Replay / E-Stop (Phase 9)
	// =================================================================

	/** Start recording: activates teleop on worker, buffers joint trajectory. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Record")
	void StartRecord();

	/** Stop recording: saves trajectory to file on worker. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Record")
	void StopRecord();

	/** Replay filename (empty = most recent recording). */
	UPROPERTY(EditAnywhere, Category = "ROS|Replay")
	FString ReplayFilename;

	/** Whether to loop the replay continuously. */
	UPROPERTY(EditAnywhere, Category = "ROS|Replay")
	bool bReplayLoop = false;

	/** Approach speed in degrees/sec. Controls how fast the robot moves
	 *  to the start position before replay begins. Lower = smoother. */
	UPROPERTY(EditAnywhere, Category = "ROS|Replay", meta = (ClampMin = "5.0", ClampMax = "300.0"))
	float ApproachSpeed = 45.0f;

	/** Start replaying a recorded trajectory on the follower arm. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Replay")
	void StartReplay();

	/** Stop replay immediately. */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Replay")
	void StopReplay();

	/** Emergency stop: abort ALL motion immediately (recording, replay, teleop). */
	UFUNCTION(BlueprintCallable, CallInEditor, Category = "ROS|Safety")
	void EStop();

	/** Current worker state (idle/recording/replaying). Updated from /robot_status. */
	UPROPERTY(VisibleAnywhere, Category = "ROS|Status")
	FString WorkerState = TEXT("unknown");

private:
	// =================================================================
	// Component hierarchy
	// =================================================================

	UPROPERTY(VisibleAnywhere, Category = "Robot")
	TObjectPtr<USceneComponent> RobotRoot;

	// Link SceneComponents
	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> BaseLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> ShoulderLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> UpperArmLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> LowerArmLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> WristLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> GripperLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> MovingJawLink;

	// Joint SceneComponents
	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ShoulderPanJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ShoulderLiftJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ElbowFlexJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> WristFlexJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> WristRollJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> GripperJoint;

	// Joint name -> component mapping
	UPROPERTY()
	TMap<FName, TObjectPtr<USceneComponent>> JointComponentMap;

	// Mesh components
	UPROPERTY()
	TArray<TObjectPtr<UStaticMeshComponent>> AllMeshComponents;

	// =================================================================
	// ROS connection
	// =================================================================

	UFUNCTION()
	void OnRosBridgeConnected();

	UFUNCTION()
	void OnRosMessage(const FString& Topic, const FString& MessageJson);

	void ParseAndApplyJointStates(const FString& MessageJson);

	// =================================================================
	// MoveIt publish helpers
	// =================================================================

	/** Advertise MoveIt command topics. Called once on connect. */
	void AdvertiseMoveItTopics();

	/** Whether MoveIt topics have been advertised in this connection session. */
	bool bMoveItTopicsAdvertised = false;

	// =================================================================
	// Record / Replay / E-Stop helpers
	// =================================================================

	/** Advertise /robot_command and subscribe /robot_status. Called once on connect. */
	void SetupRecordReplayTopics();

	/** Whether record/replay topics have been set up. */
	bool bRecordReplayTopicsSetup = false;

	/** Send a JSON command to /robot_command topic. */
	void PublishRobotCommand(const FString& JsonCmd);

	/** Handle /robot_status messages from the bridge node. */
	UFUNCTION()
	void OnRobotStatus(const FString& Topic, const FString& MessageJson);

	// =================================================================
	// Helpers (declared in original header, kept for compatibility)
	// =================================================================

	USceneComponent* CreateJointComponent(const FName& Name, USceneComponent* Parent,
		const FVector& Location, const FRotator& Rotation);

	USceneComponent* CreateLinkComponent(const FName& Name, USceneComponent* Parent);

	UStaticMeshComponent* AttachMesh(USceneComponent* Parent, UStaticMesh* Mesh,
		const FName& Name, const FVector& Location, const FRotator& Rotation,
		bool bIsMotor);
};

 

 

RobotVisualizer.cpp

더보기
#include "RobotVisualizer.h"
#include "RosCoordConv.h"
#include "RosBridgeSubsystem.h"
#include "RosBridgeLog.h"

#include "Components/SceneComponent.h"
#include "Components/StaticMeshComponent.h"
#include "Engine/StaticMesh.h"
#include "Engine/Engine.h"
#include "Engine/World.h"
#include "Kismet/GameplayStatics.h"
#include "Dom/JsonObject.h"
#include "Dom/JsonValue.h"
#include "Serialization/JsonReader.h"
#include "Serialization/JsonSerializer.h"
#include "Serialization/JsonWriter.h"
#include "UObject/ConstructorHelpers.h"

// =============================================================================
// Mesh asset path helper
// =============================================================================

static UStaticMesh* LoadMeshAsset(const TCHAR* AssetName)
{
	// All meshes live under /Game/Robot/Meshes/
	FString Path = FString::Printf(TEXT("/Game/Robot/Meshes/%s.%s"), AssetName, AssetName);
	UStaticMesh* Mesh = Cast<UStaticMesh>(StaticLoadObject(UStaticMesh::StaticClass(), nullptr, *Path));
	if (!Mesh)
	{
		UE_LOG(LogRosBridge, Warning, TEXT("Failed to load mesh: %s"), *Path);
	}
	return Mesh;
}

// =============================================================================
// Constructor — build the entire component hierarchy
// =============================================================================

ARobotVisualizer::ARobotVisualizer()
{
	PrimaryActorTick.bCanEverTick = false;

	// --- Root ---
	RobotRoot = CreateDefaultSubobject<USceneComponent>(TEXT("RobotRoot"));
	RootComponent = RobotRoot;

	// =========================================================================
	// URDF data converted to UE coordinates:
	//   Position: meters * 100 = cm, Y flipped
	//   Rotation: RPY radians -> FRotator degrees, pitch & yaw negated
	//
	// All values below are pre-computed from so101_follower.urdf.
	// =========================================================================

	// --- base_link (attached directly to root, no joint) ---
	BaseLink = CreateDefaultSubobject<USceneComponent>(TEXT("BaseLink"));
	BaseLink->SetupAttachment(RobotRoot);

	// --- shoulder_pan joint ---
	// URDF origin: xyz(0.0388353, 0, 0.0624) rpy(3.14159, 0, -3.14159)
	ShoulderPanJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderPanJoint"));
	ShoulderPanJoint->SetupAttachment(BaseLink);
	ShoulderPanJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0388353, 0.0, 0.0624));
	ShoulderPanJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159));

	ShoulderLink = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLink"));
	ShoulderLink->SetupAttachment(ShoulderPanJoint);

	// --- shoulder_lift joint ---
	// URDF origin: xyz(-0.0303992, -0.0182778, -0.0542) rpy(-1.5708, -1.5708, 0)
	ShoulderLiftJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLiftJoint"));
	ShoulderLiftJoint->SetupAttachment(ShoulderLink);
	ShoulderLiftJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.0303992, -0.0182778, -0.0542));
	ShoulderLiftJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0));

	UpperArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("UpperArmLink"));
	UpperArmLink->SetupAttachment(ShoulderLiftJoint);

	// --- elbow_flex joint ---
	// URDF origin: xyz(-0.11257, -0.028, 0) rpy(0, 0, 1.5708)
	ElbowFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ElbowFlexJoint"));
	ElbowFlexJoint->SetupAttachment(UpperArmLink);
	ElbowFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.11257, -0.028, 0.0));
	ElbowFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708));

	LowerArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("LowerArmLink"));
	LowerArmLink->SetupAttachment(ElbowFlexJoint);

	// --- wrist_flex joint ---
	// URDF origin: xyz(-0.1349, 0.0052, 0) rpy(0, 0, -1.5708)
	WristFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristFlexJoint"));
	WristFlexJoint->SetupAttachment(LowerArmLink);
	WristFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.1349, 0.0052, 0.0));
	WristFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708));

	WristLink = CreateDefaultSubobject<USceneComponent>(TEXT("WristLink"));
	WristLink->SetupAttachment(WristFlexJoint);

	// --- wrist_roll joint ---
	// URDF origin: xyz(0, -0.0611, 0.0181) rpy(1.5708, 0.0486795, 3.14159)
	WristRollJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristRollJoint"));
	WristRollJoint->SetupAttachment(WristLink);
	WristRollJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0, -0.0611, 0.0181));
	WristRollJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159));

	GripperLink = CreateDefaultSubobject<USceneComponent>(TEXT("GripperLink"));
	GripperLink->SetupAttachment(WristRollJoint);

	// --- gripper joint ---
	// URDF origin: xyz(0.0202, 0.0188, -0.0234) rpy(1.5708, 0, 0)
	GripperJoint = CreateDefaultSubobject<USceneComponent>(TEXT("GripperJoint"));
	GripperJoint->SetupAttachment(GripperLink);
	GripperJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0202, 0.0188, -0.0234));
	GripperJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0));

	MovingJawLink = CreateDefaultSubobject<USceneComponent>(TEXT("MovingJawLink"));
	MovingJawLink->SetupAttachment(GripperJoint);

	// --- Joint name mapping (matches ROS /joint_states names) ---
	JointComponentMap.Add(FName("shoulder_pan"),  ShoulderPanJoint);
	JointComponentMap.Add(FName("shoulder_lift"), ShoulderLiftJoint);
	JointComponentMap.Add(FName("elbow_flex"),    ElbowFlexJoint);
	JointComponentMap.Add(FName("wrist_flex"),    WristFlexJoint);
	JointComponentMap.Add(FName("wrist_roll"),    WristRollJoint);
	JointComponentMap.Add(FName("gripper"),       GripperJoint);
}

// =============================================================================
// BeginPlay — load meshes and attach, connect to ROS
// =============================================================================

void ARobotVisualizer::BeginPlay()
{
	Super::BeginPlay();

	// --- Load meshes and attach to links ---
	// Meshes are loaded at runtime (not in constructor) because
	// StaticLoadObject is safer to call here and allows hot-reload.

	// Helper lambda to reduce repetition
	auto Attach = [this](USceneComponent* Parent, const TCHAR* MeshName,
		double RosX, double RosY, double RosZ,
		double RosRoll, double RosPitch, double RosYaw,
		bool bIsMotor = false)
	{
		UStaticMesh* Mesh = LoadMeshAsset(MeshName);
		if (!Mesh) return;

		UStaticMeshComponent* SMC = NewObject<UStaticMeshComponent>(this);
		SMC->SetStaticMesh(Mesh);
		SMC->SetRelativeLocation(RosCoordConv::RosToUePosition(RosX, RosY, RosZ));
		SMC->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(RosRoll, RosPitch, RosYaw));
		SMC->SetCollisionEnabled(ECollisionEnabled::NoCollision);
		SMC->AttachToComponent(Parent, FAttachmentTransformRules::KeepRelativeTransform);
		SMC->RegisterComponent();
		AllMeshComponents.Add(SMC);
	};

	// === base_link meshes ===
	Attach(BaseLink, TEXT("base_motor_holder_so101_v1"),
		-0.00636471, -0.0000994414, -0.0024,
		1.5708, 0.0, 1.5708);
	Attach(BaseLink, TEXT("base_so101_v2"),
		-0.00636471, 0.0, -0.0024,
		1.5708, 0.0, 1.5708);
	Attach(BaseLink, TEXT("sts3215_03a_v1"),
		0.0263353, 0.0, 0.0437,
		0.0, 0.0, 0.0, true);
	Attach(BaseLink, TEXT("waveshare_mounting_plate_so101_v2"),
		-0.0309827, -0.000199441, 0.0474,
		1.5708, 0.0, 1.5708);

	// === shoulder_link meshes ===
	Attach(ShoulderLink, TEXT("sts3215_03a_v1"),
		-0.0303992, 0.000422241, -0.0417,
		1.5708, 1.5708, 0.0, true);
	Attach(ShoulderLink, TEXT("motor_holder_so101_base_v1"),
		-0.0675992, -0.000177759, 0.0158499,
		1.5708, -1.5708, 0.0);
	Attach(ShoulderLink, TEXT("rotation_pitch_so101_v1"),
		0.0122008, 0.0000222413, 0.0464,
		-1.5708, 0.0, 0.0);

	// === upper_arm_link meshes ===
	Attach(UpperArmLink, TEXT("sts3215_03a_v1"),
		-0.11257, -0.0155, 0.0187,
		-3.14159, 0.0, -1.5708, true);
	Attach(UpperArmLink, TEXT("upper_arm_so101_v1"),
		-0.065085, 0.012, 0.0182,
		3.14159, 0.0, 0.0);

	// === lower_arm_link meshes ===
	Attach(LowerArmLink, TEXT("under_arm_so101_v1"),
		-0.0648499, -0.032, 0.0182,
		3.14159, 0.0, 0.0);
	Attach(LowerArmLink, TEXT("motor_holder_so101_wrist_v1"),
		-0.0648499, -0.032, 0.018,
		-3.14159, 0.0, 0.0);
	Attach(LowerArmLink, TEXT("sts3215_03a_v1"),
		-0.1224, 0.0052, 0.0187,
		-3.14159, 0.0, -3.14159, true);

	// === wrist_link meshes ===
	Attach(WristLink, TEXT("sts3215_03a_no_horn_v1"),
		0.0, -0.0424, 0.0306,
		1.5708, 1.5708, 0.0, true);
	Attach(WristLink, TEXT("wrist_roll_pitch_so101_v2"),
		0.0, -0.028, 0.0181,
		-1.5708, -1.5708, 0.0);

	// === gripper_link meshes ===
	Attach(GripperLink, TEXT("sts3215_03a_v1"),
		0.0077, 0.0001, -0.0234,
		-1.5708, 0.0, 0.0, true);
	Attach(GripperLink, TEXT("wrist_roll_follower_so101_v1"),
		0.0, -0.000218214, 0.000949706,
		-3.14159, 0.0, 0.0);

	// === moving_jaw_link meshes ===
	Attach(MovingJawLink, TEXT("moving_jaw_so101_v1"),
		0.0, 0.0, 0.0189,
		0.0, 0.0, 0.0);

	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: %d mesh components created"), AllMeshComponents.Num());

	// --- Connect to ROS via Subsystem ---
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// Bind delegates.
	Ros->OnTopicMessage.AddDynamic(this, &ARobotVisualizer::OnRosMessage);
	Ros->OnConnected.AddDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);

	// Subscribe is now queued even before connection — the subsystem will
	// send it automatically when connected (including on reconnect).
	Ros->Subscribe(JointStateTopic, JointStateType);

	// Queue MoveIt topic advertisements (sent on connect).
	AdvertiseMoveItTopics();

	// Queue record/replay/estop topics.
	SetupRecordReplayTopics();

	// Initiate connection if not already connected.
	if (!Ros->IsConnected())
	{
		Ros->Connect(RosBridgeUrl);
	}
	else
	{
		// Already connected (e.g. another actor already called Connect).
		UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: already connected, subscription sent."));
	}
}

// =============================================================================
// EndPlay
// =============================================================================

void ARobotVisualizer::EndPlay(const EEndPlayReason::Type EndPlayReason)
{
	if (UGameInstance* GI = UGameplayStatics::GetGameInstance(this))
	{
		if (URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>())
		{
			Ros->OnTopicMessage.RemoveDynamic(this, &ARobotVisualizer::OnRosMessage);
			Ros->OnConnected.RemoveDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);
		}
	}

	bMoveItTopicsAdvertised = false;
	bRecordReplayTopicsSetup = false;

	Super::EndPlay(EndPlayReason);
}

// =============================================================================
// ROS connection callback
// =============================================================================

void ARobotVisualizer::OnRosBridgeConnected()
{
	UE_LOG(LogRosBridge, Log,
		TEXT("RobotVisualizer: rosbridge connected — subscriptions restored by subsystem."));
}

// =============================================================================
// MoveIt topic advertisements
// =============================================================================

void ARobotVisualizer::AdvertiseMoveItTopics()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// These are queued and sent automatically when connected.
	Ros->Advertise(TEXT("/moveit_goal_named"), TEXT("std_msgs/String"));
	Ros->Advertise(TEXT("/moveit_goal_joints"), TEXT("sensor_msgs/JointState"));
	Ros->Advertise(TEXT("/moveit_goal_pose"), TEXT("geometry_msgs/PoseStamped"));

	bMoveItTopicsAdvertised = true;
	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: MoveIt command topics advertised."));
}

// =============================================================================
// MoveIt commands — SendNamedTarget
// =============================================================================

void ARobotVisualizer::SendNamedTarget()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("SendNamedTarget: not connected to rosbridge."));
		return;
	}

	// std_msgs/String: {"data": "home"}
	FString MsgJson = FString::Printf(TEXT("{\"data\":\"%s\"}"), *MoveItNamedTarget);
	Ros->Publish(TEXT("/moveit_goal_named"), MsgJson);

	UE_LOG(LogRosBridge, Log, TEXT("SendNamedTarget: published '%s' to /moveit_goal_named"), *MoveItNamedTarget);
}

// =============================================================================
// MoveIt commands — SendJointGoal
// =============================================================================

void ARobotVisualizer::SendJointGoal()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("SendJointGoal: not connected to rosbridge."));
		return;
	}

	// sensor_msgs/JointState:
	// {
	//   "header": {"stamp": {"sec": 0, "nanosec": 0}, "frame_id": ""},
	//   "name": ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll"],
	//   "position": [0.0, -0.5, 0.5, 0.0, 0.0],
	//   "velocity": [],
	//   "effort": []
	// }

	FString MsgJson = FString::Printf(
		TEXT("{\"header\":{\"stamp\":{\"sec\":0,\"nanosec\":0},\"frame_id\":\"\"},")
		TEXT("\"name\":[\"shoulder_pan\",\"shoulder_lift\",\"elbow_flex\",\"wrist_flex\",\"wrist_roll\"],")
		TEXT("\"position\":[%f,%f,%f,%f,%f],")
		TEXT("\"velocity\":[],\"effort\":[]}"),
		GoalShoulderPan, GoalShoulderLift, GoalElbowFlex, GoalWristFlex, GoalWristRoll
	);

	Ros->Publish(TEXT("/moveit_goal_joints"), MsgJson);

	UE_LOG(LogRosBridge, Log,
		TEXT("SendJointGoal: [%.3f, %.3f, %.3f, %.3f, %.3f] to /moveit_goal_joints"),
		GoalShoulderPan, GoalShoulderLift, GoalElbowFlex, GoalWristFlex, GoalWristRoll);
}

// =============================================================================
// MoveIt commands — SendPoseGoal
// =============================================================================

void ARobotVisualizer::SendPoseGoal()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("SendPoseGoal: not connected to rosbridge."));
		return;
	}

	// Convert UE position (cm, left-handed) to ROS (meters, right-handed)
	double RosX, RosY, RosZ;
	RosCoordConv::UeToRosPosition(GoalPositionUE, RosX, RosY, RosZ);

	// geometry_msgs/PoseStamped (orientation defaults to identity — position-only goal)
	FString MsgJson = FString::Printf(
		TEXT("{\"header\":{\"stamp\":{\"sec\":0,\"nanosec\":0},\"frame_id\":\"base_link\"},")
		TEXT("\"pose\":{\"position\":{\"x\":%f,\"y\":%f,\"z\":%f},")
		TEXT("\"orientation\":{\"x\":0.0,\"y\":0.0,\"z\":0.0,\"w\":1.0}}}"),
		RosX, RosY, RosZ
	);

	Ros->Publish(TEXT("/moveit_goal_pose"), MsgJson);

	UE_LOG(LogRosBridge, Log,
		TEXT("SendPoseGoal: UE(%.1f, %.1f, %.1f)cm -> ROS(%.4f, %.4f, %.4f)m to /moveit_goal_pose"),
		GoalPositionUE.X, GoalPositionUE.Y, GoalPositionUE.Z,
		RosX, RosY, RosZ);
}

// =============================================================================
// Message handling
// =============================================================================

void ARobotVisualizer::OnRosMessage(const FString& Topic, const FString& MessageJson)
{
	if (Topic == JointStateTopic)
	{
		ParseAndApplyJointStates(MessageJson);
	}
	else if (Topic == TEXT("/robot_status"))
	{
		OnRobotStatus(Topic, MessageJson);
	}
}

void ARobotVisualizer::ParseAndApplyJointStates(const FString& MessageJson)
{
	// Parse sensor_msgs/JointState JSON:
	// { "name": ["shoulder_pan", ...], "position": [0.1, ...], ... }

	TSharedPtr<FJsonObject> Json;
	TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(MessageJson);
	if (!FJsonSerializer::Deserialize(Reader, Json) || !Json.IsValid())
	{
		return;
	}

	const TArray<TSharedPtr<FJsonValue>>* NameArray = nullptr;
	const TArray<TSharedPtr<FJsonValue>>* PosArray = nullptr;

	if (!Json->TryGetArrayField(TEXT("name"), NameArray) ||
		!Json->TryGetArrayField(TEXT("position"), PosArray))
	{
		return;
	}

	const int32 Count = FMath::Min(NameArray->Num(), PosArray->Num());
	for (int32 i = 0; i < Count; ++i)
	{
		const FName JointName(*(*NameArray)[i]->AsString());
		const double AngleRad = (*PosArray)[i]->AsNumber();

		TObjectPtr<USceneComponent>* JointComp = JointComponentMap.Find(JointName);
		if (JointComp && *JointComp)
		{
			const float AngleDeg = RosCoordConv::RosJointAngleToUeDegrees(AngleRad);

			FRotator BaseRot;
			if (JointName == FName("shoulder_pan"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159);
			else if (JointName == FName("shoulder_lift"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0);
			else if (JointName == FName("elbow_flex"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708);
			else if (JointName == FName("wrist_flex"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708);
			else if (JointName == FName("wrist_roll"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159);
			else if (JointName == FName("gripper"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0);

			const FQuat BaseQuat = BaseRot.Quaternion();
			const FQuat JointQuat = FQuat(FVector::UpVector, FMath::DegreesToRadians(AngleDeg));
			const FQuat FinalQuat = BaseQuat * JointQuat;

			(*JointComp)->SetRelativeRotation(FinalQuat.Rotator());
		}
	}
}

// =============================================================================
// Record / Replay / E-Stop — Topic setup
// =============================================================================

void ARobotVisualizer::SetupRecordReplayTopics()
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// Advertise the command topic
	Ros->Advertise(TEXT("/robot_command"), TEXT("std_msgs/String"));

	// Subscribe to status feedback
	Ros->Subscribe(TEXT("/robot_status"), TEXT("std_msgs/String"));

	bRecordReplayTopicsSetup = true;
	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: Record/Replay topics set up."));
}

// =============================================================================
// Record / Replay / E-Stop — Command publisher helper
// =============================================================================

void ARobotVisualizer::PublishRobotCommand(const FString& JsonCmd)
{
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros || !Ros->IsConnected())
	{
		UE_LOG(LogRosBridge, Warning, TEXT("PublishRobotCommand: not connected to rosbridge."));
		if (GEngine)
		{
			GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Red,
				TEXT("Robot: Not connected to rosbridge!"));
		}
		return;
	}

	// std_msgs/String: {"data": "<json_cmd>"}
	// The JSON command is nested inside the "data" field, with quotes escaped.
	FString EscapedCmd = JsonCmd.Replace(TEXT("\""), TEXT("\\\""));
	FString MsgJson = FString::Printf(TEXT("{\"data\":\"%s\"}"), *EscapedCmd);
	Ros->Publish(TEXT("/robot_command"), MsgJson);

	UE_LOG(LogRosBridge, Log, TEXT("PublishRobotCommand: %s"), *JsonCmd);
}

// =============================================================================
// Record / Replay / E-Stop — Button handlers
// =============================================================================

void ARobotVisualizer::StartRecord()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"start_record\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Green,
			TEXT("Robot: Recording started (teleop active)"));
	}
}

void ARobotVisualizer::StopRecord()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"stop_record\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Yellow,
			TEXT("Robot: Recording stopped, saving..."));
	}
}

void ARobotVisualizer::StartReplay()
{
	FString ArgsJson;
	if (ReplayFilename.IsEmpty())
	{
		ArgsJson = FString::Printf(
			TEXT("{\"cmd\":\"start_replay\",\"args\":{\"loop\":%s,\"approach_speed\":%f}}"),
			bReplayLoop ? TEXT("true") : TEXT("false"),
			ApproachSpeed);
	}
	else
	{
		ArgsJson = FString::Printf(
			TEXT("{\"cmd\":\"start_replay\",\"args\":{\"filename\":\"%s\",\"loop\":%s,\"approach_speed\":%f}}"),
			*ReplayFilename,
			bReplayLoop ? TEXT("true") : TEXT("false"),
			ApproachSpeed);
	}
	PublishRobotCommand(ArgsJson);

	if (GEngine)
	{
		FString DisplayName = ReplayFilename.IsEmpty() ? TEXT("(most recent)") : ReplayFilename;
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Cyan,
			FString::Printf(TEXT("Robot: Replaying %s (loop=%s, speed=%.0f°/s)"),
				*DisplayName, bReplayLoop ? TEXT("yes") : TEXT("no"), ApproachSpeed));
	}
}

void ARobotVisualizer::StopReplay()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"stop_replay\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Yellow,
			TEXT("Robot: Replay stopped"));
	}
}

void ARobotVisualizer::EStop()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"estop\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 5.0f, FColor::Red,
			TEXT("*** E-STOP *** All motion halted"));
	}
}

// =============================================================================
// Teleop Sync — SyncOn / SyncOff
// =============================================================================

void ARobotVisualizer::SyncOn()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"start_teleop\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Green,
			TEXT("Sync ON: leader -> follower active"));
	}
}

void ARobotVisualizer::SyncOff()
{
	PublishRobotCommand(TEXT("{\"cmd\":\"stop_teleop\"}"));

	if (GEngine)
	{
		GEngine->AddOnScreenDebugMessage(-1, 3.0f, FColor::Yellow,
			TEXT("Sync OFF: leader -> follower deactivated"));
	}
}

// =============================================================================
// Record / Replay — Status feedback handler
// =============================================================================

void ARobotVisualizer::OnRobotStatus(const FString& Topic, const FString& MessageJson)
{
	// rosbridge wraps std_msgs/String as: {"data": "..."}
	TSharedPtr<FJsonObject> OuterJson;
	TSharedRef<TJsonReader<>> OuterReader = TJsonReaderFactory<>::Create(MessageJson);
	if (!FJsonSerializer::Deserialize(OuterReader, OuterJson) || !OuterJson.IsValid())
	{
		return;
	}

	FString DataStr;
	if (!OuterJson->TryGetStringField(TEXT("data"), DataStr))
	{
		return;
	}

	// Parse the inner JSON status
	TSharedPtr<FJsonObject> StatusJson;
	TSharedRef<TJsonReader<>> StatusReader = TJsonReaderFactory<>::Create(DataStr);
	if (!FJsonSerializer::Deserialize(StatusReader, StatusJson) || !StatusJson.IsValid())
	{
		return;
	}

	FString State;
	if (StatusJson->TryGetStringField(TEXT("state"), State))
	{
		if (State != WorkerState)
		{
			WorkerState = State;
			UE_LOG(LogRosBridge, Log, TEXT("Worker state: %s"), *WorkerState);

			if (GEngine)
			{
				FColor Color = FColor::White;
				if (State == TEXT("recording")) Color = FColor::Green;
				else if (State == TEXT("replaying")) Color = FColor::Cyan;
				else if (State == TEXT("idle")) Color = FColor::Silver;

				GEngine->AddOnScreenDebugMessage(-1, 3.0f, Color,
					FString::Printf(TEXT("Robot state: %s"), *WorkerState));
			}
		}
	}

	// Update sync (teleop) status
	bool bTeleop = false;
	if (StatusJson->TryGetBoolField(TEXT("teleop"), bTeleop))
	{
		if (bTeleop != bSyncActive)
		{
			bSyncActive = bTeleop;
			UE_LOG(LogRosBridge, Log, TEXT("Sync (teleop): %s"),
				bSyncActive ? TEXT("ON") : TEXT("OFF"));
		}
	}

	// Log errors from commands
	FString Status;
	if (StatusJson->TryGetStringField(TEXT("status"), Status) && Status == TEXT("error"))
	{
		FString Reason;
		StatusJson->TryGetStringField(TEXT("reason"), Reason);
		UE_LOG(LogRosBridge, Warning, TEXT("Robot command error: %s"), *Reason);

		if (GEngine)
		{
			GEngine->AddOnScreenDebugMessage(-1, 5.0f, FColor::Red,
				FString::Printf(TEXT("Robot error: %s"), *Reason));
		}
	}

	// Log recording saved info
	FString Filename;
	if (StatusJson->TryGetStringField(TEXT("filename"), Filename) && !Filename.IsEmpty())
	{
		int32 Frames = 0;
		StatusJson->TryGetNumberField(TEXT("frames"), Frames);
		double Duration = 0.0;
		StatusJson->TryGetNumberField(TEXT("duration_sec"), Duration);

		UE_LOG(LogRosBridge, Log, TEXT("Recording saved: %s (%d frames, %.1fs)"),
			*Filename, Frames, Duration);

		if (GEngine)
		{
			GEngine->AddOnScreenDebugMessage(-1, 5.0f, FColor::Green,
				FString::Printf(TEXT("Recording saved: %s (%d frames, %.1fs)"),
					*Filename, Frames, Duration));
		}
	}
}

 

 

변경 사항

  • Approach (보간 이동) 로직
    • replay 시작 시: 현재 follower 위치에서 recording 첫 프레임까지 cosine ease-in-out 보간으로 부드럽게 이동
    • loop 전환 시: 마지막 프레임에서 첫 프레임까지 동일한 approach 보간 수행
    • Sync On 시: 현재 follower 위치에서 현재 위치까지 보간 수행
    • 보간 커브: (1 - cos(t * π)) / 2 — 시작과 끝이 부드럽게 감속/가속
    • 속도: 가장 큰 각도 차이 기준으로 duration 계산. 기본 45°/s (예: 90° 차이 → 2초)
    • 1° 미만 차이: approach 스킵, 즉시 재생
  • 설정 가능한 변수
    • Worker: --approach-speed 45.0 (기본값)
    • Unreal Details 패널: ApproachSpeed 슬라이더 (5~300°/s, 기본 45)
    • 명령 단위: "approach_speed": 30.0 으로 개별 오버라이드

 

 


 

 

4-4단계: 보간 수정 후 테스트

마찬가지로 언리얼 빌드 성공 후 bridge node 터미널 colcon build 한다.

 

 

터미널 1 (conda - worker)

source ~/miniforge3/etc/profile.d/conda.sh
conda activate lerobot
sudo chmod 666 /dev/ttyACM*
cd ~/UnrealRobotics/src/lerobot_ros2_bridge
python scripts/lerobot_worker.py --cmd-joints all --cmd-limit 120

 

 

터미널 2 (ROS2 - bridge node)

# 파일 수정 후 재빌드 필요
cd ~/UnrealRobotics
colcon build --packages-select lerobot_ros2_bridge --symlink-install
export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
source ~/UnrealRobotics/install/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 run lerobot_ros2_bridge bridge_node

 

 

터미널 3 (ROS2 - rosbridge)

export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
source ~/UnrealRobotics/install/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 launch rosbridge_server rosbridge_websocket_launch.xml

 

 

Sync On/Off 테스트

 

 

Record&Replay 테스트

Record
Replay

 

 

 


 

 

작업된 사항

  • ZMQ REP에 새 명령 핸들러 추가: start_record, stop_record, replay
  • Record: 텔레옵 모드 진입 + 매 프레임 관절값을 리스트에 저장 + 타임스탬프 기록
  • Stop: 저장된 궤적을 JSON/numpy 파일로 로컬 저장 (경량 구현, LeRobot 전체 파이프라인은 트랙 B에서)
  • Replay: 저장된 파일을 로드하여 타이밍에 맞춰 send_action()으로 재생
  • 재생 중 E-stop 명령 수신 시 즉시 중단
  • /robot_command (std_msgs/String) 구독 → ZMQ REQ로 worker에 전달
  • 명령 형식: "start_record", "stop_record", "replay:파일명", "estop"
  • Worker 응답을 /robot_status 토픽으로 발행 (상태 피드백)
  • Details 패널에 CallInEditor 버튼: StartRecord, StopRecord, Replay, EStop
  • 각 버튼이 /robot_command 토픽에 문자열 publish
  • /robot_status 구독하여 현재 상태(idle/recording/replaying)를 로그에 표시

 

 


 

 

 

728x90
반응형