This is an automated email from the ASF dual-hosted git repository. xiaozhenliu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 5cbac1546d41a777c9dea174dba5d0b4879e92a8 Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Apr 13 14:41:46 2026 -0700 fix(computing-unit-managing-service): clear cache metadata on CU termination and CU switch --- .../resource/ComputingUnitManagingResource.scala | 38 ++++++++++++++++++++++ docs/operator-port-cache.md | 1 + .../computing-unit-status.service.ts | 6 ++++ .../workflow-cache-entries.service.ts | 9 +++++ 4 files changed, 54 insertions(+) diff --git a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala index 9b214b9755..9d7d24f5a3 100644 --- a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala +++ b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala @@ -52,7 +52,9 @@ import org.apache.texera.service.util.{ InsufficientComputingUnitQuota, KubernetesClient } +import org.apache.texera.dao.jooq.generated.Tables.{OPERATOR_PORT_CACHE, OPERATOR_PORT_EXECUTIONS, WORKFLOW_EXECUTIONS} import org.jooq.{DSLContext, EnumType} +import org.jooq.impl.SQLDataType import play.api.libs.json._ import java.sql.Timestamp @@ -153,6 +155,37 @@ object ComputingUnitManagingResource { @Path("/computing-unit") class ComputingUnitManagingResource { + /** + * Delete global metadata rows for executions that ran on the given computing unit. + * Cleans up operator_port_cache and operator_port_executions rows. + * Iceberg result documents are CU-local and already destroyed with the pod. + */ + private def deleteCacheMetadataForComputingUnit(ctx: DSLContext, cuid: Int): Unit = { + ctx + .deleteFrom(OPERATOR_PORT_CACHE) + .where( + OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID.in( + ctx + .select(WORKFLOW_EXECUTIONS.EID.cast(SQLDataType.BIGINT)) + .from(WORKFLOW_EXECUTIONS) + .where(WORKFLOW_EXECUTIONS.CUID.eq(cuid)) + ) + ) + .execute() + + ctx + .deleteFrom(OPERATOR_PORT_EXECUTIONS) + .where( + OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.in( + ctx + .select(WORKFLOW_EXECUTIONS.EID) + .from(WORKFLOW_EXECUTIONS) + .where(WORKFLOW_EXECUTIONS.CUID.eq(cuid)) + ) + ) + .execute() + } + private def getComputingUnitByCuid(ctx: DSLContext, cuid: Int): WorkflowComputingUnit = { val wcDao = new WorkflowComputingUnitDao(ctx.configuration()) val unit = wcDao.fetchOneByCuid(cuid) @@ -529,8 +562,10 @@ class ComputingUnitManagingResource { allUnits.foreach { unit => if ( unit.getType == WorkflowComputingUnitTypeEnum.kubernetes && + unit.getTerminateTime == null && !KubernetesClient.podExists(unit.getCuid) ) { + deleteCacheMetadataForComputingUnit(ctx, unit.getCuid) unit.setTerminateTime(new Timestamp(System.currentTimeMillis())) computingUnitDao.update(unit) } @@ -642,6 +677,9 @@ class ComputingUnitManagingResource { val cuDao = new WorkflowComputingUnitDao(ctx.configuration()) val unit = getComputingUnitByCuid(ctx, cuid) + // Clean up cache metadata for executions that ran on this CU + deleteCacheMetadataForComputingUnit(ctx, cuid) + // if the computing unit is kubernetes pod, then kill the pod if (unit.getType == WorkflowComputingUnitTypeEnum.kubernetes) { KubernetesClient.deletePod(cuid) diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 56eaf5a3f7..ee3f4053b4 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -403,6 +403,7 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - Execution cleanup integration: - lifecycle timeout (cuid-scoped) clears cache artifacts via `invalidateCacheBySourceExecutions` - bulk execution deletion removes cache artifacts via `invalidateCacheBySourceExecutions` + - CU termination clears `operator_port_cache` and `operator_port_executions` metadata rows (SQL-only in `ComputingUnitManagingResource`); Iceberg result documents are CU-local and already destroyed with the pod, so no additional document cleanup is needed - **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs for deterministic subDAG hashing - **Submission-time lookup**: `WorkflowExecutionService` uses `OperatorPortCacheService.lookupCachedOutputs()` to compute fingerprints for all physical output ports, queries cache, stores hits in `WorkflowSettings.cachedOutputs` - **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event → `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → `OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count) diff --git a/frontend/src/app/workspace/service/computing-unit-status/computing-unit-status.service.ts b/frontend/src/app/workspace/service/computing-unit-status/computing-unit-status.service.ts index 06bd0d4071..06d7de9d28 100644 --- a/frontend/src/app/workspace/service/computing-unit-status/computing-unit-status.service.ts +++ b/frontend/src/app/workspace/service/computing-unit-status/computing-unit-status.service.ts @@ -27,6 +27,7 @@ import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { ComputingUnitState } from "../../types/computing-unit-connection.interface"; import { isDefined } from "../../../common/util/predicate"; import { WorkflowStatusService } from "../workflow-status/workflow-status.service"; +import { WorkflowCacheEntriesService } from "../workflow-status/workflow-cache-entries.service"; import { UserService } from "../../../common/service/user/user.service"; /** @@ -58,6 +59,7 @@ export class ComputingUnitStatusService implements OnDestroy { private computingUnitService: WorkflowComputingUnitManagingService, private workflowWebsocketService: WorkflowWebsocketService, private workflowStatusService: WorkflowStatusService, + private workflowCacheEntriesService: WorkflowCacheEntriesService, private userService: UserService ) { // Initialize the service by loading computing units @@ -161,6 +163,8 @@ export class ComputingUnitStatusService implements OnDestroy { if (this.workflowWebsocketService.isConnected) { this.workflowWebsocketService.closeWebsocket(); this.workflowStatusService.clearStatus(); + // refresh cache panel — cache entries may differ between CUs + this.workflowCacheEntriesService.refreshCurrentWorkflowCacheEntries(); } this.workflowWebsocketService.openWebsocket(wid, this.userService.getCurrentUser()?.uid, cuid); @@ -263,6 +267,8 @@ export class ComputingUnitStatusService implements OnDestroy { // trigger a single refresh; the refresh pipeline will // pull the new list and call updateComputingUnits() this.refreshComputingUnitList(); + // refresh cache panel — backend has cleared cache metadata for this CU + this.workflowCacheEntriesService.refreshCurrentWorkflowCacheEntries(); }), map(() => true), catchError((err: unknown) => { diff --git a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts index 852f9fddfb..d3669b3e27 100644 --- a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts +++ b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts @@ -151,6 +151,15 @@ export class WorkflowCacheEntriesService { this.setAutoInvalidationEnabled(!this.autoInvalidationEnabledSubject.value); } + /** + * Refreshes cache entries for the current workflow (if any) and updates shared state. + */ + public refreshCurrentWorkflowCacheEntries(): void { + if (this.currentWorkflowId && this.currentWorkflowId > 0) { + this.refreshCacheEntries(this.currentWorkflowId).subscribe(); + } + } + /** * Refreshes cache entries for a workflow and updates shared state. *
