mirror of
https://github.com/soconnor0919/hristudio.git
synced 2026-03-24 03:37:51 -04:00
feat: Redesign Landing, Auth, and Dashboard Pages
Also fixed schema type exports and seed script errors.
This commit is contained in:
4
src/server/api/root.ts
Normal file → Executable file
4
src/server/api/root.ts
Normal file → Executable file
@@ -4,12 +4,14 @@ import { authRouter } from "~/server/api/routers/auth";
|
||||
import { collaborationRouter } from "~/server/api/routers/collaboration";
|
||||
import { dashboardRouter } from "~/server/api/routers/dashboard";
|
||||
import { experimentsRouter } from "~/server/api/routers/experiments";
|
||||
import { filesRouter } from "~/server/api/routers/files";
|
||||
import { mediaRouter } from "~/server/api/routers/media";
|
||||
import { participantsRouter } from "~/server/api/routers/participants";
|
||||
import { robotsRouter } from "~/server/api/routers/robots";
|
||||
import { studiesRouter } from "~/server/api/routers/studies";
|
||||
import { trialsRouter } from "~/server/api/routers/trials";
|
||||
import { usersRouter } from "~/server/api/routers/users";
|
||||
import { storageRouter } from "~/server/api/routers/storage";
|
||||
import { createCallerFactory, createTRPCRouter } from "~/server/api/trpc";
|
||||
|
||||
/**
|
||||
@@ -25,11 +27,13 @@ export const appRouter = createTRPCRouter({
|
||||
participants: participantsRouter,
|
||||
trials: trialsRouter,
|
||||
robots: robotsRouter,
|
||||
files: filesRouter,
|
||||
media: mediaRouter,
|
||||
analytics: analyticsRouter,
|
||||
collaboration: collaborationRouter,
|
||||
admin: adminRouter,
|
||||
dashboard: dashboardRouter,
|
||||
storage: storageRouter,
|
||||
});
|
||||
|
||||
// export type definition of API
|
||||
|
||||
0
src/server/api/routers/admin.ts
Normal file → Executable file
0
src/server/api/routers/admin.ts
Normal file → Executable file
0
src/server/api/routers/analytics.ts
Normal file → Executable file
0
src/server/api/routers/analytics.ts
Normal file → Executable file
0
src/server/api/routers/auth.ts
Normal file → Executable file
0
src/server/api/routers/auth.ts
Normal file → Executable file
0
src/server/api/routers/collaboration.ts
Normal file → Executable file
0
src/server/api/routers/collaboration.ts
Normal file → Executable file
0
src/server/api/routers/dashboard.ts
Normal file → Executable file
0
src/server/api/routers/dashboard.ts
Normal file → Executable file
84
src/server/api/routers/experiments.ts
Normal file → Executable file
84
src/server/api/routers/experiments.ts
Normal file → Executable file
@@ -152,10 +152,10 @@ export const experimentsRouter = createTRPCRouter({
|
||||
.select({
|
||||
experimentId: trials.experimentId,
|
||||
latest: sql`max(GREATEST(
|
||||
COALESCE(${trials.completedAt}, 'epoch'::timestamptz),
|
||||
COALESCE(${trials.startedAt}, 'epoch'::timestamptz),
|
||||
COALESCE(${trials.createdAt}, 'epoch'::timestamptz)
|
||||
))`.as("latest"),
|
||||
COALESCE(${trials.completedAt}, 'epoch':: timestamptz),
|
||||
COALESCE(${trials.startedAt}, 'epoch':: timestamptz),
|
||||
COALESCE(${trials.createdAt}, 'epoch':: timestamptz)
|
||||
))`.as("latest"),
|
||||
})
|
||||
.from(trials)
|
||||
.where(inArray(trials.experimentId, experimentIds))
|
||||
@@ -360,24 +360,24 @@ export const experimentsRouter = createTRPCRouter({
|
||||
|
||||
const executionGraphSummary = stepsArray
|
||||
? {
|
||||
steps: stepsArray.length,
|
||||
actions: stepsArray.reduce((total, step) => {
|
||||
const acts = step.actions;
|
||||
return (
|
||||
total +
|
||||
(Array.isArray(acts)
|
||||
? acts.reduce(
|
||||
(aTotal, a) =>
|
||||
aTotal +
|
||||
(Array.isArray(a?.actions) ? a.actions.length : 0),
|
||||
0,
|
||||
)
|
||||
: 0)
|
||||
);
|
||||
}, 0),
|
||||
generatedAt: eg?.generatedAt ?? null,
|
||||
version: eg?.version ?? null,
|
||||
}
|
||||
steps: stepsArray.length,
|
||||
actions: stepsArray.reduce((total, step) => {
|
||||
const acts = step.actions;
|
||||
return (
|
||||
total +
|
||||
(Array.isArray(acts)
|
||||
? acts.reduce(
|
||||
(aTotal, a) =>
|
||||
aTotal +
|
||||
(Array.isArray(a?.actions) ? a.actions.length : 0),
|
||||
0,
|
||||
)
|
||||
: 0)
|
||||
);
|
||||
}, 0),
|
||||
generatedAt: eg?.generatedAt ?? null,
|
||||
version: eg?.version ?? null,
|
||||
}
|
||||
: null;
|
||||
|
||||
return {
|
||||
@@ -511,8 +511,7 @@ export const experimentsRouter = createTRPCRouter({
|
||||
return {
|
||||
valid: false,
|
||||
issues: [
|
||||
`Compilation failed: ${
|
||||
err instanceof Error ? err.message : "Unknown error"
|
||||
`Compilation failed: ${err instanceof Error ? err.message : "Unknown error"
|
||||
}`,
|
||||
],
|
||||
pluginDependencies: [],
|
||||
@@ -541,13 +540,13 @@ export const experimentsRouter = createTRPCRouter({
|
||||
integrityHash: compiledGraph?.hash ?? null,
|
||||
compiled: compiledGraph
|
||||
? {
|
||||
steps: compiledGraph.steps.length,
|
||||
actions: compiledGraph.steps.reduce(
|
||||
(acc, s) => acc + s.actions.length,
|
||||
0,
|
||||
),
|
||||
transportSummary: summarizeTransports(compiledGraph.steps),
|
||||
}
|
||||
steps: compiledGraph.steps.length,
|
||||
actions: compiledGraph.steps.reduce(
|
||||
(acc, s) => acc + s.actions.length,
|
||||
0,
|
||||
),
|
||||
transportSummary: summarizeTransports(compiledGraph.steps),
|
||||
}
|
||||
: null,
|
||||
};
|
||||
}),
|
||||
@@ -570,6 +569,7 @@ export const experimentsRouter = createTRPCRouter({
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const { id, createSteps, compileExecution, ...updateData } = input;
|
||||
const userId = ctx.session.user.id;
|
||||
console.log("[DEBUG] experiments.update called", { id, visualDesign: updateData.visualDesign, createSteps });
|
||||
|
||||
// Get experiment to check study access
|
||||
const experiment = await ctx.db.query.experiments.findFirst({
|
||||
@@ -607,7 +607,7 @@ export const experimentsRouter = createTRPCRouter({
|
||||
if (issues.length) {
|
||||
throw new TRPCError({
|
||||
code: "BAD_REQUEST",
|
||||
message: `Visual design validation failed:\n- ${issues.join("\n- ")}`,
|
||||
message: `Visual design validation failed: \n - ${issues.join("\n- ")}`,
|
||||
});
|
||||
}
|
||||
normalizedSteps = guardedSteps;
|
||||
@@ -637,11 +637,10 @@ export const experimentsRouter = createTRPCRouter({
|
||||
} catch (compileErr) {
|
||||
throw new TRPCError({
|
||||
code: "BAD_REQUEST",
|
||||
message: `Execution graph compilation failed: ${
|
||||
compileErr instanceof Error
|
||||
? compileErr.message
|
||||
: "Unknown error"
|
||||
}`,
|
||||
message: `Execution graph compilation failed: ${compileErr instanceof Error
|
||||
? compileErr.message
|
||||
: "Unknown error"
|
||||
}`,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -735,11 +734,13 @@ export const experimentsRouter = createTRPCRouter({
|
||||
|
||||
const updatedExperiment = updatedExperimentResults[0];
|
||||
if (!updatedExperiment) {
|
||||
console.error("[DEBUG] Failed to update experiment - no result returned");
|
||||
throw new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
message: "Failed to update experiment",
|
||||
});
|
||||
}
|
||||
console.log("[DEBUG] Experiment updated successfully", { updatedAt: updatedExperiment.updatedAt });
|
||||
|
||||
// Log activity
|
||||
await ctx.db.insert(activityLogs).values({
|
||||
@@ -1541,6 +1542,15 @@ export const experimentsRouter = createTRPCRouter({
|
||||
parameters: step.conditions as Record<string, unknown>,
|
||||
parentId: undefined, // Not supported in current schema
|
||||
children: [], // TODO: implement hierarchical steps if needed
|
||||
actions: step.actions.map((action) => ({
|
||||
id: action.id,
|
||||
name: action.name,
|
||||
description: action.description,
|
||||
type: action.type,
|
||||
order: action.orderIndex,
|
||||
parameters: action.parameters as Record<string, unknown>,
|
||||
pluginId: action.pluginId,
|
||||
})),
|
||||
}));
|
||||
}),
|
||||
|
||||
|
||||
146
src/server/api/routers/files.ts
Normal file
146
src/server/api/routers/files.ts
Normal file
@@ -0,0 +1,146 @@
|
||||
import { z } from "zod";
|
||||
import { createTRPCRouter, protectedProcedure } from "~/server/api/trpc";
|
||||
import { participantDocuments } from "~/server/db/schema";
|
||||
import { TRPCError } from "@trpc/server";
|
||||
import { env } from "~/env";
|
||||
import * as Minio from "minio";
|
||||
import { uuid } from "drizzle-orm/pg-core";
|
||||
import { eq, desc } from "drizzle-orm";
|
||||
|
||||
// Initialize MinIO client
|
||||
// Note: In production, ensure these ENV vars are set.
|
||||
// For development with docker-compose, we use localhost:9000
|
||||
const minioClient = new Minio.Client({
|
||||
endPoint: (env.MINIO_ENDPOINT ?? "localhost").split(":")[0] ?? "localhost",
|
||||
port: parseInt((env.MINIO_ENDPOINT ?? "9000").split(":")[1] ?? "9000"),
|
||||
useSSL: false, // Default to false for local dev; adjust for prod
|
||||
accessKey: env.MINIO_ACCESS_KEY ?? "minioadmin",
|
||||
secretKey: env.MINIO_SECRET_KEY ?? "minioadmin",
|
||||
});
|
||||
|
||||
const BUCKET_NAME = env.MINIO_BUCKET_NAME ?? "hristudio-assets";
|
||||
|
||||
// Ensure bucket exists on startup (best effort)
|
||||
const ensureBucket = async () => {
|
||||
try {
|
||||
const exists = await minioClient.bucketExists(BUCKET_NAME);
|
||||
if (!exists) {
|
||||
await minioClient.makeBucket(BUCKET_NAME, env.MINIO_REGION ?? "us-east-1");
|
||||
// Set public policy if needed? For now, keep private and use presigned URLs.
|
||||
}
|
||||
} catch (e) {
|
||||
console.error("Error ensuring MinIO bucket exists:", e);
|
||||
}
|
||||
}
|
||||
void ensureBucket(); // Fire and forget on load
|
||||
|
||||
export const filesRouter = createTRPCRouter({
|
||||
// Get a presigned URL for uploading a file
|
||||
getPresignedUrl: protectedProcedure
|
||||
.input(z.object({
|
||||
filename: z.string(),
|
||||
contentType: z.string(),
|
||||
participantId: z.string(),
|
||||
}))
|
||||
.mutation(async ({ input }) => {
|
||||
const fileExtension = input.filename.split(".").pop();
|
||||
const uniqueFilename = `${input.participantId}/${crypto.randomUUID()}.${fileExtension}`;
|
||||
|
||||
try {
|
||||
const presignedUrl = await minioClient.presignedPutObject(
|
||||
BUCKET_NAME,
|
||||
uniqueFilename,
|
||||
60 * 5 // 5 minutes expiry
|
||||
);
|
||||
|
||||
return {
|
||||
url: presignedUrl,
|
||||
storagePath: uniqueFilename, // Pass this back to client to save in DB after upload
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error generating presigned URL:", error);
|
||||
throw new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
message: "Failed to generate upload URL",
|
||||
});
|
||||
}
|
||||
}),
|
||||
|
||||
// Get a presigned URL for downloading/viewing a file
|
||||
getDownloadUrl: protectedProcedure
|
||||
.input(z.object({
|
||||
storagePath: z.string(),
|
||||
}))
|
||||
.query(async ({ input }) => {
|
||||
try {
|
||||
const url = await minioClient.presignedGetObject(
|
||||
BUCKET_NAME,
|
||||
input.storagePath,
|
||||
60 * 60 // 1 hour
|
||||
);
|
||||
return { url };
|
||||
} catch (error) {
|
||||
throw new TRPCError({
|
||||
code: "NOT_FOUND",
|
||||
message: "File not found or storage error",
|
||||
});
|
||||
}
|
||||
}),
|
||||
|
||||
// Record a successful upload in the database
|
||||
registerUpload: protectedProcedure
|
||||
.input(z.object({
|
||||
participantId: z.string(),
|
||||
name: z.string(),
|
||||
type: z.string().optional(),
|
||||
storagePath: z.string(),
|
||||
fileSize: z.number().optional(),
|
||||
}))
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
await ctx.db.insert(participantDocuments).values({
|
||||
participantId: input.participantId,
|
||||
name: input.name,
|
||||
type: input.type,
|
||||
storagePath: input.storagePath,
|
||||
fileSize: input.fileSize,
|
||||
uploadedBy: ctx.session.user.id,
|
||||
});
|
||||
}),
|
||||
|
||||
// List documents for a participant
|
||||
listParticipantDocuments: protectedProcedure
|
||||
.input(z.object({ participantId: z.string() }))
|
||||
.query(async ({ ctx, input }) => {
|
||||
return await ctx.db.query.participantDocuments.findMany({
|
||||
where: eq(participantDocuments.participantId, input.participantId),
|
||||
orderBy: [desc(participantDocuments.createdAt)],
|
||||
with: {
|
||||
// Optional: join with uploader info if needed
|
||||
}
|
||||
});
|
||||
}),
|
||||
|
||||
// Delete a document
|
||||
deleteDocument: protectedProcedure
|
||||
.input(z.object({ id: z.string() }))
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const doc = await ctx.db.query.participantDocuments.findFirst({
|
||||
where: eq(participantDocuments.id, input.id),
|
||||
});
|
||||
|
||||
if (!doc) {
|
||||
throw new TRPCError({ code: "NOT_FOUND", message: "Document not found" });
|
||||
}
|
||||
|
||||
// Delete from database
|
||||
await ctx.db.delete(participantDocuments).where(eq(participantDocuments.id, input.id));
|
||||
|
||||
// Delete from MinIO (fire and forget or await)
|
||||
try {
|
||||
await minioClient.removeObject(BUCKET_NAME, doc.storagePath);
|
||||
} catch (e) {
|
||||
console.error("Failed to delete object from S3:", e);
|
||||
// We still consider the operation successful for the user as the DB record is gone.
|
||||
}
|
||||
}),
|
||||
});
|
||||
0
src/server/api/routers/media.ts
Normal file → Executable file
0
src/server/api/routers/media.ts
Normal file → Executable file
0
src/server/api/routers/participants.ts
Normal file → Executable file
0
src/server/api/routers/participants.ts
Normal file → Executable file
0
src/server/api/routers/robots.ts
Normal file → Executable file
0
src/server/api/routers/robots.ts
Normal file → Executable file
71
src/server/api/routers/storage.ts
Normal file
71
src/server/api/routers/storage.ts
Normal file
@@ -0,0 +1,71 @@
|
||||
|
||||
import { z } from "zod";
|
||||
import { createTRPCRouter, protectedProcedure } from "~/server/api/trpc";
|
||||
import { s3Client } from "~/server/storage";
|
||||
import { PutObjectCommand } from "@aws-sdk/client-s3";
|
||||
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
|
||||
import { env } from "~/env";
|
||||
import { TRPCError } from "@trpc/server";
|
||||
import { db } from "~/server/db";
|
||||
import { mediaCaptures } from "~/server/db/schema";
|
||||
|
||||
export const storageRouter = createTRPCRouter({
|
||||
getUploadPresignedUrl: protectedProcedure
|
||||
.input(
|
||||
z.object({
|
||||
filename: z.string(),
|
||||
contentType: z.string(),
|
||||
})
|
||||
)
|
||||
.mutation(async ({ input }) => {
|
||||
const bucket = env.MINIO_BUCKET_NAME ?? "hristudio-data";
|
||||
const key = input.filename;
|
||||
|
||||
try {
|
||||
const command = new PutObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
ContentType: input.contentType,
|
||||
});
|
||||
|
||||
const url = await getSignedUrl(s3Client, command, { expiresIn: 3600 });
|
||||
|
||||
return {
|
||||
url,
|
||||
key,
|
||||
bucket,
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error generating presigned URL:", error);
|
||||
throw new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
message: "Failed to generate upload URL",
|
||||
});
|
||||
}
|
||||
}),
|
||||
saveRecording: protectedProcedure
|
||||
.input(
|
||||
z.object({
|
||||
trialId: z.string(),
|
||||
storagePath: z.string(),
|
||||
fileSize: z.number().optional(),
|
||||
format: z.string().optional(),
|
||||
mediaType: z.enum(["video", "audio", "image"]).default("video"),
|
||||
})
|
||||
)
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const { db } = ctx;
|
||||
|
||||
await db.insert(mediaCaptures).values({
|
||||
trialId: input.trialId,
|
||||
mediaType: input.mediaType,
|
||||
storagePath: input.storagePath,
|
||||
fileSize: input.fileSize,
|
||||
format: input.format,
|
||||
startTimestamp: new Date(), // Approximate
|
||||
// metadata: { uploadedBy: ctx.session.user.id }
|
||||
});
|
||||
|
||||
return { success: true };
|
||||
}),
|
||||
});
|
||||
0
src/server/api/routers/studies.ts
Normal file → Executable file
0
src/server/api/routers/studies.ts
Normal file → Executable file
352
src/server/api/routers/trials.ts
Normal file → Executable file
352
src/server/api/routers/trials.ts
Normal file → Executable file
@@ -24,8 +24,16 @@ import {
|
||||
wizardInterventions,
|
||||
mediaCaptures,
|
||||
users,
|
||||
annotations,
|
||||
} from "~/server/db/schema";
|
||||
import { TrialExecutionEngine } from "~/server/services/trial-execution";
|
||||
import {
|
||||
TrialExecutionEngine,
|
||||
type ActionDefinition,
|
||||
} from "~/server/services/trial-execution";
|
||||
import { s3Client } from "~/server/storage";
|
||||
import { GetObjectCommand } from "@aws-sdk/client-s3";
|
||||
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
|
||||
import { env } from "~/env";
|
||||
|
||||
// Helper function to check if user has access to trial
|
||||
async function checkTrialAccess(
|
||||
@@ -260,7 +268,41 @@ export const trialsRouter = createTRPCRouter({
|
||||
});
|
||||
}
|
||||
|
||||
return trial[0];
|
||||
// Fetch additional stats
|
||||
const eventCount = await db
|
||||
.select({ count: count() })
|
||||
.from(trialEvents)
|
||||
.where(eq(trialEvents.trialId, input.id));
|
||||
|
||||
const media = await db
|
||||
.select()
|
||||
.from(mediaCaptures)
|
||||
.where(eq(mediaCaptures.trialId, input.id))
|
||||
.orderBy(desc(mediaCaptures.createdAt)); // Get latest first
|
||||
|
||||
return {
|
||||
...trial[0],
|
||||
eventCount: eventCount[0]?.count ?? 0,
|
||||
mediaCount: media.length,
|
||||
media: await Promise.all(media.map(async (m) => {
|
||||
let url = "";
|
||||
try {
|
||||
// Generate Presigned GET URL
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: env.MINIO_BUCKET_NAME ?? "hristudio-data",
|
||||
Key: m.storagePath,
|
||||
});
|
||||
url = await getSignedUrl(s3Client, command, { expiresIn: 3600 });
|
||||
} catch (e) {
|
||||
console.error("Failed to sign URL for media", m.id, e);
|
||||
}
|
||||
return {
|
||||
...m,
|
||||
url, // Add the signed URL to the response
|
||||
contentType: m.format === 'webm' ? 'video/webm' : 'application/octet-stream', // Infer or store content type
|
||||
};
|
||||
})),
|
||||
};
|
||||
}),
|
||||
|
||||
create: protectedProcedure
|
||||
@@ -381,6 +423,58 @@ export const trialsRouter = createTRPCRouter({
|
||||
return trial;
|
||||
}),
|
||||
|
||||
duplicate: protectedProcedure
|
||||
.input(
|
||||
z.object({
|
||||
id: z.string(),
|
||||
}),
|
||||
)
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const { db } = ctx;
|
||||
const userId = ctx.session.user.id;
|
||||
|
||||
await checkTrialAccess(db, userId, input.id, [
|
||||
"owner",
|
||||
"researcher",
|
||||
"wizard",
|
||||
]);
|
||||
|
||||
// Get source trial
|
||||
const sourceTrial = await db
|
||||
.select()
|
||||
.from(trials)
|
||||
.where(eq(trials.id, input.id))
|
||||
.limit(1);
|
||||
|
||||
if (!sourceTrial[0]) {
|
||||
throw new TRPCError({
|
||||
code: "NOT_FOUND",
|
||||
message: "Source trial not found",
|
||||
});
|
||||
}
|
||||
|
||||
// Create new trial based on source
|
||||
const [newTrial] = await db
|
||||
.insert(trials)
|
||||
.values({
|
||||
experimentId: sourceTrial[0].experimentId,
|
||||
participantId: sourceTrial[0].participantId,
|
||||
// Scheduled for now + 1 hour by default, or null? Let's use null or source time?
|
||||
// New duplicate usually implies "planning to run soon".
|
||||
// I'll leave scheduledAt null or same as source if future?
|
||||
// Let's set it to tomorrow by default to avoid confusion
|
||||
scheduledAt: new Date(Date.now() + 24 * 60 * 60 * 1000),
|
||||
wizardId: sourceTrial[0].wizardId,
|
||||
sessionNumber: (sourceTrial[0].sessionNumber || 0) + 1, // Increment session
|
||||
status: "scheduled",
|
||||
notes: `Duplicate of trial ${sourceTrial[0].id}. ${sourceTrial[0].notes || ""}`,
|
||||
metadata: sourceTrial[0].metadata,
|
||||
})
|
||||
.returning();
|
||||
|
||||
return newTrial;
|
||||
}),
|
||||
|
||||
start: protectedProcedure
|
||||
.input(
|
||||
z.object({
|
||||
@@ -411,10 +505,15 @@ export const trialsRouter = createTRPCRouter({
|
||||
});
|
||||
}
|
||||
|
||||
// Idempotency: If already in progress, return success
|
||||
if (currentTrial[0].status === "in_progress") {
|
||||
return currentTrial[0];
|
||||
}
|
||||
|
||||
if (currentTrial[0].status !== "scheduled") {
|
||||
throw new TRPCError({
|
||||
code: "BAD_REQUEST",
|
||||
message: "Trial can only be started from scheduled status",
|
||||
message: `Trial is in ${currentTrial[0].status} status and cannot be started`,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -596,6 +695,61 @@ export const trialsRouter = createTRPCRouter({
|
||||
return intervention;
|
||||
}),
|
||||
|
||||
addAnnotation: protectedProcedure
|
||||
.input(
|
||||
z.object({
|
||||
trialId: z.string(),
|
||||
category: z.string().optional(),
|
||||
label: z.string().optional(),
|
||||
description: z.string().optional(),
|
||||
timestampStart: z.date().optional(),
|
||||
tags: z.array(z.string()).optional(),
|
||||
metadata: z.any().optional(),
|
||||
}),
|
||||
)
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const { db } = ctx;
|
||||
const userId = ctx.session.user.id;
|
||||
|
||||
await checkTrialAccess(db, userId, input.trialId, [
|
||||
"owner",
|
||||
"researcher",
|
||||
"wizard",
|
||||
]);
|
||||
|
||||
const [annotation] = await db
|
||||
.insert(annotations)
|
||||
.values({
|
||||
trialId: input.trialId,
|
||||
annotatorId: userId,
|
||||
category: input.category,
|
||||
label: input.label,
|
||||
description: input.description,
|
||||
timestampStart: input.timestampStart ?? new Date(),
|
||||
tags: input.tags,
|
||||
metadata: input.metadata,
|
||||
})
|
||||
.returning();
|
||||
|
||||
// Also create a trial event so it appears in the timeline
|
||||
if (annotation) {
|
||||
await db.insert(trialEvents).values({
|
||||
trialId: input.trialId,
|
||||
eventType: `annotation_${input.category || 'note'}`,
|
||||
timestamp: input.timestampStart ?? new Date(),
|
||||
data: {
|
||||
annotationId: annotation.id,
|
||||
description: input.description,
|
||||
category: input.category,
|
||||
label: input.label,
|
||||
tags: input.tags,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return annotation;
|
||||
}),
|
||||
|
||||
getEvents: protectedProcedure
|
||||
.input(
|
||||
z.object({
|
||||
@@ -722,51 +876,51 @@ export const trialsRouter = createTRPCRouter({
|
||||
const filteredTrials =
|
||||
trialIds.length > 0
|
||||
? await ctx.db.query.trials.findMany({
|
||||
where: inArray(trials.id, trialIds),
|
||||
with: {
|
||||
experiment: {
|
||||
with: {
|
||||
study: {
|
||||
columns: {
|
||||
id: true,
|
||||
name: true,
|
||||
},
|
||||
where: inArray(trials.id, trialIds),
|
||||
with: {
|
||||
experiment: {
|
||||
with: {
|
||||
study: {
|
||||
columns: {
|
||||
id: true,
|
||||
name: true,
|
||||
},
|
||||
},
|
||||
columns: {
|
||||
id: true,
|
||||
name: true,
|
||||
studyId: true,
|
||||
},
|
||||
},
|
||||
participant: {
|
||||
columns: {
|
||||
id: true,
|
||||
participantCode: true,
|
||||
email: true,
|
||||
name: true,
|
||||
},
|
||||
},
|
||||
wizard: {
|
||||
columns: {
|
||||
id: true,
|
||||
name: true,
|
||||
email: true,
|
||||
},
|
||||
},
|
||||
events: {
|
||||
columns: {
|
||||
id: true,
|
||||
},
|
||||
},
|
||||
mediaCaptures: {
|
||||
columns: {
|
||||
id: true,
|
||||
},
|
||||
columns: {
|
||||
id: true,
|
||||
name: true,
|
||||
studyId: true,
|
||||
},
|
||||
},
|
||||
orderBy: [desc(trials.scheduledAt)],
|
||||
})
|
||||
participant: {
|
||||
columns: {
|
||||
id: true,
|
||||
participantCode: true,
|
||||
email: true,
|
||||
name: true,
|
||||
},
|
||||
},
|
||||
wizard: {
|
||||
columns: {
|
||||
id: true,
|
||||
name: true,
|
||||
email: true,
|
||||
},
|
||||
},
|
||||
events: {
|
||||
columns: {
|
||||
id: true,
|
||||
},
|
||||
},
|
||||
mediaCaptures: {
|
||||
columns: {
|
||||
id: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
orderBy: [desc(trials.scheduledAt)],
|
||||
})
|
||||
: [];
|
||||
|
||||
// Get total count
|
||||
@@ -892,6 +1046,118 @@ export const trialsRouter = createTRPCRouter({
|
||||
createdBy: ctx.session.user.id,
|
||||
});
|
||||
|
||||
return { success: true };
|
||||
}),
|
||||
|
||||
executeRobotAction: protectedProcedure
|
||||
.input(
|
||||
z.object({
|
||||
trialId: z.string(),
|
||||
pluginName: z.string(),
|
||||
actionId: z.string(),
|
||||
parameters: z.record(z.string(), z.unknown()).optional().default({}),
|
||||
}),
|
||||
)
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const { db } = ctx;
|
||||
const userId = ctx.session.user.id;
|
||||
|
||||
await checkTrialAccess(db, userId, input.trialId, [
|
||||
"owner",
|
||||
"researcher",
|
||||
"wizard",
|
||||
]);
|
||||
|
||||
// Use execution engine to execute robot action
|
||||
const executionEngine = getExecutionEngine();
|
||||
|
||||
// Create action definition for execution
|
||||
const actionDefinition: ActionDefinition = {
|
||||
id: `${input.pluginName}.${input.actionId}`,
|
||||
stepId: "manual", // Manual execution
|
||||
name: input.actionId,
|
||||
type: `${input.pluginName}.${input.actionId}`,
|
||||
orderIndex: 0,
|
||||
parameters: input.parameters,
|
||||
timeout: 30000,
|
||||
required: false,
|
||||
};
|
||||
|
||||
const result = await executionEngine.executeAction(
|
||||
input.trialId,
|
||||
actionDefinition,
|
||||
);
|
||||
|
||||
if (!result.success) {
|
||||
throw new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
message: result.error ?? "Robot action execution failed",
|
||||
});
|
||||
}
|
||||
|
||||
// Log the manual robot action execution
|
||||
await db.insert(trialEvents).values({
|
||||
trialId: input.trialId,
|
||||
eventType: "manual_robot_action",
|
||||
actionId: actionDefinition.id,
|
||||
data: {
|
||||
userId,
|
||||
pluginName: input.pluginName,
|
||||
actionId: input.actionId,
|
||||
parameters: input.parameters,
|
||||
result: result.data,
|
||||
duration: result.duration,
|
||||
},
|
||||
timestamp: new Date(),
|
||||
createdBy: userId,
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: result.data,
|
||||
duration: result.duration,
|
||||
};
|
||||
}),
|
||||
|
||||
logRobotAction: protectedProcedure
|
||||
.input(
|
||||
z.object({
|
||||
trialId: z.string(),
|
||||
pluginName: z.string(),
|
||||
actionId: z.string(),
|
||||
parameters: z.record(z.string(), z.unknown()).optional().default({}),
|
||||
duration: z.number().optional(),
|
||||
result: z.any().optional(),
|
||||
error: z.string().optional(),
|
||||
}),
|
||||
)
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const { db } = ctx;
|
||||
const userId = ctx.session.user.id;
|
||||
|
||||
await checkTrialAccess(db, userId, input.trialId, [
|
||||
"owner",
|
||||
"researcher",
|
||||
"wizard",
|
||||
]);
|
||||
|
||||
await db.insert(trialEvents).values({
|
||||
trialId: input.trialId,
|
||||
eventType: "manual_robot_action",
|
||||
data: {
|
||||
userId,
|
||||
pluginName: input.pluginName,
|
||||
actionId: input.actionId,
|
||||
parameters: input.parameters,
|
||||
result: input.result,
|
||||
duration: input.duration,
|
||||
error: input.error,
|
||||
executionMode: "websocket_client",
|
||||
},
|
||||
timestamp: new Date(),
|
||||
createdBy: userId,
|
||||
});
|
||||
|
||||
return { success: true };
|
||||
}),
|
||||
});
|
||||
|
||||
0
src/server/api/routers/users.ts
Normal file → Executable file
0
src/server/api/routers/users.ts
Normal file → Executable file
0
src/server/api/trpc.ts
Normal file → Executable file
0
src/server/api/trpc.ts
Normal file → Executable file
0
src/server/auth/config.ts
Normal file → Executable file
0
src/server/auth/config.ts
Normal file → Executable file
0
src/server/auth/index.ts
Normal file → Executable file
0
src/server/auth/index.ts
Normal file → Executable file
0
src/server/auth/utils.ts
Normal file → Executable file
0
src/server/auth/utils.ts
Normal file → Executable file
0
src/server/db/index.ts
Normal file → Executable file
0
src/server/db/index.ts
Normal file → Executable file
28
src/server/db/schema.ts
Normal file → Executable file
28
src/server/db/schema.ts
Normal file → Executable file
@@ -438,6 +438,29 @@ export const participants = createTable(
|
||||
}),
|
||||
);
|
||||
|
||||
export const participantDocuments = createTable(
|
||||
"participant_document",
|
||||
{
|
||||
id: uuid("id").notNull().primaryKey().defaultRandom(),
|
||||
participantId: uuid("participant_id")
|
||||
.notNull()
|
||||
.references(() => participants.id, { onDelete: "cascade" }),
|
||||
name: varchar("name", { length: 255 }).notNull(),
|
||||
type: varchar("type", { length: 100 }), // MIME type or custom category
|
||||
storagePath: text("storage_path").notNull(),
|
||||
fileSize: integer("file_size"),
|
||||
uploadedBy: uuid("uploaded_by").references(() => users.id),
|
||||
createdAt: timestamp("created_at", { withTimezone: true })
|
||||
.default(sql`CURRENT_TIMESTAMP`)
|
||||
.notNull(),
|
||||
},
|
||||
(table) => ({
|
||||
participantDocIdx: index("participant_document_participant_idx").on(
|
||||
table.participantId,
|
||||
),
|
||||
}),
|
||||
);
|
||||
|
||||
export const trials = createTable("trial", {
|
||||
id: uuid("id").notNull().primaryKey().defaultRandom(),
|
||||
experimentId: uuid("experiment_id")
|
||||
@@ -1207,6 +1230,11 @@ export const systemSettingsRelations = relations(systemSettings, ({ one }) => ({
|
||||
}),
|
||||
}));
|
||||
|
||||
|
||||
export const auditLogsRelations = relations(auditLogs, ({ one }) => ({
|
||||
user: one(users, { fields: [auditLogs.userId], references: [users.id] }),
|
||||
}));
|
||||
|
||||
export type InsertPlugin = typeof plugins.$inferInsert;
|
||||
export type InsertPluginRepository = typeof pluginRepositories.$inferInsert;
|
||||
export type InsertRobot = typeof robots.$inferInsert;
|
||||
|
||||
472
src/server/services/robot-communication.ts
Executable file
472
src/server/services/robot-communication.ts
Executable file
@@ -0,0 +1,472 @@
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-call */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-return */
|
||||
|
||||
import WebSocket from "ws";
|
||||
import { EventEmitter } from "events";
|
||||
|
||||
export interface RobotCommunicationConfig {
|
||||
rosBridgeUrl: string;
|
||||
connectionTimeout: number;
|
||||
reconnectInterval: number;
|
||||
maxReconnectAttempts: number;
|
||||
}
|
||||
|
||||
export interface RobotAction {
|
||||
pluginName: string;
|
||||
actionId: string;
|
||||
parameters: Record<string, unknown>;
|
||||
implementation: {
|
||||
topic: string;
|
||||
messageType: string;
|
||||
messageTemplate: Record<string, unknown>;
|
||||
};
|
||||
}
|
||||
|
||||
export interface RobotActionResult {
|
||||
success: boolean;
|
||||
duration: number;
|
||||
data?: Record<string, unknown>;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Server-side robot communication service for ROS integration
|
||||
*
|
||||
* This service manages WebSocket connections to rosbridge_server and provides
|
||||
* a high-level interface for executing robot actions during trial execution.
|
||||
*/
|
||||
export class RobotCommunicationService extends EventEmitter {
|
||||
private ws: WebSocket | null = null;
|
||||
private config: RobotCommunicationConfig;
|
||||
private messageId = 0;
|
||||
private pendingActions = new Map<
|
||||
string,
|
||||
{
|
||||
resolve: (result: RobotActionResult) => void;
|
||||
reject: (error: Error) => void;
|
||||
timeout: NodeJS.Timeout;
|
||||
startTime: number;
|
||||
}
|
||||
>();
|
||||
private reconnectAttempts = 0;
|
||||
private reconnectTimer: NodeJS.Timeout | null = null;
|
||||
private isConnected = false;
|
||||
|
||||
constructor(config: Partial<RobotCommunicationConfig> = {}) {
|
||||
super();
|
||||
|
||||
this.config = {
|
||||
rosBridgeUrl: process.env.ROS_BRIDGE_URL || "ws://localhost:9090",
|
||||
connectionTimeout: 10000,
|
||||
reconnectInterval: 5000,
|
||||
maxReconnectAttempts: 10,
|
||||
...config,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize connection to ROS bridge
|
||||
*/
|
||||
async connect(): Promise<void> {
|
||||
if (this.isConnected) {
|
||||
return;
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
console.log(
|
||||
`[RobotComm] Connecting to ROS bridge: ${this.config.rosBridgeUrl}`,
|
||||
);
|
||||
|
||||
try {
|
||||
this.ws = new WebSocket(this.config.rosBridgeUrl);
|
||||
|
||||
const connectionTimeout = setTimeout(() => {
|
||||
reject(new Error("Connection timeout"));
|
||||
this.cleanup();
|
||||
}, this.config.connectionTimeout);
|
||||
|
||||
this.ws.on("open", () => {
|
||||
clearTimeout(connectionTimeout);
|
||||
this.isConnected = true;
|
||||
this.reconnectAttempts = 0;
|
||||
|
||||
console.log("[RobotComm] Connected to ROS bridge");
|
||||
this.emit("connected");
|
||||
resolve();
|
||||
});
|
||||
|
||||
this.ws.on("message", (data: WebSocket.Data) => {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
this.handleMessage(message);
|
||||
} catch (error) {
|
||||
console.error("[RobotComm] Failed to parse message:", error);
|
||||
}
|
||||
});
|
||||
|
||||
this.ws.on("close", (code: number, reason: string) => {
|
||||
this.isConnected = false;
|
||||
console.log(`[RobotComm] Connection closed: ${code} - ${reason}`);
|
||||
|
||||
this.emit("disconnected");
|
||||
|
||||
// Reject all pending actions
|
||||
this.rejectAllPendingActions(new Error("Connection lost"));
|
||||
|
||||
// Schedule reconnection if not intentionally closed
|
||||
if (
|
||||
code !== 1000 &&
|
||||
this.reconnectAttempts < this.config.maxReconnectAttempts
|
||||
) {
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
});
|
||||
|
||||
this.ws.on("error", (error: Error) => {
|
||||
console.error("[RobotComm] WebSocket error:", error);
|
||||
clearTimeout(connectionTimeout);
|
||||
this.emit("error", error);
|
||||
reject(error);
|
||||
});
|
||||
} catch (error) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from ROS bridge
|
||||
*/
|
||||
disconnect(): void {
|
||||
if (this.reconnectTimer) {
|
||||
clearTimeout(this.reconnectTimer);
|
||||
this.reconnectTimer = null;
|
||||
}
|
||||
|
||||
this.rejectAllPendingActions(new Error("Service disconnected"));
|
||||
|
||||
if (this.ws) {
|
||||
this.ws.close(1000, "Normal closure");
|
||||
this.ws = null;
|
||||
}
|
||||
|
||||
this.isConnected = false;
|
||||
this.emit("disconnected");
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a robot action
|
||||
*/
|
||||
async executeAction(action: RobotAction): Promise<RobotActionResult> {
|
||||
if (!this.isConnected) {
|
||||
throw new Error("Not connected to ROS bridge");
|
||||
}
|
||||
|
||||
const startTime = Date.now();
|
||||
const actionId = `action_${this.messageId++}`;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
// Set up timeout
|
||||
const timeout = setTimeout(() => {
|
||||
this.pendingActions.delete(actionId);
|
||||
reject(new Error(`Action timeout: ${action.actionId}`));
|
||||
}, 30000); // 30 second timeout
|
||||
|
||||
// Store pending action
|
||||
this.pendingActions.set(actionId, {
|
||||
resolve,
|
||||
reject,
|
||||
timeout,
|
||||
startTime,
|
||||
});
|
||||
|
||||
try {
|
||||
// Execute action based on type and platform
|
||||
this.executeRobotActionInternal(action, actionId);
|
||||
} catch (error) {
|
||||
clearTimeout(timeout);
|
||||
this.pendingActions.delete(actionId);
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if service is connected
|
||||
*/
|
||||
getConnectionStatus(): boolean {
|
||||
return this.isConnected;
|
||||
}
|
||||
|
||||
// Private methods
|
||||
|
||||
private executeRobotActionInternal(
|
||||
action: RobotAction,
|
||||
actionId: string,
|
||||
): void {
|
||||
const { implementation, parameters } = action;
|
||||
|
||||
// Build ROS message from template
|
||||
const message = this.buildRosMessage(
|
||||
implementation.messageTemplate,
|
||||
parameters,
|
||||
);
|
||||
|
||||
// Publish to ROS topic
|
||||
this.publishToTopic(
|
||||
implementation.topic,
|
||||
implementation.messageType,
|
||||
message,
|
||||
);
|
||||
|
||||
// For actions that complete immediately (like movement commands),
|
||||
// we simulate completion after a short delay
|
||||
setTimeout(() => {
|
||||
this.completeAction(actionId, {
|
||||
success: true,
|
||||
duration:
|
||||
Date.now() -
|
||||
(this.pendingActions.get(actionId)?.startTime || Date.now()),
|
||||
data: {
|
||||
topic: implementation.topic,
|
||||
messageType: implementation.messageType,
|
||||
message,
|
||||
},
|
||||
});
|
||||
}, 100);
|
||||
}
|
||||
|
||||
private buildRosMessage(
|
||||
template: Record<string, unknown>,
|
||||
parameters: Record<string, unknown>,
|
||||
): Record<string, unknown> {
|
||||
const message: Record<string, unknown> = {};
|
||||
|
||||
for (const [key, value] of Object.entries(template)) {
|
||||
if (typeof value === "string" && value.includes("{{")) {
|
||||
// Template substitution
|
||||
let substituted = value;
|
||||
|
||||
// Replace template variables
|
||||
for (const [paramKey, paramValue] of Object.entries(parameters)) {
|
||||
const placeholder = `{{${paramKey}}}`;
|
||||
if (substituted.includes(placeholder)) {
|
||||
substituted = substituted.replace(
|
||||
new RegExp(
|
||||
placeholder.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"),
|
||||
"g",
|
||||
),
|
||||
String(paramValue ?? ""),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle conditional templates
|
||||
if (
|
||||
substituted.includes("{{") &&
|
||||
substituted.includes("?") &&
|
||||
substituted.includes(":")
|
||||
) {
|
||||
// Simple conditional: {{condition ? valueTrue : valueFalse}}
|
||||
const match = substituted.match(
|
||||
/\{\{(.+?)\s*\?\s*(.+?)\s*:\s*(.+?)\}\}/,
|
||||
);
|
||||
if (match && match.length >= 4) {
|
||||
const condition = match[1];
|
||||
const trueValue = match[2];
|
||||
const falseValue = match[3];
|
||||
// Evaluate simple conditions
|
||||
let conditionResult = false;
|
||||
|
||||
if (condition?.includes("===")) {
|
||||
const parts = condition
|
||||
.split("===")
|
||||
.map((s) => s.trim().replace(/['"]/g, ""));
|
||||
if (parts.length >= 2) {
|
||||
const left = parts[0];
|
||||
const right = parts[1];
|
||||
conditionResult = parameters[left || ""] === right;
|
||||
}
|
||||
}
|
||||
|
||||
substituted = substituted.replace(
|
||||
match[0],
|
||||
conditionResult ? (trueValue ?? "") : (falseValue ?? ""),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Try to parse as number if it looks like one
|
||||
if (!isNaN(Number(substituted))) {
|
||||
message[key] = Number(substituted);
|
||||
} else {
|
||||
message[key] = substituted;
|
||||
}
|
||||
} else if (Array.isArray(value)) {
|
||||
// Handle array templates
|
||||
message[key] = value.map((item) =>
|
||||
typeof item === "string" && item.includes("{{")
|
||||
? this.substituteTemplateString(item, parameters)
|
||||
: item,
|
||||
);
|
||||
} else if (typeof value === "object" && value !== null) {
|
||||
// Recursively handle nested objects
|
||||
message[key] = this.buildRosMessage(
|
||||
value as Record<string, unknown>,
|
||||
parameters,
|
||||
);
|
||||
} else {
|
||||
message[key] = value;
|
||||
}
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
private substituteTemplateString(
|
||||
template: string,
|
||||
parameters: Record<string, unknown>,
|
||||
): unknown {
|
||||
let result = template;
|
||||
|
||||
for (const [key, value] of Object.entries(parameters)) {
|
||||
const placeholder = `{{${key}}}`;
|
||||
if (result.includes(placeholder)) {
|
||||
result = result.replace(
|
||||
new RegExp(placeholder.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"), "g"),
|
||||
String(value ?? ""),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Try to parse as number if it looks like one
|
||||
if (!isNaN(Number(result))) {
|
||||
return Number(result);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private publishToTopic(
|
||||
topic: string,
|
||||
messageType: string,
|
||||
message: Record<string, unknown>,
|
||||
): void {
|
||||
if (!this.ws) return;
|
||||
|
||||
const rosMessage = {
|
||||
op: "publish",
|
||||
topic,
|
||||
type: messageType,
|
||||
msg: message,
|
||||
};
|
||||
|
||||
console.log(`[RobotComm] Publishing to ${topic}:`, message);
|
||||
this.ws.send(JSON.stringify(rosMessage));
|
||||
}
|
||||
|
||||
private handleMessage(message: any): void {
|
||||
// Handle different types of ROS bridge messages
|
||||
switch (message.op) {
|
||||
case "publish":
|
||||
this.emit("topic_message", message.topic, message.msg);
|
||||
break;
|
||||
|
||||
case "service_response":
|
||||
this.handleServiceResponse(message);
|
||||
break;
|
||||
|
||||
case "status":
|
||||
console.log("[RobotComm] Status:", message);
|
||||
break;
|
||||
|
||||
default:
|
||||
console.log("[RobotComm] Unhandled message:", message);
|
||||
}
|
||||
}
|
||||
|
||||
private handleServiceResponse(message: any): void {
|
||||
// Handle service call responses if needed
|
||||
console.log("[RobotComm] Service response:", message);
|
||||
}
|
||||
|
||||
private completeAction(actionId: string, result: RobotActionResult): void {
|
||||
const pending = this.pendingActions.get(actionId);
|
||||
if (pending) {
|
||||
clearTimeout(pending.timeout);
|
||||
this.pendingActions.delete(actionId);
|
||||
pending.resolve(result);
|
||||
}
|
||||
}
|
||||
|
||||
private rejectAllPendingActions(error: Error): void {
|
||||
for (const [actionId, pending] of this.pendingActions.entries()) {
|
||||
clearTimeout(pending.timeout);
|
||||
pending.reject(error);
|
||||
}
|
||||
this.pendingActions.clear();
|
||||
}
|
||||
|
||||
private scheduleReconnect(): void {
|
||||
if (this.reconnectTimer) return;
|
||||
|
||||
this.reconnectAttempts++;
|
||||
console.log(
|
||||
`[RobotComm] Scheduling reconnect attempt ${this.reconnectAttempts}/${this.config.maxReconnectAttempts} in ${this.config.reconnectInterval}ms`,
|
||||
);
|
||||
|
||||
this.reconnectTimer = setTimeout(async () => {
|
||||
this.reconnectTimer = null;
|
||||
|
||||
try {
|
||||
await this.connect();
|
||||
} catch (error) {
|
||||
console.error("[RobotComm] Reconnect failed:", error);
|
||||
|
||||
if (this.reconnectAttempts < this.config.maxReconnectAttempts) {
|
||||
this.scheduleReconnect();
|
||||
} else {
|
||||
console.error("[RobotComm] Max reconnect attempts reached");
|
||||
this.emit("max_reconnects_reached");
|
||||
}
|
||||
}
|
||||
}, this.config.reconnectInterval);
|
||||
}
|
||||
|
||||
private cleanup(): void {
|
||||
if (this.ws) {
|
||||
this.ws.removeAllListeners();
|
||||
this.ws = null;
|
||||
}
|
||||
this.isConnected = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Global service instance
|
||||
let robotCommService: RobotCommunicationService | null = null;
|
||||
|
||||
/**
|
||||
* Get or create the global robot communication service
|
||||
*/
|
||||
export function getRobotCommunicationService(): RobotCommunicationService {
|
||||
if (!robotCommService) {
|
||||
robotCommService = new RobotCommunicationService();
|
||||
}
|
||||
return robotCommService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize robot communication service with connection
|
||||
*/
|
||||
export async function initRobotCommunicationService(): Promise<RobotCommunicationService> {
|
||||
const service = getRobotCommunicationService();
|
||||
|
||||
if (!service.getConnectionStatus()) {
|
||||
await service.connect();
|
||||
}
|
||||
|
||||
return service;
|
||||
}
|
||||
336
src/server/services/trial-execution.ts
Normal file → Executable file
336
src/server/services/trial-execution.ts
Normal file → Executable file
@@ -9,8 +9,19 @@
|
||||
/* eslint-disable @typescript-eslint/no-base-to-string */
|
||||
|
||||
import { type db } from "~/server/db";
|
||||
import { trials, steps, actions, trialEvents } from "~/server/db/schema";
|
||||
import {
|
||||
trials,
|
||||
steps,
|
||||
actions,
|
||||
trialEvents,
|
||||
plugins,
|
||||
} from "~/server/db/schema";
|
||||
import { eq, asc } from "drizzle-orm";
|
||||
import {
|
||||
getRobotCommunicationService,
|
||||
type RobotAction,
|
||||
type RobotActionResult,
|
||||
} from "./robot-communication";
|
||||
|
||||
export type TrialStatus =
|
||||
| "scheduled"
|
||||
@@ -72,6 +83,8 @@ export class TrialExecutionEngine {
|
||||
private db: typeof db;
|
||||
private activeTrials = new Map<string, ExecutionContext>();
|
||||
private stepDefinitions = new Map<string, StepDefinition[]>();
|
||||
private pluginCache = new Map<string, any>();
|
||||
private robotComm = getRobotCommunicationService();
|
||||
|
||||
constructor(database: typeof db) {
|
||||
this.db = database;
|
||||
@@ -377,7 +390,7 @@ export class TrialExecutionEngine {
|
||||
/**
|
||||
* Execute a single action
|
||||
*/
|
||||
private async executeAction(
|
||||
async executeAction(
|
||||
trialId: string,
|
||||
action: ActionDefinition,
|
||||
): Promise<ActionExecutionResult> {
|
||||
@@ -488,41 +501,74 @@ export class TrialExecutionEngine {
|
||||
trialId: string,
|
||||
action: ActionDefinition,
|
||||
): Promise<ActionExecutionResult> {
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
// Parse plugin.action format
|
||||
const [pluginId, actionType] = action.type.split(".");
|
||||
const [pluginName, actionId] = action.type.split(".");
|
||||
|
||||
// TODO: Integrate with actual robot plugin system
|
||||
// For now, simulate robot action execution
|
||||
console.log(`[TrialExecution] Parsed action: pluginName=${pluginName}, actionId=${actionId}`);
|
||||
|
||||
const simulationDelay = Math.random() * 2000 + 500; // 500ms - 2.5s
|
||||
if (!pluginName || !actionId) {
|
||||
throw new Error(
|
||||
`Invalid robot action format: ${action.type}. Expected format: plugin.action`,
|
||||
);
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
// Simulate success/failure
|
||||
const success = Math.random() > 0.1; // 90% success rate
|
||||
// Get plugin configuration from database
|
||||
const plugin = await this.getPluginDefinition(pluginName);
|
||||
if (!plugin) {
|
||||
throw new Error(`Plugin '${pluginName}' not found`);
|
||||
}
|
||||
|
||||
resolve({
|
||||
success,
|
||||
completed: true,
|
||||
duration: simulationDelay,
|
||||
data: {
|
||||
pluginId,
|
||||
actionType,
|
||||
parameters: action.parameters,
|
||||
robotResponse: success
|
||||
? "Action completed successfully"
|
||||
: "Robot action failed",
|
||||
},
|
||||
error: success ? undefined : "Simulated robot failure",
|
||||
});
|
||||
}, simulationDelay);
|
||||
});
|
||||
console.log(`[TrialExecution] Plugin loaded: ${plugin.name} (ID: ${plugin.id})`);
|
||||
console.log(`[TrialExecution] Available actions: ${plugin.actions?.map((a: any) => a.id).join(", ")}`);
|
||||
|
||||
// Find action definition in plugin
|
||||
const actionDefinition = plugin.actions?.find(
|
||||
(a: any) => a.id === actionId,
|
||||
);
|
||||
if (!actionDefinition) {
|
||||
throw new Error(
|
||||
`Action '${actionId}' not found in plugin '${pluginName}'`,
|
||||
);
|
||||
}
|
||||
|
||||
// Validate parameters
|
||||
const validatedParams = this.validateActionParameters(
|
||||
actionDefinition,
|
||||
action.parameters,
|
||||
);
|
||||
|
||||
// Execute action through robot communication service
|
||||
const result = await this.executeRobotActionWithComm(
|
||||
plugin,
|
||||
actionDefinition,
|
||||
validatedParams,
|
||||
trialId,
|
||||
);
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
|
||||
return {
|
||||
success: true,
|
||||
completed: true,
|
||||
duration,
|
||||
data: {
|
||||
pluginName,
|
||||
actionId,
|
||||
parameters: validatedParams,
|
||||
robotResponse: result,
|
||||
platform: plugin.platform,
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
const duration = Date.now() - startTime;
|
||||
|
||||
return {
|
||||
success: false,
|
||||
completed: false,
|
||||
duration: 0,
|
||||
duration,
|
||||
error:
|
||||
error instanceof Error
|
||||
? error.message
|
||||
@@ -531,6 +577,242 @@ export class TrialExecutionEngine {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get plugin definition from database with caching
|
||||
*/
|
||||
private async getPluginDefinition(pluginName: string): Promise<any> {
|
||||
// Check cache first
|
||||
if (this.pluginCache.has(pluginName)) {
|
||||
return this.pluginCache.get(pluginName);
|
||||
}
|
||||
|
||||
try {
|
||||
// Check if pluginName is a UUID
|
||||
const isUuid =
|
||||
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i.test(
|
||||
pluginName,
|
||||
);
|
||||
|
||||
const query = isUuid
|
||||
? eq(plugins.id, pluginName)
|
||||
: eq(plugins.name, pluginName);
|
||||
|
||||
const [plugin] = await this.db
|
||||
.select()
|
||||
.from(plugins)
|
||||
.where(query)
|
||||
.limit(1);
|
||||
|
||||
if (plugin) {
|
||||
// Cache the plugin definition
|
||||
// Use the actual name for cache key if we looked up by ID
|
||||
const cacheKey = isUuid ? plugin.name : pluginName;
|
||||
|
||||
const pluginData = {
|
||||
...plugin,
|
||||
actions: plugin.actionDefinitions,
|
||||
platform: (plugin.metadata as any)?.platform,
|
||||
ros2Config: (plugin.metadata as any)?.ros2Config,
|
||||
};
|
||||
|
||||
this.pluginCache.set(cacheKey, pluginData);
|
||||
// Also cache by ID if accessible
|
||||
if (plugin.id) {
|
||||
this.pluginCache.set(plugin.id, pluginData);
|
||||
}
|
||||
|
||||
return pluginData;
|
||||
}
|
||||
|
||||
return null;
|
||||
} catch (error) {
|
||||
console.error(`Failed to load plugin ${pluginName}:`, error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate action parameters against plugin schema
|
||||
*/
|
||||
private validateActionParameters(
|
||||
actionDefinition: any,
|
||||
parameters: Record<string, unknown>,
|
||||
): Record<string, unknown> {
|
||||
const validated: Record<string, unknown> = {};
|
||||
|
||||
if (!actionDefinition.parameters) {
|
||||
return parameters;
|
||||
}
|
||||
|
||||
for (const paramDef of actionDefinition.parameters) {
|
||||
const paramName = paramDef.name;
|
||||
const paramValue = parameters[paramName];
|
||||
|
||||
// Required parameter check
|
||||
if (
|
||||
paramDef.required &&
|
||||
(paramValue === undefined || paramValue === null)
|
||||
) {
|
||||
throw new Error(`Required parameter '${paramName}' is missing`);
|
||||
}
|
||||
|
||||
// Use default value if parameter not provided
|
||||
if (paramValue === undefined && paramDef.default !== undefined) {
|
||||
validated[paramName] = paramDef.default;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (paramValue !== undefined) {
|
||||
// Type validation
|
||||
switch (paramDef.type) {
|
||||
case "number":
|
||||
const numValue = Number(paramValue);
|
||||
if (isNaN(numValue)) {
|
||||
throw new Error(`Parameter '${paramName}' must be a number`);
|
||||
}
|
||||
if (paramDef.min !== undefined && numValue < paramDef.min) {
|
||||
throw new Error(
|
||||
`Parameter '${paramName}' must be >= ${paramDef.min}`,
|
||||
);
|
||||
}
|
||||
if (paramDef.max !== undefined && numValue > paramDef.max) {
|
||||
throw new Error(
|
||||
`Parameter '${paramName}' must be <= ${paramDef.max}`,
|
||||
);
|
||||
}
|
||||
validated[paramName] = numValue;
|
||||
break;
|
||||
|
||||
case "boolean":
|
||||
validated[paramName] = Boolean(paramValue);
|
||||
break;
|
||||
|
||||
case "select":
|
||||
if (paramDef.options) {
|
||||
const validOptions = paramDef.options.map(
|
||||
(opt: any) => opt.value,
|
||||
);
|
||||
if (!validOptions.includes(paramValue)) {
|
||||
throw new Error(
|
||||
`Parameter '${paramName}' must be one of: ${validOptions.join(", ")}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
validated[paramName] = paramValue;
|
||||
break;
|
||||
|
||||
default:
|
||||
validated[paramName] = paramValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return validated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute robot action through robot communication service
|
||||
*/
|
||||
private async executeRobotActionWithComm(
|
||||
plugin: any,
|
||||
actionDefinition: any,
|
||||
parameters: Record<string, unknown>,
|
||||
trialId: string,
|
||||
): Promise<string> {
|
||||
// Ensure robot communication service is available
|
||||
if (!this.robotComm.getConnectionStatus()) {
|
||||
try {
|
||||
await this.robotComm.connect();
|
||||
} catch (error) {
|
||||
throw new Error(
|
||||
`Failed to connect to robot: ${error instanceof Error ? error.message : "Unknown error"}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare robot action
|
||||
const robotAction: RobotAction = {
|
||||
pluginName: plugin.name,
|
||||
actionId: actionDefinition.id,
|
||||
parameters,
|
||||
implementation: actionDefinition.implementation,
|
||||
};
|
||||
|
||||
// Execute action through robot communication service
|
||||
const result: RobotActionResult =
|
||||
await this.robotComm.executeAction(robotAction);
|
||||
|
||||
if (!result.success) {
|
||||
throw new Error(result.error || "Robot action failed");
|
||||
}
|
||||
|
||||
// Log the successful action execution
|
||||
await this.logTrialEvent(trialId, "robot_action_executed", {
|
||||
actionId: actionDefinition.id,
|
||||
parameters,
|
||||
platform: plugin.platform,
|
||||
topic: actionDefinition.implementation?.topic,
|
||||
messageType: actionDefinition.implementation?.messageType,
|
||||
duration: result.duration,
|
||||
robotResponse: result.data,
|
||||
});
|
||||
|
||||
// Return human-readable result
|
||||
return this.formatRobotActionResult(
|
||||
plugin,
|
||||
actionDefinition,
|
||||
parameters,
|
||||
result,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Format robot action result for human readability
|
||||
*/
|
||||
private formatRobotActionResult(
|
||||
plugin: any,
|
||||
actionDefinition: any,
|
||||
parameters: Record<string, unknown>,
|
||||
result: RobotActionResult,
|
||||
): string {
|
||||
const actionType = actionDefinition.id;
|
||||
const platform = plugin.platform || "Robot";
|
||||
|
||||
switch (actionType) {
|
||||
case "say_text":
|
||||
return `${platform} said: "${parameters.text}"`;
|
||||
|
||||
case "walk_forward":
|
||||
return `${platform} walked forward at speed ${parameters.speed} for ${parameters.duration || "indefinite"} seconds`;
|
||||
|
||||
case "walk_backward":
|
||||
return `${platform} walked backward at speed ${parameters.speed} for ${parameters.duration || "indefinite"} seconds`;
|
||||
|
||||
case "turn_left":
|
||||
case "turn_right":
|
||||
const direction = actionType.split("_")[1];
|
||||
return `${platform} turned ${direction} at speed ${parameters.speed}`;
|
||||
|
||||
case "move_head":
|
||||
return `${platform} moved head to yaw=${parameters.yaw}, pitch=${parameters.pitch}`;
|
||||
|
||||
case "move_arm":
|
||||
return `${platform} moved ${parameters.arm} arm to specified position`;
|
||||
|
||||
case "stop_movement":
|
||||
return `${platform} stopped all movement`;
|
||||
|
||||
case "set_volume":
|
||||
return `${platform} set volume to ${parameters.volume}`;
|
||||
|
||||
case "set_language":
|
||||
return `${platform} set language to ${parameters.language}`;
|
||||
|
||||
default:
|
||||
return `${platform} executed action: ${actionType} (${result.duration}ms)`;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Advance to the next step
|
||||
*/
|
||||
|
||||
20
src/server/storage.ts
Normal file
20
src/server/storage.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import { S3Client } from "@aws-sdk/client-s3";
|
||||
import { env } from "~/env";
|
||||
|
||||
const globalForS3 = globalThis as unknown as {
|
||||
s3Client: S3Client | undefined;
|
||||
};
|
||||
|
||||
export const s3Client =
|
||||
globalForS3.s3Client ??
|
||||
new S3Client({
|
||||
region: env.MINIO_REGION ?? "us-east-1",
|
||||
endpoint: env.MINIO_ENDPOINT,
|
||||
credentials: {
|
||||
accessKeyId: env.MINIO_ACCESS_KEY ?? "minioadmin",
|
||||
secretAccessKey: env.MINIO_SECRET_KEY ?? "minioadmin",
|
||||
},
|
||||
forcePathStyle: true, // Needed for MinIO
|
||||
});
|
||||
|
||||
if (env.NODE_ENV !== "production") globalForS3.s3Client = s3Client;
|
||||
Reference in New Issue
Block a user