This is an automated email from the ASF dual-hosted git repository.
mengw15 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new b3db209d41 fix(frontend): unsubscribe connection-status subscription
in WorkflowWebsocketService.closeWebsocket (#4377)
b3db209d41 is described below
commit b3db209d41d238c43bd93baf06ce1e857b426c06
Author: Asish Kumar <[email protected]>
AuthorDate: Fri Apr 17 09:43:00 2026 +0530
fix(frontend): unsubscribe connection-status subscription in
WorkflowWebsocketService.closeWebsocket (#4377)
### What changes were proposed in this PR?
`WorkflowWebsocketService.openWebsocket` subscribed to the internal
`webSocketResponseSubject` observable to track cluster status and mark
the
connection as live, but did not store the returned `Subscription`. As a
result, `closeWebsocket` could not unsubscribe it, causing the handler
to
accumulate on every call to `openWebsocket` (e.g. when the user switches
workflows or computing units).
**Root cause (before):**
```typescript
// openWebsocket() — subscription discarded, never cleaned up
this.websocketEvent().subscribe(evt => { ... });
// closeWebsocket() — only wsWithReconnectSubscription was torn down
this.wsWithReconnectSubscription?.unsubscribe();
```
**Fix (after):**
```typescript
// openWebsocket()
this.statusUpdateSubscription = this.websocketEvent().subscribe(evt => {
... });
// closeWebsocket()
this.wsWithReconnectSubscription?.unsubscribe();
this.statusUpdateSubscription?.unsubscribe(); // ← new
```
The new `statusUpdateSubscription` field is declared as `Subscription |
undefined`;
the `?.unsubscribe()` call is safely a no-op when `closeWebsocket` is
called before
`openWebsocket` was ever invoked.
### Any related issues, documentation, discussions?
Closes #4376
### How was this PR tested?
Two new unit tests were added to `workflow-websocket.service.spec.ts`:
"should close the previous status subscription when openWebsocket is
called again" uses a lightweight WebSocket test double, calls
openWebsocket() twice and closeWebsocket() once, and verifies that the
previous statusUpdateSubscription is torn down on each reopen and on
close
The existing service-creation test continues to pass.
### Was this PR authored or co-authored using generative AI tooling?
No.
---------
Signed-off-by: Asish Kumar <[email protected]>
Co-authored-by: Meng Wang <[email protected]>
---
.../workflow-websocket.service.spec.ts | 60 ++++++++++++++++++++++
.../workflow-websocket.service.ts | 4 +-
2 files changed, 63 insertions(+), 1 deletion(-)
diff --git
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
index ab9b9d5f3c..6fc8694457 100644
---
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
+++
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
@@ -21,6 +21,44 @@ import { TestBed } from "@angular/core/testing";
import { WorkflowWebsocketService } from "./workflow-websocket.service";
import { commonTestProviders } from "../../../common/testing/test-utils";
+/** Browser-like WebSocket test double used to verify websocket reopen and
subscription cleanup behavior. */
+class FakeWebSocket extends EventTarget {
+ public static readonly CONNECTING = 0;
+ public static readonly OPEN = 1;
+ public static readonly CLOSING = 2;
+ public static readonly CLOSED = 3;
+
+ public readyState = FakeWebSocket.CONNECTING;
+
+ constructor(public readonly url: string) {
+ super();
+ Promise.resolve().then(() => {
+ this.readyState = FakeWebSocket.OPEN;
+ const onopen = this.onopen;
+ onopen?.(new Event("open"));
+ this.dispatchEvent(new Event("open"));
+ });
+ }
+
+ public onopen: ((ev: Event) => unknown) | null = null;
+ public onclose: ((ev: CloseEvent) => unknown) | null = null;
+ public onerror: ((ev: Event) => unknown) | null = null;
+ public onmessage: ((ev: MessageEvent) => unknown) | null = null;
+
+ public send() {}
+
+ public close() {
+ if (this.readyState === FakeWebSocket.CLOSED) {
+ return;
+ }
+ this.readyState = FakeWebSocket.CLOSED;
+ const closeEvent = new CloseEvent("close", { wasClean: true, code: 1000,
reason: "" });
+ const onclose = this.onclose;
+ onclose?.(closeEvent);
+ this.dispatchEvent(closeEvent);
+ }
+}
+
describe("WorkflowWebsocketService", () => {
let service: WorkflowWebsocketService;
@@ -34,4 +72,26 @@ describe("WorkflowWebsocketService", () => {
it("should be created", () => {
expect(service).toBeTruthy();
});
+
+ it("should close the previous status subscription when openWebsocket is
called again", () => {
+ const originalWebSocket = window.WebSocket;
+ window.WebSocket = FakeWebSocket as unknown as typeof WebSocket;
+
+ try {
+ service.openWebsocket(1, 1, 1);
+ const firstStatusSubscription = (service as
any).statusUpdateSubscription;
+ expect(firstStatusSubscription.closed).toBeFalse();
+
+ service.openWebsocket(1, 1, 1);
+ expect(firstStatusSubscription.closed).toBeTrue();
+
+ const secondStatusSubscription = (service as
any).statusUpdateSubscription;
+ expect(secondStatusSubscription.closed).toBeFalse();
+
+ service.closeWebsocket();
+ expect(secondStatusSubscription.closed).toBeTrue();
+ } finally {
+ window.WebSocket = originalWebSocket;
+ }
+ });
});
diff --git
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
index 90029227dd..017e43ee6a 100644
---
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
+++
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
@@ -47,6 +47,7 @@ export class WorkflowWebsocketService {
private websocket?: WebSocketSubject<TexeraWebsocketEvent |
TexeraWebsocketRequest>;
private wsWithReconnectSubscription?: Subscription;
+ private statusUpdateSubscription?: Subscription;
private readonly webSocketResponseSubject: Subject<TexeraWebsocketEvent> =
new Subject();
private readonly connectionStatusSubject = new
BehaviorSubject<boolean>(false);
@@ -90,6 +91,7 @@ export class WorkflowWebsocketService {
public closeWebsocket() {
this.wsWithReconnectSubscription?.unsubscribe();
+ this.statusUpdateSubscription?.unsubscribe();
this.websocket?.complete();
this.updateConnectionStatus(false);
}
@@ -132,7 +134,7 @@ export class WorkflowWebsocketService {
);
// refresh connection status
- this.websocketEvent().subscribe(evt => {
+ this.statusUpdateSubscription = this.websocketEvent().subscribe(evt => {
if (evt.type === "ClusterStatusUpdateEvent") {
this.numWorkers = evt.numWorkers;
}