This is an automated email from the ASF dual-hosted git repository.

xiaozhenliu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 5cbac1546d41a777c9dea174dba5d0b4879e92a8
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Apr 13 14:41:46 2026 -0700

    fix(computing-unit-managing-service): clear cache metadata on CU 
termination and CU switch
---
 .../resource/ComputingUnitManagingResource.scala   | 38 ++++++++++++++++++++++
 docs/operator-port-cache.md                        |  1 +
 .../computing-unit-status.service.ts               |  6 ++++
 .../workflow-cache-entries.service.ts              |  9 +++++
 4 files changed, 54 insertions(+)

diff --git 
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
 
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
index 9b214b9755..9d7d24f5a3 100644
--- 
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
+++ 
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
@@ -52,7 +52,9 @@ import org.apache.texera.service.util.{
   InsufficientComputingUnitQuota,
   KubernetesClient
 }
+import org.apache.texera.dao.jooq.generated.Tables.{OPERATOR_PORT_CACHE, 
OPERATOR_PORT_EXECUTIONS, WORKFLOW_EXECUTIONS}
 import org.jooq.{DSLContext, EnumType}
+import org.jooq.impl.SQLDataType
 import play.api.libs.json._
 
 import java.sql.Timestamp
@@ -153,6 +155,37 @@ object ComputingUnitManagingResource {
 @Path("/computing-unit")
 class ComputingUnitManagingResource {
 
+  /**
+    * Delete global metadata rows for executions that ran on the given 
computing unit.
+    * Cleans up operator_port_cache and operator_port_executions rows.
+    * Iceberg result documents are CU-local and already destroyed with the pod.
+    */
+  private def deleteCacheMetadataForComputingUnit(ctx: DSLContext, cuid: Int): 
Unit = {
+    ctx
+      .deleteFrom(OPERATOR_PORT_CACHE)
+      .where(
+        OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID.in(
+          ctx
+            .select(WORKFLOW_EXECUTIONS.EID.cast(SQLDataType.BIGINT))
+            .from(WORKFLOW_EXECUTIONS)
+            .where(WORKFLOW_EXECUTIONS.CUID.eq(cuid))
+        )
+      )
+      .execute()
+
+    ctx
+      .deleteFrom(OPERATOR_PORT_EXECUTIONS)
+      .where(
+        OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.in(
+          ctx
+            .select(WORKFLOW_EXECUTIONS.EID)
+            .from(WORKFLOW_EXECUTIONS)
+            .where(WORKFLOW_EXECUTIONS.CUID.eq(cuid))
+        )
+      )
+      .execute()
+  }
+
   private def getComputingUnitByCuid(ctx: DSLContext, cuid: Int): 
WorkflowComputingUnit = {
     val wcDao = new WorkflowComputingUnitDao(ctx.configuration())
     val unit = wcDao.fetchOneByCuid(cuid)
@@ -529,8 +562,10 @@ class ComputingUnitManagingResource {
       allUnits.foreach { unit =>
         if (
           unit.getType == WorkflowComputingUnitTypeEnum.kubernetes &&
+          unit.getTerminateTime == null &&
           !KubernetesClient.podExists(unit.getCuid)
         ) {
+          deleteCacheMetadataForComputingUnit(ctx, unit.getCuid)
           unit.setTerminateTime(new Timestamp(System.currentTimeMillis()))
           computingUnitDao.update(unit)
         }
@@ -642,6 +677,9 @@ class ComputingUnitManagingResource {
       val cuDao = new WorkflowComputingUnitDao(ctx.configuration())
       val unit = getComputingUnitByCuid(ctx, cuid)
 
+      // Clean up cache metadata for executions that ran on this CU
+      deleteCacheMetadataForComputingUnit(ctx, cuid)
+
       // if the computing unit is kubernetes pod, then kill the pod
       if (unit.getType == WorkflowComputingUnitTypeEnum.kubernetes) {
         KubernetesClient.deletePod(cuid)
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 56eaf5a3f7..ee3f4053b4 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -403,6 +403,7 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
   - Execution cleanup integration:
     - lifecycle timeout (cuid-scoped) clears cache artifacts via 
`invalidateCacheBySourceExecutions`
     - bulk execution deletion removes cache artifacts via 
`invalidateCacheBySourceExecutions`
+    - CU termination clears `operator_port_cache` and 
`operator_port_executions` metadata rows (SQL-only in 
`ComputingUnitManagingResource`); Iceberg result documents are CU-local and 
already destroyed with the pod, so no additional document cleanup is needed
 - **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs 
for deterministic subDAG hashing
 - **Submission-time lookup**: `WorkflowExecutionService` uses 
`OperatorPortCacheService.lookupCachedOutputs()` to compute fingerprints for 
all physical output ports, queries cache, stores hits in 
`WorkflowSettings.cachedOutputs`
 - **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event 
→ `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → 
`OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count)
diff --git 
a/frontend/src/app/workspace/service/computing-unit-status/computing-unit-status.service.ts
 
b/frontend/src/app/workspace/service/computing-unit-status/computing-unit-status.service.ts
index 06bd0d4071..06d7de9d28 100644
--- 
a/frontend/src/app/workspace/service/computing-unit-status/computing-unit-status.service.ts
+++ 
b/frontend/src/app/workspace/service/computing-unit-status/computing-unit-status.service.ts
@@ -27,6 +27,7 @@ import { UntilDestroy, untilDestroyed } from 
"@ngneat/until-destroy";
 import { ComputingUnitState } from 
"../../types/computing-unit-connection.interface";
 import { isDefined } from "../../../common/util/predicate";
 import { WorkflowStatusService } from 
"../workflow-status/workflow-status.service";
+import { WorkflowCacheEntriesService } from 
"../workflow-status/workflow-cache-entries.service";
 import { UserService } from "../../../common/service/user/user.service";
 
 /**
@@ -58,6 +59,7 @@ export class ComputingUnitStatusService implements OnDestroy {
     private computingUnitService: WorkflowComputingUnitManagingService,
     private workflowWebsocketService: WorkflowWebsocketService,
     private workflowStatusService: WorkflowStatusService,
+    private workflowCacheEntriesService: WorkflowCacheEntriesService,
     private userService: UserService
   ) {
     // Initialize the service by loading computing units
@@ -161,6 +163,8 @@ export class ComputingUnitStatusService implements 
OnDestroy {
         if (this.workflowWebsocketService.isConnected) {
           this.workflowWebsocketService.closeWebsocket();
           this.workflowStatusService.clearStatus();
+          // refresh cache panel — cache entries may differ between CUs
+          
this.workflowCacheEntriesService.refreshCurrentWorkflowCacheEntries();
         }
 
         this.workflowWebsocketService.openWebsocket(wid, 
this.userService.getCurrentUser()?.uid, cuid);
@@ -263,6 +267,8 @@ export class ComputingUnitStatusService implements 
OnDestroy {
         // trigger a single refresh; the refresh pipeline will
         // pull the new list and call updateComputingUnits()
         this.refreshComputingUnitList();
+        // refresh cache panel — backend has cleared cache metadata for this CU
+        this.workflowCacheEntriesService.refreshCurrentWorkflowCacheEntries();
       }),
       map(() => true),
       catchError((err: unknown) => {
diff --git 
a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
 
b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
index 852f9fddfb..d3669b3e27 100644
--- 
a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
+++ 
b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
@@ -151,6 +151,15 @@ export class WorkflowCacheEntriesService {
     
this.setAutoInvalidationEnabled(!this.autoInvalidationEnabledSubject.value);
   }
 
+  /**
+   * Refreshes cache entries for the current workflow (if any) and updates 
shared state.
+   */
+  public refreshCurrentWorkflowCacheEntries(): void {
+    if (this.currentWorkflowId && this.currentWorkflowId > 0) {
+      this.refreshCacheEntries(this.currentWorkflowId).subscribe();
+    }
+  }
+
   /**
    * Refreshes cache entries for a workflow and updates shared state.
    *

Reply via email to