aglinxinyuan commented on code in PR #5714:
URL: https://github.com/apache/texera/pull/5714#discussion_r3447899578


##########
amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala:
##########
@@ -375,6 +375,104 @@ class SyncExecutionResource extends LazyLogging {
     }
   }
 
+  /**
+    * Blocks until every target operator's default external result port holds 
at least as many rows
+    * as its stats report, or until `timeoutMillis` elapses. Operators with no 
result storage are
+    * treated as ready.
+    */
+  private def awaitResultsPersisted(
+      executionId: ExecutionIdentity,
+      executionService: org.apache.texera.web.service.WorkflowExecutionService,
+      targetOperatorIds: List[String],
+      timeoutMillis: Long = 2000L,
+      pollIntervalMillis: Long = 25L
+  ): Unit = {
+    def expectedOutputCount(opId: String): Long =
+      expectedDefaultPortOutputCount(
+        executionService.executionStateStore.statsStore.getState,
+        opId
+      )
+
+    def committedCount(opId: String): Option[Long] =
+      committedDefaultPortCount(
+        op =>
+          WorkflowExecutionsResource
+            .getResultUriByLogicalPortId(executionId, OperatorIdentity(op), 
PortIdentity()),
+        uri =>
+          DocumentFactory
+            .openDocument(uri)
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+            .getCount
+      )(opId)
+
+    awaitUntil(
+      targetOperatorIds,
+      expectedOutputCount,
+      committedCount,
+      timeoutMillis,
+      pollIntervalMillis,
+      () => System.currentTimeMillis(),
+      Thread.sleep
+    )
+  }
+
+  // Default external output port (PortIdentity()) row count from stats; 0 if 
absent.
+  private[resource] def expectedDefaultPortOutputCount(
+      stats: ExecutionStatsStore,
+      opId: String
+  ): Long =
+    stats.operatorInfo
+      .get(opId)
+      .flatMap { metrics =>
+        metrics.operatorStatistics.outputMetrics
+          .find(_.portId == PortIdentity())
+          .map(_.tupleMetrics.count)
+      }
+      .getOrElse(0L)
+
+  // Committed rows for the default result port; None when no storage, 0 when 
countOf throws.
+  private[resource] def committedDefaultPortCount(
+      resultUriOf: String => Option[URI],
+      countOf: URI => Long
+  )(opId: String): Option[Long] =
+    resultUriOf(opId).map { uri =>
+      try {
+        countOf(uri)
+      } catch {
+        case _: Exception => 0L

Review Comment:
   This swallows every exception as `0L` with no log. Fine for the intended 
"document not created yet → keep polling" case, but a genuine storage error 
would now surface as a silent 2s stall + empty results. A `logger.debug` in the 
catch would keep it diagnosable without changing behavior.



##########
amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala:
##########
@@ -375,6 +375,104 @@ class SyncExecutionResource extends LazyLogging {
     }
   }
 
+  /**
+    * Blocks until every target operator's default external result port holds 
at least as many rows
+    * as its stats report, or until `timeoutMillis` elapses. Operators with no 
result storage are
+    * treated as ready.
+    */
+  private def awaitResultsPersisted(
+      executionId: ExecutionIdentity,
+      executionService: org.apache.texera.web.service.WorkflowExecutionService,
+      targetOperatorIds: List[String],
+      timeoutMillis: Long = 2000L,
+      pollIntervalMillis: Long = 25L
+  ): Unit = {
+    def expectedOutputCount(opId: String): Long =
+      expectedDefaultPortOutputCount(
+        executionService.executionStateStore.statsStore.getState,
+        opId
+      )
+
+    def committedCount(opId: String): Option[Long] =
+      committedDefaultPortCount(
+        op =>
+          WorkflowExecutionsResource
+            .getResultUriByLogicalPortId(executionId, OperatorIdentity(op), 
PortIdentity()),
+        uri =>
+          DocumentFactory
+            .openDocument(uri)
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+            .getCount
+      )(opId)
+
+    awaitUntil(
+      targetOperatorIds,
+      expectedOutputCount,
+      committedCount,
+      timeoutMillis,
+      pollIntervalMillis,
+      () => System.currentTimeMillis(),
+      Thread.sleep
+    )
+  }
+
+  // Default external output port (PortIdentity()) row count from stats; 0 if 
absent.
+  private[resource] def expectedDefaultPortOutputCount(
+      stats: ExecutionStatsStore,
+      opId: String
+  ): Long =
+    stats.operatorInfo
+      .get(opId)
+      .flatMap { metrics =>
+        metrics.operatorStatistics.outputMetrics
+          .find(_.portId == PortIdentity())
+          .map(_.tupleMetrics.count)
+      }
+      .getOrElse(0L)
+
+  // Committed rows for the default result port; None when no storage, 0 when 
countOf throws.
+  private[resource] def committedDefaultPortCount(
+      resultUriOf: String => Option[URI],
+      countOf: URI => Long
+  )(opId: String): Option[Long] =
+    resultUriOf(opId).map { uri =>
+      try {
+        countOf(uri)
+      } catch {
+        case _: Exception => 0L
+      }
+    }
+
+  /**
+    * Blocks until every target operator is ready or `timeoutMillis` elapses, 
sleeping
+    * `pollIntervalMillis` between checks. An operator is ready when its 
expected count is
+    * non-positive, it has no committed count, or its committed count reaches 
the expected count.
+    * The clock and sleep are injected so tests can drive timing.
+    */
+  private[resource] def awaitUntil(
+      targetOperatorIds: List[String],
+      expectedCountOf: String => Long,
+      committedCountOf: String => Option[Long],
+      timeoutMillis: Long,
+      pollIntervalMillis: Long,
+      now: () => Long,
+      sleep: Long => Unit
+  ): Unit = {
+    if (targetOperatorIds.isEmpty) return
+
+    def ready: Boolean =
+      targetOperatorIds.forall { opId =>
+        val expected = expectedCountOf(opId)
+        expected <= 0 || committedCountOf(opId).forall(_ >= expected)
+      }
+
+    val deadline = now() + timeoutMillis
+    while (!ready && now() < deadline) {

Review Comment:
   On Copilot's second comment — I'd wave most of it through. The oversleep is 
one ~25ms interval against a 2s cap, and the happy path never sleeps at all. 
The only real part is the extra `committedCountOf` (a doc open) on the final 
pass once the deadline is crossed, but that's one open among ~80 over a full 
timeout. Not worth restructuring the loop.



##########
amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala:
##########
@@ -375,6 +375,104 @@ class SyncExecutionResource extends LazyLogging {
     }
   }
 
+  /**
+    * Blocks until every target operator's default external result port holds 
at least as many rows
+    * as its stats report, or until `timeoutMillis` elapses. Operators with no 
result storage are
+    * treated as ready.
+    */
+  private def awaitResultsPersisted(
+      executionId: ExecutionIdentity,
+      executionService: org.apache.texera.web.service.WorkflowExecutionService,
+      targetOperatorIds: List[String],
+      timeoutMillis: Long = 2000L,
+      pollIntervalMillis: Long = 25L
+  ): Unit = {
+    def expectedOutputCount(opId: String): Long =
+      expectedDefaultPortOutputCount(
+        executionService.executionStateStore.statsStore.getState,
+        opId
+      )
+
+    def committedCount(opId: String): Option[Long] =
+      committedDefaultPortCount(
+        op =>
+          WorkflowExecutionsResource
+            .getResultUriByLogicalPortId(executionId, OperatorIdentity(op), 
PortIdentity()),
+        uri =>
+          DocumentFactory
+            .openDocument(uri)
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+            .getCount
+      )(opId)
+
+    awaitUntil(
+      targetOperatorIds,
+      expectedOutputCount,
+      committedCount,
+      timeoutMillis,
+      pollIntervalMillis,
+      () => System.currentTimeMillis(),

Review Comment:
   Copilot's right here — this should be a monotonic clock; `currentTimeMillis` 
can step under NTP. Practical risk over a 2s window is tiny, but the fix is 
cheap and `awaitUntil` is already unit-agnostic: pass `() => System.nanoTime()` 
and convert `timeoutMillis` to nanos for the deadline (leave 
`pollIntervalMillis` in millis — it only feeds `sleep`). No test changes. 
Optional, but might as well take it.



##########
amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala:
##########
@@ -375,6 +375,104 @@ class SyncExecutionResource extends LazyLogging {
     }
   }
 
+  /**
+    * Blocks until every target operator's default external result port holds 
at least as many rows
+    * as its stats report, or until `timeoutMillis` elapses. Operators with no 
result storage are
+    * treated as ready.
+    */
+  private def awaitResultsPersisted(
+      executionId: ExecutionIdentity,
+      executionService: org.apache.texera.web.service.WorkflowExecutionService,
+      targetOperatorIds: List[String],
+      timeoutMillis: Long = 2000L,
+      pollIntervalMillis: Long = 25L
+  ): Unit = {
+    def expectedOutputCount(opId: String): Long =
+      expectedDefaultPortOutputCount(
+        executionService.executionStateStore.statsStore.getState,
+        opId
+      )
+
+    def committedCount(opId: String): Option[Long] =
+      committedDefaultPortCount(
+        op =>
+          WorkflowExecutionsResource
+            .getResultUriByLogicalPortId(executionId, OperatorIdentity(op), 
PortIdentity()),
+        uri =>
+          DocumentFactory
+            .openDocument(uri)
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+            .getCount
+      )(opId)
+
+    awaitUntil(
+      targetOperatorIds,
+      expectedOutputCount,
+      committedCount,
+      timeoutMillis,
+      pollIntervalMillis,
+      () => System.currentTimeMillis(),
+      Thread.sleep
+    )
+  }
+
+  // Default external output port (PortIdentity()) row count from stats; 0 if 
absent.
+  private[resource] def expectedDefaultPortOutputCount(
+      stats: ExecutionStatsStore,
+      opId: String
+  ): Long =
+    stats.operatorInfo
+      .get(opId)
+      .flatMap { metrics =>
+        metrics.operatorStatistics.outputMetrics
+          .find(_.portId == PortIdentity())

Review Comment:
   This is the crux of the change — `expected` is the default-output-port tuple 
count, and `committed` reads the doc at `getResultUriByLogicalPortId(..., 
PortIdentity())`, i.e. the same port, so the comparison is apples-to-apples. 
Can you confirm the two are always equal for a sink port — no fan-out or 
filtering result writer that would leave `committed < expected` permanently? If 
they can diverge, the poll never satisfies and every such run eats the full 2s 
cap instead of returning early.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to