diff --git a/src/server/api/routers/trials.ts b/src/server/api/routers/trials.ts index 12f2a5a..70e7e13 100755 --- a/src/server/api/routers/trials.ts +++ b/src/server/api/routers/trials.ts @@ -593,7 +593,7 @@ export const trialsRouter = createTRPCRouter({ }); // Broadcast trial status update - await wsManager.broadcast(input.id, { + await wsManager.broadcastExternal(input.id, { type: "trial_status", data: { trial: trial[0], @@ -655,7 +655,7 @@ export const trialsRouter = createTRPCRouter({ }); // Broadcast trial status update - await wsManager.broadcast(input.id, { + await wsManager.broadcastExternal(input.id, { type: "trial_status", data: { trial, @@ -718,7 +718,7 @@ export const trialsRouter = createTRPCRouter({ }); // Broadcast trial status update - await wsManager.broadcast(input.id, { + await wsManager.broadcastExternal(input.id, { type: "trial_status", data: { trial: trial[0], @@ -878,7 +878,7 @@ export const trialsRouter = createTRPCRouter({ .returning(); // Broadcast new event to all subscribers - await wsManager.broadcast(input.trialId, { + await wsManager.broadcastExternal(input.trialId, { type: "trial_event", data: { event, @@ -922,7 +922,7 @@ export const trialsRouter = createTRPCRouter({ .returning(); // Broadcast intervention to all subscribers - await wsManager.broadcast(input.trialId, { + await wsManager.broadcastExternal(input.trialId, { type: "intervention_logged", data: { intervention, @@ -986,7 +986,7 @@ export const trialsRouter = createTRPCRouter({ } // Broadcast annotation to all subscribers - await wsManager.broadcast(input.trialId, { + await wsManager.broadcastExternal(input.trialId, { type: "annotation_added", data: { annotation, @@ -1380,7 +1380,7 @@ export const trialsRouter = createTRPCRouter({ .returning(); // Broadcast robot action to all subscribers - await wsManager.broadcast(input.trialId, { + await wsManager.broadcastExternal(input.trialId, { type: "trial_action_executed", data: { action_type: `${input.pluginName}.${input.actionId}`, @@ -1439,7 +1439,7 @@ export const trialsRouter = createTRPCRouter({ .returning(); // Broadcast robot action to all subscribers - await wsManager.broadcast(input.trialId, { + await wsManager.broadcastExternal(input.trialId, { type: "trial_action_executed", data: { action_type: `${input.pluginName}.${input.actionId}`, diff --git a/src/server/services/trial-execution.ts b/src/server/services/trial-execution.ts index ae5b607..75c9a9b 100755 --- a/src/server/services/trial-execution.ts +++ b/src/server/services/trial-execution.ts @@ -441,10 +441,6 @@ export class TrialExecutionEngine { case "hristudio-core.loop": return await this.executeLoopAction(trialId, action); - case "branch": - case "hristudio-core.branch": - return await this.executeBranchAction(trialId, action); - default: // Check if it's a robot action (contains plugin prefix) if ( diff --git a/src/server/services/websocket-manager.ts b/src/server/services/websocket-manager.ts index e7cc556..2720e62 100644 --- a/src/server/services/websocket-manager.ts +++ b/src/server/services/websocket-manager.ts @@ -146,6 +146,24 @@ class WebSocketManager { ); } + // Called from Next.js tRPC router — POSTs to the Bun ws-server process + // which holds the actual client connections. + async broadcastExternal( + trialId: string, + message: OutgoingMessage, + ): Promise { + const wsPort = process.env.WS_PORT ?? "3001"; + try { + await fetch(`http://localhost:${wsPort}/internal/broadcast`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ trialId, message }), + }); + } catch (error) { + console.error(`[WS] Failed to broadcast externally for trial ${trialId}:`, error); + } + } + async broadcastToAll(message: OutgoingMessage): Promise { const messageStr = JSON.stringify(message); const disconnectedClients: string[] = []; diff --git a/ws-server.ts b/ws-server.ts index 96f6923..b292ee0 100644 --- a/ws-server.ts +++ b/ws-server.ts @@ -46,9 +46,22 @@ console.log(`Starting WebSocket server on port ${port}...`); serve({ port, - fetch(req, server) { + async fetch(req, server) { const url = new URL(req.url); + // Internal broadcast endpoint — called by Next.js tRPC router + if (url.pathname === "/internal/broadcast") { + if (req.method !== "POST") { + return new Response("Method not allowed", { status: 405 }); + } + const { trialId, message } = (await req.json()) as { + trialId: string; + message: { type: string; data: Record }; + }; + await wsManager.broadcast(trialId, message); + return new Response("OK", { status: 200 }); + } + if (url.pathname === "/api/websocket") { if (req.headers.get("upgrade") !== "websocket") { return new Response("WebSocket upgrade required", { status: 426 }); @@ -114,7 +127,7 @@ serve({ }), ); }, - message(ws: ServerWebSocket, message) { + async message(ws: ServerWebSocket, message) { const { clientId, trialId } = ws.data; try { @@ -131,7 +144,7 @@ serve({ break; case "request_trial_status": { - const status = wsManager.getTrialStatusSync(trialId); + const status = await wsManager.getTrialStatus(trialId); ws.send( JSON.stringify({ type: "trial_status", @@ -146,7 +159,7 @@ serve({ } case "request_trial_events": { - const events = wsManager.getTrialEventsSync( + const events = await wsManager.getTrialEvents( trialId, msg.data?.limit ?? 100, );