fix: resolve all three functional issues in trial execution

- WebSocket broadcasts: Next.js tRPC router now routes broadcasts via
  POST /internal/broadcast on the Bun ws-server process, which holds
  the actual client connections. Broadcasts were previously silently
  dropped due to the split singleton across processes.

- ws-server stubs: request_trial_status and request_trial_events now
  use the real async DB methods instead of the stub getTrialStatusSync/
  getTrialEventsSync that always returned null/[].

- Duplicate branch case: removed the unreachable second case "branch"
  block in executeAction switch; server-side branching is a pass-through
  since routing is client-orchestrated.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-08 13:24:32 -04:00
parent 943c7bd963
commit 14182bf078
4 changed files with 43 additions and 16 deletions
+8 -8
View File
@@ -593,7 +593,7 @@ export const trialsRouter = createTRPCRouter({
}); });
// Broadcast trial status update // Broadcast trial status update
await wsManager.broadcast(input.id, { await wsManager.broadcastExternal(input.id, {
type: "trial_status", type: "trial_status",
data: { data: {
trial: trial[0], trial: trial[0],
@@ -655,7 +655,7 @@ export const trialsRouter = createTRPCRouter({
}); });
// Broadcast trial status update // Broadcast trial status update
await wsManager.broadcast(input.id, { await wsManager.broadcastExternal(input.id, {
type: "trial_status", type: "trial_status",
data: { data: {
trial, trial,
@@ -718,7 +718,7 @@ export const trialsRouter = createTRPCRouter({
}); });
// Broadcast trial status update // Broadcast trial status update
await wsManager.broadcast(input.id, { await wsManager.broadcastExternal(input.id, {
type: "trial_status", type: "trial_status",
data: { data: {
trial: trial[0], trial: trial[0],
@@ -878,7 +878,7 @@ export const trialsRouter = createTRPCRouter({
.returning(); .returning();
// Broadcast new event to all subscribers // Broadcast new event to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "trial_event", type: "trial_event",
data: { data: {
event, event,
@@ -922,7 +922,7 @@ export const trialsRouter = createTRPCRouter({
.returning(); .returning();
// Broadcast intervention to all subscribers // Broadcast intervention to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "intervention_logged", type: "intervention_logged",
data: { data: {
intervention, intervention,
@@ -986,7 +986,7 @@ export const trialsRouter = createTRPCRouter({
} }
// Broadcast annotation to all subscribers // Broadcast annotation to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "annotation_added", type: "annotation_added",
data: { data: {
annotation, annotation,
@@ -1380,7 +1380,7 @@ export const trialsRouter = createTRPCRouter({
.returning(); .returning();
// Broadcast robot action to all subscribers // Broadcast robot action to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "trial_action_executed", type: "trial_action_executed",
data: { data: {
action_type: `${input.pluginName}.${input.actionId}`, action_type: `${input.pluginName}.${input.actionId}`,
@@ -1439,7 +1439,7 @@ export const trialsRouter = createTRPCRouter({
.returning(); .returning();
// Broadcast robot action to all subscribers // Broadcast robot action to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "trial_action_executed", type: "trial_action_executed",
data: { data: {
action_type: `${input.pluginName}.${input.actionId}`, action_type: `${input.pluginName}.${input.actionId}`,
-4
View File
@@ -441,10 +441,6 @@ export class TrialExecutionEngine {
case "hristudio-core.loop": case "hristudio-core.loop":
return await this.executeLoopAction(trialId, action); return await this.executeLoopAction(trialId, action);
case "branch":
case "hristudio-core.branch":
return await this.executeBranchAction(trialId, action);
default: default:
// Check if it's a robot action (contains plugin prefix) // Check if it's a robot action (contains plugin prefix)
if ( if (
+18
View File
@@ -146,6 +146,24 @@ class WebSocketManager {
); );
} }
// Called from Next.js tRPC router — POSTs to the Bun ws-server process
// which holds the actual client connections.
async broadcastExternal(
trialId: string,
message: OutgoingMessage,
): Promise<void> {
const wsPort = process.env.WS_PORT ?? "3001";
try {
await fetch(`http://localhost:${wsPort}/internal/broadcast`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ trialId, message }),
});
} catch (error) {
console.error(`[WS] Failed to broadcast externally for trial ${trialId}:`, error);
}
}
async broadcastToAll(message: OutgoingMessage): Promise<void> { async broadcastToAll(message: OutgoingMessage): Promise<void> {
const messageStr = JSON.stringify(message); const messageStr = JSON.stringify(message);
const disconnectedClients: string[] = []; const disconnectedClients: string[] = [];
+17 -4
View File
@@ -46,9 +46,22 @@ console.log(`Starting WebSocket server on port ${port}...`);
serve<WSData>({ serve<WSData>({
port, port,
fetch(req, server) { async fetch(req, server) {
const url = new URL(req.url); const url = new URL(req.url);
// Internal broadcast endpoint — called by Next.js tRPC router
if (url.pathname === "/internal/broadcast") {
if (req.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
const { trialId, message } = (await req.json()) as {
trialId: string;
message: { type: string; data: Record<string, unknown> };
};
await wsManager.broadcast(trialId, message);
return new Response("OK", { status: 200 });
}
if (url.pathname === "/api/websocket") { if (url.pathname === "/api/websocket") {
if (req.headers.get("upgrade") !== "websocket") { if (req.headers.get("upgrade") !== "websocket") {
return new Response("WebSocket upgrade required", { status: 426 }); return new Response("WebSocket upgrade required", { status: 426 });
@@ -114,7 +127,7 @@ serve<WSData>({
}), }),
); );
}, },
message(ws: ServerWebSocket<WSData>, message) { async message(ws: ServerWebSocket<WSData>, message) {
const { clientId, trialId } = ws.data; const { clientId, trialId } = ws.data;
try { try {
@@ -131,7 +144,7 @@ serve<WSData>({
break; break;
case "request_trial_status": { case "request_trial_status": {
const status = wsManager.getTrialStatusSync(trialId); const status = await wsManager.getTrialStatus(trialId);
ws.send( ws.send(
JSON.stringify({ JSON.stringify({
type: "trial_status", type: "trial_status",
@@ -146,7 +159,7 @@ serve<WSData>({
} }
case "request_trial_events": { case "request_trial_events": {
const events = wsManager.getTrialEventsSync( const events = await wsManager.getTrialEvents(
trialId, trialId,
msg.data?.limit ?? 100, msg.data?.limit ?? 100,
); );