4 Commits

Author SHA1 Message Date
soconnor 14182bf078 fix: resolve all three functional issues in trial execution
- WebSocket broadcasts: Next.js tRPC router now routes broadcasts via
  POST /internal/broadcast on the Bun ws-server process, which holds
  the actual client connections. Broadcasts were previously silently
  dropped due to the split singleton across processes.

- ws-server stubs: request_trial_status and request_trial_events now
  use the real async DB methods instead of the stub getTrialStatusSync/
  getTrialEventsSync that always returned null/[].

- Duplicate branch case: removed the unreachable second case "branch"
  block in executeAction switch; server-side branching is a pass-through
  since routing is client-orchestrated.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-08 13:24:32 -04:00
soconnor 943c7bd963 fix(wizard): skip unchosen branch steps during linear progression
When the wizard makes a branch choice, mark all other branch targets as
skipped. Linear progression now advances past skipped steps, so path
1→2→4 no longer executes step 3 when branch A was chosen.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-08 13:15:58 -04:00
soconnor 6b54724171 feat(wizard): enhance branching logic and add next step selection in PropertiesPanel 2026-04-08 12:08:33 -04:00
soconnor 86c1f35537 fix: SSH actions in experiment runner, branch ID serialization, and branch UI
- robot-communication.ts: add sshCommand to payloadMapping type
- trial-execution.ts: fix executeRobotActionWithComm to use ros2 key as
  implementation fallback and skip ROS connection for SSH actions
- route.ts: move studyId membership check inside initialize/executeSystemAction
  cases so executeSSH works without studyId; fix command param location
- experiments.ts: build tempId→dbUUID map on step insert and replace branch
  nextStepId references after all steps are saved
- WizardInterface.tsx: stop filtering branch actions from step action list

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:31:44 -04:00
12 changed files with 184 additions and 47 deletions
+33 -16
View File
@@ -21,27 +21,27 @@ export async function POST(request: NextRequest) {
const body = await request.json(); const body = await request.json();
const { action, studyId, robotId, parameters } = body; const { action, studyId, robotId, parameters } = body;
// Verify user has access to the study
const membership = await db.query.studyMembers.findFirst({
where: and(
eq(studyMembers.studyId, studyId),
eq(studyMembers.userId, session.user.id),
),
});
if (!membership || !["owner", "researcher"].includes(membership.role)) {
return NextResponse.json(
{ error: "Insufficient permissions" },
{ status: 403 },
);
}
const robotIp = const robotIp =
process.env.NAO_ROBOT_IP || process.env.NAO_IP || "134.82.159.168"; process.env.NAO_ROBOT_IP || process.env.NAO_IP || "134.82.159.168";
const password = process.env.NAO_PASSWORD || "robolab"; const password = process.env.NAO_PASSWORD || "robolab";
switch (action) { switch (action) {
case "initialize": { case "initialize": {
// Requires study membership
const membership = await db.query.studyMembers.findFirst({
where: and(
eq(studyMembers.studyId, studyId),
eq(studyMembers.userId, session.user.id),
),
});
if (!membership || !["owner", "researcher"].includes(membership.role)) {
return NextResponse.json(
{ error: "Insufficient permissions" },
{ status: 403 },
);
}
console.log(`[Robots API] Initializing robot at ${robotIp}`); console.log(`[Robots API] Initializing robot at ${robotIp}`);
const disableAlCmd = `sshpass -p "${password}" ssh -o StrictHostKeyChecking=no "nao@${robotIp}" "python2 -c \\"import sys; sys.path.append('/opt/aldebaran/lib/python2.7/site-packages'); import naoqi; al = naoqi.ALProxy('ALAutonomousLife', '127.0.0.1', 9559); al.setState('disabled')\\""`; const disableAlCmd = `sshpass -p "${password}" ssh -o StrictHostKeyChecking=no "nao@${robotIp}" "python2 -c \\"import sys; sys.path.append('/opt/aldebaran/lib/python2.7/site-packages'); import naoqi; al = naoqi.ALProxy('ALAutonomousLife', '127.0.0.1', 9559); al.setState('disabled')\\""`;
@@ -58,6 +58,21 @@ export async function POST(request: NextRequest) {
} }
case "executeSystemAction": { case "executeSystemAction": {
// Requires study membership
const membership = await db.query.studyMembers.findFirst({
where: and(
eq(studyMembers.studyId, studyId),
eq(studyMembers.userId, session.user.id),
),
});
if (!membership || !["owner", "researcher"].includes(membership.role)) {
return NextResponse.json(
{ error: "Insufficient permissions" },
{ status: 403 },
);
}
const { id, parameters: actionParams } = parameters ?? {}; const { id, parameters: actionParams } = parameters ?? {};
console.log(`[Robots API] Executing system action ${id}`); console.log(`[Robots API] Executing system action ${id}`);
@@ -145,7 +160,9 @@ export async function POST(request: NextRequest) {
} }
case "executeSSH": { case "executeSSH": {
const { command } = parameters ?? {}; // Session auth is sufficient — no studyId needed
// command may be top-level in body or nested under parameters
const { command } = parameters ?? body;
if (!command) { if (!command) {
return NextResponse.json( return NextResponse.json(
{ error: "Missing command parameter" }, { error: "Missing command parameter" },
@@ -835,6 +835,40 @@ export function PropertiesPanelBase({
</SelectContent> </SelectContent>
</Select> </Select>
</div> </div>
<div>
<Label className="text-xs">After this step, go to</Label>
<p className="text-muted-foreground mb-1 text-[10px]">
Override the next step (use to converge branch paths).
</p>
<Select
value={(selectedStep.trigger.conditions as any)?.nextStepId ?? "__linear__"}
onValueChange={(val) => {
onStepUpdate(selectedStep.id, {
trigger: {
...selectedStep.trigger,
conditions: {
...(selectedStep.trigger.conditions as any),
nextStepId: val === "__linear__" ? undefined : val,
},
},
});
}}
>
<SelectTrigger className="mt-1 h-7 w-full text-xs">
<SelectValue placeholder="Next step (default)" />
</SelectTrigger>
<SelectContent>
<SelectItem value="__linear__">Next step (default)</SelectItem>
{design.steps
.filter((s) => s.id !== selectedStep.id)
.map((s) => (
<SelectItem key={s.id} value={s.id}>
{s.name}
</SelectItem>
))}
</SelectContent>
</Select>
</div>
</div> </div>
</div> </div>
</div> </div>
@@ -430,8 +430,7 @@ export const WizardInterface = React.memo(function WizardInterface({
order: step.order ?? index, order: step.order ?? index,
actions: actions:
step.actions step.actions
?.filter((a) => a.type !== "branch") ?.map((action) => ({
.map((action) => ({
id: action.id, id: action.id,
name: action.name, name: action.name,
description: action.description, description: action.description,
@@ -793,8 +792,11 @@ export const WizardInterface = React.memo(function WizardInterface({
); );
} }
// Default: Linear progression // Default: Linear progression (skip steps marked as skipped by branching)
const nextIndex = currentStepIndex + 1; let nextIndex = currentStepIndex + 1;
while (nextIndex < steps.length && skippedSteps.has(nextIndex)) {
nextIndex++;
}
if (nextIndex < steps.length) { if (nextIndex < steps.length) {
// Mark current step as complete // Mark current step as complete
setCompletedSteps((prev) => { setCompletedSteps((prev) => {
@@ -923,8 +925,8 @@ export const WizardInterface = React.memo(function WizardInterface({
// Log action execution // Log action execution
console.log("Executing action:", actionId, parameters); console.log("Executing action:", actionId, parameters);
// Handle branching logic (wizard_wait_for_response) // Handle branching logic (wizard_wait_for_response / branch)
if (parameters?.value && parameters?.label) { if (parameters?.label || parameters?.nextStepId) {
setLastResponse(String(parameters.value)); setLastResponse(String(parameters.value));
// If nextStepId is provided, jump immediately // If nextStepId is provided, jump immediately
@@ -943,6 +945,24 @@ export const WizardInterface = React.memo(function WizardInterface({
console.log( console.log(
`[WizardInterface] Choice-based jump to step ${targetIndex} (${nextId})`, `[WizardInterface] Choice-based jump to step ${targetIndex} (${nextId})`,
); );
// Mark other branch targets as skipped so linear progression bypasses them
const branchingStep = steps[currentStepIndex];
const allOptions =
(branchingStep?.conditions?.options as any[]) ?? [];
setSkippedSteps((prev) => {
const next = new Set(prev);
for (const opt of allOptions) {
if (opt.nextStepId && opt.nextStepId !== nextId) {
const otherIdx = steps.findIndex(
(s) => s.id === opt.nextStepId,
);
if (otherIdx !== -1) next.add(otherIdx);
}
}
return next;
});
handleNextStep(targetIndex); handleNextStep(targetIndex);
return; // Exit after jump return; // Exit after jump
} else { } else {
@@ -499,6 +499,7 @@ export function WizardActionItem({
// Manual/Wizard Actions (Leaf nodes) // Manual/Wizard Actions (Leaf nodes)
!isContainer && !isContainer &&
action.type !== "wizard_wait_for_response" && action.type !== "wizard_wait_for_response" &&
!isBranch &&
!isCompleted && ( !isCompleted && (
<Button <Button
size="sm" size="sm"
@@ -524,7 +525,7 @@ export function WizardActionItem({
<div className="grid grid-cols-1 gap-2 pt-3 sm:grid-cols-2"> <div className="grid grid-cols-1 gap-2 pt-3 sm:grid-cols-2">
{(action.parameters.options as any[]).map((opt, optIdx) => { {(action.parameters.options as any[]).map((opt, optIdx) => {
const label = typeof opt === "string" ? opt : opt.label; const label = typeof opt === "string" ? opt : opt.label;
const value = typeof opt === "string" ? opt : opt.value; const value = typeof opt === "string" ? opt : (opt.value ?? opt.label);
const nextStepId = const nextStepId =
typeof opt === "object" ? opt.nextStepId : undefined; typeof opt === "object" ? opt.nextStepId : undefined;
+27 -1
View File
@@ -675,8 +675,11 @@ export const experimentsRouter = createTRPCRouter({
// Delete existing steps and actions for this experiment // Delete existing steps and actions for this experiment
await ctx.db.delete(steps).where(eq(steps.experimentId, id)); await ctx.db.delete(steps).where(eq(steps.experimentId, id));
// Map from designer temp step ID → new DB UUID (for branch nextStepId fix-up)
const stepIdMap = new Map<string, string>();
// Create new steps and actions // Create new steps and actions
for (const convertedStep of convertedSteps) { for (const [i, convertedStep] of convertedSteps.entries()) {
const [newStep] = await ctx.db const [newStep] = await ctx.db
.insert(steps) .insert(steps)
.values({ .values({
@@ -698,6 +701,10 @@ export const experimentsRouter = createTRPCRouter({
}); });
} }
// Record temp ID → real UUID so branch nextStepId refs can be fixed up
const tempId = normalizedSteps[i]?.id;
if (tempId) stepIdMap.set(tempId, newStep.id);
// Create actions for this step // Create actions for this step
for (const convertedAction of convertedStep.actions) { for (const convertedAction of convertedStep.actions) {
await ctx.db.insert(actions).values({ await ctx.db.insert(actions).values({
@@ -724,6 +731,25 @@ export const experimentsRouter = createTRPCRouter({
}); });
} }
} }
// Fix-up branch nextStepId: replace temp designer IDs with real DB UUIDs
// in both action parameters and step conditions
for (const [tempId, dbId] of stepIdMap) {
await ctx.db.execute(
sql`UPDATE ${actions}
SET parameters = replace(parameters::text, ${tempId}, ${dbId})::jsonb
WHERE step_id IN (
SELECT id FROM ${steps} WHERE experiment_id = ${id}
)
AND parameters::text LIKE ${"%" + tempId + "%"}`,
);
await ctx.db.execute(
sql`UPDATE ${steps}
SET conditions = replace(conditions::text, ${tempId}, ${dbId})::jsonb
WHERE experiment_id = ${id}
AND conditions::text LIKE ${"%" + tempId + "%"}`,
);
}
} catch (error) { } catch (error) {
throw new TRPCError({ throw new TRPCError({
code: "INTERNAL_SERVER_ERROR", code: "INTERNAL_SERVER_ERROR",
+8 -8
View File
@@ -593,7 +593,7 @@ export const trialsRouter = createTRPCRouter({
}); });
// Broadcast trial status update // Broadcast trial status update
await wsManager.broadcast(input.id, { await wsManager.broadcastExternal(input.id, {
type: "trial_status", type: "trial_status",
data: { data: {
trial: trial[0], trial: trial[0],
@@ -655,7 +655,7 @@ export const trialsRouter = createTRPCRouter({
}); });
// Broadcast trial status update // Broadcast trial status update
await wsManager.broadcast(input.id, { await wsManager.broadcastExternal(input.id, {
type: "trial_status", type: "trial_status",
data: { data: {
trial, trial,
@@ -718,7 +718,7 @@ export const trialsRouter = createTRPCRouter({
}); });
// Broadcast trial status update // Broadcast trial status update
await wsManager.broadcast(input.id, { await wsManager.broadcastExternal(input.id, {
type: "trial_status", type: "trial_status",
data: { data: {
trial: trial[0], trial: trial[0],
@@ -878,7 +878,7 @@ export const trialsRouter = createTRPCRouter({
.returning(); .returning();
// Broadcast new event to all subscribers // Broadcast new event to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "trial_event", type: "trial_event",
data: { data: {
event, event,
@@ -922,7 +922,7 @@ export const trialsRouter = createTRPCRouter({
.returning(); .returning();
// Broadcast intervention to all subscribers // Broadcast intervention to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "intervention_logged", type: "intervention_logged",
data: { data: {
intervention, intervention,
@@ -986,7 +986,7 @@ export const trialsRouter = createTRPCRouter({
} }
// Broadcast annotation to all subscribers // Broadcast annotation to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "annotation_added", type: "annotation_added",
data: { data: {
annotation, annotation,
@@ -1380,7 +1380,7 @@ export const trialsRouter = createTRPCRouter({
.returning(); .returning();
// Broadcast robot action to all subscribers // Broadcast robot action to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "trial_action_executed", type: "trial_action_executed",
data: { data: {
action_type: `${input.pluginName}.${input.actionId}`, action_type: `${input.pluginName}.${input.actionId}`,
@@ -1439,7 +1439,7 @@ export const trialsRouter = createTRPCRouter({
.returning(); .returning();
// Broadcast robot action to all subscribers // Broadcast robot action to all subscribers
await wsManager.broadcast(input.trialId, { await wsManager.broadcastExternal(input.trialId, {
type: "trial_action_executed", type: "trial_action_executed",
data: { data: {
action_type: `${input.pluginName}.${input.actionId}`, action_type: `${input.pluginName}.${input.actionId}`,
@@ -30,6 +30,7 @@ export interface RobotAction {
type?: string; type?: string;
transformFn?: string; transformFn?: string;
payload?: Record<string, unknown>; payload?: Record<string, unknown>;
sshCommand?: string;
}; };
ros2?: { ros2?: {
topic?: string; topic?: string;
@@ -40,6 +41,7 @@ export interface RobotAction {
type?: string; type?: string;
transformFn?: string; transformFn?: string;
payload?: Record<string, unknown>; payload?: Record<string, unknown>;
sshCommand?: string;
}; };
}; };
}; };
+15 -9
View File
@@ -441,10 +441,6 @@ export class TrialExecutionEngine {
case "hristudio-core.loop": case "hristudio-core.loop":
return await this.executeLoopAction(trialId, action); return await this.executeLoopAction(trialId, action);
case "branch":
case "hristudio-core.branch":
return await this.executeBranchAction(trialId, action);
default: default:
// Check if it's a robot action (contains plugin prefix) // Check if it's a robot action (contains plugin prefix)
if ( if (
@@ -799,8 +795,18 @@ export class TrialExecutionEngine {
parameters: Record<string, unknown>, parameters: Record<string, unknown>,
trialId: string, trialId: string,
): Promise<string> { ): Promise<string> {
// Ensure robot communication service is available // Plugin JSON uses a top-level "ros2" key; fall back to it if "implementation" is absent
if (!this.robotComm.getConnectionStatus()) { const impl = actionDefinition.implementation ?? actionDefinition.ros2;
// Determine if this action uses SSH (animations or explicit sshCommand)
const sshCommand =
impl?.payloadMapping?.sshCommand ||
impl?.ros2?.payloadMapping?.sshCommand;
const isSSHAction =
actionDefinition.id?.startsWith("play_animation_") || !!sshCommand;
// SSH actions bypass ROS bridge — only connect for ROS-dependent actions
if (!isSSHAction && !this.robotComm.getConnectionStatus()) {
try { try {
await this.robotComm.connect(); await this.robotComm.connect();
} catch (error) { } catch (error) {
@@ -810,12 +816,12 @@ export class TrialExecutionEngine {
} }
} }
// Prepare robot action - use action.type which contains the namespaced format (plugin.actionId) // Prepare robot action
const robotAction: RobotAction = { const robotAction: RobotAction = {
pluginName: plugin.name, pluginName: plugin.name,
actionId: action.type, // e.g., "nao6-ros2.play_animation_bow" actionId: actionDefinition.id, // e.g., "play_animation_yes"
parameters, parameters,
implementation: actionDefinition.implementation, implementation: impl,
}; };
// Execute action through robot communication service // Execute action through robot communication service
+18
View File
@@ -146,6 +146,24 @@ class WebSocketManager {
); );
} }
// Called from Next.js tRPC router — POSTs to the Bun ws-server process
// which holds the actual client connections.
async broadcastExternal(
trialId: string,
message: OutgoingMessage,
): Promise<void> {
const wsPort = process.env.WS_PORT ?? "3001";
try {
await fetch(`http://localhost:${wsPort}/internal/broadcast`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ trialId, message }),
});
} catch (error) {
console.error(`[WS] Failed to broadcast externally for trial ${trialId}:`, error);
}
}
async broadcastToAll(message: OutgoingMessage): Promise<void> { async broadcastToAll(message: OutgoingMessage): Promise<void> {
const messageStr = JSON.stringify(message); const messageStr = JSON.stringify(message);
const disconnectedClients: string[] = []; const disconnectedClients: string[] = [];
+1 -1
View File
@@ -30,7 +30,7 @@
], ],
"incremental": true, "incremental": true,
/* Path Aliases */ /* Path Aliases */
"baseUrl": ".", // "baseUrl": ".",
"paths": { "paths": {
"~/*": [ "~/*": [
"./src/*" "./src/*"
+17 -4
View File
@@ -46,9 +46,22 @@ console.log(`Starting WebSocket server on port ${port}...`);
serve<WSData>({ serve<WSData>({
port, port,
fetch(req, server) { async fetch(req, server) {
const url = new URL(req.url); const url = new URL(req.url);
// Internal broadcast endpoint — called by Next.js tRPC router
if (url.pathname === "/internal/broadcast") {
if (req.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
const { trialId, message } = (await req.json()) as {
trialId: string;
message: { type: string; data: Record<string, unknown> };
};
await wsManager.broadcast(trialId, message);
return new Response("OK", { status: 200 });
}
if (url.pathname === "/api/websocket") { if (url.pathname === "/api/websocket") {
if (req.headers.get("upgrade") !== "websocket") { if (req.headers.get("upgrade") !== "websocket") {
return new Response("WebSocket upgrade required", { status: 426 }); return new Response("WebSocket upgrade required", { status: 426 });
@@ -114,7 +127,7 @@ serve<WSData>({
}), }),
); );
}, },
message(ws: ServerWebSocket<WSData>, message) { async message(ws: ServerWebSocket<WSData>, message) {
const { clientId, trialId } = ws.data; const { clientId, trialId } = ws.data;
try { try {
@@ -131,7 +144,7 @@ serve<WSData>({
break; break;
case "request_trial_status": { case "request_trial_status": {
const status = wsManager.getTrialStatusSync(trialId); const status = await wsManager.getTrialStatus(trialId);
ws.send( ws.send(
JSON.stringify({ JSON.stringify({
type: "trial_status", type: "trial_status",
@@ -146,7 +159,7 @@ serve<WSData>({
} }
case "request_trial_events": { case "request_trial_events": {
const events = wsManager.getTrialEventsSync( const events = await wsManager.getTrialEvents(
trialId, trialId,
msg.data?.limit ?? 100, msg.data?.limit ?? 100,
); );