This is an automated email from the ASF dual-hosted git repository.
xuang7 pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.2 by this push:
new 911eb2f854 fix(frontend): clean up websocket state when returning to
the dashboard (#5565) [release/v1.2 backport] (#5609)
911eb2f854 is described below
commit 911eb2f8546f55e070b9bcd59f682ef087d54683
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Wed Jun 10 13:50:33 2026 -0700
fix(frontend): clean up websocket state when returning to the dashboard
(#5565) [release/v1.2 backport] (#5609)
### What changes were proposed in this PR?
Backport of #5565 ("fix(frontend): clean up websocket state when
returning to the dashboard") to `release/v1.2`.
Applies as a clean `git cherry-pick -x` of the merged squash commit
`ad10b7f2b` — no conflicts and no prerequisite chain (unlike the #5280
backport in #5602). Frontend-only: 14 files changed, 391 insertions, 2
deletions, identical to the original.
For the full change description see #5565. In brief: websocket-derived
front-end state (the connection, plus execution status, console output,
and results built from its events) lives in singletons outside the
workspace and was never torn down when returning to the dashboard or
switching computing units, so stale state carried over. This tears that
state down in `WorkspaceComponent.ngOnDestroy()` and clears it again on
a computing-unit switch.
### Any related issues, documentation, discussions?
Backports #5565 (which closes #3120 on `main`). Related: #3093.
### How was this PR tested?
This is a backport with no changes beyond the single cherry-picked
commit, so it relies on the existing tests carried over from #5565
(added `*.spec.ts` cases across `computing-unit-status`,
`workspace.component`, `execute-workflow`, `workflow-console`,
`workflow-result`, `result-panel`, and `workflow-websocket`).
Backport fidelity was verified locally: after the cherry-pick, every
file touched by #5565 is byte-identical to its state on `main` at the
merged commit (`ad10b7f2b`), and `release/v1.2` had no independent
changes to any of those files.
Full compile and unit-test runs are left to CI.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.8)
Co-authored-by: Xinyuan Lin <[email protected]>
---
.../computing-unit-status.service.spec.ts | 146 +++++++++++++++++++++
.../computing-unit-status.service.ts | 34 +++++
.../result-panel/result-panel.component.spec.ts | 21 +++
.../result-panel/result-panel.component.ts | 17 +++
.../component/workspace.component.spec.ts | 41 +++++-
.../app/workspace/component/workspace.component.ts | 28 +++-
.../execute-workflow.service.spec.ts | 15 +++
.../execute-workflow/execute-workflow.service.ts | 10 ++
.../workflow-console.service.spec.ts | 13 ++
.../workflow-console/workflow-console.service.ts | 9 ++
.../workflow-result.service.spec.ts | 27 ++++
.../workflow-result/workflow-result.service.ts | 22 ++++
.../workflow-websocket.service.spec.ts | 8 ++
.../workflow-websocket.service.ts | 2 +
14 files changed, 391 insertions(+), 2 deletions(-)
diff --git
a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts
b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts
new file mode 100644
index 0000000000..d9c3319cf3
--- /dev/null
+++
b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { TestBed } from "@angular/core/testing";
+import { HttpClientTestingModule } from "@angular/common/http/testing";
+import { of } from "rxjs";
+import { ComputingUnitStatusService } from "./computing-unit-status.service";
+import { WorkflowComputingUnitManagingService } from
"../workflow-computing-unit/workflow-computing-unit-managing.service";
+import { WorkflowWebsocketService } from
"../../../../workspace/service/workflow-websocket/workflow-websocket.service";
+import { WorkflowStatusService } from
"../../../../workspace/service/workflow-status/workflow-status.service";
+import { UserService } from "../../user/user.service";
+import { StubUserService } from "../../user/stub-user.service";
+import { AuthService } from "../../user/auth.service";
+import { StubAuthService } from "../../user/stub-auth.service";
+import { DashboardWorkflowComputingUnit } from
"../../../type/workflow-computing-unit";
+import { commonTestProviders } from "../../../testing/test-utils";
+
+describe("ComputingUnitStatusService", () => {
+ let service: ComputingUnitStatusService;
+ let websocketService: WorkflowWebsocketService;
+
+ const mockUnit = (cuid: number) => ({ computingUnit: { cuid } }) as unknown
as DashboardWorkflowComputingUnit;
+
+ beforeEach(() => {
+ const managingStub = {
+ listComputingUnits: () => of([]),
+ getComputingUnit: (cuid: number) => of(mockUnit(cuid)),
+ terminateComputingUnit: () => of(undefined),
+ };
+
+ TestBed.configureTestingModule({
+ imports: [HttpClientTestingModule],
+ providers: [
+ ComputingUnitStatusService,
+ WorkflowWebsocketService,
+ WorkflowStatusService,
+ { provide: WorkflowComputingUnitManagingService, useValue:
managingStub },
+ { provide: UserService, useClass: StubUserService },
+ { provide: AuthService, useClass: StubAuthService },
+ ...commonTestProviders,
+ ],
+ });
+
+ service = TestBed.inject(ComputingUnitStatusService);
+ websocketService = TestBed.inject(WorkflowWebsocketService);
+ });
+
+ afterEach(() => {
+ // tear down the interval poll started by selectComputingUnit() so it
can't outlive the test
+ service.ngOnDestroy();
+ });
+
+ it("should be created", () => {
+ expect(service).toBeTruthy();
+ });
+
+ it("reconnects when re-selecting the same workflow after disconnect
(regression #3120)", () => {
+ const openSpy = vi.spyOn(websocketService,
"openWebsocket").mockImplementation(() => {});
+ const closeSpy = vi.spyOn(websocketService, "closeWebsocket");
+ (service as any).allComputingUnitsSubject.next([mockUnit(7)]);
+
+ // Enter workflow 5 on computing unit 7 → opens the websocket once.
+ service.selectComputingUnit(5, 7);
+ expect(openSpy).toHaveBeenCalledTimes(1);
+
+ // User returns to the dashboard.
+ service.disconnect();
+ expect(closeSpy).toHaveBeenCalled();
+
+ // Re-enter the SAME workflow (the `wid -> null -> wid` pattern): without
the
+ // cleanup, the retained currentConnectedWid/Cuid would suppress the
reconnect.
+ service.selectComputingUnit(5, 7);
+ expect(openSpy).toHaveBeenCalledTimes(2);
+ });
+
+ it("disconnect() clears the selected computing unit", () => {
+ vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {});
+ (service as any).allComputingUnitsSubject.next([mockUnit(7)]);
+ service.selectComputingUnit(5, 7);
+
+ let latest: DashboardWorkflowComputingUnit | null = mockUnit(7);
+ service.getSelectedComputingUnit().subscribe(unit => (latest = unit));
+ expect(latest).not.toBeNull();
+
+ service.disconnect();
+ expect(latest).toBeNull();
+ });
+
+ it("emits a connection-reset signal when switching to a different computing
unit (issue #3120)", () => {
+ let connected = false;
+ vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {
+ connected = true;
+ });
+ vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {
+ connected = false;
+ });
+ vi.spyOn(websocketService, "isConnected", "get").mockImplementation(() =>
connected);
+ (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]);
+
+ let resetCount = 0;
+ service.getConnectionResetStream().subscribe(() => resetCount++);
+
+ // First connection on unit 7: nothing to tear down yet → no signal.
+ service.selectComputingUnit(5, 7);
+ expect(resetCount).toBe(0);
+
+ // Switch to a different unit while connected → tear-down signal fires
once.
+ service.selectComputingUnit(5, 8);
+ expect(resetCount).toBe(1);
+ });
+
+ it("emits a connection-reset signal when switching units even if the socket
already dropped (issue #3120)", () => {
+ vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {});
+ vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {});
+ // socket reports disconnected throughout, e.g. the previous unit was
terminated
+ vi.spyOn(websocketService, "isConnected", "get").mockReturnValue(false);
+ (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]);
+
+ let resetCount = 0;
+ service.getConnectionResetStream().subscribe(() => resetCount++);
+
+ // First connection on unit 7: nothing to tear down yet → no signal.
+ service.selectComputingUnit(5, 7);
+ expect(resetCount).toBe(0);
+
+ // Switch units while disconnected: unit 7's stale state must still be
cleared.
+ service.selectComputingUnit(5, 8);
+ expect(resetCount).toBe(1);
+ });
+});
diff --git
a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts
b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts
index 831263183b..e5b6c44388 100644
---
a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts
+++
b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts
@@ -47,6 +47,10 @@ export class ComputingUnitStatusService implements OnDestroy
{
private readonly refreshComputingUnitListSignal = new Subject<void>();
+ // Emits when the active connection is torn down to switch computing units,
so
+ // session consumers can clear their websocket-derived state.
+ private readonly connectionResetSubject = new Subject<void>();
+
// Refresh interval in milliseconds
private readonly REFRESH_INTERVAL_MS = 2000;
private refreshSubscription: Subscription | null = null;
@@ -158,9 +162,16 @@ export class ComputingUnitStatusService implements
OnDestroy {
// open websocket if needed
const shouldReconnect = this.currentConnectedCuid !== cuid ||
this.currentConnectedWid !== wid;
if (isDefined(wid) && shouldReconnect) {
+ // Tear down stale state on switch even if the socket already dropped
+ // (e.g. the prior unit was terminated), not just while still
connected.
+ const hadPreviousConnection = isDefined(this.currentConnectedWid) ||
isDefined(this.currentConnectedCuid);
if (this.workflowWebsocketService.isConnected) {
this.workflowWebsocketService.closeWebsocket();
+ }
+ if (hadPreviousConnection) {
this.workflowStatusService.clearStatus();
+ // switching units: signal consumers to clear their stale state
+ this.connectionResetSubject.next();
}
this.workflowWebsocketService.openWebsocket(wid,
this.userService.getCurrentUser()?.uid, cuid);
@@ -225,6 +236,28 @@ export class ComputingUnitStatusService implements
OnDestroy {
);
}
+ /**
+ * Emits when the connection is reset to switch computing units. Consumers
+ * subscribe to clear their websocket-derived session state.
+ */
+ public getConnectionResetStream(): Observable<void> {
+ return this.connectionResetSubject.asObservable();
+ }
+
+ /**
+ * Tear down all websocket connection state when leaving the workspace, so
+ * re-entering a workflow starts from a clean connection instead of reusing
+ * the previous one.
+ */
+ public disconnect(): void {
+ this.workflowWebsocketService.closeWebsocket();
+ this.workflowStatusService.clearStatus();
+ this.stopPollingSelectedUnit();
+ this.currentConnectedCuid = undefined;
+ this.currentConnectedWid = undefined;
+ this.selectedUnitSubject.next(null);
+ }
+
// Clean up on service destroy
ngOnDestroy(): void {
this.refreshSubscription?.unsubscribe();
@@ -232,6 +265,7 @@ export class ComputingUnitStatusService implements
OnDestroy {
this.selectedUnitSubject.complete();
this.allComputingUnitsSubject.complete();
+ this.connectionResetSubject.complete();
}
/**
diff --git
a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts
b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts
index 1017d759b5..edfbcbde61 100644
---
a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts
+++
b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts
@@ -21,6 +21,7 @@ import { ComponentFixture, TestBed } from
"@angular/core/testing";
import { ResultPanelComponent } from "./result-panel.component";
import { ExecuteWorkflowService } from
"../../service/execute-workflow/execute-workflow.service";
+import { WorkflowResultService } from
"../../service/workflow-result/workflow-result.service";
import { WorkflowActionService } from
"../../service/workflow-graph/model/workflow-action.service";
import { OperatorMetadataService } from
"../../service/operator-metadata/operator-metadata.service";
import { StubOperatorMetadataService } from
"../../service/operator-metadata/stub-operator-metadata.service";
@@ -38,6 +39,7 @@ describe("ResultPanelComponent", () => {
let fixture: ComponentFixture<ResultPanelComponent>;
let executeWorkflowService: ExecuteWorkflowService;
let workflowActionService: WorkflowActionService;
+ let workflowResultService: WorkflowResultService;
beforeEach(async () => {
await TestBed.configureTestingModule({
@@ -60,6 +62,7 @@ describe("ResultPanelComponent", () => {
component = fixture.componentInstance;
executeWorkflowService = TestBed.inject(ExecuteWorkflowService);
workflowActionService = TestBed.inject(WorkflowActionService);
+ workflowResultService = TestBed.inject(WorkflowResultService);
fixture.detectChanges();
});
@@ -82,4 +85,22 @@ describe("ResultPanelComponent", () => {
const resultPanelHtmlElement: HTMLElement = resultPanelDiv.nativeElement;
expect(resultPanelHtmlElement).toBeTruthy();
});
+
+ it("wipes the panel and operator selection when results are cleared, e.g. on
a computing-unit switch (#3120)", () => {
+ // Simulate a result frame on screen for a currently-highlighted operator.
+ // ResultPanelComponent stands in as a throwaway frame component; it's
cleared before it renders.
+ component.currentOperatorId = "op1";
+ component.operatorTitle = "Operator 1";
+ component.frameComponentConfigs.set("Result", { component:
ResultPanelComponent, componentInputs: {} });
+ expect(component.frameComponentConfigs.size).toBe(1);
+
+ // A unit switch drops the cached results and emits on the cleared stream.
The operator
+ // stays highlighted, so the normal rerender path won't tear the frame
down — only this
+ // handler does, which is the part that actually fixes the
lingering-stale-frame bug.
+ workflowResultService.clearResults();
+
+ expect(component.frameComponentConfigs.size).toBe(0);
+ expect(component.currentOperatorId).toBeUndefined();
+ expect(component.operatorTitle).toBe("");
+ });
});
diff --git
a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts
b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts
index 3260afb290..b5b0c045da 100644
---
a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts
+++
b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts
@@ -136,6 +136,7 @@ export class ResultPanelComponent implements OnInit,
OnDestroy {
this.updateReturnPosition(DEFAULT_HEIGHT, this.height);
this.registerAutoRerenderResultPanel();
this.registerAutoOpenResultPanel();
+ this.registerResultClearedHandler();
this.handleResultPanelForVersionPreview();
this.panelService.closePanelStream.pipe(untilDestroyed(this)).subscribe(()
=> this.closePanel());
this.panelService.resetPanelStream.pipe(untilDestroyed(this)).subscribe(()
=> {
@@ -218,6 +219,22 @@ export class ResultPanelComponent implements OnInit,
OnDestroy {
});
}
+ /**
+ * Wipe the panel when results are dropped (e.g. switching computing units):
a
+ * still-highlighted operator isn't re-rendered, so its stale frames would
linger.
+ */
+ registerResultClearedHandler() {
+ this.workflowResultService
+ .getResultClearedStream()
+ .pipe(untilDestroyed(this))
+ .subscribe(() => {
+ this.clearResultPanel();
+ this.currentOperatorId = undefined;
+ this.operatorTitle = "";
+ this.changeDetectorRef.detectChanges();
+ });
+ }
+
registerAutoRerenderResultPanel() {
merge(
this.executeWorkflowService
diff --git a/frontend/src/app/workspace/component/workspace.component.spec.ts
b/frontend/src/app/workspace/component/workspace.component.spec.ts
index 7659f346f3..f85294e42a 100644
--- a/frontend/src/app/workspace/component/workspace.component.spec.ts
+++ b/frontend/src/app/workspace/component/workspace.component.spec.ts
@@ -40,8 +40,11 @@ import { WorkflowCompilingService } from
"../service/compile-workflow/workflow-c
import { OperatorMetadataService } from
"../service/operator-metadata/operator-metadata.service";
import { UndoRedoService } from "../service/undo-redo/undo-redo.service";
import { WorkflowConsoleService } from
"../service/workflow-console/workflow-console.service";
+import { ExecuteWorkflowService } from
"../service/execute-workflow/execute-workflow.service";
+import { WorkflowResultService } from
"../service/workflow-result/workflow-result.service";
import { WorkflowActionService } from
"../service/workflow-graph/model/workflow-action.service";
import { OperatorReuseCacheStatusService } from
"../service/workflow-status/operator-reuse-cache-status.service";
+import { ComputingUnitStatusService } from
"../../common/service/computing-unit/computing-unit-status/computing-unit-status.service";
import { EntityType, HubService } from "../../hub/service/hub.service";
import { commonTestProviders } from "../../common/testing/test-utils";
import { WorkspaceComponent } from "./workspace.component";
@@ -62,6 +65,11 @@ describe("WorkspaceComponent", () => {
let messageService: any;
let routerMock: any;
let locationMock: any;
+ let computingUnitStatusService: any;
+ let executeWorkflowService: any;
+ let workflowConsoleService: any;
+ let workflowResultService: any;
+ let connectionResetSubject: Subject<void>;
let metadataChangedSubject: Subject<void>;
let stubGraph: { triggerCenterEvent: ReturnType<typeof vi.fn>;
hasElementWithID: ReturnType<typeof vi.fn> };
@@ -136,6 +144,14 @@ describe("WorkspaceComponent", () => {
routerMock = { navigate: vi.fn() };
locationMock = { go: vi.fn() };
+ connectionResetSubject = new Subject<void>();
+ computingUnitStatusService = {
+ disconnect: vi.fn(),
+ getConnectionResetStream: () => connectionResetSubject.asObservable(),
+ };
+ executeWorkflowService = { resetExecutionAndWorkers: vi.fn() };
+ workflowConsoleService = { clearConsoleMessages: vi.fn() };
+ workflowResultService = { clearResults: vi.fn() };
// Drop the standalone component's child imports and allow unknown
elements via
// CUSTOM_ELEMENTS_SCHEMA. The template still renders, so `<ng-template
#codeEditor>`
@@ -167,8 +183,11 @@ describe("WorkspaceComponent", () => {
// The three services listed in the constructor only to force their
// initialization aren't exercised by any test here; provide stubs.
{ provide: WorkflowCompilingService, useValue: {} },
- { provide: WorkflowConsoleService, useValue: {} },
+ { provide: WorkflowConsoleService, useValue: workflowConsoleService },
{ provide: OperatorReuseCacheStatusService, useValue: {} },
+ { provide: ComputingUnitStatusService, useValue:
computingUnitStatusService },
+ { provide: ExecuteWorkflowService, useValue: executeWorkflowService },
+ { provide: WorkflowResultService, useValue: workflowResultService },
...commonTestProviders,
],
schemas: [NO_ERRORS_SCHEMA],
@@ -415,6 +434,26 @@ describe("WorkspaceComponent", () => {
// Cleanup of the workflow state still happens regardless.
expect(workflowActionService.clearWorkflow).toHaveBeenCalled();
});
+
+ it("tears down every piece of websocket-derived state when leaving the
workspace (issue #3120)", async () => {
+ await createFixture();
+ fixture.detectChanges();
+ component.ngOnDestroy();
+ expect(computingUnitStatusService.disconnect).toHaveBeenCalled();
+
expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled();
+ expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled();
+ expect(workflowResultService.clearResults).toHaveBeenCalled();
+ });
+
+ it("clears the workflow session state when the computing unit is switched
in-canvas (issue #3120)", async () => {
+ await createFixture();
+ fixture.detectChanges();
+ // Switching to a different unit emits on the connection-reset stream.
+ connectionResetSubject.next();
+
expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled();
+ expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled();
+ expect(workflowResultService.clearResults).toHaveBeenCalled();
+ });
});
describe("copilotEnabled", () => {
diff --git a/frontend/src/app/workspace/component/workspace.component.ts
b/frontend/src/app/workspace/component/workspace.component.ts
index da220feaab..e7bfcb7bf4 100644
--- a/frontend/src/app/workspace/component/workspace.component.ts
+++ b/frontend/src/app/workspace/component/workspace.component.ts
@@ -51,6 +51,9 @@ import { THROTTLE_TIME_MS } from
"../../hub/component/workflow/detail/hub-workfl
import { WorkflowCompilingService } from
"../service/compile-workflow/workflow-compiling.service";
import { USER_WORKSPACE } from "../../app-routing.constant";
import { GuiConfigService } from "../../common/service/gui-config.service";
+import { ComputingUnitStatusService } from
"../../common/service/computing-unit/computing-unit-status/computing-unit-status.service";
+import { ExecuteWorkflowService } from
"../service/execute-workflow/execute-workflow.service";
+import { WorkflowResultService } from
"../service/workflow-result/workflow-result.service";
import { checkIfWorkflowBroken } from "../../common/util/workflow-check";
import { NzSpinComponent } from "ng-zorro-antd/spin";
import { ResultPanelComponent } from "./result-panel/result-panel.component";
@@ -126,7 +129,10 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
private hubService: HubService,
private codeEditorService: CodeEditorService,
private config: GuiConfigService,
- private changeDetectorRef: ChangeDetectorRef
+ private changeDetectorRef: ChangeDetectorRef,
+ private computingUnitStatusService: ComputingUnitStatusService,
+ private executeWorkflowService: ExecuteWorkflowService,
+ private workflowResultService: WorkflowResultService
) {}
ngOnInit() {
@@ -144,6 +150,12 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
*/
this.pid = parseInt(this.route.snapshot.queryParams.pid) || undefined;
this.workflowActionService.setHighlightingEnabled(true);
+ // Clear session state when the user switches computing units in-canvas, so
+ // the previous unit's status/console/results don't linger.
+ this.computingUnitStatusService
+ .getConnectionResetStream()
+ .pipe(untilDestroyed(this))
+ .subscribe(() => this.resetWorkflowSessionState());
}
ngAfterViewInit(): void {
@@ -184,6 +196,20 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
this.codeEditorViewRef.clear();
this.workflowActionService.clearWorkflow();
+ // Tear down the connection and all websocket-derived session state so a
+ // re-entered workflow starts clean instead of reusing the previous one.
+ this.computingUnitStatusService.disconnect();
+ this.resetWorkflowSessionState();
+ }
+
+ /**
+ * Clear websocket-derived session state (execution status, console,
results).
+ * Shared by workspace teardown and in-canvas unit switches.
+ */
+ private resetWorkflowSessionState(): void {
+ this.executeWorkflowService.resetExecutionAndWorkers();
+ this.workflowConsoleService.clearConsoleMessages();
+ this.workflowResultService.clearResults();
}
registerAutoPersistWorkflow(): void {
diff --git
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts
index b8d2779c0c..e1ff418abb 100644
---
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts
+++
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts
@@ -86,6 +86,21 @@ describe("ExecuteWorkflowService", () => {
expect(injectedService).toBeTruthy();
}));
+ it("resetExecutionAndWorkers() clears the execution state and worker
assignments", () => {
+ (service as any).currentState = { state: ExecutionState.Running };
+ (service as any).assignedWorkerIds.set("op1", ["w1", "w2"]);
+
+ const emittedStates: ExecutionState[] = [];
+ service.getExecutionStateStream().subscribe(event =>
emittedStates.push(event.current.state));
+
+ service.resetExecutionAndWorkers();
+
+
expect(service.getExecutionState().state).toBe(ExecutionState.Uninitialized);
+ expect(service.getWorkerIds("op1")).toEqual([]);
+ // must broadcast on the stream so subscribers (menu, result panel) drop
stale status
+ expect(emittedStates).toContain(ExecutionState.Uninitialized);
+ });
+
it("should generate a logical plan request based on the workflow graph that
is passed to the function", () => {
const newLogicalPlan: LogicalPlan =
ExecuteWorkflowService.getLogicalPlanRequest(mockWorkflowPlan_scan_result);
expect(newLogicalPlan).toEqual(mockLogicalPlan_scan_result);
diff --git
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
index eb86194e7c..c2ab3eac0d 100644
---
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
+++
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
@@ -354,6 +354,16 @@ export class ExecuteWorkflowService {
};
}
+ /**
+ * Reset execution status and worker assignments. Unlike
resetExecutionState(),
+ * this also clears worker assignments and broadcasts the reset on
+ * executionStateStream so subscribers drop the previous unit's status.
+ */
+ public resetExecutionAndWorkers(): void {
+ this.updateExecutionState({ state: ExecutionState.Uninitialized });
+ this.assignedWorkerIds.clear();
+ }
+
private updateExecutionState(stateInfo: ExecutionStateInfo): void {
if (isEqual(this.currentState, stateInfo)) {
return;
diff --git
a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts
b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts
index 9be036cdcc..a211eea52b 100644
---
a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts
+++
b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts
@@ -34,4 +34,17 @@ describe("WorkflowConsoleService", () => {
it("should be created", () => {
expect(service).toBeTruthy();
});
+
+ it("clearConsoleMessages() removes all messages and notifies subscribers",
() => {
+ (service as any).consoleMessages.set("op1", []);
+ expect(service.hasConsoleMessages("op1")).toBe(true);
+
+ let notified = false;
+ service.getConsoleMessageUpdateStream().subscribe(() => (notified = true));
+
+ service.clearConsoleMessages();
+
+ expect(service.hasConsoleMessages("op1")).toBe(false);
+ expect(notified).toBe(true);
+ });
});
diff --git
a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts
b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts
index b7d60868ae..5f88a22ba9 100644
---
a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts
+++
b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts
@@ -73,4 +73,13 @@ export class WorkflowConsoleService {
getConsoleMessageUpdateStream(): Observable<void> {
return this.consoleMessagesUpdateStream.asObservable();
}
+
+ /**
+ * Clear all console messages so a re-entered workflow doesn't show the
+ * previous session's output.
+ */
+ public clearConsoleMessages(): void {
+ this.consoleMessages.clear();
+ this.consoleMessagesUpdateStream.next();
+ }
}
diff --git
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts
index 8406a55c75..031de9bfa0 100644
---
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts
+++
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts
@@ -37,6 +37,33 @@ describe("WorkflowResultService", () => {
it("should be created", () => {
expect(service).toBeTruthy();
});
+
+ it("clearResults() drops cached operator results", () => {
+ (service as any).operatorResultServices.set("op1", {});
+ (service as any).paginatedResultServices.set("op2", {});
+ expect(service.hasAnyResult("op1")).toBe(true);
+ expect(service.hasAnyResult("op2")).toBe(true);
+
+ service.clearResults();
+
+ expect(service.hasAnyResult("op1")).toBe(false);
+ expect(service.hasAnyResult("op2")).toBe(false);
+ });
+
+ it("clearResults() resets table stats to empty for subscribers", () => {
+ const pairs: [unknown, unknown][] = [];
+ service.getResultTableStats().subscribe(p => pairs.push(p));
+ (service as any).resultTableStats.next({ op1: {} });
+ service.clearResults();
+ expect(pairs[pairs.length - 1][1]).toEqual({});
+ });
+
+ it("clearResults() emits on the cleared stream so the UI tears down stale
frames", () => {
+ let clearedCount = 0;
+ service.getResultClearedStream().subscribe(() => clearedCount++);
+ service.clearResults();
+ expect(clearedCount).toBe(1);
+ });
});
describe("OperatorPaginationResultService", () => {
diff --git
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
index 9fd18e0f16..ffd9b43b91 100644
---
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
+++
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
@@ -49,6 +49,8 @@ export class WorkflowResultService {
private resultUpdateStream = new Subject<Record<string, WebResultUpdate |
undefined>>();
private resultTableStats = new ReplaySubject<Record<string, Record<string,
Record<string, number>>>>(1);
private resultInitiateStream = new Subject<string>();
+ // emits when clearResults() drops cached results, so the UI can drop stale
frames
+ private resultClearedStream = new Subject<void>();
constructor(private wsService: WorkflowWebsocketService) {
this.wsService.subscribeToEvent("WebResultUpdateEvent").subscribe(event =>
{
@@ -87,6 +89,14 @@ export class WorkflowResultService {
return this.resultInitiateStream.asObservable();
}
+ /**
+ * Emits when clearResults() drops cached results, so consumers can tear down
+ * stale frames (clearing the caches alone won't re-render a displayed
operator).
+ */
+ public getResultClearedStream(): Observable<void> {
+ return this.resultClearedStream.asObservable();
+ }
+
public getPaginatedResultService(operatorID: string):
OperatorPaginationResultService | undefined {
return this.paginatedResultServices.get(operatorID);
}
@@ -95,6 +105,18 @@ export class WorkflowResultService {
return this.operatorResultServices.get(operatorID);
}
+ /**
+ * Drop cached results and reset table stats so a re-entered workflow
doesn't show
+ * stale results (resultTableStats is a ReplaySubject, so push an empty
snapshot).
+ * Emits resultClearedStream so subscribers tear down already-displayed
frames.
+ */
+ public clearResults(): void {
+ this.operatorResultServices.clear();
+ this.paginatedResultServices.clear();
+ this.resultTableStats.next({});
+ this.resultClearedStream.next();
+ }
+
private handleCleanResultCache(event: WorkflowAvailableResultEvent): void {
const removedOrInvalidatedOperators = new Set<string>();
// remove operators that no longer have results
diff --git
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
index f2862d18d7..4fb0fe1d10 100644
---
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
+++
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
@@ -94,4 +94,12 @@ describe("WorkflowWebsocketService", () => {
window.WebSocket = originalWebSocket;
}
});
+
+ it("should reset the cached worker count when the websocket is closed", ()
=> {
+ // numWorkers is populated from ClusterStatusUpdateEvent on the live
connection;
+ // once the socket is closed the count is stale and must reset.
+ service.numWorkers = 5;
+ service.closeWebsocket();
+ expect(service.numWorkers).toBe(-1);
+ });
});
diff --git
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
index 017e43ee6a..4e52d3fa09 100644
---
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
+++
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
@@ -93,6 +93,8 @@ export class WorkflowWebsocketService {
this.wsWithReconnectSubscription?.unsubscribe();
this.statusUpdateSubscription?.unsubscribe();
this.websocket?.complete();
+ // the worker count comes from the live connection; reset it once the
socket is gone
+ this.numWorkers = -1;
this.updateConnectionStatus(false);
}