This is an automated email from the ASF dual-hosted git repository.
xiaozhenliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 95496cce27 feat: Separate Runtime Statistics Collection from UI
Updates (#4205)
95496cce27 is described below
commit 95496cce27d6b3221ed30c8dd516425f9e06a333
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Mon Feb 23 23:00:37 2026 -0800
feat: Separate Runtime Statistics Collection from UI Updates (#4205)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR introduces a new configuration parameter
`runtime-statistics-persistence-interval` to independently control the
frequency of runtime statistics persistence, separate from the UI update
frequency (`status-update-interval`). Previously, both UI updates and
runtime statistics persistence were controlled by a single parameter
`status-update-interval`. This means frequent UI updates (e.g., 500ms)
caused excessive statistics writes to storage. This change allows
independent control:
- `status-update-interval`: Controls how often the frontend UI refreshes
(default: 500ms)
- `runtime-statistics-persistence-interval`: Controls how often
statistics are persisted to storage (default: 2000ms)
#### Do two timers mean more frequent worker queries?
No. The controller tracks the timestamp of the last completed full-graph
worker query and uses `min(status-update-interval,
runtime-statistics-persistence-interval)` as a freshness threshold. When
a timer fires, if the elapsed time since the last query is within this
threshold, the controller forwards stats from cache without querying
workers — so the faster timer drives all real worker queries and the
slower timer always reuses the result. If a query is already in-flight
when the second timer fires, the controller serves stats from the
previous completed query's cache. Cache reuse applies to timer-triggered
queries only; event-triggered queries (e.g., from worker completion
events) always proceed to real worker RPCs.
#### Changes
- Added `runtime-statistics-persistence-interval` parameter (default:
2000ms) in `application.conf`
- Protobuf: Added `StatisticsUpdateTarget` enum (`UI_ONLY`,
`PERSISTENCE_ONLY`, `BOTH_UI_AND_PERSISTENCE`) to
`QueryStatisticsRequest`
- Added `RuntimeStatisticsPersist` event for statistics-only updates;
`ExecutionStatsUpdate` now handles UI-only updates
- Added separate timer for runtime statistics persistence that runs
independently from the UI update timer
- Query Handling
- Timer-triggered queries specify target: UI-only or persistence-only
- Event-triggered queries (port/worker completion, pause, resume) send
both UI and persistence updates to preserve original behavior
- `QueryWorkerStatisticsHandler` routes to the appropriate event based
on `StatisticsUpdateTarget`
- Worker query deduplication in `QueryWorkerStatisticsHandler`: when the
second timer fires, the controller checks whether worker stats were
already fetched recently (within `min(status-update-interval,
runtime-statistics-persistence-interval)`). If so, it forwards the
cached stats to the appropriate sink (UI or persistence) without issuing
any worker RPCs. If a query is already in-flight, cached stats from the
previous completed query are forwarded.
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
Closes #4204
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
Tested with the following workflow and dataset, change the
`runtime-statistics-persistence-interval` parameter to see if the
runtime stats size reduces if we increase the parameter value.
[Iris Dataset
Analysis.json](https://github.com/user-attachments/files/25220000/Iris.Dataset.Analysis.json)
[Iris.csv](https://github.com/user-attachments/files/25220003/Iris.csv)
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
Generated-by: Claude-4.6
---------
Co-authored-by: Xiaozhen Liu <[email protected]>
Co-authored-by: Chen Li <[email protected]>
---
.../engine/architecture/rpc/controlcommands.proto | 7 +++
.../architecture/controller/ClientEvent.scala | 3 ++
.../architecture/controller/Controller.scala | 3 ++
.../controller/ControllerTimerService.scala | 54 ++++++++++++++++++----
.../controller/promisehandlers/PauseHandler.scala | 14 +++---
.../promisehandlers/PortCompletedHandler.scala | 11 ++++-
.../QueryWorkerStatisticsHandler.scala | 52 ++++++++++++++++++---
.../controller/promisehandlers/ResumeHandler.scala | 15 +++---
.../promisehandlers/StartWorkflowHandler.scala | 1 +
.../WorkerExecutionCompletedHandler.scala | 6 ++-
.../WorkerStateUpdatedHandler.scala | 11 ++---
.../scheduling/RegionExecutionCoordinator.scala | 15 +++---
.../texera/web/service/ExecutionStatsService.scala | 17 ++++++-
common/config/src/main/resources/application.conf | 3 ++
.../texera/amber/config/ApplicationConfig.scala | 2 +
15 files changed, 164 insertions(+), 50 deletions(-)
diff --git
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index ed17be236d..d714f64a15 100644
---
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -269,6 +269,13 @@ message PrepareCheckpointRequest{
bool estimationOnly = 2;
}
+enum StatisticsUpdateTarget {
+ BOTH_UI_AND_PERSISTENCE = 0;
+ UI_ONLY = 1;
+ PERSISTENCE_ONLY = 2;
+}
+
message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
+ StatisticsUpdateTarget updateTarget = 2;
}
\ No newline at end of file
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
index ea83eedd2a..1092af15e7 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
@@ -31,6 +31,9 @@ case class ExecutionStateUpdate(state:
WorkflowAggregatedState) extends ClientEv
case class ExecutionStatsUpdate(operatorMetrics: Map[String, OperatorMetrics])
extends ClientEvent
+case class RuntimeStatisticsPersist(operatorMetrics: Map[String,
OperatorMetrics])
+ extends ClientEvent
+
case class ReportCurrentProcessingTuple(
operatorID: String,
tuple: Array[(Tuple, ActorVirtualIdentity)]
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
index 85ae04d4e9..b0e4f3fdc3 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
@@ -51,6 +51,8 @@ object ControllerConfig {
def default: ControllerConfig =
ControllerConfig(
statusUpdateIntervalMs =
Option(ApplicationConfig.getStatusUpdateIntervalInMs),
+ runtimeStatisticsPersistenceIntervalMs =
+ Option(ApplicationConfig.getRuntimeStatisticsPersistenceIntervalInMs),
stateRestoreConfOpt = None,
faultToleranceConfOpt = None
)
@@ -58,6 +60,7 @@ object ControllerConfig {
final case class ControllerConfig(
statusUpdateIntervalMs: Option[Long],
+ runtimeStatisticsPersistenceIntervalMs: Option[Long],
stateRestoreConfOpt: Option[StateRestoreConfig],
faultToleranceConfOpt: Option[FaultToleranceConfig]
)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
index 20ebff9fc0..a778a27c46 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
@@ -23,7 +23,8 @@ import org.apache.pekko.actor.Cancellable
import org.apache.texera.amber.engine.architecture.common.AkkaActorService
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
- QueryStatisticsRequest
+ QueryStatisticsRequest,
+ StatisticsUpdateTarget
}
import
org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceGrpc.METHOD_CONTROLLER_INITIATE_QUERY_STATISTICS
import
org.apache.texera.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation
@@ -36,28 +37,61 @@ class ControllerTimerService(
akkaActorService: AkkaActorService
) {
var statusUpdateAskHandle: Option[Cancellable] = None
+ var runtimeStatisticsAskHandle: Option[Cancellable] = None
- def enableStatusUpdate(): Unit = {
- if (controllerConfig.statusUpdateIntervalMs.nonEmpty &&
statusUpdateAskHandle.isEmpty) {
- statusUpdateAskHandle = Option(
+ private def enableTimer(
+ intervalMs: Option[Long],
+ updateTarget: StatisticsUpdateTarget,
+ handleOpt: Option[Cancellable]
+ ): Option[Cancellable] = {
+ if (intervalMs.nonEmpty && handleOpt.isEmpty) {
+ Option(
akkaActorService.sendToSelfWithFixedDelay(
0.milliseconds,
- FiniteDuration.apply(controllerConfig.statusUpdateIntervalMs.get,
MILLISECONDS),
+ FiniteDuration.apply(intervalMs.get, MILLISECONDS),
ControlInvocation(
METHOD_CONTROLLER_INITIATE_QUERY_STATISTICS,
- QueryStatisticsRequest(Seq.empty),
+ QueryStatisticsRequest(Seq.empty, updateTarget),
AsyncRPCContext(SELF, SELF),
0
)
)
)
+ } else {
+ handleOpt
}
}
- def disableStatusUpdate(): Unit = {
- if (statusUpdateAskHandle.nonEmpty) {
- statusUpdateAskHandle.get.cancel()
- statusUpdateAskHandle = Option.empty
+ private def disableTimer(handleOpt: Option[Cancellable]):
Option[Cancellable] = {
+ if (handleOpt.nonEmpty) {
+ handleOpt.get.cancel()
+ Option.empty
+ } else {
+ handleOpt
}
}
+
+ def enableStatusUpdate(): Unit = {
+ statusUpdateAskHandle = enableTimer(
+ controllerConfig.statusUpdateIntervalMs,
+ StatisticsUpdateTarget.UI_ONLY,
+ statusUpdateAskHandle
+ )
+ }
+
+ def enableRuntimeStatisticsCollection(): Unit = {
+ runtimeStatisticsAskHandle = enableTimer(
+ controllerConfig.runtimeStatisticsPersistenceIntervalMs,
+ StatisticsUpdateTarget.PERSISTENCE_ONLY,
+ runtimeStatisticsAskHandle
+ )
+ }
+
+ def disableStatusUpdate(): Unit = {
+ statusUpdateAskHandle = disableTimer(statusUpdateAskHandle)
+ }
+
+ def disableRuntimeStatisticsCollection(): Unit = {
+ runtimeStatisticsAskHandle = disableTimer(runtimeStatisticsAskHandle)
+ }
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala
index 5511fea6ab..35a85f56ae 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala
@@ -25,7 +25,8 @@ import
org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
ExecutionStateUpdate,
- ExecutionStatsUpdate
+ ExecutionStatsUpdate,
+ RuntimeStatisticsPersist
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
@@ -47,6 +48,7 @@ trait PauseHandler {
override def pauseWorkflow(request: EmptyRequest, ctx: AsyncRPCContext):
Future[EmptyReturn] = {
cp.controllerTimerService.disableStatusUpdate() // to be enabled in resume
+ cp.controllerTimerService.disableRuntimeStatisticsCollection() // to be
enabled in resume
Future
.collect(
cp.workflowExecution.getRunningRegionExecutions
@@ -81,12 +83,10 @@ trait PauseHandler {
.toSeq
)
.map { _ =>
- // update frontend workflow status
- sendToClient(
- ExecutionStatsUpdate(
- cp.workflowExecution.getAllRegionExecutionsStats
- )
- )
+ // update frontend workflow status and persist statistics
+ val stats = cp.workflowExecution.getAllRegionExecutionsStats
+ sendToClient(ExecutionStatsUpdate(stats))
+ sendToClient(RuntimeStatisticsPersist(stats))
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
logger.info(s"workflow paused")
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
index 0add56eba3..810c098c41 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
@@ -29,7 +29,8 @@ import
org.apache.texera.amber.engine.architecture.controller.{
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
PortCompletedRequest,
- QueryStatisticsRequest
+ QueryStatisticsRequest,
+ StatisticsUpdateTarget
}
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
@@ -50,7 +51,13 @@ trait PortCompletedHandler {
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
controllerInterface
-
.controllerInitiateQueryStatistics(QueryStatisticsRequest(scala.Seq(ctx.sender)),
CONTROLLER)
+ .controllerInitiateQueryStatistics(
+ QueryStatisticsRequest(
+ scala.Seq(ctx.sender),
+ StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE
+ ),
+ CONTROLLER
+ )
.map { _ =>
val globalPortId = GlobalPortIdentity(
VirtualIdentityUtils.getPhysicalOpId(ctx.sender),
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
index a7705170e3..6551579f71 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
@@ -20,16 +20,19 @@
package org.apache.texera.amber.engine.architecture.controller.promisehandlers
import com.twitter.util.Future
+import org.apache.texera.amber.config.ApplicationConfig
import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
- ExecutionStatsUpdate
+ ExecutionStatsUpdate,
+ RuntimeStatisticsPersist
}
import
org.apache.texera.amber.engine.architecture.deploysemantics.layer.WorkerExecution
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
EmptyRequest,
- QueryStatisticsRequest
+ QueryStatisticsRequest,
+ StatisticsUpdateTarget
}
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
@@ -47,6 +50,33 @@ trait QueryWorkerStatisticsHandler {
private var globalQueryStatsOngoing = false
+ // Minimum of the two timer intervals converted to nanoseconds.
+ // A full-graph worker query is skipped and served from cache when the last
completed
+ // query falls within this window, avoiding redundant worker RPCs.
+ private val minQueryIntervalNs: Long =
+ Math.min(
+ ApplicationConfig.getStatusUpdateIntervalInMs,
+ ApplicationConfig.getRuntimeStatisticsPersistenceIntervalInMs
+ ) * 1_000_000L
+
+ // Nanosecond timestamp of the last completed full-graph worker stats query.
+ @volatile private var lastWorkerQueryTimestampNs: Long = 0L
+
+ // Reads the current cached stats and forwards them to the appropriate
client sink(s).
+ private def forwardStats(updateTarget: StatisticsUpdateTarget): Unit = {
+ val stats = cp.workflowExecution.getAllRegionExecutionsStats
+ updateTarget match {
+ case StatisticsUpdateTarget.UI_ONLY =>
+ sendToClient(ExecutionStatsUpdate(stats))
+ case StatisticsUpdateTarget.PERSISTENCE_ONLY =>
+ sendToClient(RuntimeStatisticsPersist(stats))
+ case StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE |
+ StatisticsUpdateTarget.Unrecognized(_) =>
+ sendToClient(ExecutionStatsUpdate(stats))
+ sendToClient(RuntimeStatisticsPersist(stats))
+ }
+ }
+
override def controllerInitiateQueryStatistics(
msg: QueryStatisticsRequest,
ctx: AsyncRPCContext
@@ -54,12 +84,20 @@ trait QueryWorkerStatisticsHandler {
// Avoid issuing concurrent full-graph statistics queries.
// If a global query is already in progress, skip this request.
if (globalQueryStatsOngoing && msg.filterByWorkers.isEmpty) {
+ // A query is already in-flight: serve the last completed query's cached
data,
+ // or drop silently if no prior query has finished yet.
+ if (lastWorkerQueryTimestampNs > 0) forwardStats(msg.updateTarget)
return EmptyReturn()
}
var opFilter: Set[PhysicalOpIdentity] = Set.empty
// Only enforce the single-query restriction for full-graph queries.
if (msg.filterByWorkers.isEmpty) {
+ if (System.nanoTime() - lastWorkerQueryTimestampNs < minQueryIntervalNs)
{
+ // Cache is still fresh: the faster timer already queried workers
recently.
+ forwardStats(msg.updateTarget)
+ return EmptyReturn()
+ }
globalQueryStatsOngoing = true
} else {
// Map the filtered worker IDs (if any) to their corresponding physical
operator IDs
@@ -133,17 +171,17 @@ trait QueryWorkerStatisticsHandler {
Future.collect(futures).flatMap(_ => processLayers(rest))
}
- // Start processing all layers and update the frontend after completion
+ // Start processing all layers and forward stats to the appropriate
sink(s) on completion.
processLayers(layers).map { _ =>
collectedResults.foreach {
case (wExec, resp, timestamp) =>
wExec.update(timestamp, resp.metrics.workerState,
resp.metrics.workerStatistics)
}
- sendToClient(
- ExecutionStatsUpdate(cp.workflowExecution.getAllRegionExecutionsStats)
- )
- // Release the global query lock if it was set
+ forwardStats(msg.updateTarget)
+ // Record the completion timestamp before releasing the lock so that any
timer
+ // firing in between sees a valid cache entry rather than triggering a
redundant query.
if (globalQueryStatsOngoing) {
+ lastWorkerQueryTimestampNs = System.nanoTime()
globalQueryStatsOngoing = false
}
EmptyReturn()
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala
index 020ad40a1c..c94ba91c20 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala
@@ -22,7 +22,8 @@ package
org.apache.texera.amber.engine.architecture.controller.promisehandlers
import com.twitter.util.Future
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
- ExecutionStatsUpdate
+ ExecutionStatsUpdate,
+ RuntimeStatisticsPersist
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
@@ -57,14 +58,14 @@ trait ResumeHandler {
.toSeq
)
.map { _ =>
- // update frontend status
- sendToClient(
- ExecutionStatsUpdate(
- cp.workflowExecution.getAllRegionExecutionsStats
- )
- )
+ // update frontend status and persist statistics
+ val stats = cp.workflowExecution.getAllRegionExecutionsStats
+ sendToClient(ExecutionStatsUpdate(stats))
+ sendToClient(RuntimeStatisticsPersist(stats))
cp.controllerTimerService
.enableStatusUpdate() //re-enabled it since it is disabled in pause
+ cp.controllerTimerService
+ .enableRuntimeStatisticsCollection() //re-enabled it since it is
disabled in pause
EmptyReturn()
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala
index 2ce28afb0f..7d938dbedd 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala
@@ -45,6 +45,7 @@ trait StartWorkflowHandler {
.coordinateRegionExecutors(cp.actorService)
.map(_ => {
cp.controllerTimerService.enableStatusUpdate()
+ cp.controllerTimerService.enableRuntimeStatisticsCollection()
StartWorkflowResponse(RUNNING)
})
} else {
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index 594673caa5..d54a22f26b 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -27,7 +27,8 @@ import
org.apache.texera.amber.engine.architecture.controller.{
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
EmptyRequest,
- QueryStatisticsRequest
+ QueryStatisticsRequest,
+ StatisticsUpdateTarget
}
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
@@ -52,7 +53,7 @@ trait WorkerExecutionCompletedHandler {
// and the user sees the last update before completion
val statsRequest =
controllerInterface.controllerInitiateQueryStatistics(
- QueryStatisticsRequest(Seq(ctx.sender)),
+ QueryStatisticsRequest(Seq(ctx.sender),
StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE),
mkContext(SELF)
)
@@ -64,6 +65,7 @@ trait WorkerExecutionCompletedHandler {
// after query result come back: send completed event, cleanup ,and
kill workflow
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
cp.controllerTimerService.disableStatusUpdate()
+ cp.controllerTimerService.disableRuntimeStatisticsCollection()
}
})
EmptyReturn()
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala
index c1fa6b1ef7..5ee98a4918 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala
@@ -22,7 +22,8 @@ package
org.apache.texera.amber.engine.architecture.controller.promisehandlers
import com.twitter.util.Future
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
- ExecutionStatsUpdate
+ ExecutionStatsUpdate,
+ RuntimeStatisticsPersist
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
@@ -50,11 +51,9 @@ trait WorkerStateUpdatedHandler {
.foreach(operatorExecution =>
operatorExecution.getWorkerExecution(ctx.sender).update(System.nanoTime(),
msg.state)
)
- sendToClient(
- ExecutionStatsUpdate(
- cp.workflowExecution.getAllRegionExecutionsStats
- )
- )
+ val stats = cp.workflowExecution.getAllRegionExecutionsStats
+ sendToClient(ExecutionStatsUpdate(stats))
+ sendToClient(RuntimeStatisticsPersist(stats))
EmptyReturn()
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 7e5b228801..e490cde3d9 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -38,6 +38,7 @@ import
org.apache.texera.amber.engine.architecture.controller.execution.{
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStatsUpdate,
+ RuntimeStatisticsPersist,
WorkerAssignmentUpdate
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
@@ -275,9 +276,9 @@ class RegionExecutionCoordinator(
val resourceConfig = region.resourceConfig.get
val regionExecution = workflowExecution.getRegionExecution(region.id)
- asyncRPCClient.sendToClient(
- ExecutionStatsUpdate(workflowExecution.getAllRegionExecutionsStats)
- )
+ val stats = workflowExecution.getAllRegionExecutionsStats
+ asyncRPCClient.sendToClient(ExecutionStatsUpdate(stats))
+ asyncRPCClient.sendToClient(RuntimeStatisticsPersist(stats))
asyncRPCClient.sendToClient(
WorkerAssignmentUpdate(
operatorsToRun
@@ -489,11 +490,9 @@ class RegionExecutionCoordinator(
region: Region,
isDependeePhase: Boolean
): Future[Seq[Unit]] = {
- asyncRPCClient.sendToClient(
- ExecutionStatsUpdate(
- workflowExecution.getAllRegionExecutionsStats
- )
- )
+ val stats = workflowExecution.getAllRegionExecutionsStats
+ asyncRPCClient.sendToClient(ExecutionStatsUpdate(stats))
+ asyncRPCClient.sendToClient(RuntimeStatisticsPersist(stats))
val allStarterOperators = region.getStarterOperators
val starterOpsForThisPhase =
if (isDependeePhase)
allStarterOperators.filter(_.dependeeInputs.nonEmpty)
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
index 841c7112c7..3703a2bf41 100644
---
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
+++
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala
@@ -28,7 +28,14 @@ import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.workflow.WorkflowContext
import
org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError
-import org.apache.texera.amber.engine.architecture.controller._
+import org.apache.texera.amber.engine.architecture.controller.{
+ ExecutionStateUpdate,
+ ExecutionStatsUpdate,
+ FatalError,
+ RuntimeStatisticsPersist,
+ WorkerAssignmentUpdate,
+ WorkflowRecoveryStatus
+}
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
COMPLETED,
@@ -175,12 +182,20 @@ class ExecutionStatsService(
}
private[this] def registerCallbackOnWorkflowStatsUpdate(): Unit = {
+ // Register callback for UI updates (UI state store update only, no
persistence)
addSubscription(
client
.registerCallback[ExecutionStatsUpdate]((evt: ExecutionStatsUpdate) =>
{
stateStore.statsStore.updateState { statsStore =>
statsStore.withOperatorInfo(evt.operatorMetrics)
}
+ })
+ )
+
+ // Register callback for statistics persistence (persistence only, no UI
update)
+ addSubscription(
+ client
+ .registerCallback[RuntimeStatisticsPersist]((evt:
RuntimeStatisticsPersist) => {
metricsPersistThread.execute(() => {
storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics))
lastPersistedMetrics = evt.operatorMetrics
diff --git a/common/config/src/main/resources/application.conf
b/common/config/src/main/resources/application.conf
index 62daee8679..c7a7af2418 100644
--- a/common/config/src/main/resources/application.conf
+++ b/common/config/src/main/resources/application.conf
@@ -31,6 +31,9 @@ constants {
status-update-interval = 500
status-update-interval = ${?CONSTANTS_STATUS_UPDATE_INTERVAL}
+
+ runtime-statistics-persistence-interval = 2000
+ runtime-statistics-persistence-interval =
${?CONSTANTS_RUNTIME_STATISTICS_PERSISTENCE_INTERVAL}
}
flow-control {
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/ApplicationConfig.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/ApplicationConfig.scala
index 415a23d23c..927b102ebc 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/ApplicationConfig.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/ApplicationConfig.scala
@@ -46,6 +46,8 @@ object ApplicationConfig {
val MAX_RESOLUTION_COLUMNS: Int =
getConfSource.getInt("constants.max-resolution-columns")
val numWorkerPerOperatorByDefault: Int =
getConfSource.getInt("constants.num-worker-per-operator")
val getStatusUpdateIntervalInMs: Long =
getConfSource.getLong("constants.status-update-interval")
+ val getRuntimeStatisticsPersistenceIntervalInMs: Long =
+ getConfSource.getLong("constants.runtime-statistics-persistence-interval")
// Flow control
val maxCreditAllowedInBytesPerChannel: Long =