Xiao-zhen-Liu commented on code in PR #4205:
URL: https://github.com/apache/texera/pull/4205#discussion_r2834764481
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala:
##########
@@ -47,19 +50,54 @@ trait QueryWorkerStatisticsHandler {
private var globalQueryStatsOngoing = false
+ // Minimum of the two timer intervals converted to nanoseconds.
+ // A full-graph worker query is skipped and served from cache when the last
completed
+ // query falls within this window, avoiding redundant worker RPCs.
+ private val minQueryIntervalNs: Long =
+ Math.min(
+ ApplicationConfig.getStatusUpdateIntervalInMs,
+ ApplicationConfig.getRuntimeStatisticsPersistenceIntervalInMs
+ ) * 1_000_000L
+
+ // Nanosecond timestamp of the last completed full-graph worker stats query.
+ @volatile private var lastWorkerQueryTimestampNs: Long = 0L
+
+ // Reads the current cached stats and forwards them to the appropriate
client sink(s).
+ private def forwardStats(updateTarget: StatisticsUpdateTarget): Unit = {
+ val stats = cp.workflowExecution.getAllRegionExecutionsStats
+ updateTarget match {
+ case StatisticsUpdateTarget.UI_ONLY =>
+ sendToClient(ExecutionStatsUpdate(stats))
+ case StatisticsUpdateTarget.PERSISTENCE_ONLY =>
+ sendToClient(RuntimeStatisticsPersist(stats))
+ case StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE |
+ StatisticsUpdateTarget.Unrecognized(_) =>
+ sendToClient(ExecutionStatsUpdate(stats))
+ sendToClient(RuntimeStatisticsPersist(stats))
+ }
+ }
+
override def controllerInitiateQueryStatistics(
msg: QueryStatisticsRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
// Avoid issuing concurrent full-graph statistics queries.
// If a global query is already in progress, skip this request.
if (globalQueryStatsOngoing && msg.filterByWorkers.isEmpty) {
+ // A query is already in-flight: serve the last completed query's cached
data,
+ // or drop silently if no prior query has finished yet.
+ if (lastWorkerQueryTimestampNs > 0) forwardStats(msg.updateTarget)
Review Comment:
I'm trying to understand the behavior of this change. Does this mean only
the concurrent requests before the first `globalQuery` finishes will be
dropped, and after the first `globalQuery` of a workflow finishes, all
subsequent concurrent requests of an ongoing `globalQuery` will be served from
cache? (Previously, any concurrent request will be dropped.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]