This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 20c3a57eac perf(amber): replace Thread.sleep with event-driven waits
in e2e specs (#4518)
20c3a57eac is described below
commit 20c3a57eac724f5f81dc8cfa16598f39e29459a8
Author: Yicong Huang <[email protected]>
AuthorDate: Sat Apr 25 21:37:43 2026 -0700
perf(amber): replace Thread.sleep with event-driven waits in e2e specs
(#4518)
### What changes were proposed in this PR?
`PauseSpec.shouldPause` and `ReconfigurationSpec.shouldReconfigure` used
hardcoded `Thread.sleep` to wait for the engine to reach `PAUSED` (4s +
0.4s + 4s in `PauseSpec`, 4s in `ReconfigurationSpec`).
Replaced with a `stateReached(client, target)` helper that resolves a
`Promise` on the next matching `ExecutionStateUpdate`. The post-resume
sleep is dropped entirely — `Await.result(resumeWorkflow)` already
blocks until `ResumeHandler` finishes, by which point all workers are in
`RUNNING`.
Wait timeout: 10s (~2.5× the original sleep, leaves CI headroom while
keeping hang feedback fast).
### Any related issues, documentation, discussions?
Closes #4517.
### How was this PR tested?
Local timing (warm sbt + DB):
| | `Run completed in` |
|---|---|
| before | 41s 869ms |
| after | 31s 355ms |
`ReconfigurationSpec` can't be measured locally on JDK 17 (Arrow
`MemoryUtil` reflection); CI on JDK 11 will exercise it.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
---
.../apache/texera/amber/engine/e2e/PauseSpec.scala | 16 ++++++++++-----
.../amber/engine/e2e/ReconfigurationSpec.scala | 11 +++++++---
.../apache/texera/amber/engine/e2e/TestUtils.scala | 24 +++++++++++++++++++++-
3 files changed, 42 insertions(+), 9 deletions(-)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
index 921e2de2bd..b459533c57 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
@@ -31,13 +31,17 @@ import
org.apache.texera.amber.engine.architecture.controller.{
ExecutionStateUpdate
}
import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
-import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
+ COMPLETED,
+ PAUSED
+}
import org.apache.texera.amber.engine.common.AmberRuntime
import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.e2e.TestUtils.{
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
- setUpWorkflowExecutionData
+ setUpWorkflowExecutionData,
+ stateReached
}
import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
import org.apache.texera.workflow.LogicalLink
@@ -107,13 +111,15 @@ class PauseSpec
completion.setDone()
}
})
+ val stateWaitTimeout = Duration.fromSeconds(10)
Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+ val firstPaused = stateReached(client, PAUSED)
Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
- Thread.sleep(4000)
+ Await.result(firstPaused, stateWaitTimeout)
Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
- Thread.sleep(400)
+ val secondPaused = stateReached(client, PAUSED)
Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
- Thread.sleep(4000)
+ Await.result(secondPaused, stateWaitTimeout)
Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
Await.result(completion, Duration.fromMinutes(1))
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 5f9f1bba18..6f344caae3 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -40,13 +40,17 @@ import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
UpdateExecutorRequest,
WorkflowReconfigureRequest
}
-import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
+ COMPLETED,
+ PAUSED
+}
import org.apache.texera.amber.engine.common.AmberRuntime
import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.e2e.TestUtils.{
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
- setUpWorkflowExecutionData
+ setUpWorkflowExecutionData,
+ stateReached
}
import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
@@ -147,8 +151,9 @@ class ReconfigurationSpec
}
})
Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+ val pausedReached = stateReached(client, PAUSED)
Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
- Thread.sleep(4000)
+ Await.result(pausedReached, Duration.fromSeconds(10))
val physicalOps = targetOps.flatMap(op =>
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index e7d729c5e2..fab5d5a16c 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -19,9 +19,12 @@
package org.apache.texera.amber.engine.e2e
+import com.twitter.util.{Promise, Return}
import org.apache.texera.amber.config.StorageConfig
import org.apache.texera.amber.core.workflow.WorkflowContext
-import org.apache.texera.amber.engine.architecture.controller.Workflow
+import
org.apache.texera.amber.engine.architecture.controller.{ExecutionStateUpdate,
Workflow}
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.operator.LogicalOp
import org.apache.texera.dao.SqlServer
import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum
@@ -117,6 +120,25 @@ object TestUtils {
workflowExecutionsDao.insert(testWorkflowExecutionEntry)
}
+ /**
+ * Returns a Promise that completes the next time the client emits an
+ * ExecutionStateUpdate with the given target state. Must be called BEFORE
+ * the action that triggers the state change, since AmberClient observables
+ * do not replay past events.
+ */
+ def stateReached(
+ client: AmberClient,
+ target: WorkflowAggregatedState
+ ): Promise[Unit] = {
+ val p = Promise[Unit]()
+ client.registerCallback[ExecutionStateUpdate](evt => {
+ if (evt.state == target) {
+ p.updateIfEmpty(Return(()))
+ }
+ })
+ p
+ }
+
def cleanupWorkflowExecutionData(): Unit = {
val dslConfig = SqlServer.getInstance().context.configuration()
val userDao = new UserDao(dslConfig)