This is an automated email from the ASF dual-hosted git repository.

xiaozhenliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 95496cce27 feat: Separate Runtime Statistics Collection from UI 
Updates (#4205)
95496cce27 is described below

commit 95496cce27d6b3221ed30c8dd516425f9e06a333
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Mon Feb 23 23:00:37 2026 -0800

    feat: Separate Runtime Statistics Collection from UI Updates (#4205)
    
    <!--
    Thanks for sending a pull request (PR)! Here are some tips for you:
    1. If this is your first time, please read our contributor guidelines:
    [Contributing to
    Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
      2. Ensure you have added or run the appropriate tests for your PR
      3. If the PR is work in progress, mark it a draft on GitHub.
      4. Please write your PR title to summarize what this PR proposes, we
        are following Conventional Commits style for PR titles as well.
      5. Be sure to keep the PR description updated to reflect all changes.
    -->
    
    ### What changes were proposed in this PR?
    <!--
    Please clarify what changes you are proposing. The purpose of this
    section
    is to outline the changes. Here are some tips for you:
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
      3. If it is a refactoring, clarify what has been changed.
      3. It would be helpful to include a before-and-after comparison using
         screenshots or GIFs.
      4. Please consider writing useful notes for better and faster reviews.
    -->
    
    This PR introduces a new configuration parameter
    `runtime-statistics-persistence-interval` to independently control the
    frequency of runtime statistics persistence, separate from the UI update
    frequency (`status-update-interval`). Previously, both UI updates and
    runtime statistics persistence were controlled by a single parameter
    `status-update-interval`. This means frequent UI updates (e.g., 500ms)
    caused excessive statistics writes to storage. This change allows
    independent control:
    - `status-update-interval`: Controls how often the frontend UI refreshes
    (default: 500ms)
    - `runtime-statistics-persistence-interval`: Controls how often
    statistics are persisted to storage (default: 2000ms)
    
    #### Do two timers mean more frequent worker queries?
    
    No. The controller tracks the timestamp of the last completed full-graph
    worker query and uses `min(status-update-interval,
    runtime-statistics-persistence-interval)` as a freshness threshold. When
    a timer fires, if the elapsed time since the last query is within this
    threshold, the controller forwards stats from cache without querying
    workers — so the faster timer drives all real worker queries and the
    slower timer always reuses the result. If a query is already in-flight
    when the second timer fires, the controller serves stats from the
    previous completed query's cache. Cache reuse applies to timer-triggered
    queries only; event-triggered queries (e.g., from worker completion
    events) always proceed to real worker RPCs.
    
    #### Changes
    - Added `runtime-statistics-persistence-interval` parameter (default:
    2000ms) in `application.conf`
    - Protobuf: Added `StatisticsUpdateTarget` enum (`UI_ONLY`,
    `PERSISTENCE_ONLY`, `BOTH_UI_AND_PERSISTENCE`) to
    `QueryStatisticsRequest`
    - Added `RuntimeStatisticsPersist` event for statistics-only updates;
    `ExecutionStatsUpdate` now handles UI-only updates
    - Added separate timer for runtime statistics persistence that runs
    independently from the UI update timer
    - Query Handling
      - Timer-triggered queries specify target: UI-only or persistence-only
    - Event-triggered queries (port/worker completion, pause, resume) send
    both UI and persistence updates to preserve original behavior
    - `QueryWorkerStatisticsHandler` routes to the appropriate event based
    on `StatisticsUpdateTarget`
    - Worker query deduplication in `QueryWorkerStatisticsHandler`: when the
    second timer fires, the controller checks whether worker stats were
    already fetched recently (within `min(status-update-interval,
    runtime-statistics-persistence-interval)`). If so, it forwards the
    cached stats to the appropriate sink (UI or persistence) without issuing
    any worker RPCs. If a query is already in-flight, cached stats from the
    previous completed query are forwarded.
    
    
    ### Any related issues, documentation, discussions?
    <!--
    Please use this section to link other resources if not mentioned
    already.
    1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
    #1234`
    or `Closes #1234`. If it is only related, simply mention the issue
    number.
      2. If there is design documentation, please add the link.
      3. If there is a discussion in the mailing list, please add the link.
    -->
    Closes #4204
    
    ### How was this PR tested?
    <!--
    If tests were added, say they were added here. Or simply mention that if
    the PR
    is tested with existing test cases. Make sure to include/update test
    cases that
    check the changes thoroughly including negative and positive cases if
    possible.
    If it was tested in a way different from regular unit tests, please
    clarify how
    you tested step by step, ideally copy and paste-able, so that other
    reviewers can
    test and check, and descendants can verify in the future. If tests were
    not added,
    please describe why they were not added and/or why it was difficult to
    add.
    -->
    Tested with the following workflow and dataset, change the
    `runtime-statistics-persistence-interval` parameter to see if the
    runtime stats size reduces if we increase the parameter value.
    [Iris Dataset
    
Analysis.json](https://github.com/user-attachments/files/25220000/Iris.Dataset.Analysis.json)
    [Iris.csv](https://github.com/user-attachments/files/25220003/Iris.csv)
    
    
    
    ### Was this PR authored or co-authored using generative AI tooling?
    <!--
    If generative AI tooling has been used in the process of authoring this
    PR,
    please include the phrase: 'Generated-by: ' followed by the name of the
    tool
    and its version. If no, write 'No'.
    Please refer to the [ASF Generative Tooling
    Guidance](https://www.apache.org/legal/generative-tooling.html) for
    details.
    -->
    Generated-by: Claude-4.6
    
    ---------
    
    Co-authored-by: Xiaozhen Liu <[email protected]>
    Co-authored-by: Chen Li <[email protected]>
---
 .../engine/architecture/rpc/controlcommands.proto  |  7 +++
 .../architecture/controller/ClientEvent.scala      |  3 ++
 .../architecture/controller/Controller.scala       |  3 ++
 .../controller/ControllerTimerService.scala        | 54 ++++++++++++++++++----
 .../controller/promisehandlers/PauseHandler.scala  | 14 +++---
 .../promisehandlers/PortCompletedHandler.scala     | 11 ++++-
 .../QueryWorkerStatisticsHandler.scala             | 52 ++++++++++++++++++---
 .../controller/promisehandlers/ResumeHandler.scala | 15 +++---
 .../promisehandlers/StartWorkflowHandler.scala     |  1 +
 .../WorkerExecutionCompletedHandler.scala          |  6 ++-
 .../WorkerStateUpdatedHandler.scala                | 11 ++---
 .../scheduling/RegionExecutionCoordinator.scala    | 15 +++---
 .../texera/web/service/ExecutionStatsService.scala | 17 ++++++-
 common/config/src/main/resources/application.conf  |  3 ++
 .../texera/amber/config/ApplicationConfig.scala    |  2 +
 15 files changed, 164 insertions(+), 50 deletions(-)

diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index ed17be236d..d714f64a15 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -269,6 +269,13 @@ message PrepareCheckpointRequest{
   bool estimationOnly = 2;
 }
 
+enum StatisticsUpdateTarget {
+  BOTH_UI_AND_PERSISTENCE = 0;
+  UI_ONLY = 1;
+  PERSISTENCE_ONLY = 2;
+}
+
 message QueryStatisticsRequest{
   repeated core.ActorVirtualIdentity filterByWorkers = 1;
+  StatisticsUpdateTarget updateTarget = 2;
 }
\ No newline at end of file
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
index ea83eedd2a..1092af15e7 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
@@ -31,6 +31,9 @@ case class ExecutionStateUpdate(state: 
WorkflowAggregatedState) extends ClientEv
 
 case class ExecutionStatsUpdate(operatorMetrics: Map[String, OperatorMetrics]) 
extends ClientEvent
 
+case class RuntimeStatisticsPersist(operatorMetrics: Map[String, 
OperatorMetrics])
+    extends ClientEvent
+
 case class ReportCurrentProcessingTuple(
     operatorID: String,
     tuple: Array[(Tuple, ActorVirtualIdentity)]
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
index 85ae04d4e9..b0e4f3fdc3 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
@@ -51,6 +51,8 @@ object ControllerConfig {
   def default: ControllerConfig =
     ControllerConfig(
       statusUpdateIntervalMs = 
Option(ApplicationConfig.getStatusUpdateIntervalInMs),
+      runtimeStatisticsPersistenceIntervalMs =
+        Option(ApplicationConfig.getRuntimeStatisticsPersistenceIntervalInMs),
       stateRestoreConfOpt = None,
       faultToleranceConfOpt = None
     )
@@ -58,6 +60,7 @@ object ControllerConfig {
 
 final case class ControllerConfig(
     statusUpdateIntervalMs: Option[Long],
+    runtimeStatisticsPersistenceIntervalMs: Option[Long],
     stateRestoreConfOpt: Option[StateRestoreConfig],
     faultToleranceConfOpt: Option[FaultToleranceConfig]
 )
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
index 20ebff9fc0..a778a27c46 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
@@ -23,7 +23,8 @@ import org.apache.pekko.actor.Cancellable
 import org.apache.texera.amber.engine.architecture.common.AkkaActorService
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
-  QueryStatisticsRequest
+  QueryStatisticsRequest,
+  StatisticsUpdateTarget
 }
 import 
org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceGrpc.METHOD_CONTROLLER_INITIATE_QUERY_STATISTICS
 import 
org.apache.texera.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation
@@ -36,28 +37,61 @@ class ControllerTimerService(
     akkaActorService: AkkaActorService
 ) {
   var statusUpdateAskHandle: Option[Cancellable] = None
+  var runtimeStatisticsAskHandle: Option[Cancellable] = None
 
-  def enableStatusUpdate(): Unit = {
-    if (controllerConfig.statusUpdateIntervalMs.nonEmpty && 
statusUpdateAskHandle.isEmpty) {
-      statusUpdateAskHandle = Option(
+  private def enableTimer(
+      intervalMs: Option[Long],
+      updateTarget: StatisticsUpdateTarget,
+      handleOpt: Option[Cancellable]
+  ): Option[Cancellable] = {
+    if (intervalMs.nonEmpty && handleOpt.isEmpty) {
+      Option(
         akkaActorService.sendToSelfWithFixedDelay(
           0.milliseconds,
-          FiniteDuration.apply(controllerConfig.statusUpdateIntervalMs.get, 
MILLISECONDS),
+          FiniteDuration.apply(intervalMs.get, MILLISECONDS),
           ControlInvocation(
             METHOD_CONTROLLER_INITIATE_QUERY_STATISTICS,
-            QueryStatisticsRequest(Seq.empty),
+            QueryStatisticsRequest(Seq.empty, updateTarget),
             AsyncRPCContext(SELF, SELF),
             0
           )
         )
       )
+    } else {
+      handleOpt
     }
   }
 
-  def disableStatusUpdate(): Unit = {
-    if (statusUpdateAskHandle.nonEmpty) {
-      statusUpdateAskHandle.get.cancel()
-      statusUpdateAskHandle = Option.empty
+  private def disableTimer(handleOpt: Option[Cancellable]): 
Option[Cancellable] = {
+    if (handleOpt.nonEmpty) {
+      handleOpt.get.cancel()
+      Option.empty
+    } else {
+      handleOpt
     }
   }
+
+  def enableStatusUpdate(): Unit = {
+    statusUpdateAskHandle = enableTimer(
+      controllerConfig.statusUpdateIntervalMs,
+      StatisticsUpdateTarget.UI_ONLY,
+      statusUpdateAskHandle
+    )
+  }
+
+  def enableRuntimeStatisticsCollection(): Unit = {
+    runtimeStatisticsAskHandle = enableTimer(
+      controllerConfig.runtimeStatisticsPersistenceIntervalMs,
+      StatisticsUpdateTarget.PERSISTENCE_ONLY,
+      runtimeStatisticsAskHandle
+    )
+  }
+
+  def disableStatusUpdate(): Unit = {
+    statusUpdateAskHandle = disableTimer(statusUpdateAskHandle)
+  }
+
+  def disableRuntimeStatisticsCollection(): Unit = {
+    runtimeStatisticsAskHandle = disableTimer(runtimeStatisticsAskHandle)
+  }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala
index 5511fea6ab..35a85f56ae 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala
@@ -25,7 +25,8 @@ import 
org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerAsyncRPCHandlerInitializer,
   ExecutionStateUpdate,
-  ExecutionStatsUpdate
+  ExecutionStatsUpdate,
+  RuntimeStatisticsPersist
 }
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
@@ -47,6 +48,7 @@ trait PauseHandler {
 
   override def pauseWorkflow(request: EmptyRequest, ctx: AsyncRPCContext): 
Future[EmptyReturn] = {
     cp.controllerTimerService.disableStatusUpdate() // to be enabled in resume
+    cp.controllerTimerService.disableRuntimeStatisticsCollection() // to be 
enabled in resume
     Future
       .collect(
         cp.workflowExecution.getRunningRegionExecutions
@@ -81,12 +83,10 @@ trait PauseHandler {
           .toSeq
       )
       .map { _ =>
-        // update frontend workflow status
-        sendToClient(
-          ExecutionStatsUpdate(
-            cp.workflowExecution.getAllRegionExecutionsStats
-          )
-        )
+        // update frontend workflow status and persist statistics
+        val stats = cp.workflowExecution.getAllRegionExecutionsStats
+        sendToClient(ExecutionStatsUpdate(stats))
+        sendToClient(RuntimeStatisticsPersist(stats))
         sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
         logger.info(s"workflow paused")
       }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
index 0add56eba3..810c098c41 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
@@ -29,7 +29,8 @@ import 
org.apache.texera.amber.engine.architecture.controller.{
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
   PortCompletedRequest,
-  QueryStatisticsRequest
+  QueryStatisticsRequest,
+  StatisticsUpdateTarget
 }
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
@@ -50,7 +51,13 @@ trait PortCompletedHandler {
       ctx: AsyncRPCContext
   ): Future[EmptyReturn] = {
     controllerInterface
-      
.controllerInitiateQueryStatistics(QueryStatisticsRequest(scala.Seq(ctx.sender)),
 CONTROLLER)
+      .controllerInitiateQueryStatistics(
+        QueryStatisticsRequest(
+          scala.Seq(ctx.sender),
+          StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE
+        ),
+        CONTROLLER
+      )
       .map { _ =>
         val globalPortId = GlobalPortIdentity(
           VirtualIdentityUtils.getPhysicalOpId(ctx.sender),
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
index a7705170e3..6551579f71 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
@@ -20,16 +20,19 @@
 package org.apache.texera.amber.engine.architecture.controller.promisehandlers
 
 import com.twitter.util.Future
+import org.apache.texera.amber.config.ApplicationConfig
 import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerAsyncRPCHandlerInitializer,
-  ExecutionStatsUpdate
+  ExecutionStatsUpdate,
+  RuntimeStatisticsPersist
 }
 import 
org.apache.texera.amber.engine.architecture.deploysemantics.layer.WorkerExecution
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
   EmptyRequest,
-  QueryStatisticsRequest
+  QueryStatisticsRequest,
+  StatisticsUpdateTarget
 }
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
 import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
@@ -47,6 +50,33 @@ trait QueryWorkerStatisticsHandler {
 
   private var globalQueryStatsOngoing = false
 
+  // Minimum of the two timer intervals converted to nanoseconds.
+  // A full-graph worker query is skipped and served from cache when the last 
completed
+  // query falls within this window, avoiding redundant worker RPCs.
+  private val minQueryIntervalNs: Long =
+    Math.min(
+      ApplicationConfig.getStatusUpdateIntervalInMs,
+      ApplicationConfig.getRuntimeStatisticsPersistenceIntervalInMs
+    ) * 1_000_000L
+
+  // Nanosecond timestamp of the last completed full-graph worker stats query.
+  @volatile private var lastWorkerQueryTimestampNs: Long = 0L
+
+  // Reads the current cached stats and forwards them to the appropriate 
client sink(s).
+  private def forwardStats(updateTarget: StatisticsUpdateTarget): Unit = {
+    val stats = cp.workflowExecution.getAllRegionExecutionsStats
+    updateTarget match {
+      case StatisticsUpdateTarget.UI_ONLY =>
+        sendToClient(ExecutionStatsUpdate(stats))
+      case StatisticsUpdateTarget.PERSISTENCE_ONLY =>
+        sendToClient(RuntimeStatisticsPersist(stats))
+      case StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE |
+          StatisticsUpdateTarget.Unrecognized(_) =>
+        sendToClient(ExecutionStatsUpdate(stats))
+        sendToClient(RuntimeStatisticsPersist(stats))
+    }
+  }
+
   override def controllerInitiateQueryStatistics(
       msg: QueryStatisticsRequest,
       ctx: AsyncRPCContext
@@ -54,12 +84,20 @@ trait QueryWorkerStatisticsHandler {
     // Avoid issuing concurrent full-graph statistics queries.
     // If a global query is already in progress, skip this request.
     if (globalQueryStatsOngoing && msg.filterByWorkers.isEmpty) {
+      // A query is already in-flight: serve the last completed query's cached 
data,
+      // or drop silently if no prior query has finished yet.
+      if (lastWorkerQueryTimestampNs > 0) forwardStats(msg.updateTarget)
       return EmptyReturn()
     }
 
     var opFilter: Set[PhysicalOpIdentity] = Set.empty
     // Only enforce the single-query restriction for full-graph queries.
     if (msg.filterByWorkers.isEmpty) {
+      if (System.nanoTime() - lastWorkerQueryTimestampNs < minQueryIntervalNs) 
{
+        // Cache is still fresh: the faster timer already queried workers 
recently.
+        forwardStats(msg.updateTarget)
+        return EmptyReturn()
+      }
       globalQueryStatsOngoing = true
     } else {
       // Map the filtered worker IDs (if any) to their corresponding physical 
operator IDs
@@ -133,17 +171,17 @@ trait QueryWorkerStatisticsHandler {
           Future.collect(futures).flatMap(_ => processLayers(rest))
       }
 
-    // Start processing all layers and update the frontend after completion
+    // Start processing all layers and forward stats to the appropriate 
sink(s) on completion.
     processLayers(layers).map { _ =>
       collectedResults.foreach {
         case (wExec, resp, timestamp) =>
           wExec.update(timestamp, resp.metrics.workerState, 
resp.metrics.workerStatistics)
       }
-      sendToClient(
-        ExecutionStatsUpdate(cp.workflowExecution.getAllRegionExecutionsStats)
-      )
-      // Release the global query lock if it was set
+      forwardStats(msg.updateTarget)
+      // Record the completion timestamp before releasing the lock so that any 
timer
+      // firing in between sees a valid cache entry rather than triggering a 
redundant query.
       if (globalQueryStatsOngoing) {
+        lastWorkerQueryTimestampNs = System.nanoTime()
         globalQueryStatsOngoing = false
       }
       EmptyReturn()
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala
index 020ad40a1c..c94ba91c20 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala
@@ -22,7 +22,8 @@ package 
org.apache.texera.amber.engine.architecture.controller.promisehandlers
 import com.twitter.util.Future
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerAsyncRPCHandlerInitializer,
-  ExecutionStatsUpdate
+  ExecutionStatsUpdate,
+  RuntimeStatisticsPersist
 }
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
@@ -57,14 +58,14 @@ trait ResumeHandler {
           .toSeq
       )
       .map { _ =>
-        // update frontend status
-        sendToClient(
-          ExecutionStatsUpdate(
-            cp.workflowExecution.getAllRegionExecutionsStats
-          )
-        )
+        // update frontend status and persist statistics
+        val stats = cp.workflowExecution.getAllRegionExecutionsStats
+        sendToClient(ExecutionStatsUpdate(stats))
+        sendToClient(RuntimeStatisticsPersist(stats))
         cp.controllerTimerService
           .enableStatusUpdate() //re-enabled it since it is disabled in pause
+        cp.controllerTimerService
+          .enableRuntimeStatisticsCollection() //re-enabled it since it is 
disabled in pause
         EmptyReturn()
       }
   }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala
index 2ce28afb0f..7d938dbedd 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala
@@ -45,6 +45,7 @@ trait StartWorkflowHandler {
         .coordinateRegionExecutors(cp.actorService)
         .map(_ => {
           cp.controllerTimerService.enableStatusUpdate()
+          cp.controllerTimerService.enableRuntimeStatisticsCollection()
           StartWorkflowResponse(RUNNING)
         })
     } else {
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index 594673caa5..d54a22f26b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -27,7 +27,8 @@ import 
org.apache.texera.amber.engine.architecture.controller.{
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
   EmptyRequest,
-  QueryStatisticsRequest
+  QueryStatisticsRequest,
+  StatisticsUpdateTarget
 }
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
@@ -52,7 +53,7 @@ trait WorkerExecutionCompletedHandler {
     // and the user sees the last update before completion
     val statsRequest =
       controllerInterface.controllerInitiateQueryStatistics(
-        QueryStatisticsRequest(Seq(ctx.sender)),
+        QueryStatisticsRequest(Seq(ctx.sender), 
StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE),
         mkContext(SELF)
       )
 
@@ -64,6 +65,7 @@ trait WorkerExecutionCompletedHandler {
           // after query result come back: send completed event, cleanup ,and 
kill workflow
           sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
           cp.controllerTimerService.disableStatusUpdate()
+          cp.controllerTimerService.disableRuntimeStatisticsCollection()
         }
       })
     EmptyReturn()
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala
index c1fa6b1ef7..5ee98a4918 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala
@@ -22,7 +22,8 @@ package 
org.apache.texera.amber.engine.architecture.controller.promisehandlers
 import com.twitter.util.Future
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerAsyncRPCHandlerInitializer,
-  ExecutionStatsUpdate
+  ExecutionStatsUpdate,
+  RuntimeStatisticsPersist
 }
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
@@ -50,11 +51,9 @@ trait WorkerStateUpdatedHandler {
       .foreach(operatorExecution =>
         
operatorExecution.getWorkerExecution(ctx.sender).update(System.nanoTime(), 
msg.state)
       )
-    sendToClient(
-      ExecutionStatsUpdate(
-        cp.workflowExecution.getAllRegionExecutionsStats
-      )
-    )
+    val stats = cp.workflowExecution.getAllRegionExecutionsStats
+    sendToClient(ExecutionStatsUpdate(stats))
+    sendToClient(RuntimeStatisticsPersist(stats))
     EmptyReturn()
   }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 7e5b228801..e490cde3d9 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -38,6 +38,7 @@ import 
org.apache.texera.amber.engine.architecture.controller.execution.{
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerConfig,
   ExecutionStatsUpdate,
+  RuntimeStatisticsPersist,
   WorkerAssignmentUpdate
 }
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
@@ -275,9 +276,9 @@ class RegionExecutionCoordinator(
     val resourceConfig = region.resourceConfig.get
     val regionExecution = workflowExecution.getRegionExecution(region.id)
 
-    asyncRPCClient.sendToClient(
-      ExecutionStatsUpdate(workflowExecution.getAllRegionExecutionsStats)
-    )
+    val stats = workflowExecution.getAllRegionExecutionsStats
+    asyncRPCClient.sendToClient(ExecutionStatsUpdate(stats))
+    asyncRPCClient.sendToClient(RuntimeStatisticsPersist(stats))
     asyncRPCClient.sendToClient(
       WorkerAssignmentUpdate(
         operatorsToRun
@@ -489,11 +490,9 @@ class RegionExecutionCoordinator(
       region: Region,
       isDependeePhase: Boolean
   ): Future[Seq[Unit]] = {
-    asyncRPCClient.sendToClient(
-      ExecutionStatsUpdate(
-        workflowExecution.getAllRegionExecutionsStats
-      )
-    )
+    val stats = workflowExecution.getAllRegionExecutionsStats
+    asyncRPCClient.sendToClient(ExecutionStatsUpdate(stats))
+    asyncRPCClient.sendToClient(RuntimeStatisticsPersist(stats))
     val allStarterOperators = region.getStarterOperators
     val starterOpsForThisPhase =
       if (isDependeePhase) 
allStarterOperators.filter(_.dependeeInputs.nonEmpty)
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
index 841c7112c7..3703a2bf41 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
@@ -28,7 +28,14 @@ import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.workflow.WorkflowContext
 import 
org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
 import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError
-import org.apache.texera.amber.engine.architecture.controller._
+import org.apache.texera.amber.engine.architecture.controller.{
+  ExecutionStateUpdate,
+  ExecutionStatsUpdate,
+  FatalError,
+  RuntimeStatisticsPersist,
+  WorkerAssignmentUpdate,
+  WorkflowRecoveryStatus
+}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
   COMPLETED,
@@ -175,12 +182,20 @@ class ExecutionStatsService(
   }
 
   private[this] def registerCallbackOnWorkflowStatsUpdate(): Unit = {
+    // Register callback for UI updates (UI state store update only, no 
persistence)
     addSubscription(
       client
         .registerCallback[ExecutionStatsUpdate]((evt: ExecutionStatsUpdate) => 
{
           stateStore.statsStore.updateState { statsStore =>
             statsStore.withOperatorInfo(evt.operatorMetrics)
           }
+        })
+    )
+
+    // Register callback for statistics persistence (persistence only, no UI 
update)
+    addSubscription(
+      client
+        .registerCallback[RuntimeStatisticsPersist]((evt: 
RuntimeStatisticsPersist) => {
           metricsPersistThread.execute(() => {
             storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics))
             lastPersistedMetrics = evt.operatorMetrics
diff --git a/common/config/src/main/resources/application.conf 
b/common/config/src/main/resources/application.conf
index 62daee8679..c7a7af2418 100644
--- a/common/config/src/main/resources/application.conf
+++ b/common/config/src/main/resources/application.conf
@@ -31,6 +31,9 @@ constants {
 
     status-update-interval = 500
     status-update-interval = ${?CONSTANTS_STATUS_UPDATE_INTERVAL}
+
+    runtime-statistics-persistence-interval = 2000
+    runtime-statistics-persistence-interval = 
${?CONSTANTS_RUNTIME_STATISTICS_PERSISTENCE_INTERVAL}
 }
 
 flow-control {
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/ApplicationConfig.scala
 
b/common/config/src/main/scala/org/apache/texera/amber/config/ApplicationConfig.scala
index 415a23d23c..927b102ebc 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/ApplicationConfig.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/ApplicationConfig.scala
@@ -46,6 +46,8 @@ object ApplicationConfig {
   val MAX_RESOLUTION_COLUMNS: Int = 
getConfSource.getInt("constants.max-resolution-columns")
   val numWorkerPerOperatorByDefault: Int = 
getConfSource.getInt("constants.num-worker-per-operator")
   val getStatusUpdateIntervalInMs: Long = 
getConfSource.getLong("constants.status-update-interval")
+  val getRuntimeStatisticsPersistenceIntervalInMs: Long =
+    getConfSource.getLong("constants.runtime-statistics-persistence-interval")
 
   // Flow control
   val maxCreditAllowedInBytesPerChannel: Long =

Reply via email to