This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 598ed0f7f0 test(amber): drop stale Thread.sleep(1000) after COMPLETED 
state (#4680)
598ed0f7f0 is described below

commit 598ed0f7f037b96ebe5549759158e46ba736b372
Author: Yicong Huang <[email protected]>
AuthorDate: Sat May 2 12:19:00 2026 -0700

    test(amber): drop stale Thread.sleep(1000) after COMPLETED state (#4680)
    
    ### What changes were proposed in this PR?
    
    Remove the `Thread.sleep(1000)` workaround inside the result-reading
    block of `DataProcessingSpec` and `ReconfigurationSpec`. The sleep was
    annotated with a TODO calling it a workaround for "the issue of
    reporting `completed` status too early" — the current engine path is
    synchronous end-to-end, so the workaround is dead code.
    
    ```diff
                 .map(terminalOpId => {
    -              //TODO: remove the delay after fixing the issue of reporting 
"completed" status too early.
    -              Thread.sleep(1000)
                   val uri = getResultUriByLogicalPortId(...)
                   ...
                 })
    ```
    
    The synchronous path that emits `COMPLETED`:
    
    1. `DataProcessor.outputOneTuple` handles `FinalizePort` →
    `outputManager.closeOutputStorageWriterIfNeeded(portId)`
    2. `closeOutputStorageWriterIfNeeded` puts a terminate signal on the
    writer thread's queue and `join()`s it
    3. The writer thread, on terminate, calls `IcebergTableWriter.close()` →
    `flushBuffer()` → `table.newAppend().appendFile(dataFile).commit()`
    4. Only then does `DataProcessor` fire `portCompleted` to the controller
    5. After all ports complete, controller emits
    `ExecutionStateUpdate(COMPLETED)` to the client
    6. Read side does `IcebergDocument.get()` → `seekToUsableFile()` →
    `table.refresh()` before scan
    
    ### Any related issues, documentation, discussions?
    
    Closes #4679.
    
    ### How was this PR tested?
    
    Ran the two specs together with `-T 1` (sequential) on a clean local
    checkout, 5 consecutive runs:
    
    ```
    sbt 'WorkflowExecutionService / Test / testOnly \
      org.apache.texera.amber.engine.e2e.DataProcessingSpec \
      org.apache.texera.amber.engine.e2e.ReconfigurationSpec'
    
    run 1: 21/21 passed in 39.2 s
    run 2: 21/21 passed in 39.5 s
    run 3: 21/21 passed in 43.2 s
    run 4: 21/21 passed in 37.3 s
    run 5: 21/21 passed in 36.9 s
    ```
    
    Compared to the same two specs with the sleep on the same machine:
    
    | spec | before | after | delta |
    |---|---|---|---|
    | DataProcessingSpec  | 36.5 s | 16.9 s | -19.6 s |
    | ReconfigurationSpec | 27.2 s | 18.7 s | -8.5 s  |
    
    DataProcessingSpec saves more than 16 × 1 s because the sleep was inside
    a per-terminal-port `.map(...)` block — workflows with multiple terminal
    ports paid the cost more than once per test.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7, 1M context)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala   | 2 --
 .../scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala  | 2 --
 2 files changed, 4 deletions(-)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index 69ee9c6a5f..f93909f53f 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -129,8 +129,6 @@ class DataProcessingSpec
               uri.nonEmpty
             })
             .map(terminalOpId => {
-              //TODO: remove the delay after fixing the issue of reporting 
"completed" status too early.
-              Thread.sleep(1000)
               val uri = getResultUriByLogicalPortId(
                 workflowContext.executionId,
                 terminalOpId,
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 92dfba19de..a125c1be00 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -132,8 +132,6 @@ class ReconfigurationSpec
               uri.nonEmpty
             })
             .map(terminalOpId => {
-              //TODO: remove the delay after fixing the issue of reporting 
"completed" status too early.
-              Thread.sleep(1000)
               val uri = getResultUriByLogicalPortId(
                 workflow.context.executionId,
                 terminalOpId,

Reply via email to