This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 62fcb3807644d7446224d1ec80726463044ac798 Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Jan 12 20:27:03 2026 -0800 feat(cache): update cached region completion logic to avoid assigning workers. --- .../controller/execution/OperatorExecution.scala | 108 ++++++++++++++------- .../scheduling/RegionExecutionCoordinator.scala | 53 +++++----- docs/operator-port-cache.md | 9 +- 3 files changed, 106 insertions(+), 64 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecution.scala index 5a7f57083a..6e1f27274b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecution.scala @@ -40,6 +40,22 @@ case class OperatorExecution() { private val workerExecutions = new util.concurrent.ConcurrentHashMap[ActorVirtualIdentity, WorkerExecution]() + // Cached metrics for ToSkip regions; when set, operator stats/state are derived from this only. + private var cachedMetrics: Option[OperatorMetrics] = None + + /** + * Sets cached operator metrics for a ToSkip region and bypasses worker-based aggregation. + */ + def setCachedMetrics(metrics: OperatorMetrics): Unit = { + cachedMetrics = Some(metrics) + } + + /** + * Clears cached operator metrics so the operator can report live worker stats again. + */ + def clearCachedMetrics(): Unit = { + cachedMetrics = None + } /** * Initializes a `WorkerExecution` for the specified workerId and adds it to the workerExecutions map. @@ -70,17 +86,24 @@ case class OperatorExecution() { */ def getWorkerIds: Set[ActorVirtualIdentity] = workerExecutions.keys.asScala.toSet + /** + * Returns the aggregated operator state from worker executions, or the cached state when present. + */ def getState: WorkflowAggregatedState = { - val workerStates = workerExecutions.values.asScala.map(_.getState) - aggregateStates( - workerStates, - WorkerState.COMPLETED, - WorkerState.TERMINATED, - WorkerState.RUNNING, - WorkerState.UNINITIALIZED, - WorkerState.PAUSED, - WorkerState.READY - ) + cachedMetrics + .map(_.operatorState) + .getOrElse { + val workerStates = workerExecutions.values.asScala.map(_.getState) + aggregateStates( + workerStates, + WorkerState.COMPLETED, + WorkerState.TERMINATED, + WorkerState.RUNNING, + WorkerState.UNINITIALIZED, + WorkerState.PAUSED, + WorkerState.READY + ) + } } private[this] def computeOperatorPortStats( @@ -89,36 +112,55 @@ case class OperatorExecution() { ExecutionUtils.aggregatePortMetrics(workerPortStats) } + /** + * Returns operator metrics aggregated from worker executions, or cached metrics when set. + */ def getStats: OperatorMetrics = { - val workerRawStats = workerExecutions.values.asScala.map(_.getStats) - val inputMetrics = workerRawStats.flatMap(_.inputTupleMetrics) - val outputMetrics = workerRawStats.flatMap(_.outputTupleMetrics) - OperatorMetrics( - getState, - OperatorStatistics( - inputMetrics = computeOperatorPortStats(inputMetrics), - outputMetrics = computeOperatorPortStats(outputMetrics), - getWorkerIds.size, - dataProcessingTime = workerRawStats.map(_.dataProcessingTime).sum, - controlProcessingTime = workerRawStats.map(_.controlProcessingTime).sum, - idleTime = workerRawStats.map(_.idleTime).sum + cachedMetrics.getOrElse { + val workerRawStats = workerExecutions.values.asScala.map(_.getStats) + val inputMetrics = workerRawStats.flatMap(_.inputTupleMetrics) + val outputMetrics = workerRawStats.flatMap(_.outputTupleMetrics) + OperatorMetrics( + getState, + OperatorStatistics( + inputMetrics = computeOperatorPortStats(inputMetrics), + outputMetrics = computeOperatorPortStats(outputMetrics), + getWorkerIds.size, + dataProcessingTime = workerRawStats.map(_.dataProcessingTime).sum, + controlProcessingTime = workerRawStats.map(_.controlProcessingTime).sum, + idleTime = workerRawStats.map(_.idleTime).sum + ) ) - ) + } } + /** + * Returns true when all worker input ports are completed, or always true for cached operators. + */ def isInputPortCompleted(portId: PortIdentity): Boolean = { - workerExecutions - .values() - .asScala - .map(workerExecution => workerExecution.getInputPortExecution(portId)) - .forall(_.completed) + if (cachedMetrics.isDefined) { + true + } else { + workerExecutions + .values() + .asScala + .map(workerExecution => workerExecution.getInputPortExecution(portId)) + .forall(_.completed) + } } + /** + * Returns true when all worker output ports are completed, or always true for cached operators. + */ def isOutputPortCompleted(portId: PortIdentity): Boolean = { - workerExecutions - .values() - .asScala - .map(workerExecution => workerExecution.getOutputPortExecution(portId)) - .forall(_.completed) + if (cachedMetrics.isDefined) { + true + } else { + workerExecutions + .values() + .asScala + .map(workerExecution => workerExecution.getOutputPortExecution(portId)) + .forall(_.completed) + } } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index a1bd6bdd17..2b6c9e6d71 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -29,12 +29,13 @@ import org.apache.texera.amber.engine.architecture.common.{AkkaActorRefMappingSe import org.apache.texera.amber.engine.architecture.controller.execution.{OperatorExecution, RegionExecution, WorkflowExecution} import org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate, ExecutionStatsUpdate, WorkerAssignmentUpdate} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ -import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.texera.amber.engine.architecture.scheduling.config.{InputPortConfig, OperatorConfig, OutputPortConfig, ResourceConfig, WorkerConfig} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{EmptyReturn, WorkflowAggregatedState} +import org.apache.texera.amber.engine.architecture.scheduling.config.{InputPortConfig, OperatorConfig, OutputPortConfig, ResourceConfig} import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning -import org.apache.texera.amber.engine.architecture.worker.statistics.{PortTupleMetricsMapping, TupleMetrics, WorkerState, WorkerStatistics} +import org.apache.texera.amber.engine.architecture.worker.statistics.{PortTupleMetricsMapping, TupleMetrics, WorkerState} import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.engine.common.FutureBijection._ +import org.apache.texera.amber.engine.common.executionruntimestate.{OperatorMetrics, OperatorStatistics} import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.texera.web.SessionState @@ -97,38 +98,34 @@ class RegionExecutionCoordinator( initRegionExecution() } + /** + * Short-circuit a cached region by recording operator metrics and output URIs without workers, + * then emit stats and mark the region as completed. + */ private def completeCachedRegion(): Unit = { val regionExecution = workflowExecution.getRegionExecution(region.id) val resourceConfig = region.resourceConfig.getOrElse(ResourceConfig()) region.getOperators.foreach { op => val opExecution = regionExecution.initOperatorExecution(op.id) - val workerConfigs = resourceConfig.operatorConfigs - .get(op.id) - .map(_.workerConfigs) - .getOrElse(WorkerConfig.generateWorkerConfigs(op)) - workerConfigs.foreach { workerCfg => - val workerExecution = opExecution.initWorkerExecution(workerCfg.workerId) - op.inputPorts.keys.foreach(pid => workerExecution.getInputPortExecution(pid).setCompleted()) - op.outputPorts.keys.foreach(pid => workerExecution.getOutputPortExecution(pid).setCompleted()) - val outputMetrics = op.outputPorts.keys.map { pid => - val count = resourceConfig.portConfigs.collectFirst { - case (gpid, cfg: OutputPortConfig) if gpid == GlobalPortIdentity(op.id, pid) => - cfg.cachedTupleCount.getOrElse(0L) - }.getOrElse(0L) - PortTupleMetricsMapping(pid, TupleMetrics(count, 0L)) - }.toSeq - val inputMetrics = op.inputPorts.keys - .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L))) - .toSeq - val stats = WorkerStatistics( + // Cached regions do not create workers; synthesize operator-level metrics instead. + val outputMetrics = op.outputPorts.keys.map { pid => + val count = resourceConfig.portConfigs.collectFirst { + case (gpid, cfg: OutputPortConfig) if gpid == GlobalPortIdentity(op.id, pid) => + cfg.cachedTupleCount.getOrElse(0L) + }.getOrElse(0L) + PortTupleMetricsMapping(pid, TupleMetrics(count, 0L)) + }.toSeq + val inputMetrics = op.inputPorts.keys + .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L))) + .toSeq + val stats = OperatorMetrics( + WorkflowAggregatedState.COMPLETED, + OperatorStatistics( inputMetrics, outputMetrics, - dataProcessingTime = 0L, - controlProcessingTime = 0L, - idleTime = 0L ) - workerExecution.update(System.nanoTime(), WorkerState.COMPLETED, stats) - } + ) + opExecution.setCachedMetrics(stats) } recordCachedOutputPortResults(resourceConfig) asyncRPCClient.sendToClient( @@ -362,6 +359,8 @@ class RegionExecutionCoordinator( else None ) + // Ensure live execution does not inherit cached operator metrics. + operatorExecution.clearCachedMetrics() if (!existOpExecution) { buildOperator( diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index f77fe2653c..940d0490fa 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -35,10 +35,10 @@ A region is either fully cached (ToSkip) or fully executed (ToExecute) — no pa - Cost model (compare full region costs) ### 4. Shallow State Hierarchy for Cached Regions -ToSkip regions create lightweight state structures (Workflow → Region → Operator/Link) without Worker/Channel states. This is consistent with `numWorkers=0` and avoids synthetic worker lifecycle management. +ToSkip regions create lightweight state structures (Workflow → Region → Operator/Link) and store cached metrics at the operator level. No Worker/Channel states are created, so `numWorkers=0` and no worker assignments are emitted. ### 5. Stats Emission via Direct Client Updates -Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via `asyncRPCClient.sendToClient()`. This reuses existing stats infrastructure without special-casing the frontend. +Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via `asyncRPCClient.sendToClient()`, with cached operator metrics (`numWorkers=0`). This reuses existing stats infrastructure without special-casing the frontend. ### 6. No Explicit Cache Flag in Metrics Cached execution is inferred from `numWorkers=0` + instant completion, rather than adding a `from_cache` flag to protobuf messages. This minimizes protocol changes. @@ -112,8 +112,9 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached 1. **Skip operator execution**: Call `completeCachedRegion()` immediately 2. **State hierarchy** (shallow): - Create: Workflow → Region → Operator/Link states + - Record cached operator metrics (numWorkers=0) - Skip: Worker/Channel states (not needed) -3. **Mark ports completed**: Set port status to COMPLETED with cached URI +3. **Mark ports completed**: Treat cached operators as completed and record cached URIs 4. **Emit synthetic stats** via `asyncRPCClient.sendToClient(ExecutionStatsUpdate(...))`: - `numWorkers = 0` - `dataProcessingTime = 0`, `controlProcessingTime = 0`, `idleTime = 0` @@ -288,7 +289,7 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event → `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → `OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count) - **Scheduler integration**: `CostBasedScheduleGenerator` marks regions cached when all required outputs have hits, reuses cached URIs in port configs - **Runtime execution**: `RegionExecutionCoordinator` branches on `region.cached` flag: - - ToSkip regions: `completeCachedRegion()` creates shallow state hierarchy, emits synthetic stats (numWorkers=0, processingTime=0), propagates cached URIs downstream + - ToSkip regions: `completeCachedRegion()` records cached operator metrics (numWorkers=0, processingTime=0) without creating workers, propagates cached URIs downstream - ToExecute regions: normal execution path - **Stats emission**: Cached regions emit `ExecutionStatsUpdate` via direct client updates, maintaining consistency with normal execution lifecycle
