bobbai00 opened a new issue, #5855:
URL: https://github.com/apache/texera/issues/5855
## Summary
`SyncExecutionResource.executeWorkflowSync` waits before reading an
execution's results from storage (today: two unconditional `Thread.sleep(500)`
calls). PR #5714 proposes replacing those sleeps with a bounded readiness-poll
that compares the in-memory output-stats count against the row count committed
to result storage.
**The core issue is not `sleep(500)` versus a poll versus any other waiting
mechanism. It is that the wait itself is redundant.** The engine already
guarantees that an operator's result storage is durably committed *before* that
operator is observable as `COMPLETED`, and both code paths that read results in
`SyncExecutionResource` are gated on `COMPLETED`. So by the time we read, the
data is guaranteed to be present — no sleep, no poll, and no stats-vs-storage
count comparison is needed.
Swapping the sleep for a poll trades one heuristic ("wait a fixed time and
hope") for another ("compare two numbers that are themselves racy and hope").
Neither addresses the fact that the synchronization we're approximating is
something the engine already provides as a hard happens-before guarantee.
## Why the wait is redundant
The result writer is committed synchronously on the worker's data-processing
thread *before* the worker reports the port/operator complete:
1. `OutputManager.finalizeOutput()` queues `FinalizePort(port, input=false)`
for each output port **before** `FinalizeExecutor`
(`amber/.../messaginglayer/OutputManager.scala:278-284`).
2. Processing `FinalizePort(port, false)` calls
`OutputManager.closeOutputStorageWriterIfNeeded(port)`, which sends a terminate
signal and then **blocks on `writerThread.join()`**
(`OutputManager.scala:257-264`). The writer thread's `finally` runs
`IcebergTableWriter.close()` → `flushBuffer()` →
`table.newAppend().appendFile(...).commit()`
(`common/workflow-core/.../iceberg/IcebergTableWriter.scala:106-135,140-144`).
After `join()` returns, every row is committed. `getFailure.foreach(throw _)`
re-throws a failed commit, so a commit failure becomes a `FatalError` (→
`FAILED`/`KILLED`), never `COMPLETED`.
3. **Only then** does the worker send `portCompleted`
(`DataProcessor.scala:174-181`).
4. `FinalizeExecutor` afterward transitions the worker to `COMPLETED` and
sends `workerExecutionCompleted` (`DataProcessor.scala:159-173`).
So: **commit happens-before the port/worker is reported `COMPLETED`.** Since
the read side (`IcebergDocument.getCount`/`get()`) reloads fresh catalog
metadata on each call, a read taken after observing `COMPLETED` always sees the
full result — including across the decoupled compute-unit process boundary.
Both termination paths in `SyncExecutionResource` that read results are
already gated on `COMPLETED`:
- `TerminalStateReached(COMPLETED)`
- `TargetResultsReady` — fires on `allTargetsCompleted(stats)`, i.e. every
target operator's state is `COMPLETED`.
The old "RegionExecutionCoordinator caches upstream results asynchronously
after operators complete" comment that motivated the sleep no longer
corresponds to any code; the synchronous `writerThread.join()` barrier
(introduced with state materialization across regions) closed that gap.
## Proposed resolution
- **Remove the wait** from `SyncExecutionResource` (rather than replace it
with a poll), and document the invariant it relies on.
- **Lock the invariant with engine-side tests**, since removing the safety
net turns a regression from "a slow response" into "a silent short read":
- The commit + failure-propagation half is already covered by
`OutputPortStorageWriterThreadSpec`.
- Add an e2e spec that reads result storage the instant the workflow
reports `COMPLETED` (no wait) and asserts the committed row count (`getCount`)
equals the rows actually readable, across a multi-region DAG including an
intermediate operator.
## References
- PR #5714 — the sleep→poll approach this issue argues against as the
framing.
- Closes-target of #5714: #5713.
- Key engine seam: `OutputManager.closeOutputStorageWriterIfNeeded`
(`writerThread.join()`) → `DataProcessor.outputOneTuple` (`portCompleted` /
`COMPLETED`).
---
*Filed with assistance from Claude (Opus 4.8) in compliance with the ASF
generative-AI tooling policy.*
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]