This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 598ed0f7f0 test(amber): drop stale Thread.sleep(1000) after COMPLETED
state (#4680)
598ed0f7f0 is described below
commit 598ed0f7f037b96ebe5549759158e46ba736b372
Author: Yicong Huang <[email protected]>
AuthorDate: Sat May 2 12:19:00 2026 -0700
test(amber): drop stale Thread.sleep(1000) after COMPLETED state (#4680)
### What changes were proposed in this PR?
Remove the `Thread.sleep(1000)` workaround inside the result-reading
block of `DataProcessingSpec` and `ReconfigurationSpec`. The sleep was
annotated with a TODO calling it a workaround for "the issue of
reporting `completed` status too early" — the current engine path is
synchronous end-to-end, so the workaround is dead code.
```diff
.map(terminalOpId => {
- //TODO: remove the delay after fixing the issue of reporting
"completed" status too early.
- Thread.sleep(1000)
val uri = getResultUriByLogicalPortId(...)
...
})
```
The synchronous path that emits `COMPLETED`:
1. `DataProcessor.outputOneTuple` handles `FinalizePort` →
`outputManager.closeOutputStorageWriterIfNeeded(portId)`
2. `closeOutputStorageWriterIfNeeded` puts a terminate signal on the
writer thread's queue and `join()`s it
3. The writer thread, on terminate, calls `IcebergTableWriter.close()` →
`flushBuffer()` → `table.newAppend().appendFile(dataFile).commit()`
4. Only then does `DataProcessor` fire `portCompleted` to the controller
5. After all ports complete, controller emits
`ExecutionStateUpdate(COMPLETED)` to the client
6. Read side does `IcebergDocument.get()` → `seekToUsableFile()` →
`table.refresh()` before scan
### Any related issues, documentation, discussions?
Closes #4679.
### How was this PR tested?
Ran the two specs together with `-T 1` (sequential) on a clean local
checkout, 5 consecutive runs:
```
sbt 'WorkflowExecutionService / Test / testOnly \
org.apache.texera.amber.engine.e2e.DataProcessingSpec \
org.apache.texera.amber.engine.e2e.ReconfigurationSpec'
run 1: 21/21 passed in 39.2 s
run 2: 21/21 passed in 39.5 s
run 3: 21/21 passed in 43.2 s
run 4: 21/21 passed in 37.3 s
run 5: 21/21 passed in 36.9 s
```
Compared to the same two specs with the sleep on the same machine:
| spec | before | after | delta |
|---|---|---|---|
| DataProcessingSpec | 36.5 s | 16.9 s | -19.6 s |
| ReconfigurationSpec | 27.2 s | 18.7 s | -8.5 s |
DataProcessingSpec saves more than 16 × 1 s because the sleep was inside
a per-terminal-port `.map(...)` block — workflows with multiple terminal
ports paid the cost more than once per test.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7, 1M context)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala | 2 --
.../scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala | 2 --
2 files changed, 4 deletions(-)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index 69ee9c6a5f..f93909f53f 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -129,8 +129,6 @@ class DataProcessingSpec
uri.nonEmpty
})
.map(terminalOpId => {
- //TODO: remove the delay after fixing the issue of reporting
"completed" status too early.
- Thread.sleep(1000)
val uri = getResultUriByLogicalPortId(
workflowContext.executionId,
terminalOpId,
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 92dfba19de..a125c1be00 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -132,8 +132,6 @@ class ReconfigurationSpec
uri.nonEmpty
})
.map(terminalOpId => {
- //TODO: remove the delay after fixing the issue of reporting
"completed" status too early.
- Thread.sleep(1000)
val uri = getResultUriByLogicalPortId(
workflow.context.executionId,
terminalOpId,