kunwp1 commented on code in PR #4205:
URL: https://github.com/apache/texera/pull/4205#discussion_r2842767827


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala:
##########
@@ -47,19 +50,54 @@ trait QueryWorkerStatisticsHandler {
 
   private var globalQueryStatsOngoing = false
 
+  // Minimum of the two timer intervals converted to nanoseconds.
+  // A full-graph worker query is skipped and served from cache when the last 
completed
+  // query falls within this window, avoiding redundant worker RPCs.
+  private val minQueryIntervalNs: Long =
+    Math.min(
+      ApplicationConfig.getStatusUpdateIntervalInMs,
+      ApplicationConfig.getRuntimeStatisticsPersistenceIntervalInMs
+    ) * 1_000_000L
+
+  // Nanosecond timestamp of the last completed full-graph worker stats query.
+  @volatile private var lastWorkerQueryTimestampNs: Long = 0L
+
+  // Reads the current cached stats and forwards them to the appropriate 
client sink(s).
+  private def forwardStats(updateTarget: StatisticsUpdateTarget): Unit = {
+    val stats = cp.workflowExecution.getAllRegionExecutionsStats
+    updateTarget match {
+      case StatisticsUpdateTarget.UI_ONLY =>
+        sendToClient(ExecutionStatsUpdate(stats))
+      case StatisticsUpdateTarget.PERSISTENCE_ONLY =>
+        sendToClient(RuntimeStatisticsPersist(stats))
+      case StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE |
+          StatisticsUpdateTarget.Unrecognized(_) =>
+        sendToClient(ExecutionStatsUpdate(stats))
+        sendToClient(RuntimeStatisticsPersist(stats))
+    }
+  }
+
   override def controllerInitiateQueryStatistics(
       msg: QueryStatisticsRequest,
       ctx: AsyncRPCContext
   ): Future[EmptyReturn] = {
     // Avoid issuing concurrent full-graph statistics queries.
     // If a global query is already in progress, skip this request.
     if (globalQueryStatsOngoing && msg.filterByWorkers.isEmpty) {
+      // A query is already in-flight: serve the last completed query's cached 
data,
+      // or drop silently if no prior query has finished yet.
+      if (lastWorkerQueryTimestampNs > 0) forwardStats(msg.updateTarget)

Review Comment:
   Yes. The purpose of this logic is to prevent a loss of an update every time 
both times coincide. For example, if both `status-update-interval` and 
`runtime-statistics-persistence-interval` are using the same value, one of the 
messages will be discarded without this logic. I couldn't think of a cleaner 
way to solve this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to