kunwp1 commented on code in PR #4205:
URL: https://github.com/apache/texera/pull/4205#discussion_r2842767827
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala:
##########
@@ -47,19 +50,54 @@ trait QueryWorkerStatisticsHandler {
private var globalQueryStatsOngoing = false
+ // Minimum of the two timer intervals converted to nanoseconds.
+ // A full-graph worker query is skipped and served from cache when the last
completed
+ // query falls within this window, avoiding redundant worker RPCs.
+ private val minQueryIntervalNs: Long =
+ Math.min(
+ ApplicationConfig.getStatusUpdateIntervalInMs,
+ ApplicationConfig.getRuntimeStatisticsPersistenceIntervalInMs
+ ) * 1_000_000L
+
+ // Nanosecond timestamp of the last completed full-graph worker stats query.
+ @volatile private var lastWorkerQueryTimestampNs: Long = 0L
+
+ // Reads the current cached stats and forwards them to the appropriate
client sink(s).
+ private def forwardStats(updateTarget: StatisticsUpdateTarget): Unit = {
+ val stats = cp.workflowExecution.getAllRegionExecutionsStats
+ updateTarget match {
+ case StatisticsUpdateTarget.UI_ONLY =>
+ sendToClient(ExecutionStatsUpdate(stats))
+ case StatisticsUpdateTarget.PERSISTENCE_ONLY =>
+ sendToClient(RuntimeStatisticsPersist(stats))
+ case StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE |
+ StatisticsUpdateTarget.Unrecognized(_) =>
+ sendToClient(ExecutionStatsUpdate(stats))
+ sendToClient(RuntimeStatisticsPersist(stats))
+ }
+ }
+
override def controllerInitiateQueryStatistics(
msg: QueryStatisticsRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
// Avoid issuing concurrent full-graph statistics queries.
// If a global query is already in progress, skip this request.
if (globalQueryStatsOngoing && msg.filterByWorkers.isEmpty) {
+ // A query is already in-flight: serve the last completed query's cached
data,
+ // or drop silently if no prior query has finished yet.
+ if (lastWorkerQueryTimestampNs > 0) forwardStats(msg.updateTarget)
Review Comment:
Yes. The purpose of this logic is to prevent a loss of an update every time
both times coincide. For example, if both `status-update-interval` and
`runtime-statistics-persistence-interval` are using the same value, one of the
messages will be discarded without this logic. I couldn't think of a cleaner
way to solve this problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]