This is an automated email from the ASF dual-hosted git repository.

mengw15 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new b3db209d41 fix(frontend): unsubscribe connection-status subscription 
in WorkflowWebsocketService.closeWebsocket (#4377)
b3db209d41 is described below

commit b3db209d41d238c43bd93baf06ce1e857b426c06
Author: Asish Kumar <[email protected]>
AuthorDate: Fri Apr 17 09:43:00 2026 +0530

    fix(frontend): unsubscribe connection-status subscription in 
WorkflowWebsocketService.closeWebsocket (#4377)
    
    ### What changes were proposed in this PR?
    
    `WorkflowWebsocketService.openWebsocket` subscribed to the internal
    `webSocketResponseSubject` observable to track cluster status and mark
    the
    connection as live, but did not store the returned `Subscription`.  As a
    result, `closeWebsocket` could not unsubscribe it, causing the handler
    to
    accumulate on every call to `openWebsocket` (e.g. when the user switches
    workflows or computing units).
    
    **Root cause (before):**
    ```typescript
    // openWebsocket() — subscription discarded, never cleaned up
    this.websocketEvent().subscribe(evt => { ... });
    
    // closeWebsocket() — only wsWithReconnectSubscription was torn down
    this.wsWithReconnectSubscription?.unsubscribe();
    ```
    
    **Fix (after):**
    ```typescript
    // openWebsocket()
    this.statusUpdateSubscription = this.websocketEvent().subscribe(evt => { 
... });
    
    // closeWebsocket()
    this.wsWithReconnectSubscription?.unsubscribe();
    this.statusUpdateSubscription?.unsubscribe();   // ← new
    ```
    
    The new `statusUpdateSubscription` field is declared as `Subscription |
    undefined`;
    the `?.unsubscribe()` call is safely a no-op when `closeWebsocket` is
    called before
    `openWebsocket` was ever invoked.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4376
    
    ### How was this PR tested?
    
    Two new unit tests were added to `workflow-websocket.service.spec.ts`:
    
    "should close the previous status subscription when openWebsocket is
    called again" uses a lightweight WebSocket test double, calls
    openWebsocket() twice and closeWebsocket() once, and verifies that the
    previous statusUpdateSubscription is torn down on each reopen and on
    close
    
    The existing service-creation test continues to pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    No.
    
    ---------
    
    Signed-off-by: Asish Kumar <[email protected]>
    Co-authored-by: Meng Wang <[email protected]>
---
 .../workflow-websocket.service.spec.ts             | 60 ++++++++++++++++++++++
 .../workflow-websocket.service.ts                  |  4 +-
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
 
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
index ab9b9d5f3c..6fc8694457 100644
--- 
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
+++ 
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
@@ -21,6 +21,44 @@ import { TestBed } from "@angular/core/testing";
 import { WorkflowWebsocketService } from "./workflow-websocket.service";
 import { commonTestProviders } from "../../../common/testing/test-utils";
 
+/** Browser-like WebSocket test double used to verify websocket reopen and 
subscription cleanup behavior. */
+class FakeWebSocket extends EventTarget {
+  public static readonly CONNECTING = 0;
+  public static readonly OPEN = 1;
+  public static readonly CLOSING = 2;
+  public static readonly CLOSED = 3;
+
+  public readyState = FakeWebSocket.CONNECTING;
+
+  constructor(public readonly url: string) {
+    super();
+    Promise.resolve().then(() => {
+      this.readyState = FakeWebSocket.OPEN;
+      const onopen = this.onopen;
+      onopen?.(new Event("open"));
+      this.dispatchEvent(new Event("open"));
+    });
+  }
+
+  public onopen: ((ev: Event) => unknown) | null = null;
+  public onclose: ((ev: CloseEvent) => unknown) | null = null;
+  public onerror: ((ev: Event) => unknown) | null = null;
+  public onmessage: ((ev: MessageEvent) => unknown) | null = null;
+
+  public send() {}
+
+  public close() {
+    if (this.readyState === FakeWebSocket.CLOSED) {
+      return;
+    }
+    this.readyState = FakeWebSocket.CLOSED;
+    const closeEvent = new CloseEvent("close", { wasClean: true, code: 1000, 
reason: "" });
+    const onclose = this.onclose;
+    onclose?.(closeEvent);
+    this.dispatchEvent(closeEvent);
+  }
+}
+
 describe("WorkflowWebsocketService", () => {
   let service: WorkflowWebsocketService;
 
@@ -34,4 +72,26 @@ describe("WorkflowWebsocketService", () => {
   it("should be created", () => {
     expect(service).toBeTruthy();
   });
+
+  it("should close the previous status subscription when openWebsocket is 
called again", () => {
+    const originalWebSocket = window.WebSocket;
+    window.WebSocket = FakeWebSocket as unknown as typeof WebSocket;
+
+    try {
+      service.openWebsocket(1, 1, 1);
+      const firstStatusSubscription = (service as 
any).statusUpdateSubscription;
+      expect(firstStatusSubscription.closed).toBeFalse();
+
+      service.openWebsocket(1, 1, 1);
+      expect(firstStatusSubscription.closed).toBeTrue();
+
+      const secondStatusSubscription = (service as 
any).statusUpdateSubscription;
+      expect(secondStatusSubscription.closed).toBeFalse();
+
+      service.closeWebsocket();
+      expect(secondStatusSubscription.closed).toBeTrue();
+    } finally {
+      window.WebSocket = originalWebSocket;
+    }
+  });
 });
diff --git 
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
 
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
index 90029227dd..017e43ee6a 100644
--- 
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
+++ 
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
@@ -47,6 +47,7 @@ export class WorkflowWebsocketService {
 
   private websocket?: WebSocketSubject<TexeraWebsocketEvent | 
TexeraWebsocketRequest>;
   private wsWithReconnectSubscription?: Subscription;
+  private statusUpdateSubscription?: Subscription;
   private readonly webSocketResponseSubject: Subject<TexeraWebsocketEvent> = 
new Subject();
   private readonly connectionStatusSubject = new 
BehaviorSubject<boolean>(false);
 
@@ -90,6 +91,7 @@ export class WorkflowWebsocketService {
 
   public closeWebsocket() {
     this.wsWithReconnectSubscription?.unsubscribe();
+    this.statusUpdateSubscription?.unsubscribe();
     this.websocket?.complete();
     this.updateConnectionStatus(false);
   }
@@ -132,7 +134,7 @@ export class WorkflowWebsocketService {
     );
 
     // refresh connection status
-    this.websocketEvent().subscribe(evt => {
+    this.statusUpdateSubscription = this.websocketEvent().subscribe(evt => {
       if (evt.type === "ClusterStatusUpdateEvent") {
         this.numWorkers = evt.numWorkers;
       }

Reply via email to