This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 20c3a57eac perf(amber): replace Thread.sleep with event-driven waits 
in e2e specs (#4518)
20c3a57eac is described below

commit 20c3a57eac724f5f81dc8cfa16598f39e29459a8
Author: Yicong Huang <[email protected]>
AuthorDate: Sat Apr 25 21:37:43 2026 -0700

    perf(amber): replace Thread.sleep with event-driven waits in e2e specs 
(#4518)
    
    ### What changes were proposed in this PR?
    
    `PauseSpec.shouldPause` and `ReconfigurationSpec.shouldReconfigure` used
    hardcoded `Thread.sleep` to wait for the engine to reach `PAUSED` (4s +
    0.4s + 4s in `PauseSpec`, 4s in `ReconfigurationSpec`).
    
    Replaced with a `stateReached(client, target)` helper that resolves a
    `Promise` on the next matching `ExecutionStateUpdate`. The post-resume
    sleep is dropped entirely — `Await.result(resumeWorkflow)` already
    blocks until `ResumeHandler` finishes, by which point all workers are in
    `RUNNING`.
    
    Wait timeout: 10s (~2.5× the original sleep, leaves CI headroom while
    keeping hang feedback fast).
    
    ### Any related issues, documentation, discussions?
    
    Closes #4517.
    
    ### How was this PR tested?
    
    Local timing (warm sbt + DB):
    
    | | `Run completed in` |
    |---|---|
    | before | 41s 869ms |
    | after  | 31s 355ms |
    
    `ReconfigurationSpec` can't be measured locally on JDK 17 (Arrow
    `MemoryUtil` reflection); CI on JDK 11 will exercise it.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7)
---
 .../apache/texera/amber/engine/e2e/PauseSpec.scala | 16 ++++++++++-----
 .../amber/engine/e2e/ReconfigurationSpec.scala     | 11 +++++++---
 .../apache/texera/amber/engine/e2e/TestUtils.scala | 24 +++++++++++++++++++++-
 3 files changed, 42 insertions(+), 9 deletions(-)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
index 921e2de2bd..b459533c57 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
@@ -31,13 +31,17 @@ import 
org.apache.texera.amber.engine.architecture.controller.{
   ExecutionStateUpdate
 }
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
+  COMPLETED,
+  PAUSED
+}
 import org.apache.texera.amber.engine.common.AmberRuntime
 import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   cleanupWorkflowExecutionData,
   initiateTexeraDBForTestCases,
-  setUpWorkflowExecutionData
+  setUpWorkflowExecutionData,
+  stateReached
 }
 import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
 import org.apache.texera.workflow.LogicalLink
@@ -107,13 +111,15 @@ class PauseSpec
           completion.setDone()
         }
       })
+    val stateWaitTimeout = Duration.fromSeconds(10)
     Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+    val firstPaused = stateReached(client, PAUSED)
     Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
-    Thread.sleep(4000)
+    Await.result(firstPaused, stateWaitTimeout)
     Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
-    Thread.sleep(400)
+    val secondPaused = stateReached(client, PAUSED)
     Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
-    Thread.sleep(4000)
+    Await.result(secondPaused, stateWaitTimeout)
     Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
     Await.result(completion, Duration.fromMinutes(1))
   }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 5f9f1bba18..6f344caae3 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -40,13 +40,17 @@ import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   UpdateExecutorRequest,
   WorkflowReconfigureRequest
 }
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
+  COMPLETED,
+  PAUSED
+}
 import org.apache.texera.amber.engine.common.AmberRuntime
 import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   cleanupWorkflowExecutionData,
   initiateTexeraDBForTestCases,
-  setUpWorkflowExecutionData
+  setUpWorkflowExecutionData,
+  stateReached
 }
 import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
@@ -147,8 +151,9 @@ class ReconfigurationSpec
         }
       })
     Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+    val pausedReached = stateReached(client, PAUSED)
     Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
-    Thread.sleep(4000)
+    Await.result(pausedReached, Duration.fromSeconds(10))
     val physicalOps = targetOps.flatMap(op =>
       workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
     )
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index e7d729c5e2..fab5d5a16c 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -19,9 +19,12 @@
 
 package org.apache.texera.amber.engine.e2e
 
+import com.twitter.util.{Promise, Return}
 import org.apache.texera.amber.config.StorageConfig
 import org.apache.texera.amber.core.workflow.WorkflowContext
-import org.apache.texera.amber.engine.architecture.controller.Workflow
+import 
org.apache.texera.amber.engine.architecture.controller.{ExecutionStateUpdate, 
Workflow}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.operator.LogicalOp
 import org.apache.texera.dao.SqlServer
 import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum
@@ -117,6 +120,25 @@ object TestUtils {
     workflowExecutionsDao.insert(testWorkflowExecutionEntry)
   }
 
+  /**
+    * Returns a Promise that completes the next time the client emits an
+    * ExecutionStateUpdate with the given target state. Must be called BEFORE
+    * the action that triggers the state change, since AmberClient observables
+    * do not replay past events.
+    */
+  def stateReached(
+      client: AmberClient,
+      target: WorkflowAggregatedState
+  ): Promise[Unit] = {
+    val p = Promise[Unit]()
+    client.registerCallback[ExecutionStateUpdate](evt => {
+      if (evt.state == target) {
+        p.updateIfEmpty(Return(()))
+      }
+    })
+    p
+  }
+
   def cleanupWorkflowExecutionData(): Unit = {
     val dslConfig = SqlServer.getInstance().context.configuration()
     val userDao = new UserDao(dslConfig)

Reply via email to