현재 URosBridgeSubsystem은 수신(Subscribe)만 가능하고, 송신(Publish)이 없다. 이번 작업은 이걸 추가해서 Unreal에서 ROS로 명령을 보낼 수 있게 만드는 단계이다. 간단하게 역으로 명령을 준다고 보면 된다. 지금까지는 로봇이 움직이면 언리얼의 로봇이 따라 움직였다면, 이번 작업은 언리얼에서 플래닝한대로 실제 로봇이 움직여야 하는 것이다.
1단계: URosBridgeSubsystem 수정
RosBridgeSubsystem.h, RosBridgeSubsystem.cpp, RobotVisualizer.h, RobotVisualizer.cpp 총 4개 파일 수정
RosBridgeSubsystem.h
#pragma once
#include "CoreMinimal.h"
#include "Subsystems/GameInstanceSubsystem.h"
#include "RosBridgeSubsystem.generated.h"
// Forward declarations — avoid pulling heavy headers into this header.
class IWebSocket;
/**
* Delegate fired when a subscribed topic receives a message.
*
* @param Topic The topic name (e.g. "/chatter")
* @param MessageJson The "msg" field of the rosbridge publish message,
* already extracted as a JSON string for the caller
* to parse into whatever struct they need.
*/
DECLARE_DYNAMIC_MULTICAST_DELEGATE_TwoParams(
FOnRosTopicMessage,
const FString&, Topic,
const FString&, MessageJson
);
/** Delegate fired when the WebSocket connection to rosbridge is established. */
DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnRosBridgeConnected);
/** Delegate fired when the WebSocket connection to rosbridge is lost. */
DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnRosBridgeDisconnected);
/**
* Game instance subsystem that owns the WebSocket connection to
* rosbridge_server and dispatches incoming topic messages to subscribers.
*
* Phase 6 additions:
* - Advertise/Publish API for UE → ROS messaging
* - FOnRosBridgeConnected / FOnRosBridgeDisconnected delegates
* - Automatic reconnection with exponential backoff
* - Re-subscribe + re-advertise on reconnect
*
* Typical usage from any actor or component:
*
* URosBridgeSubsystem* Ros =
* GetGameInstance()->GetSubsystem<URosBridgeSubsystem>();
* Ros->OnConnected.AddDynamic(this, &AMyActor::OnRosConnected);
* Ros->Connect(TEXT("ws://127.0.0.1:9090/?x=1"));
*
* // In OnRosConnected():
* Ros->Subscribe(TEXT("/joint_states"), TEXT("sensor_msgs/JointState"));
* Ros->Advertise(TEXT("/cmd_vel"), TEXT("geometry_msgs/Twist"));
*/
UCLASS()
class SO101_TWIN_API URosBridgeSubsystem : public UGameInstanceSubsystem
{
GENERATED_BODY()
public:
// --- Subsystem lifecycle ---
virtual void Initialize(FSubsystemCollectionBase& Collection) override;
virtual void Deinitialize() override;
// --- Connection control ---
/**
* Open a WebSocket connection to the given rosbridge URL.
*
* Default URL uses 127.0.0.1 explicitly (not "localhost") and includes
* a dummy "?x=1" query string to work around a libwebsockets path-parsing
* quirk. See SKILL.md sections 7.1 and 7.2 for details.
*
* After a successful connection, OnConnected is broadcast and
* auto-reconnection is enabled for this URL.
*/
UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
void Connect(const FString& Url = TEXT("ws://127.0.0.1:9090/?x=1"));
/** Close the WebSocket connection if open. Disables auto-reconnection. */
UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
void Disconnect();
/** True if the WebSocket is currently open. */
UFUNCTION(BlueprintPure, Category = "ROS|Bridge")
bool IsConnected() const;
// --- Topic operations (subscribe: ROS → UE) ---
/**
* Subscribe to a ROS topic through rosbridge.
* If not yet connected, the subscription is queued and sent
* automatically when the connection is established.
*/
UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
void Subscribe(const FString& Topic, const FString& Type);
// --- Topic operations (publish: UE → ROS) ---
/**
* Advertise a ROS topic through rosbridge.
* Must be called before Publish(). If not yet connected, the
* advertisement is queued and sent when the connection is established.
*
* rosbridge silently drops publishes on un-advertised topics.
*/
UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
void Advertise(const FString& Topic, const FString& Type);
/**
* Publish a message to a previously advertised ROS topic.
*
* @param Topic The topic name (must have been Advertise'd first)
* @param MsgJson The "msg" payload as a JSON string. The caller is
* responsible for building the correct message structure.
* Example for geometry_msgs/Twist:
* {"linear":{"x":0.5,"y":0,"z":0},"angular":{"x":0,"y":0,"z":0.3}}
*/
UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
void Publish(const FString& Topic, const FString& MsgJson);
// --- Events ---
/** Broadcast whenever a publish message arrives on a subscribed topic. */
UPROPERTY(BlueprintAssignable, Category = "ROS|Bridge")
FOnRosTopicMessage OnTopicMessage;
/** Broadcast when the WebSocket connection to rosbridge is established (including reconnects). */
UPROPERTY(BlueprintAssignable, Category = "ROS|Bridge")
FOnRosBridgeConnected OnConnected;
/** Broadcast when the WebSocket connection to rosbridge is lost. */
UPROPERTY(BlueprintAssignable, Category = "ROS|Bridge")
FOnRosBridgeDisconnected OnDisconnected;
private:
// --- WebSocket event handlers (all run on non-game thread!) ---
void HandleConnected();
void HandleConnectionError(const FString& Error);
void HandleClosed(int32 StatusCode, const FString& Reason, bool bWasClean);
void HandleMessage(const FString& Message);
// --- Game-thread message processing ---
void ProcessIncomingMessage(const FString& Message);
// --- Internal helpers ---
/** Send raw JSON string over the WebSocket. Returns false if not connected. */
bool SendRaw(const FString& JsonPayload);
/** Send all tracked subscriptions to rosbridge. Called on (re)connect. */
void RestoreSubscriptions();
/** Send all tracked advertisements to rosbridge. Called on (re)connect. */
void RestoreAdvertisements();
// --- Reconnection ---
/** Schedule a reconnection attempt with exponential backoff. */
void ScheduleReconnect();
/** Called by the reconnect timer. */
void AttemptReconnect();
/** The URL passed to Connect(), saved for reconnection. */
FString ConnectionUrl;
/** Whether auto-reconnection is active. Set true on Connect(), false on explicit Disconnect(). */
bool bAutoReconnect = false;
/** Current backoff delay in seconds (doubles each failure, caps at MaxReconnectDelay). */
float CurrentReconnectDelay = 1.0f;
/** Base reconnect delay in seconds. */
static constexpr float BaseReconnectDelay = 1.0f;
/** Maximum reconnect delay in seconds. */
static constexpr float MaxReconnectDelay = 30.0f;
/** Timer handle for the reconnect timer. */
FTimerHandle ReconnectTimerHandle;
/** Number of consecutive reconnect attempts (for logging). */
int32 ReconnectAttemptCount = 0;
// --- State ---
/** The underlying WebSocket. Kept as a TSharedPtr because IWebSocket
* is a plain C++ interface, not a UObject, so UPROPERTY does not apply. */
TSharedPtr<IWebSocket> Socket;
/** Topics we have subscribed to, kept so we can re-subscribe on reconnect. */
UPROPERTY()
TMap<FString, FString> SubscribedTopics;
/** Topics we have advertised, kept so we can re-advertise on reconnect. */
UPROPERTY()
TMap<FString, FString> AdvertisedTopics;
};
변경 사항
- 새 델리게이트 선언 2개 추가 - 연결/끊김 이벤트를 액터들이 바인딩할 수 있음
- DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnRosBridgeConnected);
- DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnRosBridgeDisconnected);
- 새 public 메서드 2개 추가 - rosbridge v2 프로토콜 준수. Publish()는 MsgJson을 JSON 객체로 파싱한 뒤 {"op":"publish","topic":"...","msg":{...}} 형태로 전송. 미리 Advertise하지 않은 토픽에 publish하면 경고 후 드롭.
- Advertise(Topic, Type) — rosbridge에 토픽 광고
- Publish(Topic, MsgJson) — 광고한 토픽에 메시지 송신
- Subscribe 동작 변경
- 기존: 연결 안 되어 있으면 경고 후 무시
- 변경: 연결 전이어도 맵에 저장 → 연결 시 자동 전송 (주석 업데이트)
RosBridgeSubsystem.cpp
#include "RosBridgeSubsystem.h"
#include "RosBridgeLog.h"
#include "WebSocketsModule.h"
#include "IWebSocket.h"
#include "Async/Async.h"
#include "Dom/JsonObject.h"
#include "Dom/JsonValue.h"
#include "Serialization/JsonReader.h"
#include "Serialization/JsonSerializer.h"
#include "Serialization/JsonWriter.h"
#include "Engine/GameInstance.h"
#include "TimerManager.h"
#include "Engine/World.h"
// =============================================================================
// Subsystem lifecycle
// =============================================================================
void URosBridgeSubsystem::Initialize(FSubsystemCollectionBase& Collection)
{
Super::Initialize(Collection);
// The WebSockets module may not be loaded yet at this point (depends on
// module startup order), so force-load it before we try to create a socket.
if (!FModuleManager::Get().IsModuleLoaded("WebSockets"))
{
FModuleManager::Get().LoadModule("WebSockets");
}
UE_LOG(LogRosBridge, Log, TEXT("RosBridgeSubsystem initialized"));
}
void URosBridgeSubsystem::Deinitialize()
{
// Disable auto-reconnect first so Disconnect doesn't trigger reconnection.
bAutoReconnect = false;
// Cancel any pending reconnect timer.
if (UGameInstance* GI = GetGameInstance())
{
if (UWorld* World = GI->GetWorld())
{
World->GetTimerManager().ClearTimer(ReconnectTimerHandle);
}
}
Disconnect();
UE_LOG(LogRosBridge, Log, TEXT("RosBridgeSubsystem deinitialized"));
Super::Deinitialize();
}
// =============================================================================
// Connection control
// =============================================================================
void URosBridgeSubsystem::Connect(const FString& Url)
{
if (Socket.IsValid() && Socket->IsConnected())
{
UE_LOG(LogRosBridge, Warning,
TEXT("Connect called but socket is already connected. Ignoring."));
return;
}
// If we have an old socket that's not connected (e.g. from a failed attempt),
// clean it up before creating a new one.
if (Socket.IsValid())
{
Socket.Reset();
}
ConnectionUrl = Url;
bAutoReconnect = true;
CurrentReconnectDelay = BaseReconnectDelay;
ReconnectAttemptCount = 0;
UE_LOG(LogRosBridge, Log, TEXT("Connecting to %s ..."), *Url);
Socket = FWebSocketsModule::Get().CreateWebSocket(Url);
// Bind handlers. All of these fire on a non-game thread.
Socket->OnConnected().AddUObject(this, &URosBridgeSubsystem::HandleConnected);
Socket->OnConnectionError().AddUObject(this, &URosBridgeSubsystem::HandleConnectionError);
Socket->OnClosed().AddUObject(this, &URosBridgeSubsystem::HandleClosed);
Socket->OnMessage().AddUObject(this, &URosBridgeSubsystem::HandleMessage);
Socket->Connect();
}
void URosBridgeSubsystem::Disconnect()
{
// Explicit disconnect: disable auto-reconnect.
bAutoReconnect = false;
// Cancel pending reconnect timer.
if (UGameInstance* GI = GetGameInstance())
{
if (UWorld* World = GI->GetWorld())
{
World->GetTimerManager().ClearTimer(ReconnectTimerHandle);
}
}
if (Socket.IsValid())
{
if (Socket->IsConnected())
{
Socket->Close();
}
Socket.Reset();
}
}
bool URosBridgeSubsystem::IsConnected() const
{
return Socket.IsValid() && Socket->IsConnected();
}
// =============================================================================
// Topic operations — Subscribe (ROS → UE)
// =============================================================================
void URosBridgeSubsystem::Subscribe(const FString& Topic, const FString& Type)
{
// Always track the subscription so it gets restored on reconnect.
SubscribedTopics.Add(Topic, Type);
if (!IsConnected())
{
UE_LOG(LogRosBridge, Log,
TEXT("Subscribe('%s') queued — will be sent on connect."), *Topic);
return;
}
// Build: {"op":"subscribe","topic":"<Topic>","type":"<Type>"}
const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
Json->SetStringField(TEXT("op"), TEXT("subscribe"));
Json->SetStringField(TEXT("topic"), Topic);
Json->SetStringField(TEXT("type"), Type);
FString Payload;
const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
FJsonSerializer::Serialize(Json, Writer);
SendRaw(Payload);
UE_LOG(LogRosBridge, Log, TEXT("Subscribed to %s (%s)"), *Topic, *Type);
}
// =============================================================================
// Topic operations — Advertise / Publish (UE → ROS)
// =============================================================================
void URosBridgeSubsystem::Advertise(const FString& Topic, const FString& Type)
{
// Always track the advertisement so it gets restored on reconnect.
AdvertisedTopics.Add(Topic, Type);
if (!IsConnected())
{
UE_LOG(LogRosBridge, Log,
TEXT("Advertise('%s') queued — will be sent on connect."), *Topic);
return;
}
// Build: {"op":"advertise","topic":"<Topic>","type":"<Type>"}
const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
Json->SetStringField(TEXT("op"), TEXT("advertise"));
Json->SetStringField(TEXT("topic"), Topic);
Json->SetStringField(TEXT("type"), Type);
FString Payload;
const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
FJsonSerializer::Serialize(Json, Writer);
SendRaw(Payload);
UE_LOG(LogRosBridge, Log, TEXT("Advertised %s (%s)"), *Topic, *Type);
}
void URosBridgeSubsystem::Publish(const FString& Topic, const FString& MsgJson)
{
if (!IsConnected())
{
UE_LOG(LogRosBridge, Warning,
TEXT("Publish('%s') called but not connected. Dropping message."), *Topic);
return;
}
if (!AdvertisedTopics.Contains(Topic))
{
UE_LOG(LogRosBridge, Warning,
TEXT("Publish('%s') called but topic not advertised. Call Advertise() first. Dropping."), *Topic);
return;
}
// Build: {"op":"publish","topic":"<Topic>","msg":<MsgJson>}
//
// We need to embed MsgJson as a JSON *object*, not as a string.
// So we parse MsgJson first, then insert it as a sub-object.
TSharedPtr<FJsonObject> MsgObject;
const TSharedRef<TJsonReader<>> MsgReader = TJsonReaderFactory<>::Create(MsgJson);
if (!FJsonSerializer::Deserialize(MsgReader, MsgObject) || !MsgObject.IsValid())
{
UE_LOG(LogRosBridge, Error,
TEXT("Publish('%s'): failed to parse MsgJson as JSON object. Dropping."), *Topic);
return;
}
const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
Json->SetStringField(TEXT("op"), TEXT("publish"));
Json->SetStringField(TEXT("topic"), Topic);
Json->SetObjectField(TEXT("msg"), MsgObject);
FString Payload;
const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
FJsonSerializer::Serialize(Json, Writer);
SendRaw(Payload);
}
// =============================================================================
// Internal helpers
// =============================================================================
bool URosBridgeSubsystem::SendRaw(const FString& JsonPayload)
{
if (!IsConnected())
{
return false;
}
Socket->Send(JsonPayload);
return true;
}
void URosBridgeSubsystem::RestoreSubscriptions()
{
for (const auto& Pair : SubscribedTopics)
{
const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
Json->SetStringField(TEXT("op"), TEXT("subscribe"));
Json->SetStringField(TEXT("topic"), Pair.Key);
Json->SetStringField(TEXT("type"), Pair.Value);
FString Payload;
const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
FJsonSerializer::Serialize(Json, Writer);
SendRaw(Payload);
UE_LOG(LogRosBridge, Log, TEXT("Re-subscribed to %s (%s)"), *Pair.Key, *Pair.Value);
}
}
void URosBridgeSubsystem::RestoreAdvertisements()
{
for (const auto& Pair : AdvertisedTopics)
{
const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
Json->SetStringField(TEXT("op"), TEXT("advertise"));
Json->SetStringField(TEXT("topic"), Pair.Key);
Json->SetStringField(TEXT("type"), Pair.Value);
FString Payload;
const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
FJsonSerializer::Serialize(Json, Writer);
SendRaw(Payload);
UE_LOG(LogRosBridge, Log, TEXT("Re-advertised %s (%s)"), *Pair.Key, *Pair.Value);
}
}
// =============================================================================
// Reconnection
// =============================================================================
void URosBridgeSubsystem::ScheduleReconnect()
{
if (!bAutoReconnect)
{
return;
}
UGameInstance* GI = GetGameInstance();
if (!GI) return;
UWorld* World = GI->GetWorld();
if (!World) return;
ReconnectAttemptCount++;
UE_LOG(LogRosBridge, Warning,
TEXT("Scheduling reconnect attempt #%d in %.1fs ..."),
ReconnectAttemptCount, CurrentReconnectDelay);
World->GetTimerManager().SetTimer(
ReconnectTimerHandle,
this,
&URosBridgeSubsystem::AttemptReconnect,
CurrentReconnectDelay,
false // not looping
);
// Exponential backoff: double the delay, cap at MaxReconnectDelay.
CurrentReconnectDelay = FMath::Min(CurrentReconnectDelay * 2.0f, MaxReconnectDelay);
}
void URosBridgeSubsystem::AttemptReconnect()
{
if (!bAutoReconnect || ConnectionUrl.IsEmpty())
{
return;
}
UE_LOG(LogRosBridge, Log,
TEXT("Reconnect attempt #%d to %s ..."), ReconnectAttemptCount, *ConnectionUrl);
// Clean up old socket if it still exists.
if (Socket.IsValid())
{
Socket.Reset();
}
Socket = FWebSocketsModule::Get().CreateWebSocket(ConnectionUrl);
Socket->OnConnected().AddUObject(this, &URosBridgeSubsystem::HandleConnected);
Socket->OnConnectionError().AddUObject(this, &URosBridgeSubsystem::HandleConnectionError);
Socket->OnClosed().AddUObject(this, &URosBridgeSubsystem::HandleClosed);
Socket->OnMessage().AddUObject(this, &URosBridgeSubsystem::HandleMessage);
Socket->Connect();
}
// =============================================================================
// WebSocket event handlers (NON-GAME THREAD)
// =============================================================================
void URosBridgeSubsystem::HandleConnected()
{
// Marshal to game thread — we will touch UObject state (delegates, topic maps).
TWeakObjectPtr<URosBridgeSubsystem> WeakThis(this);
AsyncTask(ENamedThreads::GameThread, [WeakThis]()
{
if (URosBridgeSubsystem* StrongThis = WeakThis.Get())
{
// Reset backoff on successful connection.
StrongThis->CurrentReconnectDelay = BaseReconnectDelay;
StrongThis->ReconnectAttemptCount = 0;
UE_LOG(LogRosBridge, Log, TEXT("WebSocket connected to %s"), *StrongThis->ConnectionUrl);
// Restore all tracked subscriptions and advertisements.
StrongThis->RestoreSubscriptions();
StrongThis->RestoreAdvertisements();
// Broadcast to listeners (RobotVisualizer, etc.)
StrongThis->OnConnected.Broadcast();
}
});
}
void URosBridgeSubsystem::HandleConnectionError(const FString& Error)
{
TWeakObjectPtr<URosBridgeSubsystem> WeakThis(this);
AsyncTask(ENamedThreads::GameThread, [WeakThis, Error]()
{
if (URosBridgeSubsystem* StrongThis = WeakThis.Get())
{
UE_LOG(LogRosBridge, Error, TEXT("WebSocket connection error: %s"), *Error);
StrongThis->OnDisconnected.Broadcast();
StrongThis->ScheduleReconnect();
}
});
}
void URosBridgeSubsystem::HandleClosed(int32 StatusCode, const FString& Reason, bool bWasClean)
{
TWeakObjectPtr<URosBridgeSubsystem> WeakThis(this);
AsyncTask(ENamedThreads::GameThread, [WeakThis, StatusCode, Reason, bWasClean]()
{
if (URosBridgeSubsystem* StrongThis = WeakThis.Get())
{
UE_LOG(LogRosBridge, Warning,
TEXT("WebSocket closed (code=%d, clean=%s): %s"),
StatusCode, bWasClean ? TEXT("true") : TEXT("false"), *Reason);
StrongThis->OnDisconnected.Broadcast();
StrongThis->ScheduleReconnect();
}
});
}
void URosBridgeSubsystem::HandleMessage(const FString& Message)
{
// We are NOT on the game thread. Marshal to the game thread before
// touching any UObject state (including broadcasting the delegate).
TWeakObjectPtr<URosBridgeSubsystem> WeakThis(this);
AsyncTask(ENamedThreads::GameThread, [WeakThis, Message]()
{
if (URosBridgeSubsystem* StrongThis = WeakThis.Get())
{
StrongThis->ProcessIncomingMessage(Message);
}
});
}
// =============================================================================
// Game-thread message processing
// =============================================================================
void URosBridgeSubsystem::ProcessIncomingMessage(const FString& Message)
{
// Parse the top-level JSON object.
TSharedPtr<FJsonObject> Json;
const TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Message);
if (!FJsonSerializer::Deserialize(Reader, Json) || !Json.IsValid())
{
UE_LOG(LogRosBridge, Error, TEXT("Failed to parse incoming JSON: %s"), *Message);
return;
}
// rosbridge v2 uses an "op" field to describe the operation.
// For a subscribed topic we care about {"op":"publish","topic":...,"msg":{...}}.
FString Op;
if (!Json->TryGetStringField(TEXT("op"), Op))
{
UE_LOG(LogRosBridge, Warning, TEXT("Incoming message has no 'op' field: %s"), *Message);
return;
}
if (Op == TEXT("publish"))
{
FString Topic;
if (!Json->TryGetStringField(TEXT("topic"), Topic))
{
UE_LOG(LogRosBridge, Warning, TEXT("publish without topic: %s"), *Message);
return;
}
// Re-serialize the "msg" sub-object back into a string. This keeps the
// subsystem message-type-agnostic: the receiver parses whatever it needs.
const TSharedPtr<FJsonObject>* MsgObjectPtr = nullptr;
FString MsgJson;
if (Json->TryGetObjectField(TEXT("msg"), MsgObjectPtr) && MsgObjectPtr && MsgObjectPtr->IsValid())
{
const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&MsgJson);
FJsonSerializer::Serialize(MsgObjectPtr->ToSharedRef(), Writer);
}
OnTopicMessage.Broadcast(Topic, MsgJson);
}
else
{
// Other ops (status, service_response, etc.) - just log for now.
UE_LOG(LogRosBridge, Verbose, TEXT("Unhandled op '%s': %s"), *Op, *Message);
}
}
변경 사항
- 자동 재연결: HandleConnectionError / HandleClosed에서 ScheduleReconnect() → 지수 백오프(1s→2s→4s→...→30s cap) → 재연결 성공 시 RestoreSubscriptions() + RestoreAdvertisements() 자동 복원. 명시적 Disconnect() 호출 시에만 재연결 비활성화.
- Subscribe 큐잉: 연결 전에 Subscribe()/Advertise()를 호출해도 맵에 저장해뒀다가 연결 시 자동 전송.
- 스레드 안전성 강화: HandleConnected, HandleConnectionError, HandleClosed도 모두 AsyncTask(GameThread, ...) 마샬링 추가.
RobotVisualizer.h
#pragma once
#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "RobotVisualizer.generated.h"
class UStaticMesh;
class UStaticMeshComponent;
class USceneComponent;
class URosBridgeSubsystem;
/**
* Visualizes the SO-ARM-101 follower arm in Unreal Engine.
*
* The component hierarchy mirrors the URDF link/joint structure:
* BaseLink -> ShoulderPanJoint -> ShoulderLink -> ShoulderLiftJoint -> ...
*
* Each "Joint" SceneComponent is where the ROS joint angle gets applied
* as a local Z-axis rotation. All child links/meshes rotate with it.
*
* Mesh offsets and joint origins are hardcoded from the URDF
* (so101_follower.urdf), converted to UE coordinates (cm, left-handed).
*
* Usage:
* 1. Place this actor in the level
* 2. It auto-connects to rosbridge and subscribes to /joint_states
* 3. Moving the real robot updates the 3D model in real time
*
* Phase 6: Now uses OnConnected delegate instead of timer polling.
*/
UCLASS()
class SO101_TWIN_API ARobotVisualizer : public AActor
{
GENERATED_BODY()
public:
ARobotVisualizer();
protected:
virtual void BeginPlay() override;
virtual void EndPlay(const EEndPlayReason::Type EndPlayReason) override;
// --- Configuration ---
UPROPERTY(EditAnywhere, Category = "ROS|Bridge")
FString RosBridgeUrl = TEXT("ws://127.0.0.1:9090/?x=1");
UPROPERTY(EditAnywhere, Category = "ROS|Topics")
FString JointStateTopic = TEXT("/joint_states");
UPROPERTY(EditAnywhere, Category = "ROS|Topics")
FString JointStateType = TEXT("sensor_msgs/JointState");
private:
// --- Component hierarchy ---
// Root
UPROPERTY(VisibleAnywhere, Category = "Robot")
TObjectPtr<USceneComponent> RobotRoot;
// Link SceneComponents (parent for mesh offsets)
UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
TObjectPtr<USceneComponent> BaseLink;
UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
TObjectPtr<USceneComponent> ShoulderLink;
UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
TObjectPtr<USceneComponent> UpperArmLink;
UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
TObjectPtr<USceneComponent> LowerArmLink;
UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
TObjectPtr<USceneComponent> WristLink;
UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
TObjectPtr<USceneComponent> GripperLink;
UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
TObjectPtr<USceneComponent> MovingJawLink;
// Joint SceneComponents (rotation applied here)
UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
TObjectPtr<USceneComponent> ShoulderPanJoint;
UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
TObjectPtr<USceneComponent> ShoulderLiftJoint;
UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
TObjectPtr<USceneComponent> ElbowFlexJoint;
UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
TObjectPtr<USceneComponent> WristFlexJoint;
UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
TObjectPtr<USceneComponent> WristRollJoint;
UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
TObjectPtr<USceneComponent> GripperJoint;
// --- Joint name → component mapping ---
UPROPERTY()
TMap<FName, TObjectPtr<USceneComponent>> JointComponentMap;
// --- Mesh components (for material assignment etc.) ---
UPROPERTY()
TArray<TObjectPtr<UStaticMeshComponent>> AllMeshComponents;
// --- ROS connection ---
/** Called when rosbridge connection is established. Subscribes to topics. */
UFUNCTION()
void OnRosBridgeConnected();
UFUNCTION()
void OnRosMessage(const FString& Topic, const FString& MessageJson);
void ParseAndApplyJointStates(const FString& MessageJson);
// --- Helpers ---
/** Create a SceneComponent with a relative transform (already in UE coordinates). */
USceneComponent* CreateJointComponent(const FName& Name, USceneComponent* Parent,
const FVector& Location, const FRotator& Rotation);
/** Create a SceneComponent as a link container. */
USceneComponent* CreateLinkComponent(const FName& Name, USceneComponent* Parent);
/** Attach a StaticMeshComponent to a link with the given mesh offset (UE coordinates). */
UStaticMeshComponent* AttachMesh(USceneComponent* Parent, UStaticMesh* Mesh,
const FName& Name, const FVector& Location, const FRotator& Rotation,
bool bIsMotor);
};
변경 사항
- SubscribeDelaySeconds UPROPERTY와 FTimerHandle SubscribeTimerHandle 제거 (타이머 폴링 방식 완전 폐기)
RobotVisualizer.cpp
#include "RobotVisualizer.h"
#include "RosCoordConv.h"
#include "RosBridgeSubsystem.h"
#include "RosBridgeLog.h"
#include "Components/SceneComponent.h"
#include "Components/StaticMeshComponent.h"
#include "Engine/StaticMesh.h"
#include "Engine/Engine.h"
#include "Engine/World.h"
#include "Kismet/GameplayStatics.h"
#include "Dom/JsonObject.h"
#include "Dom/JsonValue.h"
#include "Serialization/JsonReader.h"
#include "Serialization/JsonSerializer.h"
#include "UObject/ConstructorHelpers.h"
// =============================================================================
// Mesh asset path helper
// =============================================================================
static UStaticMesh* LoadMeshAsset(const TCHAR* AssetName)
{
// All meshes live under /Game/Robot/Meshes/
FString Path = FString::Printf(TEXT("/Game/Robot/Meshes/%s.%s"), AssetName, AssetName);
UStaticMesh* Mesh = Cast<UStaticMesh>(StaticLoadObject(UStaticMesh::StaticClass(), nullptr, *Path));
if (!Mesh)
{
UE_LOG(LogRosBridge, Warning, TEXT("Failed to load mesh: %s"), *Path);
}
return Mesh;
}
// =============================================================================
// Constructor — build the entire component hierarchy
// =============================================================================
ARobotVisualizer::ARobotVisualizer()
{
PrimaryActorTick.bCanEverTick = false;
// --- Root ---
RobotRoot = CreateDefaultSubobject<USceneComponent>(TEXT("RobotRoot"));
RootComponent = RobotRoot;
// =========================================================================
// URDF data converted to UE coordinates:
// Position: meters * 100 = cm, Y flipped
// Rotation: RPY radians -> FRotator degrees, pitch & yaw negated
//
// All values below are pre-computed from so101_follower.urdf.
// =========================================================================
// --- base_link (attached directly to root, no joint) ---
BaseLink = CreateDefaultSubobject<USceneComponent>(TEXT("BaseLink"));
BaseLink->SetupAttachment(RobotRoot);
// --- shoulder_pan joint ---
// URDF origin: xyz(0.0388353, 0, 0.0624) rpy(3.14159, 0, -3.14159)
ShoulderPanJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderPanJoint"));
ShoulderPanJoint->SetupAttachment(BaseLink);
ShoulderPanJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0388353, 0.0, 0.0624));
ShoulderPanJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159));
ShoulderLink = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLink"));
ShoulderLink->SetupAttachment(ShoulderPanJoint);
// --- shoulder_lift joint ---
// URDF origin: xyz(-0.0303992, -0.0182778, -0.0542) rpy(-1.5708, -1.5708, 0)
ShoulderLiftJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLiftJoint"));
ShoulderLiftJoint->SetupAttachment(ShoulderLink);
ShoulderLiftJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.0303992, -0.0182778, -0.0542));
ShoulderLiftJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0));
UpperArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("UpperArmLink"));
UpperArmLink->SetupAttachment(ShoulderLiftJoint);
// --- elbow_flex joint ---
// URDF origin: xyz(-0.11257, -0.028, 0) rpy(0, 0, 1.5708)
ElbowFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ElbowFlexJoint"));
ElbowFlexJoint->SetupAttachment(UpperArmLink);
ElbowFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.11257, -0.028, 0.0));
ElbowFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708));
LowerArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("LowerArmLink"));
LowerArmLink->SetupAttachment(ElbowFlexJoint);
// --- wrist_flex joint ---
// URDF origin: xyz(-0.1349, 0.0052, 0) rpy(0, 0, -1.5708)
WristFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristFlexJoint"));
WristFlexJoint->SetupAttachment(LowerArmLink);
WristFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.1349, 0.0052, 0.0));
WristFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708));
WristLink = CreateDefaultSubobject<USceneComponent>(TEXT("WristLink"));
WristLink->SetupAttachment(WristFlexJoint);
// --- wrist_roll joint ---
// URDF origin: xyz(0, -0.0611, 0.0181) rpy(1.5708, 0.0486795, 3.14159)
WristRollJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristRollJoint"));
WristRollJoint->SetupAttachment(WristLink);
WristRollJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0, -0.0611, 0.0181));
WristRollJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159));
GripperLink = CreateDefaultSubobject<USceneComponent>(TEXT("GripperLink"));
GripperLink->SetupAttachment(WristRollJoint);
// --- gripper joint ---
// URDF origin: xyz(0.0202, 0.0188, -0.0234) rpy(1.5708, 0, 0)
GripperJoint = CreateDefaultSubobject<USceneComponent>(TEXT("GripperJoint"));
GripperJoint->SetupAttachment(GripperLink);
GripperJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0202, 0.0188, -0.0234));
GripperJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0));
MovingJawLink = CreateDefaultSubobject<USceneComponent>(TEXT("MovingJawLink"));
MovingJawLink->SetupAttachment(GripperJoint);
// --- Joint name mapping (matches ROS /joint_states names) ---
JointComponentMap.Add(FName("shoulder_pan"), ShoulderPanJoint);
JointComponentMap.Add(FName("shoulder_lift"), ShoulderLiftJoint);
JointComponentMap.Add(FName("elbow_flex"), ElbowFlexJoint);
JointComponentMap.Add(FName("wrist_flex"), WristFlexJoint);
JointComponentMap.Add(FName("wrist_roll"), WristRollJoint);
JointComponentMap.Add(FName("gripper"), GripperJoint);
}
// =============================================================================
// BeginPlay — load meshes and attach, connect to ROS
// =============================================================================
void ARobotVisualizer::BeginPlay()
{
Super::BeginPlay();
// --- Load meshes and attach to links ---
// Meshes are loaded at runtime (not in constructor) because
// StaticLoadObject is safer to call here and allows hot-reload.
// Helper lambda to reduce repetition
auto Attach = [this](USceneComponent* Parent, const TCHAR* MeshName,
double RosX, double RosY, double RosZ,
double RosRoll, double RosPitch, double RosYaw,
bool bIsMotor = false)
{
UStaticMesh* Mesh = LoadMeshAsset(MeshName);
if (!Mesh) return;
UStaticMeshComponent* SMC = NewObject<UStaticMeshComponent>(this);
SMC->SetStaticMesh(Mesh);
SMC->SetRelativeLocation(RosCoordConv::RosToUePosition(RosX, RosY, RosZ));
SMC->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(RosRoll, RosPitch, RosYaw));
SMC->SetCollisionEnabled(ECollisionEnabled::NoCollision);
SMC->AttachToComponent(Parent, FAttachmentTransformRules::KeepRelativeTransform);
SMC->RegisterComponent();
AllMeshComponents.Add(SMC);
};
// === base_link meshes ===
Attach(BaseLink, TEXT("base_motor_holder_so101_v1"),
-0.00636471, -0.0000994414, -0.0024,
1.5708, 0.0, 1.5708);
Attach(BaseLink, TEXT("base_so101_v2"),
-0.00636471, 0.0, -0.0024,
1.5708, 0.0, 1.5708);
Attach(BaseLink, TEXT("sts3215_03a_v1"),
0.0263353, 0.0, 0.0437,
0.0, 0.0, 0.0, true);
Attach(BaseLink, TEXT("waveshare_mounting_plate_so101_v2"),
-0.0309827, -0.000199441, 0.0474,
1.5708, 0.0, 1.5708);
// === shoulder_link meshes ===
Attach(ShoulderLink, TEXT("sts3215_03a_v1"),
-0.0303992, 0.000422241, -0.0417,
1.5708, 1.5708, 0.0, true);
Attach(ShoulderLink, TEXT("motor_holder_so101_base_v1"),
-0.0675992, -0.000177759, 0.0158499,
1.5708, -1.5708, 0.0);
Attach(ShoulderLink, TEXT("rotation_pitch_so101_v1"),
0.0122008, 0.0000222413, 0.0464,
-1.5708, 0.0, 0.0);
// === upper_arm_link meshes ===
Attach(UpperArmLink, TEXT("sts3215_03a_v1"),
-0.11257, -0.0155, 0.0187,
-3.14159, 0.0, -1.5708, true);
Attach(UpperArmLink, TEXT("upper_arm_so101_v1"),
-0.065085, 0.012, 0.0182,
3.14159, 0.0, 0.0);
// === lower_arm_link meshes ===
Attach(LowerArmLink, TEXT("under_arm_so101_v1"),
-0.0648499, -0.032, 0.0182,
3.14159, 0.0, 0.0);
Attach(LowerArmLink, TEXT("motor_holder_so101_wrist_v1"),
-0.0648499, -0.032, 0.018,
-3.14159, 0.0, 0.0);
Attach(LowerArmLink, TEXT("sts3215_03a_v1"),
-0.1224, 0.0052, 0.0187,
-3.14159, 0.0, -3.14159, true);
// === wrist_link meshes ===
Attach(WristLink, TEXT("sts3215_03a_no_horn_v1"),
0.0, -0.0424, 0.0306,
1.5708, 1.5708, 0.0, true);
Attach(WristLink, TEXT("wrist_roll_pitch_so101_v2"),
0.0, -0.028, 0.0181,
-1.5708, -1.5708, 0.0);
// === gripper_link meshes ===
Attach(GripperLink, TEXT("sts3215_03a_v1"),
0.0077, 0.0001, -0.0234,
-1.5708, 0.0, 0.0, true);
Attach(GripperLink, TEXT("wrist_roll_follower_so101_v1"),
0.0, -0.000218214, 0.000949706,
-3.14159, 0.0, 0.0);
// === moving_jaw_link meshes ===
Attach(MovingJawLink, TEXT("moving_jaw_so101_v1"),
0.0, 0.0, 0.0189,
0.0, 0.0, 0.0);
UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: %d mesh components created"), AllMeshComponents.Num());
// --- Connect to ROS via Subsystem ---
UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
if (!GI) return;
URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
if (!Ros) return;
// Bind delegates.
Ros->OnTopicMessage.AddDynamic(this, &ARobotVisualizer::OnRosMessage);
Ros->OnConnected.AddDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);
// Subscribe is now queued even before connection — the subsystem will
// send it automatically when connected (including on reconnect).
Ros->Subscribe(JointStateTopic, JointStateType);
// Initiate connection if not already connected.
if (!Ros->IsConnected())
{
Ros->Connect(RosBridgeUrl);
}
else
{
// Already connected (e.g. another actor already called Connect).
// Subscribe was queued above and the subsystem saw it's already connected,
// so it sent the subscribe immediately. Nothing else to do.
UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: already connected, subscription sent."));
}
}
// =============================================================================
// EndPlay
// =============================================================================
void ARobotVisualizer::EndPlay(const EEndPlayReason::Type EndPlayReason)
{
if (UGameInstance* GI = UGameplayStatics::GetGameInstance(this))
{
if (URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>())
{
Ros->OnTopicMessage.RemoveDynamic(this, &ARobotVisualizer::OnRosMessage);
Ros->OnConnected.RemoveDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);
}
}
Super::EndPlay(EndPlayReason);
}
// =============================================================================
// ROS connection callback
// =============================================================================
void ARobotVisualizer::OnRosBridgeConnected()
{
// The subsystem already restored all tracked subscriptions on connect,
// so we don't need to re-subscribe here. This callback is available
// for any per-actor logic that should run on (re)connect.
UE_LOG(LogRosBridge, Log,
TEXT("RobotVisualizer: rosbridge connected — subscriptions restored by subsystem."));
}
// =============================================================================
// Message handling
// =============================================================================
void ARobotVisualizer::OnRosMessage(const FString& Topic, const FString& MessageJson)
{
if (Topic == JointStateTopic)
{
ParseAndApplyJointStates(MessageJson);
}
}
void ARobotVisualizer::ParseAndApplyJointStates(const FString& MessageJson)
{
// Parse sensor_msgs/JointState JSON:
// { "name": ["shoulder_pan", ...], "position": [0.1, ...], ... }
TSharedPtr<FJsonObject> Json;
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(MessageJson);
if (!FJsonSerializer::Deserialize(Reader, Json) || !Json.IsValid())
{
return;
}
const TArray<TSharedPtr<FJsonValue>>* NameArray = nullptr;
const TArray<TSharedPtr<FJsonValue>>* PosArray = nullptr;
if (!Json->TryGetArrayField(TEXT("name"), NameArray) ||
!Json->TryGetArrayField(TEXT("position"), PosArray))
{
return;
}
const int32 Count = FMath::Min(NameArray->Num(), PosArray->Num());
for (int32 i = 0; i < Count; ++i)
{
const FName JointName(*(*NameArray)[i]->AsString());
const double AngleRad = (*PosArray)[i]->AsNumber();
TObjectPtr<USceneComponent>* JointComp = JointComponentMap.Find(JointName);
if (JointComp && *JointComp)
{
// All URDF joints in this robot have axis="0 0 1" (local Z).
// Apply the joint angle as a Yaw rotation on top of the base
// joint orientation that was set in the constructor.
const float AngleDeg = RosCoordConv::RosJointAngleToUeDegrees(AngleRad);
// Re-derive base rotation from URDF (matches constructor values)
// TODO: Cache base rotations for cleanliness.
FRotator BaseRot;
if (JointName == FName("shoulder_pan"))
BaseRot = RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159);
else if (JointName == FName("shoulder_lift"))
BaseRot = RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0);
else if (JointName == FName("elbow_flex"))
BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708);
else if (JointName == FName("wrist_flex"))
BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708);
else if (JointName == FName("wrist_roll"))
BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159);
else if (JointName == FName("gripper"))
BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0);
// Compose: base rotation, then joint angle about local Z
const FQuat BaseQuat = BaseRot.Quaternion();
const FQuat JointQuat = FQuat(FVector::UpVector, FMath::DegreesToRadians(AngleDeg));
const FQuat FinalQuat = BaseQuat * JointQuat;
(*JointComp)->SetRelativeRotation(FinalQuat.Rotator());
}
}
}
변경 사항
- DoSubscribe() 제거 → OnRosBridgeConnected() 콜백으로 대체
- BeginPlay에서 연결 부분만 변경
- 기존: OnTopicMessage.AddDynamic → Connect → SetTimer(DoSubscribe, 1초 후)
- 변경: OnTopicMessage.AddDynamic + OnConnected.AddDynamic → Subscribe() 즉시 호출 (큐잉됨) → Connect
- EndPlay에서 OnConnected.RemoveDynamic 해제 추가
- 기존: OnTopicMessage.RemoveDynamic + ClearTimer
- 변경: OnTopicMessage.RemoveDynamic + OnConnected.RemoveDynamic (타이머 관련 코드 제거)
- TimerManager.h include 제거 (더 이상 불필요)
4개 파일을 모두 수정했으면 프로젝트파일.sln을 열어서 빌드한다. (SO101_Twin.sln)

빌드 성공 확인
2단계: delegate/queuing/reconnect 검증
WSL에서 rosbridge 띄우기
# WSL2 터미널 (ROS2 환경변수 설정 후)
ros2 launch rosbridge_server rosbridge_websocket_launch.xml
언리얼에서 레벨에 Robot Visualizer 액터를 하나 띄우고 실행하여
LogRosBridge: WebSocket connected to ws://127.0.0.1:9090/?x=1
LogRosBridge: Re-subscribed to /joint_states (sensor_msgs/JointState)
LogRosBridge: RobotVisualizer: rosbridge connected — subscriptions restored by subsystem.
Output Log에 위와 같은 출력이 뜨는지 확인

잘 나오면 델리게이트 + Subscribe 큐잉 정상 동작 확인된 것이다.
WSL 터미널을 Ctrl+C로 중단하고 Output Log에
WebSocket closed (code=..., clean=...): ...
Scheduling reconnect attempt #1 in 1.0s ...
Reconnect attempt #1 to ws://127.0.0.1:9090/?x=1 ...
(rosbridge 꺼져있으니 실패) Scheduling reconnect attempt #2 in 2.0s ...
위와 같은 출력이 뜨는지 확인

다시 WSL 터미널에서 rosbridge를 띄우면 연결되는지 확인
# WSL2 터미널 (ROS2 환경변수 설정 후)
ros2 launch rosbridge_server rosbridge_websocket_launch.xml
Output Log에
WebSocket connected to ws://127.0.0.1:9090/?x=1
Re-subscribed to /joint_states ...
출력되는지 확인

여기까지 검증 완료된 내용
- 연결 시 델리게이트 정상 브로드캐스트
- Subscribe 큐잉 + 연결 시 자동 전송
- 지수 백오프 (1s→2s→4s→8s→16s→30s cap) 정상
- 재연결 후 Re-subscribe 자동 복원
3단계: 송신 검증
이제 Unreal에서 더미 토픽을 Publish해서 WSL2에서 수신되는지 확인해야 한다. 테스트 액터를 만들어서 RobotVisualizer가 연결된 상태에서 /unreal_test 토픽에 메시지를 보내고, WSL2에서 ros2 topic echo로 확인하는 방식.
예전에 만들어뒀던 RosTestActor를 이용하여 테스트한다. 해당 액터에 Publish 기능을 추가한다.
RosTestActor.h
#pragma once
#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "RosTestActor.generated.h"
/**
* Test actor that verifies both directions of the rosbridge subsystem.
*
* - Subscribe: listens on a configurable topic (default /chatter) and prints to Output Log + screen
* - Publish: advertises /unreal_test (std_msgs/String) and publishes a counter message every second
*
* Drop one instance into any level, then Play In Editor with rosbridge_server running.
*/
UCLASS()
class SO101_TWIN_API ARosTestActor : public AActor
{
GENERATED_BODY()
public:
ARosTestActor();
protected:
virtual void BeginPlay() override;
virtual void EndPlay(const EEndPlayReason::Type EndPlayReason) override;
// --- Subscribe test config ---
UPROPERTY(EditAnywhere, Category = "ROS|Test")
FString RosBridgeUrl = TEXT("ws://127.0.0.1:9090/?x=1");
UPROPERTY(EditAnywhere, Category = "ROS|Test")
FString TopicName = TEXT("/chatter");
UPROPERTY(EditAnywhere, Category = "ROS|Test")
FString TopicType = TEXT("std_msgs/String");
// --- Publish test config ---
UPROPERTY(EditAnywhere, Category = "ROS|Test|Publish")
FString PublishTopicName = TEXT("/unreal_test");
UPROPERTY(EditAnywhere, Category = "ROS|Test|Publish")
FString PublishTopicType = TEXT("std_msgs/String");
/** Interval in seconds between published messages. */
UPROPERTY(EditAnywhere, Category = "ROS|Test|Publish")
float PublishIntervalSeconds = 1.0f;
/** Enable/disable the publish test. */
UPROPERTY(EditAnywhere, Category = "ROS|Test|Publish")
bool bEnablePublishTest = true;
private:
UFUNCTION()
void OnRosMessage(const FString& Topic, const FString& MessageJson);
UFUNCTION()
void OnRosBridgeConnected();
/** Publish one test message. Called by repeating timer. */
void PublishTestMessage();
FTimerHandle PublishTimerHandle;
int32 PublishCounter = 0;
};
변경 사항
- Publish 테스트 기능 추가: 연결되면 /unreal_test (std_msgs/String) 토픽을 Advertise하고, 1초마다 "Hello from Unreal #0", #1, #2... 메시지를 Publish. bEnablePublishTest 프로퍼티로 에디터에서 on/off 가능.
- Publish 관련 멤버 추가: PublishTestMessage() 메서드, PublishTimerHandle, PublishCounter
- OnConnected 델리게이트 콜백 추가: OnRosBridgeConnected() UFUNCTION
- 타이머 폴링 → OnConnected 델리게이트 전환 — 기존 DoSubscribe() + SubscribeDelaySeconds 타이머 방식을 제거하고, Subscribe/Advertise를 BeginPlay에서 큐잉 + OnConnected에서 Publish 타이머 시작하는 방식으로 변경.
RosTestActor.cpp
#include "RosTestActor.h"
#include "RosBridgeSubsystem.h"
#include "RosBridgeLog.h"
#include "Engine/Engine.h"
#include "Engine/World.h"
#include "TimerManager.h"
#include "Kismet/GameplayStatics.h"
ARosTestActor::ARosTestActor()
{
PrimaryActorTick.bCanEverTick = false;
}
void ARosTestActor::BeginPlay()
{
Super::BeginPlay();
UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
if (!GI) return;
URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
if (!Ros) return;
// Bind delegates.
Ros->OnTopicMessage.AddDynamic(this, &ARosTestActor::OnRosMessage);
Ros->OnConnected.AddDynamic(this, &ARosTestActor::OnRosBridgeConnected);
// Queue subscribe — sent automatically when connected.
Ros->Subscribe(TopicName, TopicType);
// Queue advertise for publish test.
if (bEnablePublishTest)
{
Ros->Advertise(PublishTopicName, PublishTopicType);
}
// Initiate connection if not already connected.
if (!Ros->IsConnected())
{
Ros->Connect(RosBridgeUrl);
}
}
void ARosTestActor::EndPlay(const EEndPlayReason::Type EndPlayReason)
{
// Stop publish timer.
if (GetWorld())
{
GetWorld()->GetTimerManager().ClearTimer(PublishTimerHandle);
}
// Unbind delegates.
if (UGameInstance* GI = UGameplayStatics::GetGameInstance(this))
{
if (URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>())
{
Ros->OnTopicMessage.RemoveDynamic(this, &ARosTestActor::OnRosMessage);
Ros->OnConnected.RemoveDynamic(this, &ARosTestActor::OnRosBridgeConnected);
}
}
Super::EndPlay(EndPlayReason);
}
void ARosTestActor::OnRosBridgeConnected()
{
UE_LOG(LogRosBridge, Log, TEXT("RosTestActor: connected — starting publish timer"));
// Start repeating publish timer.
if (bEnablePublishTest && GetWorld())
{
PublishCounter = 0;
GetWorld()->GetTimerManager().SetTimer(
PublishTimerHandle,
this,
&ARosTestActor::PublishTestMessage,
PublishIntervalSeconds,
true // looping
);
}
}
void ARosTestActor::PublishTestMessage()
{
UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
if (!GI) return;
URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
if (!Ros) return;
// Build std_msgs/String JSON: {"data": "Hello from Unreal #42"}
const FString Msg = FString::Printf(
TEXT("{\"data\": \"Hello from Unreal #%d\"}"), PublishCounter);
Ros->Publish(PublishTopicName, Msg);
UE_LOG(LogRosBridge, Log, TEXT("Published to %s: %s"), *PublishTopicName, *Msg);
PublishCounter++;
}
void ARosTestActor::OnRosMessage(const FString& Topic, const FString& MessageJson)
{
UE_LOG(LogRosBridge, Log, TEXT("Received on %s: %s"), *Topic, *MessageJson);
if (GEngine)
{
const FString ScreenMsg = FString::Printf(TEXT("%s: %s"), *Topic, *MessageJson);
GEngine->AddOnScreenDebugMessage(-1, 2.0f, FColor::Green, ScreenMsg);
}
}
변경 사항
- BeginPlay 연결 방식 변경: 타이머로 DoSubscribe() 예약하는 방식 → OnConnected.AddDynamic 바인딩 + Subscribe()/Advertise() 즉시 큐잉
- OnRosBridgeConnected() 신규: 연결 성공 시 1초 간격 반복 타이머로 PublishTestMessage() 시작
- PublishTestMessage() 신규: {"data": "Hello from Unreal #N"} JSON 빌드 → Ros->Publish() 호출
- DoSubscribe() 삭제: 타이머 폴링 함수 전체 제거
- EndPlay 변경: OnConnected.RemoveDynamic 추가 + PublishTimerHandle 클리어 추가, 기존 SubscribeTimerHandle 클리어 제거
두 파일을 업데이트 했다면 프로젝트파일.sln에서 빌드한다.

빌드 성공 확인
언리얼에서 RosTestActor를 레벨에 배치

WSL에서 rosbridge 가동
export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 launch rosbridge_server rosbridge_websocket_launch.xml
언리얼 Play후 Output Log에서 Publish 출력 확인

WSL 터미널 하나 더 열고
source /opt/ros/humble/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 topic echo /unreal_test std_msgs/msg/String
데이터가 성공적으로 들어오면 성공

*혹시 잘 안되면 리스트 확인
export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 topic list

*리스트 안나오면 CycloneDDS 또는 daemon 문제
ros2 daemon start
#완료 후
ros2 daemon stop
#완료 후
ros2 daemon start
완료 작업 정리
- Connected/Disconnected 델리게이트: URosBridgeSubsystem에 OnConnected/OnDisconnected 이벤트 추가. RobotVisualizer와 RosTestActor 모두 타이머 폴링 방식에서 델리게이트 기반으로 전환.
- Subscribe/Advertise 큐잉: 연결 전에 호출해도 맵에 저장했다가 연결 시 자동 전송. 재연결 시에도 자동 복원.
- 자동 재연결: 지수 백오프(1s→2s→4s→8s→16s→30s cap). HandleConnected/HandleConnectionError/HandleClosed 모두 게임 스레드 마샬링 추가.
- Advertise/Publish API: rosbridge v2 프로토콜 준수. MsgJson을 JSON 객체로 파싱 후 {"op":"publish","topic":"...","msg":{...}} 형태로 전송.
- End-to-end 송신 검증: RosTestActor에서 /unreal_test (std_msgs/String) 1초 간격 Publish → WSL2에서 ros2 topic echo로 수신 성공.
- 발견된 이슈: rosbridge 터미널도 CycloneDDS 환경변수 필수 (Appendix A #14). FastDDS로 띄우면 CycloneDDS 터미널과 DDS 도메인 불일치.
'Unreal Engine > 언리얼-ROS-Physical 통합 프로젝트' 카테고리의 다른 글
| [UnrealRobotics: SO-101] (14) Unreal Engine → MoveIt → 실물 로봇: 양방향 디지털 트윈 End-to-End (0) | 2026.05.13 |
|---|---|
| [UnrealRobotics: SO-101] (13) MoveIt 통합 - 로봇 Motion Planning (0) | 2026.05.12 |
| [UnrealRobotics: SO-101] (11) Unreal에 3D 로봇 파트 올리기, 연동 확인 (0) | 2026.05.07 |
| [UnrealRobotics: SO-101] (10) Joint Command 역방향 경로 검증 (ROS → 로봇) (0) | 2026.04.27 |
| [UnrealRobotics: SO-101] (9) 실물 로봇 + RViz 실시간 동기화 (0) | 2026.04.22 |