본문 바로가기
Unreal Engine/언리얼-ROS-Physical 통합 프로젝트

[UnrealRobotics: SO-101] (12) Unreal에서 ROS2 로봇 명령 송신 파이프 확장

by 테크앤아트 2026. 5. 12.
728x90
반응형

 

 

 


 

 

현재 URosBridgeSubsystem은 수신(Subscribe)만 가능하고, 송신(Publish)이 없다. 이번 작업은 이걸 추가해서 Unreal에서 ROS로 명령을 보낼 수 있게 만드는 단계이다. 간단하게 역으로 명령을 준다고 보면 된다. 지금까지는 로봇이 움직이면 언리얼의 로봇이 따라 움직였다면, 이번 작업은 언리얼에서 플래닝한대로 실제 로봇이 움직여야 하는 것이다.

 

 

1단계: URosBridgeSubsystem 수정

RosBridgeSubsystem.h, RosBridgeSubsystem.cpp, RobotVisualizer.h, RobotVisualizer.cpp 총 4개 파일 수정

 

RosBridgeSubsystem.h

더보기
#pragma once

#include "CoreMinimal.h"
#include "Subsystems/GameInstanceSubsystem.h"
#include "RosBridgeSubsystem.generated.h"

// Forward declarations — avoid pulling heavy headers into this header.
class IWebSocket;

/**
 * Delegate fired when a subscribed topic receives a message.
 *
 * @param Topic      The topic name (e.g. "/chatter")
 * @param MessageJson  The "msg" field of the rosbridge publish message,
 *                     already extracted as a JSON string for the caller
 *                     to parse into whatever struct they need.
 */
DECLARE_DYNAMIC_MULTICAST_DELEGATE_TwoParams(
    FOnRosTopicMessage,
    const FString&, Topic,
    const FString&, MessageJson
);

/** Delegate fired when the WebSocket connection to rosbridge is established. */
DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnRosBridgeConnected);

/** Delegate fired when the WebSocket connection to rosbridge is lost. */
DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnRosBridgeDisconnected);

/**
 * Game instance subsystem that owns the WebSocket connection to
 * rosbridge_server and dispatches incoming topic messages to subscribers.
 *
 * Phase 6 additions:
 *   - Advertise/Publish API for UE → ROS messaging
 *   - FOnRosBridgeConnected / FOnRosBridgeDisconnected delegates
 *   - Automatic reconnection with exponential backoff
 *   - Re-subscribe + re-advertise on reconnect
 *
 * Typical usage from any actor or component:
 *
 *   URosBridgeSubsystem* Ros =
 *       GetGameInstance()->GetSubsystem<URosBridgeSubsystem>();
 *   Ros->OnConnected.AddDynamic(this, &AMyActor::OnRosConnected);
 *   Ros->Connect(TEXT("ws://127.0.0.1:9090/?x=1"));
 *
 *   // In OnRosConnected():
 *   Ros->Subscribe(TEXT("/joint_states"), TEXT("sensor_msgs/JointState"));
 *   Ros->Advertise(TEXT("/cmd_vel"), TEXT("geometry_msgs/Twist"));
 */
UCLASS()
class SO101_TWIN_API URosBridgeSubsystem : public UGameInstanceSubsystem
{
    GENERATED_BODY()

public:
    // --- Subsystem lifecycle ---
    virtual void Initialize(FSubsystemCollectionBase& Collection) override;
    virtual void Deinitialize() override;

    // --- Connection control ---

    /**
    * Open a WebSocket connection to the given rosbridge URL.
    *
    * Default URL uses 127.0.0.1 explicitly (not "localhost") and includes
    * a dummy "?x=1" query string to work around a libwebsockets path-parsing
    * quirk. See SKILL.md sections 7.1 and 7.2 for details.
    *
    * After a successful connection, OnConnected is broadcast and
    * auto-reconnection is enabled for this URL.
    */
    UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
    void Connect(const FString& Url = TEXT("ws://127.0.0.1:9090/?x=1"));

    /** Close the WebSocket connection if open. Disables auto-reconnection. */
    UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
    void Disconnect();

    /** True if the WebSocket is currently open. */
    UFUNCTION(BlueprintPure, Category = "ROS|Bridge")
    bool IsConnected() const;

    // --- Topic operations (subscribe: ROS → UE) ---

    /**
     * Subscribe to a ROS topic through rosbridge.
     * If not yet connected, the subscription is queued and sent
     * automatically when the connection is established.
     */
    UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
    void Subscribe(const FString& Topic, const FString& Type);

    // --- Topic operations (publish: UE → ROS) ---

    /**
     * Advertise a ROS topic through rosbridge.
     * Must be called before Publish(). If not yet connected, the
     * advertisement is queued and sent when the connection is established.
     *
     * rosbridge silently drops publishes on un-advertised topics.
     */
    UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
    void Advertise(const FString& Topic, const FString& Type);

    /**
     * Publish a message to a previously advertised ROS topic.
     *
     * @param Topic     The topic name (must have been Advertise'd first)
     * @param MsgJson   The "msg" payload as a JSON string. The caller is
     *                  responsible for building the correct message structure.
     *                  Example for geometry_msgs/Twist:
     *                  {"linear":{"x":0.5,"y":0,"z":0},"angular":{"x":0,"y":0,"z":0.3}}
     */
    UFUNCTION(BlueprintCallable, Category = "ROS|Bridge")
    void Publish(const FString& Topic, const FString& MsgJson);

    // --- Events ---

    /** Broadcast whenever a publish message arrives on a subscribed topic. */
    UPROPERTY(BlueprintAssignable, Category = "ROS|Bridge")
    FOnRosTopicMessage OnTopicMessage;

    /** Broadcast when the WebSocket connection to rosbridge is established (including reconnects). */
    UPROPERTY(BlueprintAssignable, Category = "ROS|Bridge")
    FOnRosBridgeConnected OnConnected;

    /** Broadcast when the WebSocket connection to rosbridge is lost. */
    UPROPERTY(BlueprintAssignable, Category = "ROS|Bridge")
    FOnRosBridgeDisconnected OnDisconnected;

private:
    // --- WebSocket event handlers (all run on non-game thread!) ---
    void HandleConnected();
    void HandleConnectionError(const FString& Error);
    void HandleClosed(int32 StatusCode, const FString& Reason, bool bWasClean);
    void HandleMessage(const FString& Message);

    // --- Game-thread message processing ---
    void ProcessIncomingMessage(const FString& Message);

    // --- Internal helpers ---

    /** Send raw JSON string over the WebSocket. Returns false if not connected. */
    bool SendRaw(const FString& JsonPayload);

    /** Send all tracked subscriptions to rosbridge. Called on (re)connect. */
    void RestoreSubscriptions();

    /** Send all tracked advertisements to rosbridge. Called on (re)connect. */
    void RestoreAdvertisements();

    // --- Reconnection ---

    /** Schedule a reconnection attempt with exponential backoff. */
    void ScheduleReconnect();

    /** Called by the reconnect timer. */
    void AttemptReconnect();

    /** The URL passed to Connect(), saved for reconnection. */
    FString ConnectionUrl;

    /** Whether auto-reconnection is active. Set true on Connect(), false on explicit Disconnect(). */
    bool bAutoReconnect = false;

    /** Current backoff delay in seconds (doubles each failure, caps at MaxReconnectDelay). */
    float CurrentReconnectDelay = 1.0f;

    /** Base reconnect delay in seconds. */
    static constexpr float BaseReconnectDelay = 1.0f;

    /** Maximum reconnect delay in seconds. */
    static constexpr float MaxReconnectDelay = 30.0f;

    /** Timer handle for the reconnect timer. */
    FTimerHandle ReconnectTimerHandle;

    /** Number of consecutive reconnect attempts (for logging). */
    int32 ReconnectAttemptCount = 0;

    // --- State ---

    /** The underlying WebSocket. Kept as a TSharedPtr because IWebSocket
     *  is a plain C++ interface, not a UObject, so UPROPERTY does not apply. */
    TSharedPtr<IWebSocket> Socket;

    /** Topics we have subscribed to, kept so we can re-subscribe on reconnect. */
    UPROPERTY()
    TMap<FString, FString> SubscribedTopics;

    /** Topics we have advertised, kept so we can re-advertise on reconnect. */
    UPROPERTY()
    TMap<FString, FString> AdvertisedTopics;
};

변경 사항

  • 새 델리게이트 선언 2개 추가 - 연결/끊김 이벤트를 액터들이 바인딩할 수 있음
    • DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnRosBridgeConnected);
    • DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnRosBridgeDisconnected);
  • 새 public 메서드 2개 추가 - rosbridge v2 프로토콜 준수. Publish()는 MsgJson을 JSON 객체로 파싱한 뒤 {"op":"publish","topic":"...","msg":{...}} 형태로 전송. 미리 Advertise하지 않은 토픽에 publish하면 경고 후 드롭.
    • Advertise(Topic, Type) — rosbridge에 토픽 광고
    • Publish(Topic, MsgJson) — 광고한 토픽에 메시지 송신
  • Subscribe 동작 변경
    • 기존: 연결 안 되어 있으면 경고 후 무시
    • 변경: 연결 전이어도 맵에 저장 → 연결 시 자동 전송 (주석 업데이트)

 

 

RosBridgeSubsystem.cpp

더보기
#include "RosBridgeSubsystem.h"
#include "RosBridgeLog.h"

#include "WebSocketsModule.h"
#include "IWebSocket.h"
#include "Async/Async.h"
#include "Dom/JsonObject.h"
#include "Dom/JsonValue.h"
#include "Serialization/JsonReader.h"
#include "Serialization/JsonSerializer.h"
#include "Serialization/JsonWriter.h"
#include "Engine/GameInstance.h"
#include "TimerManager.h"
#include "Engine/World.h"

// =============================================================================
// Subsystem lifecycle
// =============================================================================

void URosBridgeSubsystem::Initialize(FSubsystemCollectionBase& Collection)
{
    Super::Initialize(Collection);

    // The WebSockets module may not be loaded yet at this point (depends on
    // module startup order), so force-load it before we try to create a socket.
    if (!FModuleManager::Get().IsModuleLoaded("WebSockets"))
    {
        FModuleManager::Get().LoadModule("WebSockets");
    }

    UE_LOG(LogRosBridge, Log, TEXT("RosBridgeSubsystem initialized"));
}

void URosBridgeSubsystem::Deinitialize()
{
    // Disable auto-reconnect first so Disconnect doesn't trigger reconnection.
    bAutoReconnect = false;

    // Cancel any pending reconnect timer.
    if (UGameInstance* GI = GetGameInstance())
    {
        if (UWorld* World = GI->GetWorld())
        {
            World->GetTimerManager().ClearTimer(ReconnectTimerHandle);
        }
    }

    Disconnect();
    UE_LOG(LogRosBridge, Log, TEXT("RosBridgeSubsystem deinitialized"));

    Super::Deinitialize();
}

// =============================================================================
// Connection control
// =============================================================================

void URosBridgeSubsystem::Connect(const FString& Url)
{
    if (Socket.IsValid() && Socket->IsConnected())
    {
        UE_LOG(LogRosBridge, Warning,
            TEXT("Connect called but socket is already connected. Ignoring."));
        return;
    }

    // If we have an old socket that's not connected (e.g. from a failed attempt),
    // clean it up before creating a new one.
    if (Socket.IsValid())
    {
        Socket.Reset();
    }

    ConnectionUrl = Url;
    bAutoReconnect = true;
    CurrentReconnectDelay = BaseReconnectDelay;
    ReconnectAttemptCount = 0;

    UE_LOG(LogRosBridge, Log, TEXT("Connecting to %s ..."), *Url);

    Socket = FWebSocketsModule::Get().CreateWebSocket(Url);

    // Bind handlers. All of these fire on a non-game thread.
    Socket->OnConnected().AddUObject(this, &URosBridgeSubsystem::HandleConnected);
    Socket->OnConnectionError().AddUObject(this, &URosBridgeSubsystem::HandleConnectionError);
    Socket->OnClosed().AddUObject(this, &URosBridgeSubsystem::HandleClosed);
    Socket->OnMessage().AddUObject(this, &URosBridgeSubsystem::HandleMessage);

    Socket->Connect();
}

void URosBridgeSubsystem::Disconnect()
{
    // Explicit disconnect: disable auto-reconnect.
    bAutoReconnect = false;

    // Cancel pending reconnect timer.
    if (UGameInstance* GI = GetGameInstance())
    {
        if (UWorld* World = GI->GetWorld())
        {
            World->GetTimerManager().ClearTimer(ReconnectTimerHandle);
        }
    }

    if (Socket.IsValid())
    {
        if (Socket->IsConnected())
        {
            Socket->Close();
        }
        Socket.Reset();
    }
}

bool URosBridgeSubsystem::IsConnected() const
{
    return Socket.IsValid() && Socket->IsConnected();
}

// =============================================================================
// Topic operations — Subscribe (ROS → UE)
// =============================================================================

void URosBridgeSubsystem::Subscribe(const FString& Topic, const FString& Type)
{
    // Always track the subscription so it gets restored on reconnect.
    SubscribedTopics.Add(Topic, Type);

    if (!IsConnected())
    {
        UE_LOG(LogRosBridge, Log,
            TEXT("Subscribe('%s') queued — will be sent on connect."), *Topic);
        return;
    }

    // Build: {"op":"subscribe","topic":"<Topic>","type":"<Type>"}
    const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
    Json->SetStringField(TEXT("op"), TEXT("subscribe"));
    Json->SetStringField(TEXT("topic"), Topic);
    Json->SetStringField(TEXT("type"), Type);

    FString Payload;
    const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
    FJsonSerializer::Serialize(Json, Writer);

    SendRaw(Payload);
    UE_LOG(LogRosBridge, Log, TEXT("Subscribed to %s (%s)"), *Topic, *Type);
}

// =============================================================================
// Topic operations — Advertise / Publish (UE → ROS)
// =============================================================================

void URosBridgeSubsystem::Advertise(const FString& Topic, const FString& Type)
{
    // Always track the advertisement so it gets restored on reconnect.
    AdvertisedTopics.Add(Topic, Type);

    if (!IsConnected())
    {
        UE_LOG(LogRosBridge, Log,
            TEXT("Advertise('%s') queued — will be sent on connect."), *Topic);
        return;
    }

    // Build: {"op":"advertise","topic":"<Topic>","type":"<Type>"}
    const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
    Json->SetStringField(TEXT("op"), TEXT("advertise"));
    Json->SetStringField(TEXT("topic"), Topic);
    Json->SetStringField(TEXT("type"), Type);

    FString Payload;
    const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
    FJsonSerializer::Serialize(Json, Writer);

    SendRaw(Payload);
    UE_LOG(LogRosBridge, Log, TEXT("Advertised %s (%s)"), *Topic, *Type);
}

void URosBridgeSubsystem::Publish(const FString& Topic, const FString& MsgJson)
{
    if (!IsConnected())
    {
        UE_LOG(LogRosBridge, Warning,
            TEXT("Publish('%s') called but not connected. Dropping message."), *Topic);
        return;
    }

    if (!AdvertisedTopics.Contains(Topic))
    {
        UE_LOG(LogRosBridge, Warning,
            TEXT("Publish('%s') called but topic not advertised. Call Advertise() first. Dropping."), *Topic);
        return;
    }

    // Build: {"op":"publish","topic":"<Topic>","msg":<MsgJson>}
    //
    // We need to embed MsgJson as a JSON *object*, not as a string.
    // So we parse MsgJson first, then insert it as a sub-object.
    TSharedPtr<FJsonObject> MsgObject;
    const TSharedRef<TJsonReader<>> MsgReader = TJsonReaderFactory<>::Create(MsgJson);
    if (!FJsonSerializer::Deserialize(MsgReader, MsgObject) || !MsgObject.IsValid())
    {
        UE_LOG(LogRosBridge, Error,
            TEXT("Publish('%s'): failed to parse MsgJson as JSON object. Dropping."), *Topic);
        return;
    }

    const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
    Json->SetStringField(TEXT("op"), TEXT("publish"));
    Json->SetStringField(TEXT("topic"), Topic);
    Json->SetObjectField(TEXT("msg"), MsgObject);

    FString Payload;
    const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
    FJsonSerializer::Serialize(Json, Writer);

    SendRaw(Payload);
}

// =============================================================================
// Internal helpers
// =============================================================================

bool URosBridgeSubsystem::SendRaw(const FString& JsonPayload)
{
    if (!IsConnected())
    {
        return false;
    }

    Socket->Send(JsonPayload);
    return true;
}

void URosBridgeSubsystem::RestoreSubscriptions()
{
    for (const auto& Pair : SubscribedTopics)
    {
        const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
        Json->SetStringField(TEXT("op"), TEXT("subscribe"));
        Json->SetStringField(TEXT("topic"), Pair.Key);
        Json->SetStringField(TEXT("type"), Pair.Value);

        FString Payload;
        const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
        FJsonSerializer::Serialize(Json, Writer);

        SendRaw(Payload);
        UE_LOG(LogRosBridge, Log, TEXT("Re-subscribed to %s (%s)"), *Pair.Key, *Pair.Value);
    }
}

void URosBridgeSubsystem::RestoreAdvertisements()
{
    for (const auto& Pair : AdvertisedTopics)
    {
        const TSharedRef<FJsonObject> Json = MakeShared<FJsonObject>();
        Json->SetStringField(TEXT("op"), TEXT("advertise"));
        Json->SetStringField(TEXT("topic"), Pair.Key);
        Json->SetStringField(TEXT("type"), Pair.Value);

        FString Payload;
        const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Payload);
        FJsonSerializer::Serialize(Json, Writer);

        SendRaw(Payload);
        UE_LOG(LogRosBridge, Log, TEXT("Re-advertised %s (%s)"), *Pair.Key, *Pair.Value);
    }
}

// =============================================================================
// Reconnection
// =============================================================================

void URosBridgeSubsystem::ScheduleReconnect()
{
    if (!bAutoReconnect)
    {
        return;
    }

    UGameInstance* GI = GetGameInstance();
    if (!GI) return;

    UWorld* World = GI->GetWorld();
    if (!World) return;

    ReconnectAttemptCount++;
    UE_LOG(LogRosBridge, Warning,
        TEXT("Scheduling reconnect attempt #%d in %.1fs ..."),
        ReconnectAttemptCount, CurrentReconnectDelay);

    World->GetTimerManager().SetTimer(
        ReconnectTimerHandle,
        this,
        &URosBridgeSubsystem::AttemptReconnect,
        CurrentReconnectDelay,
        false  // not looping
    );

    // Exponential backoff: double the delay, cap at MaxReconnectDelay.
    CurrentReconnectDelay = FMath::Min(CurrentReconnectDelay * 2.0f, MaxReconnectDelay);
}

void URosBridgeSubsystem::AttemptReconnect()
{
    if (!bAutoReconnect || ConnectionUrl.IsEmpty())
    {
        return;
    }

    UE_LOG(LogRosBridge, Log,
        TEXT("Reconnect attempt #%d to %s ..."), ReconnectAttemptCount, *ConnectionUrl);

    // Clean up old socket if it still exists.
    if (Socket.IsValid())
    {
        Socket.Reset();
    }

    Socket = FWebSocketsModule::Get().CreateWebSocket(ConnectionUrl);

    Socket->OnConnected().AddUObject(this, &URosBridgeSubsystem::HandleConnected);
    Socket->OnConnectionError().AddUObject(this, &URosBridgeSubsystem::HandleConnectionError);
    Socket->OnClosed().AddUObject(this, &URosBridgeSubsystem::HandleClosed);
    Socket->OnMessage().AddUObject(this, &URosBridgeSubsystem::HandleMessage);

    Socket->Connect();
}

// =============================================================================
// WebSocket event handlers (NON-GAME THREAD)
// =============================================================================

void URosBridgeSubsystem::HandleConnected()
{
    // Marshal to game thread — we will touch UObject state (delegates, topic maps).
    TWeakObjectPtr<URosBridgeSubsystem> WeakThis(this);
    AsyncTask(ENamedThreads::GameThread, [WeakThis]()
        {
            if (URosBridgeSubsystem* StrongThis = WeakThis.Get())
            {
                // Reset backoff on successful connection.
                StrongThis->CurrentReconnectDelay = BaseReconnectDelay;
                StrongThis->ReconnectAttemptCount = 0;

                UE_LOG(LogRosBridge, Log, TEXT("WebSocket connected to %s"), *StrongThis->ConnectionUrl);

                // Restore all tracked subscriptions and advertisements.
                StrongThis->RestoreSubscriptions();
                StrongThis->RestoreAdvertisements();

                // Broadcast to listeners (RobotVisualizer, etc.)
                StrongThis->OnConnected.Broadcast();
            }
        });
}

void URosBridgeSubsystem::HandleConnectionError(const FString& Error)
{
    TWeakObjectPtr<URosBridgeSubsystem> WeakThis(this);
    AsyncTask(ENamedThreads::GameThread, [WeakThis, Error]()
        {
            if (URosBridgeSubsystem* StrongThis = WeakThis.Get())
            {
                UE_LOG(LogRosBridge, Error, TEXT("WebSocket connection error: %s"), *Error);
                StrongThis->OnDisconnected.Broadcast();
                StrongThis->ScheduleReconnect();
            }
        });
}

void URosBridgeSubsystem::HandleClosed(int32 StatusCode, const FString& Reason, bool bWasClean)
{
    TWeakObjectPtr<URosBridgeSubsystem> WeakThis(this);
    AsyncTask(ENamedThreads::GameThread, [WeakThis, StatusCode, Reason, bWasClean]()
        {
            if (URosBridgeSubsystem* StrongThis = WeakThis.Get())
            {
                UE_LOG(LogRosBridge, Warning,
                    TEXT("WebSocket closed (code=%d, clean=%s): %s"),
                    StatusCode, bWasClean ? TEXT("true") : TEXT("false"), *Reason);
                StrongThis->OnDisconnected.Broadcast();
                StrongThis->ScheduleReconnect();
            }
        });
}

void URosBridgeSubsystem::HandleMessage(const FString& Message)
{
    // We are NOT on the game thread. Marshal to the game thread before
    // touching any UObject state (including broadcasting the delegate).
    TWeakObjectPtr<URosBridgeSubsystem> WeakThis(this);
    AsyncTask(ENamedThreads::GameThread, [WeakThis, Message]()
        {
            if (URosBridgeSubsystem* StrongThis = WeakThis.Get())
            {
                StrongThis->ProcessIncomingMessage(Message);
            }
        });
}

// =============================================================================
// Game-thread message processing
// =============================================================================

void URosBridgeSubsystem::ProcessIncomingMessage(const FString& Message)
{
    // Parse the top-level JSON object.
    TSharedPtr<FJsonObject> Json;
    const TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Message);
    if (!FJsonSerializer::Deserialize(Reader, Json) || !Json.IsValid())
    {
        UE_LOG(LogRosBridge, Error, TEXT("Failed to parse incoming JSON: %s"), *Message);
        return;
    }

    // rosbridge v2 uses an "op" field to describe the operation.
    // For a subscribed topic we care about {"op":"publish","topic":...,"msg":{...}}.
    FString Op;
    if (!Json->TryGetStringField(TEXT("op"), Op))
    {
        UE_LOG(LogRosBridge, Warning, TEXT("Incoming message has no 'op' field: %s"), *Message);
        return;
    }

    if (Op == TEXT("publish"))
    {
        FString Topic;
        if (!Json->TryGetStringField(TEXT("topic"), Topic))
        {
            UE_LOG(LogRosBridge, Warning, TEXT("publish without topic: %s"), *Message);
            return;
        }

        // Re-serialize the "msg" sub-object back into a string. This keeps the
        // subsystem message-type-agnostic: the receiver parses whatever it needs.
        const TSharedPtr<FJsonObject>* MsgObjectPtr = nullptr;
        FString MsgJson;
        if (Json->TryGetObjectField(TEXT("msg"), MsgObjectPtr) && MsgObjectPtr && MsgObjectPtr->IsValid())
        {
            const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&MsgJson);
            FJsonSerializer::Serialize(MsgObjectPtr->ToSharedRef(), Writer);
        }

        OnTopicMessage.Broadcast(Topic, MsgJson);
    }
    else
    {
        // Other ops (status, service_response, etc.) - just log for now.
        UE_LOG(LogRosBridge, Verbose, TEXT("Unhandled op '%s': %s"), *Op, *Message);
    }
}

변경 사항

  • 자동 재연결: HandleConnectionError / HandleClosed에서 ScheduleReconnect() → 지수 백오프(1s→2s→4s→...→30s cap) → 재연결 성공 시 RestoreSubscriptions() + RestoreAdvertisements() 자동 복원. 명시적 Disconnect() 호출 시에만 재연결 비활성화.
  • Subscribe 큐잉: 연결 전에 Subscribe()/Advertise()를 호출해도 맵에 저장해뒀다가 연결 시 자동 전송.
  • 스레드 안전성 강화: HandleConnected, HandleConnectionError, HandleClosed도 모두 AsyncTask(GameThread, ...) 마샬링 추가.

 

 

RobotVisualizer.h

더보기
#pragma once

#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "RobotVisualizer.generated.h"

class UStaticMesh;
class UStaticMeshComponent;
class USceneComponent;
class URosBridgeSubsystem;

/**
 * Visualizes the SO-ARM-101 follower arm in Unreal Engine.
 *
 * The component hierarchy mirrors the URDF link/joint structure:
 *   BaseLink -> ShoulderPanJoint -> ShoulderLink -> ShoulderLiftJoint -> ...
 *
 * Each "Joint" SceneComponent is where the ROS joint angle gets applied
 * as a local Z-axis rotation. All child links/meshes rotate with it.
 *
 * Mesh offsets and joint origins are hardcoded from the URDF
 * (so101_follower.urdf), converted to UE coordinates (cm, left-handed).
 *
 * Usage:
 *   1. Place this actor in the level
 *   2. It auto-connects to rosbridge and subscribes to /joint_states
 *   3. Moving the real robot updates the 3D model in real time
 *
 * Phase 6: Now uses OnConnected delegate instead of timer polling.
 */
UCLASS()
class SO101_TWIN_API ARobotVisualizer : public AActor
{
	GENERATED_BODY()

public:
	ARobotVisualizer();

protected:
	virtual void BeginPlay() override;
	virtual void EndPlay(const EEndPlayReason::Type EndPlayReason) override;

	// --- Configuration ---

	UPROPERTY(EditAnywhere, Category = "ROS|Bridge")
	FString RosBridgeUrl = TEXT("ws://127.0.0.1:9090/?x=1");

	UPROPERTY(EditAnywhere, Category = "ROS|Topics")
	FString JointStateTopic = TEXT("/joint_states");

	UPROPERTY(EditAnywhere, Category = "ROS|Topics")
	FString JointStateType = TEXT("sensor_msgs/JointState");

private:
	// --- Component hierarchy ---

	// Root
	UPROPERTY(VisibleAnywhere, Category = "Robot")
	TObjectPtr<USceneComponent> RobotRoot;

	// Link SceneComponents (parent for mesh offsets)
	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> BaseLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> ShoulderLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> UpperArmLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> LowerArmLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> WristLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> GripperLink;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Links")
	TObjectPtr<USceneComponent> MovingJawLink;

	// Joint SceneComponents (rotation applied here)
	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ShoulderPanJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ShoulderLiftJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> ElbowFlexJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> WristFlexJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> WristRollJoint;

	UPROPERTY(VisibleAnywhere, Category = "Robot|Joints")
	TObjectPtr<USceneComponent> GripperJoint;

	// --- Joint name → component mapping ---
	UPROPERTY()
	TMap<FName, TObjectPtr<USceneComponent>> JointComponentMap;

	// --- Mesh components (for material assignment etc.) ---
	UPROPERTY()
	TArray<TObjectPtr<UStaticMeshComponent>> AllMeshComponents;

	// --- ROS connection ---

	/** Called when rosbridge connection is established. Subscribes to topics. */
	UFUNCTION()
	void OnRosBridgeConnected();

	UFUNCTION()
	void OnRosMessage(const FString& Topic, const FString& MessageJson);

	void ParseAndApplyJointStates(const FString& MessageJson);

	// --- Helpers ---

	/** Create a SceneComponent with a relative transform (already in UE coordinates). */
	USceneComponent* CreateJointComponent(const FName& Name, USceneComponent* Parent,
		const FVector& Location, const FRotator& Rotation);

	/** Create a SceneComponent as a link container. */
	USceneComponent* CreateLinkComponent(const FName& Name, USceneComponent* Parent);

	/** Attach a StaticMeshComponent to a link with the given mesh offset (UE coordinates). */
	UStaticMeshComponent* AttachMesh(USceneComponent* Parent, UStaticMesh* Mesh,
		const FName& Name, const FVector& Location, const FRotator& Rotation,
		bool bIsMotor);
};

변경 사항

  • SubscribeDelaySeconds UPROPERTY와 FTimerHandle SubscribeTimerHandle 제거 (타이머 폴링 방식 완전 폐기)

 

 

RobotVisualizer.cpp

더보기
#include "RobotVisualizer.h"
#include "RosCoordConv.h"
#include "RosBridgeSubsystem.h"
#include "RosBridgeLog.h"

#include "Components/SceneComponent.h"
#include "Components/StaticMeshComponent.h"
#include "Engine/StaticMesh.h"
#include "Engine/Engine.h"
#include "Engine/World.h"
#include "Kismet/GameplayStatics.h"
#include "Dom/JsonObject.h"
#include "Dom/JsonValue.h"
#include "Serialization/JsonReader.h"
#include "Serialization/JsonSerializer.h"
#include "UObject/ConstructorHelpers.h"

// =============================================================================
// Mesh asset path helper
// =============================================================================

static UStaticMesh* LoadMeshAsset(const TCHAR* AssetName)
{
	// All meshes live under /Game/Robot/Meshes/
	FString Path = FString::Printf(TEXT("/Game/Robot/Meshes/%s.%s"), AssetName, AssetName);
	UStaticMesh* Mesh = Cast<UStaticMesh>(StaticLoadObject(UStaticMesh::StaticClass(), nullptr, *Path));
	if (!Mesh)
	{
		UE_LOG(LogRosBridge, Warning, TEXT("Failed to load mesh: %s"), *Path);
	}
	return Mesh;
}

// =============================================================================
// Constructor — build the entire component hierarchy
// =============================================================================

ARobotVisualizer::ARobotVisualizer()
{
	PrimaryActorTick.bCanEverTick = false;

	// --- Root ---
	RobotRoot = CreateDefaultSubobject<USceneComponent>(TEXT("RobotRoot"));
	RootComponent = RobotRoot;

	// =========================================================================
	// URDF data converted to UE coordinates:
	//   Position: meters * 100 = cm, Y flipped
	//   Rotation: RPY radians -> FRotator degrees, pitch & yaw negated
	//
	// All values below are pre-computed from so101_follower.urdf.
	// =========================================================================

	// --- base_link (attached directly to root, no joint) ---
	BaseLink = CreateDefaultSubobject<USceneComponent>(TEXT("BaseLink"));
	BaseLink->SetupAttachment(RobotRoot);

	// --- shoulder_pan joint ---
	// URDF origin: xyz(0.0388353, 0, 0.0624) rpy(3.14159, 0, -3.14159)
	ShoulderPanJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderPanJoint"));
	ShoulderPanJoint->SetupAttachment(BaseLink);
	ShoulderPanJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0388353, 0.0, 0.0624));
	ShoulderPanJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159));

	ShoulderLink = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLink"));
	ShoulderLink->SetupAttachment(ShoulderPanJoint);

	// --- shoulder_lift joint ---
	// URDF origin: xyz(-0.0303992, -0.0182778, -0.0542) rpy(-1.5708, -1.5708, 0)
	ShoulderLiftJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ShoulderLiftJoint"));
	ShoulderLiftJoint->SetupAttachment(ShoulderLink);
	ShoulderLiftJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.0303992, -0.0182778, -0.0542));
	ShoulderLiftJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0));

	UpperArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("UpperArmLink"));
	UpperArmLink->SetupAttachment(ShoulderLiftJoint);

	// --- elbow_flex joint ---
	// URDF origin: xyz(-0.11257, -0.028, 0) rpy(0, 0, 1.5708)
	ElbowFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("ElbowFlexJoint"));
	ElbowFlexJoint->SetupAttachment(UpperArmLink);
	ElbowFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.11257, -0.028, 0.0));
	ElbowFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708));

	LowerArmLink = CreateDefaultSubobject<USceneComponent>(TEXT("LowerArmLink"));
	LowerArmLink->SetupAttachment(ElbowFlexJoint);

	// --- wrist_flex joint ---
	// URDF origin: xyz(-0.1349, 0.0052, 0) rpy(0, 0, -1.5708)
	WristFlexJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristFlexJoint"));
	WristFlexJoint->SetupAttachment(LowerArmLink);
	WristFlexJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(-0.1349, 0.0052, 0.0));
	WristFlexJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708));

	WristLink = CreateDefaultSubobject<USceneComponent>(TEXT("WristLink"));
	WristLink->SetupAttachment(WristFlexJoint);

	// --- wrist_roll joint ---
	// URDF origin: xyz(0, -0.0611, 0.0181) rpy(1.5708, 0.0486795, 3.14159)
	WristRollJoint = CreateDefaultSubobject<USceneComponent>(TEXT("WristRollJoint"));
	WristRollJoint->SetupAttachment(WristLink);
	WristRollJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0, -0.0611, 0.0181));
	WristRollJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159));

	GripperLink = CreateDefaultSubobject<USceneComponent>(TEXT("GripperLink"));
	GripperLink->SetupAttachment(WristRollJoint);

	// --- gripper joint ---
	// URDF origin: xyz(0.0202, 0.0188, -0.0234) rpy(1.5708, 0, 0)
	GripperJoint = CreateDefaultSubobject<USceneComponent>(TEXT("GripperJoint"));
	GripperJoint->SetupAttachment(GripperLink);
	GripperJoint->SetRelativeLocation(RosCoordConv::RosToUePosition(0.0202, 0.0188, -0.0234));
	GripperJoint->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0));

	MovingJawLink = CreateDefaultSubobject<USceneComponent>(TEXT("MovingJawLink"));
	MovingJawLink->SetupAttachment(GripperJoint);

	// --- Joint name mapping (matches ROS /joint_states names) ---
	JointComponentMap.Add(FName("shoulder_pan"),  ShoulderPanJoint);
	JointComponentMap.Add(FName("shoulder_lift"), ShoulderLiftJoint);
	JointComponentMap.Add(FName("elbow_flex"),    ElbowFlexJoint);
	JointComponentMap.Add(FName("wrist_flex"),    WristFlexJoint);
	JointComponentMap.Add(FName("wrist_roll"),    WristRollJoint);
	JointComponentMap.Add(FName("gripper"),       GripperJoint);
}

// =============================================================================
// BeginPlay — load meshes and attach, connect to ROS
// =============================================================================

void ARobotVisualizer::BeginPlay()
{
	Super::BeginPlay();

	// --- Load meshes and attach to links ---
	// Meshes are loaded at runtime (not in constructor) because
	// StaticLoadObject is safer to call here and allows hot-reload.

	// Helper lambda to reduce repetition
	auto Attach = [this](USceneComponent* Parent, const TCHAR* MeshName,
		double RosX, double RosY, double RosZ,
		double RosRoll, double RosPitch, double RosYaw,
		bool bIsMotor = false)
	{
		UStaticMesh* Mesh = LoadMeshAsset(MeshName);
		if (!Mesh) return;

		UStaticMeshComponent* SMC = NewObject<UStaticMeshComponent>(this);
		SMC->SetStaticMesh(Mesh);
		SMC->SetRelativeLocation(RosCoordConv::RosToUePosition(RosX, RosY, RosZ));
		SMC->SetRelativeRotation(RosCoordConv::RosRpyToUeRotator(RosRoll, RosPitch, RosYaw));
		SMC->SetCollisionEnabled(ECollisionEnabled::NoCollision);
		SMC->AttachToComponent(Parent, FAttachmentTransformRules::KeepRelativeTransform);
		SMC->RegisterComponent();
		AllMeshComponents.Add(SMC);
	};

	// === base_link meshes ===
	Attach(BaseLink, TEXT("base_motor_holder_so101_v1"),
		-0.00636471, -0.0000994414, -0.0024,
		1.5708, 0.0, 1.5708);
	Attach(BaseLink, TEXT("base_so101_v2"),
		-0.00636471, 0.0, -0.0024,
		1.5708, 0.0, 1.5708);
	Attach(BaseLink, TEXT("sts3215_03a_v1"),
		0.0263353, 0.0, 0.0437,
		0.0, 0.0, 0.0, true);
	Attach(BaseLink, TEXT("waveshare_mounting_plate_so101_v2"),
		-0.0309827, -0.000199441, 0.0474,
		1.5708, 0.0, 1.5708);

	// === shoulder_link meshes ===
	Attach(ShoulderLink, TEXT("sts3215_03a_v1"),
		-0.0303992, 0.000422241, -0.0417,
		1.5708, 1.5708, 0.0, true);
	Attach(ShoulderLink, TEXT("motor_holder_so101_base_v1"),
		-0.0675992, -0.000177759, 0.0158499,
		1.5708, -1.5708, 0.0);
	Attach(ShoulderLink, TEXT("rotation_pitch_so101_v1"),
		0.0122008, 0.0000222413, 0.0464,
		-1.5708, 0.0, 0.0);

	// === upper_arm_link meshes ===
	Attach(UpperArmLink, TEXT("sts3215_03a_v1"),
		-0.11257, -0.0155, 0.0187,
		-3.14159, 0.0, -1.5708, true);
	Attach(UpperArmLink, TEXT("upper_arm_so101_v1"),
		-0.065085, 0.012, 0.0182,
		3.14159, 0.0, 0.0);

	// === lower_arm_link meshes ===
	Attach(LowerArmLink, TEXT("under_arm_so101_v1"),
		-0.0648499, -0.032, 0.0182,
		3.14159, 0.0, 0.0);
	Attach(LowerArmLink, TEXT("motor_holder_so101_wrist_v1"),
		-0.0648499, -0.032, 0.018,
		-3.14159, 0.0, 0.0);
	Attach(LowerArmLink, TEXT("sts3215_03a_v1"),
		-0.1224, 0.0052, 0.0187,
		-3.14159, 0.0, -3.14159, true);

	// === wrist_link meshes ===
	Attach(WristLink, TEXT("sts3215_03a_no_horn_v1"),
		0.0, -0.0424, 0.0306,
		1.5708, 1.5708, 0.0, true);
	Attach(WristLink, TEXT("wrist_roll_pitch_so101_v2"),
		0.0, -0.028, 0.0181,
		-1.5708, -1.5708, 0.0);

	// === gripper_link meshes ===
	Attach(GripperLink, TEXT("sts3215_03a_v1"),
		0.0077, 0.0001, -0.0234,
		-1.5708, 0.0, 0.0, true);
	Attach(GripperLink, TEXT("wrist_roll_follower_so101_v1"),
		0.0, -0.000218214, 0.000949706,
		-3.14159, 0.0, 0.0);

	// === moving_jaw_link meshes ===
	Attach(MovingJawLink, TEXT("moving_jaw_so101_v1"),
		0.0, 0.0, 0.0189,
		0.0, 0.0, 0.0);

	UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: %d mesh components created"), AllMeshComponents.Num());

	// --- Connect to ROS via Subsystem ---
	UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
	if (!GI) return;

	URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
	if (!Ros) return;

	// Bind delegates.
	Ros->OnTopicMessage.AddDynamic(this, &ARobotVisualizer::OnRosMessage);
	Ros->OnConnected.AddDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);

	// Subscribe is now queued even before connection — the subsystem will
	// send it automatically when connected (including on reconnect).
	Ros->Subscribe(JointStateTopic, JointStateType);

	// Initiate connection if not already connected.
	if (!Ros->IsConnected())
	{
		Ros->Connect(RosBridgeUrl);
	}
	else
	{
		// Already connected (e.g. another actor already called Connect).
		// Subscribe was queued above and the subsystem saw it's already connected,
		// so it sent the subscribe immediately. Nothing else to do.
		UE_LOG(LogRosBridge, Log, TEXT("RobotVisualizer: already connected, subscription sent."));
	}
}

// =============================================================================
// EndPlay
// =============================================================================

void ARobotVisualizer::EndPlay(const EEndPlayReason::Type EndPlayReason)
{
	if (UGameInstance* GI = UGameplayStatics::GetGameInstance(this))
	{
		if (URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>())
		{
			Ros->OnTopicMessage.RemoveDynamic(this, &ARobotVisualizer::OnRosMessage);
			Ros->OnConnected.RemoveDynamic(this, &ARobotVisualizer::OnRosBridgeConnected);
		}
	}

	Super::EndPlay(EndPlayReason);
}

// =============================================================================
// ROS connection callback
// =============================================================================

void ARobotVisualizer::OnRosBridgeConnected()
{
	// The subsystem already restored all tracked subscriptions on connect,
	// so we don't need to re-subscribe here. This callback is available
	// for any per-actor logic that should run on (re)connect.
	UE_LOG(LogRosBridge, Log,
		TEXT("RobotVisualizer: rosbridge connected — subscriptions restored by subsystem."));
}

// =============================================================================
// Message handling
// =============================================================================

void ARobotVisualizer::OnRosMessage(const FString& Topic, const FString& MessageJson)
{
	if (Topic == JointStateTopic)
	{
		ParseAndApplyJointStates(MessageJson);
	}
}

void ARobotVisualizer::ParseAndApplyJointStates(const FString& MessageJson)
{
	// Parse sensor_msgs/JointState JSON:
	// { "name": ["shoulder_pan", ...], "position": [0.1, ...], ... }

	TSharedPtr<FJsonObject> Json;
	TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(MessageJson);
	if (!FJsonSerializer::Deserialize(Reader, Json) || !Json.IsValid())
	{
		return;
	}

	const TArray<TSharedPtr<FJsonValue>>* NameArray = nullptr;
	const TArray<TSharedPtr<FJsonValue>>* PosArray = nullptr;

	if (!Json->TryGetArrayField(TEXT("name"), NameArray) ||
		!Json->TryGetArrayField(TEXT("position"), PosArray))
	{
		return;
	}

	const int32 Count = FMath::Min(NameArray->Num(), PosArray->Num());
	for (int32 i = 0; i < Count; ++i)
	{
		const FName JointName(*(*NameArray)[i]->AsString());
		const double AngleRad = (*PosArray)[i]->AsNumber();

		TObjectPtr<USceneComponent>* JointComp = JointComponentMap.Find(JointName);
		if (JointComp && *JointComp)
		{
			// All URDF joints in this robot have axis="0 0 1" (local Z).
			// Apply the joint angle as a Yaw rotation on top of the base
			// joint orientation that was set in the constructor.
			const float AngleDeg = RosCoordConv::RosJointAngleToUeDegrees(AngleRad);

			// Re-derive base rotation from URDF (matches constructor values)
			// TODO: Cache base rotations for cleanliness.
			FRotator BaseRot;
			if (JointName == FName("shoulder_pan"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(3.14159, 0.0, -3.14159);
			else if (JointName == FName("shoulder_lift"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(-1.5708, -1.5708, 0.0);
			else if (JointName == FName("elbow_flex"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, 1.5708);
			else if (JointName == FName("wrist_flex"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(0.0, 0.0, -1.5708);
			else if (JointName == FName("wrist_roll"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0486795, 3.14159);
			else if (JointName == FName("gripper"))
				BaseRot = RosCoordConv::RosRpyToUeRotator(1.5708, 0.0, 0.0);

			// Compose: base rotation, then joint angle about local Z
			const FQuat BaseQuat = BaseRot.Quaternion();
			const FQuat JointQuat = FQuat(FVector::UpVector, FMath::DegreesToRadians(AngleDeg));
			const FQuat FinalQuat = BaseQuat * JointQuat;

			(*JointComp)->SetRelativeRotation(FinalQuat.Rotator());
		}
	}
}

변경 사항

  • DoSubscribe() 제거 → OnRosBridgeConnected() 콜백으로 대체
  • BeginPlay에서 연결 부분만 변경
    • 기존: OnTopicMessage.AddDynamic → Connect → SetTimer(DoSubscribe, 1초 후)
    • 변경: OnTopicMessage.AddDynamic + OnConnected.AddDynamic → Subscribe() 즉시 호출 (큐잉됨) → Connect
  • EndPlay에서 OnConnected.RemoveDynamic 해제 추가
    • 기존: OnTopicMessage.RemoveDynamic + ClearTimer
    • 변경: OnTopicMessage.RemoveDynamic + OnConnected.RemoveDynamic (타이머 관련 코드 제거)
  • TimerManager.h include 제거 (더 이상 불필요)

 

 

 

 

4개 파일을 모두 수정했으면 프로젝트파일.sln을 열어서 빌드한다. (SO101_Twin.sln)

빌드 성공 확인

 

 


 

 

2단계: delegate/queuing/reconnect 검증

WSL에서 rosbridge 띄우기

# WSL2 터미널 (ROS2 환경변수 설정 후)
ros2 launch rosbridge_server rosbridge_websocket_launch.xml

 

 

언리얼에서 레벨에 Robot Visualizer 액터를 하나 띄우고 실행하여

LogRosBridge: WebSocket connected to ws://127.0.0.1:9090/?x=1
LogRosBridge: Re-subscribed to /joint_states (sensor_msgs/JointState)
LogRosBridge: RobotVisualizer: rosbridge connected — subscriptions restored by subsystem.

Output Log에 위와 같은 출력이 뜨는지 확인

잘 나오면 델리게이트 + Subscribe 큐잉 정상 동작 확인된 것이다.

 

 

WSL 터미널을 Ctrl+C로 중단하고 Output Log에 

WebSocket closed (code=..., clean=...): ...
Scheduling reconnect attempt #1 in 1.0s ...
Reconnect attempt #1 to ws://127.0.0.1:9090/?x=1 ...
(rosbridge 꺼져있으니 실패) Scheduling reconnect attempt #2 in 2.0s ...

 

위와 같은 출력이 뜨는지 확인

 

 

다시 WSL 터미널에서 rosbridge를 띄우면 연결되는지 확인

# WSL2 터미널 (ROS2 환경변수 설정 후)
ros2 launch rosbridge_server rosbridge_websocket_launch.xml

 

 

Output Log에

WebSocket connected to ws://127.0.0.1:9090/?x=1
Re-subscribed to /joint_states ...

출력되는지 확인

 

 

여기까지 검증 완료된 내용

  • 연결 시 델리게이트 정상 브로드캐스트
  • Subscribe 큐잉 + 연결 시 자동 전송
  • 지수 백오프 (1s→2s→4s→8s→16s→30s cap) 정상
  • 재연결 후 Re-subscribe 자동 복원

 

 


 

 

3단계: 송신 검증

이제 Unreal에서 더미 토픽을 Publish해서 WSL2에서 수신되는지 확인해야 한다. 테스트 액터를 만들어서 RobotVisualizer가 연결된 상태에서 /unreal_test 토픽에 메시지를 보내고, WSL2에서 ros2 topic echo로 확인하는 방식.

 

 

예전에 만들어뒀던 RosTestActor를 이용하여 테스트한다. 해당 액터에 Publish 기능을 추가한다.

 

 

RosTestActor.h

더보기
#pragma once

#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "RosTestActor.generated.h"

/**
 * Test actor that verifies both directions of the rosbridge subsystem.
 *
 * - Subscribe: listens on a configurable topic (default /chatter) and prints to Output Log + screen
 * - Publish: advertises /unreal_test (std_msgs/String) and publishes a counter message every second
 *
 * Drop one instance into any level, then Play In Editor with rosbridge_server running.
 */
UCLASS()
class SO101_TWIN_API ARosTestActor : public AActor
{
    GENERATED_BODY()

public:
    ARosTestActor();

protected:
    virtual void BeginPlay() override;
    virtual void EndPlay(const EEndPlayReason::Type EndPlayReason) override;

    // --- Subscribe test config ---

    UPROPERTY(EditAnywhere, Category = "ROS|Test")
    FString RosBridgeUrl = TEXT("ws://127.0.0.1:9090/?x=1");

    UPROPERTY(EditAnywhere, Category = "ROS|Test")
    FString TopicName = TEXT("/chatter");

    UPROPERTY(EditAnywhere, Category = "ROS|Test")
    FString TopicType = TEXT("std_msgs/String");

    // --- Publish test config ---

    UPROPERTY(EditAnywhere, Category = "ROS|Test|Publish")
    FString PublishTopicName = TEXT("/unreal_test");

    UPROPERTY(EditAnywhere, Category = "ROS|Test|Publish")
    FString PublishTopicType = TEXT("std_msgs/String");

    /** Interval in seconds between published messages. */
    UPROPERTY(EditAnywhere, Category = "ROS|Test|Publish")
    float PublishIntervalSeconds = 1.0f;

    /** Enable/disable the publish test. */
    UPROPERTY(EditAnywhere, Category = "ROS|Test|Publish")
    bool bEnablePublishTest = true;

private:
    UFUNCTION()
    void OnRosMessage(const FString& Topic, const FString& MessageJson);

    UFUNCTION()
    void OnRosBridgeConnected();

    /** Publish one test message. Called by repeating timer. */
    void PublishTestMessage();

    FTimerHandle PublishTimerHandle;
    int32 PublishCounter = 0;
};

변경 사항

 

  • Publish 테스트 기능 추가: 연결되면 /unreal_test (std_msgs/String) 토픽을 Advertise하고, 1초마다 "Hello from Unreal #0", #1, #2... 메시지를 Publish. bEnablePublishTest 프로퍼티로 에디터에서 on/off 가능.
  • Publish 관련 멤버 추가: PublishTestMessage() 메서드, PublishTimerHandle, PublishCounter
  • OnConnected 델리게이트 콜백 추가: OnRosBridgeConnected() UFUNCTION
  • 타이머 폴링 → OnConnected 델리게이트 전환 — 기존 DoSubscribe() + SubscribeDelaySeconds 타이머 방식을 제거하고, Subscribe/Advertise를 BeginPlay에서 큐잉 + OnConnected에서 Publish 타이머 시작하는 방식으로 변경.

 

 

 

RosTestActor.cpp

더보기
#include "RosTestActor.h"
#include "RosBridgeSubsystem.h"
#include "RosBridgeLog.h"

#include "Engine/Engine.h"
#include "Engine/World.h"
#include "TimerManager.h"
#include "Kismet/GameplayStatics.h"

ARosTestActor::ARosTestActor()
{
    PrimaryActorTick.bCanEverTick = false;
}

void ARosTestActor::BeginPlay()
{
    Super::BeginPlay();

    UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
    if (!GI) return;

    URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
    if (!Ros) return;

    // Bind delegates.
    Ros->OnTopicMessage.AddDynamic(this, &ARosTestActor::OnRosMessage);
    Ros->OnConnected.AddDynamic(this, &ARosTestActor::OnRosBridgeConnected);

    // Queue subscribe — sent automatically when connected.
    Ros->Subscribe(TopicName, TopicType);

    // Queue advertise for publish test.
    if (bEnablePublishTest)
    {
        Ros->Advertise(PublishTopicName, PublishTopicType);
    }

    // Initiate connection if not already connected.
    if (!Ros->IsConnected())
    {
        Ros->Connect(RosBridgeUrl);
    }
}

void ARosTestActor::EndPlay(const EEndPlayReason::Type EndPlayReason)
{
    // Stop publish timer.
    if (GetWorld())
    {
        GetWorld()->GetTimerManager().ClearTimer(PublishTimerHandle);
    }

    // Unbind delegates.
    if (UGameInstance* GI = UGameplayStatics::GetGameInstance(this))
    {
        if (URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>())
        {
            Ros->OnTopicMessage.RemoveDynamic(this, &ARosTestActor::OnRosMessage);
            Ros->OnConnected.RemoveDynamic(this, &ARosTestActor::OnRosBridgeConnected);
        }
    }

    Super::EndPlay(EndPlayReason);
}

void ARosTestActor::OnRosBridgeConnected()
{
    UE_LOG(LogRosBridge, Log, TEXT("RosTestActor: connected — starting publish timer"));

    // Start repeating publish timer.
    if (bEnablePublishTest && GetWorld())
    {
        PublishCounter = 0;
        GetWorld()->GetTimerManager().SetTimer(
            PublishTimerHandle,
            this,
            &ARosTestActor::PublishTestMessage,
            PublishIntervalSeconds,
            true  // looping
        );
    }
}

void ARosTestActor::PublishTestMessage()
{
    UGameInstance* GI = UGameplayStatics::GetGameInstance(this);
    if (!GI) return;

    URosBridgeSubsystem* Ros = GI->GetSubsystem<URosBridgeSubsystem>();
    if (!Ros) return;

    // Build std_msgs/String JSON: {"data": "Hello from Unreal #42"}
    const FString Msg = FString::Printf(
        TEXT("{\"data\": \"Hello from Unreal #%d\"}"), PublishCounter);

    Ros->Publish(PublishTopicName, Msg);

    UE_LOG(LogRosBridge, Log, TEXT("Published to %s: %s"), *PublishTopicName, *Msg);
    PublishCounter++;
}

void ARosTestActor::OnRosMessage(const FString& Topic, const FString& MessageJson)
{
    UE_LOG(LogRosBridge, Log, TEXT("Received on %s: %s"), *Topic, *MessageJson);

    if (GEngine)
    {
        const FString ScreenMsg = FString::Printf(TEXT("%s: %s"), *Topic, *MessageJson);
        GEngine->AddOnScreenDebugMessage(-1, 2.0f, FColor::Green, ScreenMsg);
    }
}

변경 사항

 

  • BeginPlay 연결 방식 변경: 타이머로 DoSubscribe() 예약하는 방식 → OnConnected.AddDynamic 바인딩 + Subscribe()/Advertise() 즉시 큐잉
  • OnRosBridgeConnected() 신규: 연결 성공 시 1초 간격 반복 타이머로 PublishTestMessage() 시작
  • PublishTestMessage() 신규: {"data": "Hello from Unreal #N"} JSON 빌드 → Ros->Publish() 호출
  • DoSubscribe() 삭제: 타이머 폴링 함수 전체 제거
  • EndPlay 변경: OnConnected.RemoveDynamic 추가 + PublishTimerHandle 클리어 추가, 기존 SubscribeTimerHandle 클리어 제거

 

 

 

두 파일을 업데이트 했다면 프로젝트파일.sln에서 빌드한다.

빌드 성공 확인

 

 

언리얼에서 RosTestActor를 레벨에 배치

 

 

WSL에서 rosbridge 가동

export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 launch rosbridge_server rosbridge_websocket_launch.xml

 

 

언리얼 Play후 Output Log에서 Publish 출력 확인

 

 

WSL 터미널 하나 더 열고

source /opt/ros/humble/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 topic echo /unreal_test std_msgs/msg/String

데이터가 성공적으로 들어오면 성공

 

*혹시 잘 안되면 리스트 확인

export PATH=$(echo $PATH | tr ':' '\n' | grep -v miniforge | tr '\n' ':' | sed 's/:$//')
source /opt/ros/humble/setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export ROS_LOCALHOST_ONLY=1
ros2 topic list

 

*리스트 안나오면 CycloneDDS 또는 daemon 문제

ros2 daemon start
#완료 후
ros2 daemon stop
#완료 후
ros2 daemon start

 

 


 

 

완료 작업 정리

 

  • Connected/Disconnected 델리게이트: URosBridgeSubsystem에 OnConnected/OnDisconnected 이벤트 추가. RobotVisualizer와 RosTestActor 모두 타이머 폴링 방식에서 델리게이트 기반으로 전환.
  • Subscribe/Advertise 큐잉: 연결 전에 호출해도 맵에 저장했다가 연결 시 자동 전송. 재연결 시에도 자동 복원.
  • 자동 재연결: 지수 백오프(1s→2s→4s→8s→16s→30s cap). HandleConnected/HandleConnectionError/HandleClosed 모두 게임 스레드 마샬링 추가.
  • Advertise/Publish API: rosbridge v2 프로토콜 준수. MsgJson을 JSON 객체로 파싱 후 {"op":"publish","topic":"...","msg":{...}} 형태로 전송.
  • End-to-end 송신 검증: RosTestActor에서 /unreal_test (std_msgs/String) 1초 간격 Publish → WSL2에서 ros2 topic echo로 수신 성공.
  • 발견된 이슈: rosbridge 터미널도 CycloneDDS 환경변수 필수 (Appendix A #14). FastDDS로 띄우면 CycloneDDS 터미널과 DDS 도메인 불일치.

 

 

 


 

 

 

728x90
반응형