Xiao-zhen-Liu commented on code in PR #4020: URL: https://github.com/apache/texera/pull/4020#discussion_r2508488823
########## frontend/src/app/workspace/service/copilot/texera-copilot.ts: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable, from, of, throwError, defer } from "rxjs"; +import { map, catchError, tap, switchMap, finalize } from "rxjs/operators"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { + toolWithTimeout, + createGetOperatorInCurrentWorkflowTool, + createGetOperatorPropertiesSchemaTool, + createGetOperatorPortsInfoTool, + createGetOperatorMetadataTool, + createListAllOperatorTypesTool, + createListLinksInCurrentWorkflowTool, + createListOperatorsInCurrentWorkflowTool, +} from "./workflow-tools"; +import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service"; +import { createOpenAI } from "@ai-sdk/openai"; +import { generateText, type ModelMessage, stepCountIs } from "ai"; +import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service"; +import { AppSettings } from "../../../common/app-setting"; +import { WorkflowCompilingService } from "../compile-workflow/workflow-compiling.service"; +import { COPILOT_SYSTEM_PROMPT } from "./copilot-prompts"; +import { NotificationService } from "../../../common/service/notification/notification.service"; + +export enum CopilotState { + UNAVAILABLE = "Unavailable", + AVAILABLE = "Available", + GENERATING = "Generating", + STOPPING = "Stopping", +} + +export interface AgentUIMessage { + role: "user" | "agent"; + content: string; + isBegin: boolean; + isEnd: boolean; + toolCalls?: any[]; + toolResults?: any[]; + usage?: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + cachedInputTokens?: number; + }; +} + +/** + * Texera Copilot Service provides AI-powered assistance for workflow creation and manipulation. + * + * This service manages a single AI agent instance that can: + * 1. Interact with users through natural language messages + * 2. Execute workflow operations using specialized tools + * 3. Maintain conversation history and state + * + * The service communicates with an LLM backend (via LiteLLM) to generate responses and uses + * workflow tools to perform actions like listing operators, getting operator schemas, and + * manipulating workflow components. + * + * State management includes: + * - UNAVAILABLE: Agent not initialized + * - AVAILABLE: Agent ready to receive messages + * - GENERATING: Agent currently processing and generating response + * - STOPPING: Agent in the process of stopping generation + */ +@Injectable() +export class TexeraCopilot { + private model: any; + private modelType = ""; + private agentName = ""; + private messages: ModelMessage[] = []; + private agentResponses: AgentUIMessage[] = []; + private agentResponsesSubject = new BehaviorSubject<AgentUIMessage[]>([]); + public agentResponses$ = this.agentResponsesSubject.asObservable(); + private state = CopilotState.UNAVAILABLE; + private stateSubject = new BehaviorSubject<CopilotState>(CopilotState.UNAVAILABLE); + public state$ = this.stateSubject.asObservable(); + + constructor( + private workflowActionService: WorkflowActionService, + private workflowUtilService: WorkflowUtilService, + private operatorMetadataService: OperatorMetadataService, + private workflowCompilingService: WorkflowCompilingService, + private notificationService: NotificationService + ) {} + + public setAgentInfo(agentName: string): void { + this.agentName = agentName; + } + + public setModelType(modelType: string): void { + this.modelType = modelType; + } + + private setState(newState: CopilotState): void { + this.state = newState; + this.stateSubject.next(newState); + } + private emitAgentUIMessage( + role: "user" | "agent", + content: string, + isBegin: boolean, + isEnd: boolean, + toolCalls?: any[], + toolResults?: any[], + usage?: AgentUIMessage["usage"] + ): void { + this.agentResponses.push({ role, content, isBegin, isEnd, toolCalls, toolResults, usage }); + this.agentResponsesSubject.next([...this.agentResponses]); + } + public initialize(): Observable<void> { + return defer(() => { + try { + this.model = createOpenAI({ + baseURL: new URL(`${AppSettings.getApiEndpoint()}`, document.baseURI).toString(), + apiKey: "dummy", Review Comment: Why can this be dummy? ########## frontend/src/app/workspace/service/copilot/texera-copilot.ts: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable, from, of, throwError, defer } from "rxjs"; +import { map, catchError, tap, switchMap, finalize } from "rxjs/operators"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { + toolWithTimeout, + createGetOperatorInCurrentWorkflowTool, + createGetOperatorPropertiesSchemaTool, + createGetOperatorPortsInfoTool, + createGetOperatorMetadataTool, + createListAllOperatorTypesTool, + createListLinksInCurrentWorkflowTool, + createListOperatorsInCurrentWorkflowTool, +} from "./workflow-tools"; +import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service"; +import { createOpenAI } from "@ai-sdk/openai"; +import { generateText, type ModelMessage, stepCountIs } from "ai"; +import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service"; +import { AppSettings } from "../../../common/app-setting"; +import { WorkflowCompilingService } from "../compile-workflow/workflow-compiling.service"; +import { COPILOT_SYSTEM_PROMPT } from "./copilot-prompts"; +import { NotificationService } from "../../../common/service/notification/notification.service"; + +export enum CopilotState { + UNAVAILABLE = "Unavailable", + AVAILABLE = "Available", + GENERATING = "Generating", + STOPPING = "Stopping", +} + +export interface AgentUIMessage { + role: "user" | "agent"; + content: string; + isBegin: boolean; + isEnd: boolean; + toolCalls?: any[]; + toolResults?: any[]; + usage?: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + cachedInputTokens?: number; + }; +} + +/** + * Texera Copilot Service provides AI-powered assistance for workflow creation and manipulation. + * + * This service manages a single AI agent instance that can: + * 1. Interact with users through natural language messages + * 2. Execute workflow operations using specialized tools + * 3. Maintain conversation history and state + * + * The service communicates with an LLM backend (via LiteLLM) to generate responses and uses + * workflow tools to perform actions like listing operators, getting operator schemas, and + * manipulating workflow components. + * + * State management includes: + * - UNAVAILABLE: Agent not initialized + * - AVAILABLE: Agent ready to receive messages + * - GENERATING: Agent currently processing and generating response + * - STOPPING: Agent in the process of stopping generation + */ +@Injectable() +export class TexeraCopilot { + private model: any; + private modelType = ""; + private agentName = ""; + private messages: ModelMessage[] = []; Review Comment: Can you add some comments about these variables? Both `messages` and `agentResponses` are messages and it is not immediately clear which does what. ########## frontend/src/app/workspace/service/copilot/texera-copilot.ts: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable, from, of, throwError, defer } from "rxjs"; +import { map, catchError, tap, switchMap, finalize } from "rxjs/operators"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { + toolWithTimeout, + createGetOperatorInCurrentWorkflowTool, + createGetOperatorPropertiesSchemaTool, + createGetOperatorPortsInfoTool, + createGetOperatorMetadataTool, + createListAllOperatorTypesTool, + createListLinksInCurrentWorkflowTool, + createListOperatorsInCurrentWorkflowTool, +} from "./workflow-tools"; +import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service"; +import { createOpenAI } from "@ai-sdk/openai"; +import { generateText, type ModelMessage, stepCountIs } from "ai"; +import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service"; +import { AppSettings } from "../../../common/app-setting"; +import { WorkflowCompilingService } from "../compile-workflow/workflow-compiling.service"; +import { COPILOT_SYSTEM_PROMPT } from "./copilot-prompts"; +import { NotificationService } from "../../../common/service/notification/notification.service"; + +export enum CopilotState { + UNAVAILABLE = "Unavailable", + AVAILABLE = "Available", + GENERATING = "Generating", + STOPPING = "Stopping", +} + +export interface AgentUIMessage { + role: "user" | "agent"; + content: string; + isBegin: boolean; + isEnd: boolean; + toolCalls?: any[]; + toolResults?: any[]; + usage?: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + cachedInputTokens?: number; + }; +} + +/** + * Texera Copilot Service provides AI-powered assistance for workflow creation and manipulation. + * + * This service manages a single AI agent instance that can: + * 1. Interact with users through natural language messages + * 2. Execute workflow operations using specialized tools + * 3. Maintain conversation history and state + * + * The service communicates with an LLM backend (via LiteLLM) to generate responses and uses + * workflow tools to perform actions like listing operators, getting operator schemas, and + * manipulating workflow components. + * + * State management includes: + * - UNAVAILABLE: Agent not initialized + * - AVAILABLE: Agent ready to receive messages + * - GENERATING: Agent currently processing and generating response + * - STOPPING: Agent in the process of stopping generation + */ +@Injectable() +export class TexeraCopilot { + private model: any; + private modelType = ""; + private agentName = ""; + private messages: ModelMessage[] = []; + private agentResponses: AgentUIMessage[] = []; + private agentResponsesSubject = new BehaviorSubject<AgentUIMessage[]>([]); + public agentResponses$ = this.agentResponsesSubject.asObservable(); + private state = CopilotState.UNAVAILABLE; + private stateSubject = new BehaviorSubject<CopilotState>(CopilotState.UNAVAILABLE); + public state$ = this.stateSubject.asObservable(); + + constructor( + private workflowActionService: WorkflowActionService, + private workflowUtilService: WorkflowUtilService, + private operatorMetadataService: OperatorMetadataService, + private workflowCompilingService: WorkflowCompilingService, + private notificationService: NotificationService + ) {} + + public setAgentInfo(agentName: string): void { + this.agentName = agentName; + } + + public setModelType(modelType: string): void { + this.modelType = modelType; + } + + private setState(newState: CopilotState): void { + this.state = newState; + this.stateSubject.next(newState); + } + private emitAgentUIMessage( Review Comment: nit: several methods here are not separated by a blank line. Some other files also have this issue. ########## frontend/src/app/workspace/service/copilot/texera-copilot.ts: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable, from, of, throwError, defer } from "rxjs"; +import { map, catchError, tap, switchMap, finalize } from "rxjs/operators"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { + toolWithTimeout, + createGetOperatorInCurrentWorkflowTool, + createGetOperatorPropertiesSchemaTool, + createGetOperatorPortsInfoTool, + createGetOperatorMetadataTool, + createListAllOperatorTypesTool, + createListLinksInCurrentWorkflowTool, + createListOperatorsInCurrentWorkflowTool, +} from "./workflow-tools"; +import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service"; +import { createOpenAI } from "@ai-sdk/openai"; +import { generateText, type ModelMessage, stepCountIs } from "ai"; +import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service"; +import { AppSettings } from "../../../common/app-setting"; +import { WorkflowCompilingService } from "../compile-workflow/workflow-compiling.service"; +import { COPILOT_SYSTEM_PROMPT } from "./copilot-prompts"; +import { NotificationService } from "../../../common/service/notification/notification.service"; + +export enum CopilotState { + UNAVAILABLE = "Unavailable", + AVAILABLE = "Available", + GENERATING = "Generating", + STOPPING = "Stopping", +} + +export interface AgentUIMessage { + role: "user" | "agent"; + content: string; + isBegin: boolean; + isEnd: boolean; + toolCalls?: any[]; + toolResults?: any[]; + usage?: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + cachedInputTokens?: number; + }; +} + +/** + * Texera Copilot Service provides AI-powered assistance for workflow creation and manipulation. + * + * This service manages a single AI agent instance that can: + * 1. Interact with users through natural language messages + * 2. Execute workflow operations using specialized tools + * 3. Maintain conversation history and state + * + * The service communicates with an LLM backend (via LiteLLM) to generate responses and uses + * workflow tools to perform actions like listing operators, getting operator schemas, and + * manipulating workflow components. + * + * State management includes: + * - UNAVAILABLE: Agent not initialized + * - AVAILABLE: Agent ready to receive messages + * - GENERATING: Agent currently processing and generating response + * - STOPPING: Agent in the process of stopping generation + */ +@Injectable() +export class TexeraCopilot { + private model: any; + private modelType = ""; + private agentName = ""; + private messages: ModelMessage[] = []; + private agentResponses: AgentUIMessage[] = []; + private agentResponsesSubject = new BehaviorSubject<AgentUIMessage[]>([]); + public agentResponses$ = this.agentResponsesSubject.asObservable(); + private state = CopilotState.UNAVAILABLE; + private stateSubject = new BehaviorSubject<CopilotState>(CopilotState.UNAVAILABLE); + public state$ = this.stateSubject.asObservable(); + + constructor( + private workflowActionService: WorkflowActionService, + private workflowUtilService: WorkflowUtilService, + private operatorMetadataService: OperatorMetadataService, + private workflowCompilingService: WorkflowCompilingService, + private notificationService: NotificationService + ) {} + + public setAgentInfo(agentName: string): void { + this.agentName = agentName; + } + + public setModelType(modelType: string): void { + this.modelType = modelType; + } + + private setState(newState: CopilotState): void { + this.state = newState; + this.stateSubject.next(newState); + } + private emitAgentUIMessage( + role: "user" | "agent", + content: string, + isBegin: boolean, + isEnd: boolean, + toolCalls?: any[], + toolResults?: any[], + usage?: AgentUIMessage["usage"] + ): void { + this.agentResponses.push({ role, content, isBegin, isEnd, toolCalls, toolResults, usage }); + this.agentResponsesSubject.next([...this.agentResponses]); + } + public initialize(): Observable<void> { + return defer(() => { + try { + this.model = createOpenAI({ + baseURL: new URL(`${AppSettings.getApiEndpoint()}`, document.baseURI).toString(), + apiKey: "dummy", + }).chat(this.modelType); + + this.setState(CopilotState.AVAILABLE); + return of(undefined); + } catch (error: unknown) { + this.setState(CopilotState.UNAVAILABLE); + return throwError(() => error); + } + }); + } + + public sendMessage(message: string): Observable<void> { + return defer(() => { + if (!this.model) { + return throwError(() => new Error("Copilot not initialized")); + } + + if (this.state !== CopilotState.AVAILABLE) { + return throwError(() => new Error(`Cannot send message: agent is ${this.state}`)); + } + + this.setState(CopilotState.GENERATING); + + this.emitAgentUIMessage("user", message, true, true); + this.messages.push({ role: "user", content: message }); + + const tools = this.createWorkflowTools(); + let isFirstStep = true; + + return from( + generateText({ + model: this.model, + messages: this.messages, + tools, + system: COPILOT_SYSTEM_PROMPT, + stopWhen: ({ steps }) => { + if (this.state === CopilotState.STOPPING) { + this.notificationService.info(`Agent ${this.agentName} has stopped generation`); + return true; + } + return stepCountIs(50)({ steps }); Review Comment: What does this mean? ########## frontend/src/app/workspace/component/agent-panel/agent-registration/agent-registration.component.ts: ########## @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Component, EventEmitter, OnDestroy, OnInit, Output } from "@angular/core"; +import { TexeraCopilotManagerService, ModelType } from "../../../service/copilot/texera-copilot-manager.service"; +import { NotificationService } from "../../../../common/service/notification/notification.service"; +import { Subject, takeUntil } from "rxjs"; + +@Component({ + selector: "texera-agent-registration", + templateUrl: "agent-registration.component.html", + styleUrls: ["agent-registration.component.scss"], +}) +export class AgentRegistrationComponent implements OnInit, OnDestroy { + @Output() agentCreated = new EventEmitter<string>(); + + public modelTypes: ModelType[] = []; + public selectedModelType: string | null = null; + public customAgentName: string = ""; + public isLoadingModels: boolean = false; + public hasLoadingError: boolean = false; + + private destroy$ = new Subject<void>(); + + constructor( + private copilotManagerService: TexeraCopilotManagerService, + private notificationService: NotificationService + ) {} + + ngOnInit(): void { + this.isLoadingModels = true; + this.hasLoadingError = false; + + this.copilotManagerService + .fetchModelTypes() + .pipe(takeUntil(this.destroy$)) + .subscribe({ + next: models => { + this.modelTypes = models; + this.isLoadingModels = false; + if (models.length === 0) { + this.hasLoadingError = true; + this.notificationService.error("No models available. Please check the LiteLLM configuration."); + } + }, + error: (error: unknown) => { + this.isLoadingModels = false; + this.hasLoadingError = true; + const errorMessage = error instanceof Error ? error.message : String(error); + this.notificationService.error(`Failed to fetch models: ${errorMessage}`); + }, + }); + } + + ngOnDestroy(): void { + this.destroy$.next(); + this.destroy$.complete(); + } + + public selectModelType(modelTypeId: string): void { + this.selectedModelType = modelTypeId; + } + + public isCreating: boolean = false; + + /** + * Create a new agent with the selected model type. + */ + public async createAgent(): Promise<void> { Review Comment: This method is marked `async` but doesn’t use await or return a Promise-based flow. Since the logic uses RxJS and subscribe(), it doesn’t need to be async. ########## frontend/src/app/workspace/component/agent-panel/agent-panel.component.ts: ########## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Component, HostListener, OnDestroy, OnInit } from "@angular/core"; +import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; +import { NzResizeEvent } from "ng-zorro-antd/resizable"; +import { TexeraCopilotManagerService, AgentInfo } from "../../service/copilot/texera-copilot-manager.service"; +import { calculateTotalTranslate3d } from "../../../common/util/panel-dock"; + +@UntilDestroy() +@Component({ + selector: "texera-agent-panel", + templateUrl: "agent-panel.component.html", + styleUrls: ["agent-panel.component.scss"], +}) +export class AgentPanelComponent implements OnInit, OnDestroy { + protected readonly window = window; + private static readonly MIN_PANEL_WIDTH = 400; + private static readonly MIN_PANEL_HEIGHT = 450; + + // Panel dimensions and position + width: number = 0; // Start with 0 to show docked button + height = Math.max(AgentPanelComponent.MIN_PANEL_HEIGHT, window.innerHeight * 0.7); + id = -1; + dragPosition = { x: 0, y: 0 }; + returnPosition = { x: 0, y: 0 }; + isDocked = true; + + // Tab management + selectedTabIndex: number = 0; // 0 = registration tab, 1 = action plans tab, 2+ = agent tabs + agents: AgentInfo[] = []; + + constructor(private copilotManagerService: TexeraCopilotManagerService) {} + + ngOnInit(): void { + // Load saved panel dimensions and position + const savedWidth = localStorage.getItem("agent-panel-width"); Review Comment: It would be clearer if the logics for setting / loading local storage are contained in their own methods. ########## frontend/src/app/workspace/service/copilot/texera-copilot.ts: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable, from, of, throwError, defer } from "rxjs"; +import { map, catchError, tap, switchMap, finalize } from "rxjs/operators"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { + toolWithTimeout, + createGetOperatorInCurrentWorkflowTool, + createGetOperatorPropertiesSchemaTool, + createGetOperatorPortsInfoTool, + createGetOperatorMetadataTool, + createListAllOperatorTypesTool, + createListLinksInCurrentWorkflowTool, + createListOperatorsInCurrentWorkflowTool, +} from "./workflow-tools"; +import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service"; +import { createOpenAI } from "@ai-sdk/openai"; +import { generateText, type ModelMessage, stepCountIs } from "ai"; +import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service"; +import { AppSettings } from "../../../common/app-setting"; +import { WorkflowCompilingService } from "../compile-workflow/workflow-compiling.service"; +import { COPILOT_SYSTEM_PROMPT } from "./copilot-prompts"; +import { NotificationService } from "../../../common/service/notification/notification.service"; + +export enum CopilotState { + UNAVAILABLE = "Unavailable", + AVAILABLE = "Available", + GENERATING = "Generating", + STOPPING = "Stopping", +} + +export interface AgentUIMessage { + role: "user" | "agent"; + content: string; + isBegin: boolean; + isEnd: boolean; + toolCalls?: any[]; + toolResults?: any[]; + usage?: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + cachedInputTokens?: number; + }; +} + +/** + * Texera Copilot Service provides AI-powered assistance for workflow creation and manipulation. + * + * This service manages a single AI agent instance that can: + * 1. Interact with users through natural language messages + * 2. Execute workflow operations using specialized tools + * 3. Maintain conversation history and state + * + * The service communicates with an LLM backend (via LiteLLM) to generate responses and uses + * workflow tools to perform actions like listing operators, getting operator schemas, and + * manipulating workflow components. + * + * State management includes: + * - UNAVAILABLE: Agent not initialized + * - AVAILABLE: Agent ready to receive messages + * - GENERATING: Agent currently processing and generating response + * - STOPPING: Agent in the process of stopping generation + */ +@Injectable() +export class TexeraCopilot { + private model: any; + private modelType = ""; + private agentName = ""; + private messages: ModelMessage[] = []; + private agentResponses: AgentUIMessage[] = []; + private agentResponsesSubject = new BehaviorSubject<AgentUIMessage[]>([]); + public agentResponses$ = this.agentResponsesSubject.asObservable(); + private state = CopilotState.UNAVAILABLE; + private stateSubject = new BehaviorSubject<CopilotState>(CopilotState.UNAVAILABLE); + public state$ = this.stateSubject.asObservable(); + + constructor( + private workflowActionService: WorkflowActionService, + private workflowUtilService: WorkflowUtilService, + private operatorMetadataService: OperatorMetadataService, + private workflowCompilingService: WorkflowCompilingService, + private notificationService: NotificationService + ) {} + + public setAgentInfo(agentName: string): void { + this.agentName = agentName; + } + + public setModelType(modelType: string): void { + this.modelType = modelType; + } + + private setState(newState: CopilotState): void { + this.state = newState; + this.stateSubject.next(newState); + } + private emitAgentUIMessage( + role: "user" | "agent", + content: string, + isBegin: boolean, + isEnd: boolean, + toolCalls?: any[], + toolResults?: any[], + usage?: AgentUIMessage["usage"] + ): void { + this.agentResponses.push({ role, content, isBegin, isEnd, toolCalls, toolResults, usage }); + this.agentResponsesSubject.next([...this.agentResponses]); + } + public initialize(): Observable<void> { + return defer(() => { + try { + this.model = createOpenAI({ + baseURL: new URL(`${AppSettings.getApiEndpoint()}`, document.baseURI).toString(), + apiKey: "dummy", + }).chat(this.modelType); + + this.setState(CopilotState.AVAILABLE); + return of(undefined); + } catch (error: unknown) { + this.setState(CopilotState.UNAVAILABLE); + return throwError(() => error); + } + }); + } + + public sendMessage(message: string): Observable<void> { + return defer(() => { + if (!this.model) { + return throwError(() => new Error("Copilot not initialized")); + } + + if (this.state !== CopilotState.AVAILABLE) { + return throwError(() => new Error(`Cannot send message: agent is ${this.state}`)); + } + + this.setState(CopilotState.GENERATING); + + this.emitAgentUIMessage("user", message, true, true); + this.messages.push({ role: "user", content: message }); + + const tools = this.createWorkflowTools(); + let isFirstStep = true; + + return from( Review Comment: This is the most import part of the life-cycle of agent handling. It would be good if you can have some docs here about what each step / callback does. ########## frontend/src/styles.scss: ########## @@ -20,7 +20,7 @@ @import "@ali-hm/angular-tree-component/css/angular-tree-component.css"; * { - user-select: none; + user-select: text; Review Comment: What does this change do? ########## frontend/src/app/workspace/service/copilot/texera-copilot.ts: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable, from, of, throwError, defer } from "rxjs"; +import { map, catchError, tap, switchMap, finalize } from "rxjs/operators"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { + toolWithTimeout, + createGetOperatorInCurrentWorkflowTool, + createGetOperatorPropertiesSchemaTool, + createGetOperatorPortsInfoTool, + createGetOperatorMetadataTool, + createListAllOperatorTypesTool, + createListLinksInCurrentWorkflowTool, + createListOperatorsInCurrentWorkflowTool, +} from "./workflow-tools"; +import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service"; +import { createOpenAI } from "@ai-sdk/openai"; +import { generateText, type ModelMessage, stepCountIs } from "ai"; +import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service"; +import { AppSettings } from "../../../common/app-setting"; +import { WorkflowCompilingService } from "../compile-workflow/workflow-compiling.service"; +import { COPILOT_SYSTEM_PROMPT } from "./copilot-prompts"; +import { NotificationService } from "../../../common/service/notification/notification.service"; + +export enum CopilotState { + UNAVAILABLE = "Unavailable", + AVAILABLE = "Available", + GENERATING = "Generating", + STOPPING = "Stopping", +} + +export interface AgentUIMessage { + role: "user" | "agent"; + content: string; + isBegin: boolean; + isEnd: boolean; + toolCalls?: any[]; + toolResults?: any[]; + usage?: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + cachedInputTokens?: number; + }; +} + +/** + * Texera Copilot Service provides AI-powered assistance for workflow creation and manipulation. + * + * This service manages a single AI agent instance that can: + * 1. Interact with users through natural language messages + * 2. Execute workflow operations using specialized tools + * 3. Maintain conversation history and state + * + * The service communicates with an LLM backend (via LiteLLM) to generate responses and uses + * workflow tools to perform actions like listing operators, getting operator schemas, and + * manipulating workflow components. + * + * State management includes: + * - UNAVAILABLE: Agent not initialized + * - AVAILABLE: Agent ready to receive messages + * - GENERATING: Agent currently processing and generating response + * - STOPPING: Agent in the process of stopping generation + */ +@Injectable() +export class TexeraCopilot { + private model: any; + private modelType = ""; + private agentName = ""; + private messages: ModelMessage[] = []; + private agentResponses: AgentUIMessage[] = []; + private agentResponsesSubject = new BehaviorSubject<AgentUIMessage[]>([]); + public agentResponses$ = this.agentResponsesSubject.asObservable(); + private state = CopilotState.UNAVAILABLE; + private stateSubject = new BehaviorSubject<CopilotState>(CopilotState.UNAVAILABLE); + public state$ = this.stateSubject.asObservable(); + + constructor( + private workflowActionService: WorkflowActionService, + private workflowUtilService: WorkflowUtilService, + private operatorMetadataService: OperatorMetadataService, + private workflowCompilingService: WorkflowCompilingService, + private notificationService: NotificationService + ) {} + + public setAgentInfo(agentName: string): void { + this.agentName = agentName; + } + + public setModelType(modelType: string): void { + this.modelType = modelType; + } + + private setState(newState: CopilotState): void { + this.state = newState; + this.stateSubject.next(newState); + } + private emitAgentUIMessage( + role: "user" | "agent", + content: string, + isBegin: boolean, + isEnd: boolean, + toolCalls?: any[], + toolResults?: any[], + usage?: AgentUIMessage["usage"] + ): void { + this.agentResponses.push({ role, content, isBegin, isEnd, toolCalls, toolResults, usage }); + this.agentResponsesSubject.next([...this.agentResponses]); + } + public initialize(): Observable<void> { + return defer(() => { + try { + this.model = createOpenAI({ + baseURL: new URL(`${AppSettings.getApiEndpoint()}`, document.baseURI).toString(), + apiKey: "dummy", + }).chat(this.modelType); + + this.setState(CopilotState.AVAILABLE); + return of(undefined); + } catch (error: unknown) { + this.setState(CopilotState.UNAVAILABLE); + return throwError(() => error); + } + }); + } + + public sendMessage(message: string): Observable<void> { + return defer(() => { + if (!this.model) { + return throwError(() => new Error("Copilot not initialized")); + } + + if (this.state !== CopilotState.AVAILABLE) { + return throwError(() => new Error(`Cannot send message: agent is ${this.state}`)); + } + + this.setState(CopilotState.GENERATING); + + this.emitAgentUIMessage("user", message, true, true); + this.messages.push({ role: "user", content: message }); + + const tools = this.createWorkflowTools(); + let isFirstStep = true; + + return from( + generateText({ + model: this.model, + messages: this.messages, + tools, + system: COPILOT_SYSTEM_PROMPT, + stopWhen: ({ steps }) => { + if (this.state === CopilotState.STOPPING) { + this.notificationService.info(`Agent ${this.agentName} has stopped generation`); + return true; + } + return stepCountIs(50)({ steps }); + }, + onStepFinish: ({ text, toolCalls, toolResults, usage }) => { + if (this.state === CopilotState.STOPPING) { + return; + } + + this.emitAgentUIMessage("agent", text || "", isFirstStep, false, toolCalls, toolResults, usage as any); + + isFirstStep = false; + }, + }) + ).pipe( + tap(({ response }) => { Review Comment: Can you explain the difference between this part and `onStepFinish` as comments? It would help understanding the lifecycle of `sendMessage`. ########## frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html: ########## @@ -139,7 +139,7 @@ <!-- Delete option for links only --> <li nz-menu-item - *ngIf="hasHighlightedLinks() && + *ngIf="hasHighlightedLinks() && Review Comment: Accidental change? ########## frontend/src/app/workspace/component/agent-panel/agent-chat/agent-chat.component.ts: ########## @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Component, ViewChild, ElementRef, Input, OnInit, AfterViewChecked } from "@angular/core"; +import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; +import { CopilotState, AgentUIMessage } from "../../../service/copilot/texera-copilot"; +import { AgentInfo, TexeraCopilotManagerService } from "../../../service/copilot/texera-copilot-manager.service"; +import { NotificationService } from "../../../../common/service/notification/notification.service"; + +@UntilDestroy() +@Component({ + selector: "texera-agent-chat", + templateUrl: "agent-chat.component.html", + styleUrls: ["agent-chat.component.scss"], +}) +export class AgentChatComponent implements OnInit, AfterViewChecked { + @Input() agentInfo!: AgentInfo; + @ViewChild("messageContainer", { static: false }) messageContainer?: ElementRef; + @ViewChild("messageInput", { static: false }) messageInput?: ElementRef; + + public agentResponses: AgentUIMessage[] = []; + public currentMessage = ""; + private shouldScrollToBottom = false; + public isDetailsModalVisible = false; + public selectedResponse: AgentUIMessage | null = null; + public hoveredMessageIndex: number | null = null; + public isSystemInfoModalVisible = false; + public systemPrompt: string = ""; + public availableTools: Array<{ name: string; description: string; inputSchema: any }> = []; + public agentState: CopilotState = CopilotState.UNAVAILABLE; + + constructor( + private copilotManagerService: TexeraCopilotManagerService, + private notificationService: NotificationService + ) {} + + ngOnInit(): void { + if (!this.agentInfo) { + return; + } + + // Subscribe to agent responses + this.copilotManagerService + .getAgentResponsesObservable(this.agentInfo.id) + .pipe(untilDestroyed(this)) + .subscribe(responses => { + this.agentResponses = responses; + this.shouldScrollToBottom = true; + }); + + // Subscribe to agent state changes + this.copilotManagerService + .getAgentStateObservable(this.agentInfo.id) + .pipe(untilDestroyed(this)) + .subscribe(state => { + this.agentState = state; + }); + } + + ngAfterViewChecked(): void { + if (this.shouldScrollToBottom) { + this.scrollToBottom(); + this.shouldScrollToBottom = false; + } + } + + public setHoveredMessage(index: number | null): void { + this.hoveredMessageIndex = index; + } + + public showResponseDetails(response: AgentUIMessage): void { + this.selectedResponse = response; + this.isDetailsModalVisible = true; + } + + public closeDetailsModal(): void { + this.isDetailsModalVisible = false; + this.selectedResponse = null; + } + + public showSystemInfo(): void { + this.copilotManagerService + .getSystemInfo(this.agentInfo.id) + .pipe(untilDestroyed(this)) + .subscribe(systemInfo => { + this.systemPrompt = systemInfo.systemPrompt; + this.availableTools = systemInfo.tools; + this.isSystemInfoModalVisible = true; + }); + } + + public closeSystemInfoModal(): void { + this.isSystemInfoModalVisible = false; + } + + public formatJson(data: any): string { + return JSON.stringify(data, null, 2); + } + + public getToolResult(response: AgentUIMessage, toolCallIndex: number): any { + if (!response.toolResults || toolCallIndex >= response.toolResults.length) { + return null; + } + const toolResult = response.toolResults[toolCallIndex]; + return toolResult.output || toolResult.result || toolResult; + } + + public getTotalInputTokens(): number { + for (let i = this.agentResponses.length - 1; i >= 0; i--) { + const response = this.agentResponses[i]; + if (response.usage?.inputTokens !== undefined) { + return response.usage.inputTokens; + } + } + return 0; + } + + public getTotalOutputTokens(): number { + for (let i = this.agentResponses.length - 1; i >= 0; i--) { + const response = this.agentResponses[i]; + if (response.usage?.outputTokens !== undefined) { + return response.usage.outputTokens; + } + } + return 0; + } + + /** + * Send a message to the agent via the copilot manager service. + */ + public sendMessage(): void { + if (!this.currentMessage.trim() || !this.canSendMessage()) { + return; + } + + const userMessage = this.currentMessage.trim(); + this.currentMessage = ""; + + // Send to copilot via manager service + this.copilotManagerService + .sendMessage(this.agentInfo.id, userMessage) + .pipe(untilDestroyed(this)) + .subscribe({ + error: (error: unknown) => { + this.notificationService.error(`Error sending message: ${error}`); + }, + }); + } + + /** + * Check if messages can be sent (only when agent is available). + */ + public canSendMessage(): boolean { + return this.agentState === CopilotState.AVAILABLE; + } + + /** + * Get the state icon URL based on current agent state. + * Uses the same icons as workflow operators for consistency. + */ + public getStateIconUrl(): string { + return this.agentState === CopilotState.AVAILABLE ? "assets/svg/done.svg" : "assets/gif/loading.gif"; + } + + /** + * Get the tooltip text for the state icon. + */ + public getStateTooltip(): string { + switch (this.agentState) { + case CopilotState.AVAILABLE: + return "Agent is ready"; + case CopilotState.GENERATING: + return "Agent is generating response..."; + case CopilotState.STOPPING: + return "Agent is stopping..."; + case CopilotState.UNAVAILABLE: + return "Agent is unavailable"; + default: + return "Agent status unknown"; + } + } + + public onEnterPress(event: KeyboardEvent): void { + if (!event.shiftKey) { + event.preventDefault(); + this.sendMessage(); + } + } + + private scrollToBottom(): void { + if (this.messageContainer) { + const element = this.messageContainer.nativeElement; + element.scrollTop = element.scrollHeight; + } + } + + public stopGeneration(): void { + this.copilotManagerService.stopGeneration(this.agentInfo.id).pipe(untilDestroyed(this)).subscribe(); + } + + public clearMessages(): void { + this.copilotManagerService.clearMessages(this.agentInfo.id).pipe(untilDestroyed(this)).subscribe(); + } + + public isGenerating(): boolean { + return this.agentState === CopilotState.GENERATING; + } + + public isStopping(): boolean { Review Comment: Where is this method used? ########## frontend/src/app/workspace/service/copilot/texera-copilot.ts: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable, from, of, throwError, defer } from "rxjs"; +import { map, catchError, tap, switchMap, finalize } from "rxjs/operators"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { + toolWithTimeout, + createGetOperatorInCurrentWorkflowTool, + createGetOperatorPropertiesSchemaTool, + createGetOperatorPortsInfoTool, + createGetOperatorMetadataTool, + createListAllOperatorTypesTool, + createListLinksInCurrentWorkflowTool, + createListOperatorsInCurrentWorkflowTool, +} from "./workflow-tools"; +import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service"; +import { createOpenAI } from "@ai-sdk/openai"; +import { generateText, type ModelMessage, stepCountIs } from "ai"; +import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service"; +import { AppSettings } from "../../../common/app-setting"; +import { WorkflowCompilingService } from "../compile-workflow/workflow-compiling.service"; +import { COPILOT_SYSTEM_PROMPT } from "./copilot-prompts"; +import { NotificationService } from "../../../common/service/notification/notification.service"; + +export enum CopilotState { + UNAVAILABLE = "Unavailable", + AVAILABLE = "Available", + GENERATING = "Generating", + STOPPING = "Stopping", +} + +export interface AgentUIMessage { + role: "user" | "agent"; + content: string; + isBegin: boolean; + isEnd: boolean; + toolCalls?: any[]; + toolResults?: any[]; + usage?: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + cachedInputTokens?: number; + }; +} + +/** + * Texera Copilot Service provides AI-powered assistance for workflow creation and manipulation. + * + * This service manages a single AI agent instance that can: + * 1. Interact with users through natural language messages + * 2. Execute workflow operations using specialized tools + * 3. Maintain conversation history and state + * + * The service communicates with an LLM backend (via LiteLLM) to generate responses and uses + * workflow tools to perform actions like listing operators, getting operator schemas, and + * manipulating workflow components. + * + * State management includes: + * - UNAVAILABLE: Agent not initialized + * - AVAILABLE: Agent ready to receive messages + * - GENERATING: Agent currently processing and generating response + * - STOPPING: Agent in the process of stopping generation + */ +@Injectable() +export class TexeraCopilot { + private model: any; + private modelType = ""; + private agentName = ""; + private messages: ModelMessage[] = []; + private agentResponses: AgentUIMessage[] = []; + private agentResponsesSubject = new BehaviorSubject<AgentUIMessage[]>([]); + public agentResponses$ = this.agentResponsesSubject.asObservable(); + private state = CopilotState.UNAVAILABLE; + private stateSubject = new BehaviorSubject<CopilotState>(CopilotState.UNAVAILABLE); + public state$ = this.stateSubject.asObservable(); + + constructor( + private workflowActionService: WorkflowActionService, + private workflowUtilService: WorkflowUtilService, + private operatorMetadataService: OperatorMetadataService, + private workflowCompilingService: WorkflowCompilingService, + private notificationService: NotificationService + ) {} + + public setAgentInfo(agentName: string): void { + this.agentName = agentName; + } + + public setModelType(modelType: string): void { + this.modelType = modelType; + } + + private setState(newState: CopilotState): void { + this.state = newState; + this.stateSubject.next(newState); + } + private emitAgentUIMessage( + role: "user" | "agent", + content: string, + isBegin: boolean, + isEnd: boolean, + toolCalls?: any[], + toolResults?: any[], + usage?: AgentUIMessage["usage"] + ): void { + this.agentResponses.push({ role, content, isBegin, isEnd, toolCalls, toolResults, usage }); + this.agentResponsesSubject.next([...this.agentResponses]); + } + public initialize(): Observable<void> { + return defer(() => { + try { + this.model = createOpenAI({ + baseURL: new URL(`${AppSettings.getApiEndpoint()}`, document.baseURI).toString(), + apiKey: "dummy", + }).chat(this.modelType); + + this.setState(CopilotState.AVAILABLE); + return of(undefined); + } catch (error: unknown) { + this.setState(CopilotState.UNAVAILABLE); + return throwError(() => error); + } + }); + } + + public sendMessage(message: string): Observable<void> { + return defer(() => { + if (!this.model) { + return throwError(() => new Error("Copilot not initialized")); + } + + if (this.state !== CopilotState.AVAILABLE) { + return throwError(() => new Error(`Cannot send message: agent is ${this.state}`)); + } + + this.setState(CopilotState.GENERATING); + + this.emitAgentUIMessage("user", message, true, true); + this.messages.push({ role: "user", content: message }); + + const tools = this.createWorkflowTools(); Review Comment: Why does this have to be created inside `sendMessage`? Can it be created once inside this class? ########## frontend/src/app/workspace/service/copilot/texera-copilot.ts: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable, from, of, throwError, defer } from "rxjs"; +import { map, catchError, tap, switchMap, finalize } from "rxjs/operators"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { + toolWithTimeout, + createGetOperatorInCurrentWorkflowTool, + createGetOperatorPropertiesSchemaTool, + createGetOperatorPortsInfoTool, + createGetOperatorMetadataTool, + createListAllOperatorTypesTool, + createListLinksInCurrentWorkflowTool, + createListOperatorsInCurrentWorkflowTool, +} from "./workflow-tools"; +import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service"; +import { createOpenAI } from "@ai-sdk/openai"; +import { generateText, type ModelMessage, stepCountIs } from "ai"; +import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service"; +import { AppSettings } from "../../../common/app-setting"; +import { WorkflowCompilingService } from "../compile-workflow/workflow-compiling.service"; +import { COPILOT_SYSTEM_PROMPT } from "./copilot-prompts"; +import { NotificationService } from "../../../common/service/notification/notification.service"; + +export enum CopilotState { + UNAVAILABLE = "Unavailable", + AVAILABLE = "Available", + GENERATING = "Generating", + STOPPING = "Stopping", +} + +export interface AgentUIMessage { + role: "user" | "agent"; + content: string; + isBegin: boolean; + isEnd: boolean; + toolCalls?: any[]; + toolResults?: any[]; + usage?: { + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + cachedInputTokens?: number; + }; +} + +/** + * Texera Copilot Service provides AI-powered assistance for workflow creation and manipulation. + * + * This service manages a single AI agent instance that can: + * 1. Interact with users through natural language messages + * 2. Execute workflow operations using specialized tools + * 3. Maintain conversation history and state + * + * The service communicates with an LLM backend (via LiteLLM) to generate responses and uses + * workflow tools to perform actions like listing operators, getting operator schemas, and + * manipulating workflow components. + * + * State management includes: + * - UNAVAILABLE: Agent not initialized + * - AVAILABLE: Agent ready to receive messages + * - GENERATING: Agent currently processing and generating response + * - STOPPING: Agent in the process of stopping generation + */ +@Injectable() +export class TexeraCopilot { + private model: any; + private modelType = ""; + private agentName = ""; + private messages: ModelMessage[] = []; + private agentResponses: AgentUIMessage[] = []; + private agentResponsesSubject = new BehaviorSubject<AgentUIMessage[]>([]); + public agentResponses$ = this.agentResponsesSubject.asObservable(); + private state = CopilotState.UNAVAILABLE; + private stateSubject = new BehaviorSubject<CopilotState>(CopilotState.UNAVAILABLE); + public state$ = this.stateSubject.asObservable(); + + constructor( + private workflowActionService: WorkflowActionService, + private workflowUtilService: WorkflowUtilService, + private operatorMetadataService: OperatorMetadataService, + private workflowCompilingService: WorkflowCompilingService, + private notificationService: NotificationService + ) {} + + public setAgentInfo(agentName: string): void { + this.agentName = agentName; + } + + public setModelType(modelType: string): void { + this.modelType = modelType; + } + + private setState(newState: CopilotState): void { + this.state = newState; + this.stateSubject.next(newState); + } + private emitAgentUIMessage( + role: "user" | "agent", + content: string, + isBegin: boolean, + isEnd: boolean, + toolCalls?: any[], + toolResults?: any[], + usage?: AgentUIMessage["usage"] + ): void { + this.agentResponses.push({ role, content, isBegin, isEnd, toolCalls, toolResults, usage }); + this.agentResponsesSubject.next([...this.agentResponses]); + } + public initialize(): Observable<void> { + return defer(() => { + try { + this.model = createOpenAI({ + baseURL: new URL(`${AppSettings.getApiEndpoint()}`, document.baseURI).toString(), + apiKey: "dummy", + }).chat(this.modelType); + + this.setState(CopilotState.AVAILABLE); + return of(undefined); + } catch (error: unknown) { + this.setState(CopilotState.UNAVAILABLE); + return throwError(() => error); + } + }); + } + + public sendMessage(message: string): Observable<void> { + return defer(() => { + if (!this.model) { + return throwError(() => new Error("Copilot not initialized")); + } + + if (this.state !== CopilotState.AVAILABLE) { + return throwError(() => new Error(`Cannot send message: agent is ${this.state}`)); + } + + this.setState(CopilotState.GENERATING); + + this.emitAgentUIMessage("user", message, true, true); + this.messages.push({ role: "user", content: message }); + + const tools = this.createWorkflowTools(); + let isFirstStep = true; + + return from( + generateText({ + model: this.model, + messages: this.messages, + tools, + system: COPILOT_SYSTEM_PROMPT, + stopWhen: ({ steps }) => { + if (this.state === CopilotState.STOPPING) { + this.notificationService.info(`Agent ${this.agentName} has stopped generation`); + return true; + } + return stepCountIs(50)({ steps }); + }, + onStepFinish: ({ text, toolCalls, toolResults, usage }) => { + if (this.state === CopilotState.STOPPING) { + return; + } + + this.emitAgentUIMessage("agent", text || "", isFirstStep, false, toolCalls, toolResults, usage as any); + + isFirstStep = false; + }, + }) + ).pipe( + tap(({ response }) => { + this.messages.push(...response.messages); + this.agentResponsesSubject.next([...this.agentResponses]); + }), + map(() => undefined), + catchError((err: unknown) => { + const errorText = `Error: ${err instanceof Error ? err.message : String(err)}`; + this.messages.push({ role: "assistant", content: errorText }); + this.emitAgentUIMessage("agent", errorText, false, true); + return throwError(() => err); + }), + finalize(() => { + this.setState(CopilotState.AVAILABLE); + }) + ); + }); + } + + private createWorkflowTools(): Record<string, any> { + const listOperatorsInCurrentWorkflowTool = toolWithTimeout( + createListOperatorsInCurrentWorkflowTool(this.workflowActionService) + ); + const listLinksTool = toolWithTimeout(createListLinksInCurrentWorkflowTool(this.workflowActionService)); + const listAllOperatorTypesTool = toolWithTimeout(createListAllOperatorTypesTool(this.workflowUtilService)); + const getOperatorTool = toolWithTimeout( + createGetOperatorInCurrentWorkflowTool(this.workflowActionService, this.workflowCompilingService) + ); + const getOperatorPropertiesSchemaTool = toolWithTimeout( + createGetOperatorPropertiesSchemaTool(this.operatorMetadataService) + ); + const getOperatorPortsInfoTool = toolWithTimeout(createGetOperatorPortsInfoTool(this.operatorMetadataService)); + const getOperatorMetadataTool = toolWithTimeout(createGetOperatorMetadataTool(this.operatorMetadataService)); + + return { + listAllOperatorTypes: listAllOperatorTypesTool, + listOperatorsInCurrentWorkflow: listOperatorsInCurrentWorkflowTool, + listLinksInCurrentWorkflow: listLinksTool, + getOperatorInCurrentWorkflow: getOperatorTool, + getOperatorPropertiesSchema: getOperatorPropertiesSchemaTool, + getOperatorPortsInfo: getOperatorPortsInfoTool, + getOperatorMetadata: getOperatorMetadataTool, + }; + } + + public getAgentResponses(): AgentUIMessage[] { + return [...this.agentResponses]; + } + + public stopGeneration(): void { + if (this.state !== CopilotState.GENERATING) { + return; + } + this.setState(CopilotState.STOPPING); + } + + public clearMessages(): void { + this.messages = []; + this.agentResponses = []; + this.agentResponsesSubject.next([...this.agentResponses]); + } + + public getState(): CopilotState { + return this.state; + } + + public disconnect(): Observable<void> { + return defer(() => { + if (this.state === CopilotState.GENERATING) { + this.stopGeneration(); + } + + this.clearMessages(); + this.setState(CopilotState.UNAVAILABLE); + this.notificationService.info(`Agent ${this.agentName} is removed successfully`); + + return of(undefined); + }); + } + + public isConnected(): boolean { + return this.state !== CopilotState.UNAVAILABLE; + } + + public getSystemPrompt(): string { + return COPILOT_SYSTEM_PROMPT; + } + + public getToolsInfo(): Array<{ name: string; description: string; inputSchema: any }> { + const tools = this.createWorkflowTools(); Review Comment: Ditto, why does `tools` need to be created dynamically? ########## frontend/src/app/workspace/component/agent-panel/agent-panel.component.ts: ########## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Component, HostListener, OnDestroy, OnInit } from "@angular/core"; +import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; +import { NzResizeEvent } from "ng-zorro-antd/resizable"; +import { TexeraCopilotManagerService, AgentInfo } from "../../service/copilot/texera-copilot-manager.service"; +import { calculateTotalTranslate3d } from "../../../common/util/panel-dock"; + +@UntilDestroy() +@Component({ + selector: "texera-agent-panel", + templateUrl: "agent-panel.component.html", + styleUrls: ["agent-panel.component.scss"], +}) +export class AgentPanelComponent implements OnInit, OnDestroy { + protected readonly window = window; + private static readonly MIN_PANEL_WIDTH = 400; + private static readonly MIN_PANEL_HEIGHT = 450; + + // Panel dimensions and position + width: number = 0; // Start with 0 to show docked button + height = Math.max(AgentPanelComponent.MIN_PANEL_HEIGHT, window.innerHeight * 0.7); + id = -1; + dragPosition = { x: 0, y: 0 }; + returnPosition = { x: 0, y: 0 }; + isDocked = true; + + // Tab management + selectedTabIndex: number = 0; // 0 = registration tab, 1 = action plans tab, 2+ = agent tabs + agents: AgentInfo[] = []; + + constructor(private copilotManagerService: TexeraCopilotManagerService) {} + + ngOnInit(): void { + // Load saved panel dimensions and position + const savedWidth = localStorage.getItem("agent-panel-width"); + const savedHeight = localStorage.getItem("agent-panel-height"); + const savedStyle = localStorage.getItem("agent-panel-style"); + const savedDocked = localStorage.getItem("agent-panel-docked"); + + // Only restore width if the panel was not docked + if (savedDocked === "false" && savedWidth) { + this.width = Number(savedWidth); + } + + if (savedHeight) this.height = Number(savedHeight); + + if (savedStyle) { + const container = document.getElementById("agent-container"); + if (container) { + container.style.cssText = savedStyle; + const translates = container.style.transform; + const [xOffset, yOffset] = calculateTotalTranslate3d(translates); + this.returnPosition = { x: -xOffset, y: -yOffset }; + this.isDocked = this.dragPosition.x === this.returnPosition.x && this.dragPosition.y === this.returnPosition.y; + } + } + + // Subscribe to agent changes + this.copilotManagerService.agentChange$.pipe(untilDestroyed(this)).subscribe(() => { + this.copilotManagerService + .getAllAgents() + .pipe(untilDestroyed(this)) + .subscribe(agents => { + this.agents = agents; + }); + }); + + // Load initial agents + this.copilotManagerService + .getAllAgents() + .pipe(untilDestroyed(this)) + .subscribe(agents => { + this.agents = agents; + }); + } + + @HostListener("window:beforeunload") + ngOnDestroy(): void { + // Save panel state + localStorage.setItem("agent-panel-width", String(this.width)); + localStorage.setItem("agent-panel-height", String(this.height)); + localStorage.setItem("agent-panel-docked", String(this.width === 0)); + + const container = document.getElementById("agent-container"); + if (container) { + localStorage.setItem("agent-panel-style", container.style.cssText); + } + } + + /** + * Open the panel from docked state + */ + public openPanel(): void { + if (this.width === 0) { + // Open panel + this.width = AgentPanelComponent.MIN_PANEL_WIDTH; + } else { + // Close panel (dock it) + this.width = 0; + this.isDocked = true; + } + } + + /** + * Handle agent creation + */ + public onAgentCreated(agentId: string): void { + // The agent is already added to the agents array by the manager service + // Find the index of the newly created agent and switch to that tab + // Tab index 0 is registration, 1 is action plans, so agent tabs start at index 2 + const agentIndex = this.agents.findIndex(agent => agent.id === agentId); + if (agentIndex !== -1) { + this.selectedTabIndex = agentIndex + 2; // +2 because tab 0 is registration, tab 1 is action plans + } + } + + /** + * Delete an agent + */ + public deleteAgent(agentId: string, event: Event): void { + event.stopPropagation(); // Prevent tab switch + + if (confirm("Are you sure you want to delete this agent?")) { + const agentIndex = this.agents.findIndex(agent => agent.id === agentId); + this.copilotManagerService.deleteAgent(agentId); + + // If we're on the deleted agent's tab, switch to registration + if (agentIndex !== -1 && this.selectedTabIndex === agentIndex + 2) { + this.selectedTabIndex = 0; + } else if (this.selectedTabIndex > agentIndex + 2) { + // Adjust selected index if we deleted a tab before the current one + this.selectedTabIndex--; + } + } + } + + /** + * Handle panel resize + */ + onResize({ width, height }: NzResizeEvent): void { + cancelAnimationFrame(this.id); + this.id = requestAnimationFrame(() => { + this.width = width!; Review Comment: What would happen if `width` or `height` are undefined? Should there be some default values for such case? ########## frontend/src/app/workspace/service/copilot/texera-copilot-manager.service.ts: ########## @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable, Injector } from "@angular/core"; +import { HttpClient } from "@angular/common/http"; +import { TexeraCopilot, AgentUIMessage, CopilotState } from "./texera-copilot"; +import { Observable, Subject, catchError, map, of, shareReplay, tap, defer, throwError, switchMap } from "rxjs"; +import { AppSettings } from "../../../common/app-setting"; + +export interface AgentInfo { + id: string; + name: string; + modelType: string; + instance: TexeraCopilot; + createdAt: Date; +} + +export interface ModelType { + id: string; + name: string; + description: string; + icon: string; +} + +interface LiteLLMModel { + id: string; + object: string; + created: number; + owned_by: string; +} + +interface LiteLLMModelsResponse { + data: LiteLLMModel[]; + object: string; +} + +/** + * Texera Copilot Manager Service manages multiple AI agent instances for workflow assistance. + * + * This service provides centralized management for multiple copilot agents, allowing users to: + * 1. Create and delete multiple agent instances with different LLM models + * 2. Route messages to specific agents + * 3. Track agent states and conversation history + * 4. Query available LLM models from the backend + * + * Each agent is a separate TexeraCopilot instance with its own: + * - Model configuration (e.g., GPT-4, Claude, etc.) + * - Conversation history + * - State (available, generating, stopping, unavailable) + * + * The service acts as a registry and coordinator, ensuring proper lifecycle management + * and providing observable streams for agent changes and state updates. + */ +@Injectable({ + providedIn: "root", +}) +export class TexeraCopilotManagerService { + private agents = new Map<string, AgentInfo>(); + private agentCounter = 0; + private agentChangeSubject = new Subject<void>(); + public agentChange$ = this.agentChangeSubject.asObservable(); + + private modelTypes$: Observable<ModelType[]> | null = null; + + constructor( + private injector: Injector, + private http: HttpClient + ) {} + + public createAgent(modelType: string, customName?: string): Observable<AgentInfo> { + return defer(() => { + const agentId = `agent-${++this.agentCounter}`; + const agentName = customName || `Agent ${this.agentCounter}`; + + const agentInstance = this.createCopilotInstance(modelType); + agentInstance.setAgentInfo(agentName); + + return agentInstance.initialize().pipe( + map(() => { + const agentInfo: AgentInfo = { + id: agentId, + name: agentName, + modelType, + instance: agentInstance, + createdAt: new Date(), + }; + + this.agents.set(agentId, agentInfo); + this.agentChangeSubject.next(); + + return agentInfo; + }), + catchError((error: unknown) => { + return throwError(() => error); + }) + ); + }); + } + + public getAgent(agentId: string): Observable<AgentInfo> { + return defer(() => { + const agent = this.agents.get(agentId); + if (!agent) { + return throwError(() => new Error(`Agent with ID ${agentId} not found`)); + } + return of(agent); + }); + } + + public getAllAgents(): Observable<AgentInfo[]> { + return of(Array.from(this.agents.values())); + } + public deleteAgent(agentId: string): Observable<boolean> { + return defer(() => { + const agent = this.agents.get(agentId); + if (!agent) { + return of(false); + } + + return agent.instance.disconnect().pipe( + map(() => { + this.agents.delete(agentId); + this.agentChangeSubject.next(); + return true; + }) + ); + }); + } + + public fetchModelTypes(): Observable<ModelType[]> { + if (!this.modelTypes$) { + this.modelTypes$ = this.http.get<LiteLLMModelsResponse>(`${AppSettings.getApiEndpoint()}/models`).pipe( + map(response => + response.data.map((model: LiteLLMModel) => ({ + id: model.id, + name: this.formatModelName(model.id), + description: `Model: ${model.id}`, + icon: "robot", + })) + ), + catchError((error: unknown) => { + console.error("Failed to fetch models from API:", error); + return of([]); + }), + shareReplay(1) + ); + } + return this.modelTypes$; + } + + private formatModelName(modelId: string): string { + return modelId + .split("-") + .map(word => word.charAt(0).toUpperCase() + word.slice(1)) + .join(" "); + } + + public getAgentCount(): Observable<number> { + return of(this.agents.size); + } + public sendMessage(agentId: string, message: string): Observable<void> { + return defer(() => { + const agent = this.agents.get(agentId); + if (!agent) { + return throwError(() => new Error(`Agent with ID ${agentId} not found`)); + } + return agent.instance.sendMessage(message); + }); + } + + public getAgentResponsesObservable(agentId: string): Observable<AgentUIMessage[]> { + return defer(() => { Review Comment: There are many methods here that repeat similar logics and all throw the same `Agent with ID not found` error. Is it possible to refactor these methods to avoid duplicate code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
