This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 8803d084cd fix(test): isolate e2e suites by unique workflow/execution 
id (#5888)
8803d084cd is described below

commit 8803d084cd68fc98c06c782aa7a56450e7ddc9ee
Author: Matthew B. <[email protected]>
AuthorDate: Mon Jun 22 22:30:09 2026 -0700

    fix(test): isolate e2e suites by unique workflow/execution id (#5888)
    
    ### What changes were proposed in this PR?
    - Add `TestUtils.workflowContext(id, settings)` that sets both
    `workflowId` and `executionId` to `id`, and make the DB fixtures plus
    `setUp`/`cleanupWorkflowExecutionData` take an `id` (the user email is
    derived from `id` to avoid the unique-email collision).
    - Give each materializing e2e suite a distinct id so concurrent suites
    no longer share an Iceberg result keyspace or DB rows:
    DataProcessingSpec=1, PauseSpec=2, ReconfigurationSpec=3,
    ReconfigurationIntegrationSpec=4.
    - Test-only change, no production code; BatchSizePropagationSpec and
    CheckpointSpec are untouched because they do not materialize results.
    ### Any related issues, documentation, discussions?
    Closes: #5887
    ### How was this PR tested?
    - `sbt "WorkflowExecutionService/Test/compile"` compiles clean on Java
    17.
    - Run the previously-flaky suite against the integration services
    (Postgres test DB + MinIO/S3 + Iceberg catalog, as in the `build /
    amber` CI job): `sbt "WorkflowExecutionService/testOnly
    *DataProcessingSpec"`; expect all DataProcessingSpec tests green with no
    CommitFailedException flake.
    - The timing-dependent flake could not be reproduced locally (no
    MinIO/Iceberg env), so final verification is the `build / amber` CI job
    staying green across re-runs.
    ### Was this PR authored or co-authored using generative AI tooling?
    Co-authored with Claude Opus 4.8 in compliance with ASF
---
 .../e2e/ReconfigurationIntegrationSpec.scala       | 15 ++---
 .../amber/engine/e2e/DataProcessingSpec.scala      | 13 ++--
 .../apache/texera/amber/engine/e2e/PauseSpec.scala | 10 +--
 .../amber/engine/e2e/ReconfigurationSpec.scala     |  9 +--
 .../apache/texera/amber/engine/e2e/TestUtils.scala | 72 ++++++++++++++--------
 5 files changed, 74 insertions(+), 45 deletions(-)

diff --git 
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
index 6f0936da28..5d2ed7e5e4 100644
--- 
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
+++ 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
@@ -28,7 +28,7 @@ import org.apache.texera.amber.clustering.SingleNodeListener
 import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.PortIdentity
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerConfig,
   ExecutionStateUpdate
@@ -80,14 +80,15 @@ class ReconfigurationIntegrationSpec
   implicit val timeout: Timeout = Timeout(5.seconds)
 
   val logger = Logger("ReconfigurationIntegrationSpecLogger")
-  val ctx = new WorkflowContext()
+  private val specId = 4
+  val ctx = TestUtils.workflowContext(specId)
 
   override protected def beforeEach(): Unit = {
-    setUpWorkflowExecutionData()
+    setUpWorkflowExecutionData(specId)
   }
 
   override protected def afterEach(): Unit = {
-    cleanupWorkflowExecutionData()
+    cleanupWorkflowExecutionData(specId)
   }
 
   override def beforeAll(): Unit = {
@@ -117,12 +118,12 @@ class ReconfigurationIntegrationSpec
     */
   private def warmupOnce(): Unit = {
     val warmupCap = Duration.fromSeconds(10)
-    setUpWorkflowExecutionData()
+    setUpWorkflowExecutionData(specId)
     var client: AmberClient = null
     try {
       val src = new TextInputSourceOpDesc()
       src.textInput = "warmup"
-      val warmupCtx = new WorkflowContext()
+      val warmupCtx = TestUtils.workflowContext(specId)
       val workflow = buildWorkflow(List(src), List.empty, warmupCtx)
       client = new AmberClient(
         system,
@@ -150,7 +151,7 @@ class ReconfigurationIntegrationSpec
         try client.shutdown()
         catch { case _: Throwable => () }
       }
-      cleanupWorkflowExecutionData()
+      cleanupWorkflowExecutionData(specId)
     }
   }
 
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index 2606d9d656..6dde4eff9f 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -66,21 +66,24 @@ class DataProcessingSpec
 
   implicit val timeout: Timeout = Timeout(5.seconds)
 
-  val workflowContext: WorkflowContext = new WorkflowContext()
+  private val specId = 1
 
-  val materializedWorkflowContext: WorkflowContext = new WorkflowContext(
-    workflowSettings = WorkflowSettings(
+  val workflowContext: WorkflowContext = TestUtils.workflowContext(specId)
+
+  val materializedWorkflowContext: WorkflowContext = TestUtils.workflowContext(
+    specId,
+    WorkflowSettings(
       dataTransferBatchSize = 400,
       executionMode = ExecutionMode.MATERIALIZED
     )
   )
 
   override protected def beforeEach(): Unit = {
-    setUpWorkflowExecutionData()
+    setUpWorkflowExecutionData(specId)
   }
 
   override protected def afterEach(): Unit = {
-    cleanupWorkflowExecutionData()
+    cleanupWorkflowExecutionData(specId)
   }
 
   override def beforeAll(): Unit = {
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
index 2cc268608f..036caefa0a 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
@@ -25,7 +25,7 @@ import org.apache.pekko.util.Timeout
 import com.twitter.util.{Await, Duration, Promise}
 import com.typesafe.scalalogging.Logger
 import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.PortIdentity
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerConfig,
   ExecutionStateUpdate
@@ -70,12 +70,14 @@ class PauseSpec
 
   val logger = Logger("PauseSpecLogger")
 
+  private val specId = 2
+
   override protected def beforeEach(): Unit = {
-    setUpWorkflowExecutionData()
+    setUpWorkflowExecutionData(specId)
   }
 
   override protected def afterEach(): Unit = {
-    cleanupWorkflowExecutionData()
+    cleanupWorkflowExecutionData(specId)
   }
 
   override def beforeAll(): Unit = {
@@ -95,7 +97,7 @@ class PauseSpec
       links: List[LogicalLink]
   ): Unit = {
     val workflow =
-      TestUtils.buildWorkflow(operators, links, new WorkflowContext())
+      TestUtils.buildWorkflow(operators, links, 
TestUtils.workflowContext(specId))
     val client =
       new AmberClient(
         system,
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 2cd3559736..6dfb23f6ac 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -27,7 +27,7 @@ import org.apache.texera.amber.clustering.SingleNodeListener
 import org.apache.texera.amber.core.executor.OpExecInitInfo
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.PortIdentity
 import org.apache.texera.amber.engine.common.AmberRuntime
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   cleanupWorkflowExecutionData,
@@ -60,14 +60,15 @@ class ReconfigurationSpec
   implicit val timeout: Timeout = Timeout(5.seconds)
 
   val logger = Logger("ReconfigurationSpecLogger")
-  val ctx = new WorkflowContext()
+  private val specId = 3
+  val ctx = TestUtils.workflowContext(specId)
 
   override protected def beforeEach(): Unit = {
-    setUpWorkflowExecutionData()
+    setUpWorkflowExecutionData(specId)
   }
 
   override protected def afterEach(): Unit = {
-    cleanupWorkflowExecutionData()
+    cleanupWorkflowExecutionData(specId)
   }
 
   override def beforeAll(): Unit = {
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index 2a87fe3490..ac71483a5d 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -26,8 +26,12 @@ import org.apache.texera.amber.core.executor.OpExecInitInfo
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
-import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
OperatorIdentity}
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.virtualidentity.{
+  ExecutionIdentity,
+  OperatorIdentity,
+  WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext, 
WorkflowSettings}
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerConfig,
   ExecutionStateUpdate,
@@ -66,6 +70,22 @@ import org.apache.texera.workflow.{LogicalLink, 
WorkflowCompiler}
 
 object TestUtils {
 
+  /**
+    * A WorkflowContext whose workflow- and execution-id are both `id`. Each 
e2e
+    * suite passes a distinct id so its results land in a disjoint storage
+    * keyspace (`vfs:///wid/{id}/eid/{id}/...`) and disjoint DB rows, letting 
the
+    * suites run concurrently without colliding on the shared Iceberg catalog.
+    */
+  def workflowContext(
+      id: Int,
+      workflowSettings: WorkflowSettings = WorkflowSettings()
+  ): WorkflowContext =
+    new WorkflowContext(
+      workflowId = WorkflowIdentity(id.toLong),
+      executionId = ExecutionIdentity(id.toLong),
+      workflowSettings = workflowSettings
+    )
+
   def buildWorkflow(
       operators: List[LogicalOp],
       links: List[LogicalLink],
@@ -186,53 +206,55 @@ object TestUtils {
     )
   }
 
-  val testUser: User = {
+  // All fixture rows for one suite share `id` as uid/wid/vid/eid; the email is
+  // derived from it so concurrent suites don't collide on the unique email 
key.
+  def testUser(id: Int): User = {
     val user = new User
-    user.setUid(Integer.valueOf(1))
-    user.setName("test_user")
+    user.setUid(Integer.valueOf(id))
+    user.setName(s"test_user_$id")
     user.setRole(UserRoleEnum.ADMIN)
     user.setPassword("123")
-    user.setEmail("[email protected]")
+    user.setEmail(s"[email protected]")
     user
   }
 
-  val testWorkflowEntry: WorkflowPojo = {
+  def testWorkflowEntry(id: Int): WorkflowPojo = {
     val workflow = new WorkflowPojo
     workflow.setName("test workflow")
-    workflow.setWid(Integer.valueOf(1))
+    workflow.setWid(Integer.valueOf(id))
     workflow.setContent("test workflow content")
     workflow.setDescription("test description")
     workflow
   }
 
-  val testWorkflowVersionEntry: WorkflowVersion = {
+  def testWorkflowVersionEntry(id: Int): WorkflowVersion = {
     val workflowVersion = new WorkflowVersion
-    workflowVersion.setWid(Integer.valueOf(1))
-    workflowVersion.setVid(Integer.valueOf(1))
+    workflowVersion.setWid(Integer.valueOf(id))
+    workflowVersion.setVid(Integer.valueOf(id))
     workflowVersion.setContent("test version content")
     workflowVersion
   }
 
-  val testWorkflowExecutionEntry: WorkflowExecutions = {
+  def testWorkflowExecutionEntry(id: Int): WorkflowExecutions = {
     val workflowExecution = new WorkflowExecutions
-    workflowExecution.setEid(Integer.valueOf(1))
-    workflowExecution.setVid(Integer.valueOf(1))
-    workflowExecution.setUid(Integer.valueOf(1))
+    workflowExecution.setEid(Integer.valueOf(id))
+    workflowExecution.setVid(Integer.valueOf(id))
+    workflowExecution.setUid(Integer.valueOf(id))
     workflowExecution.setStatus(3.toByte)
     workflowExecution.setEnvironmentVersion("test engine")
     workflowExecution
   }
 
-  def setUpWorkflowExecutionData(): Unit = {
+  def setUpWorkflowExecutionData(id: Int): Unit = {
     val dslConfig = SqlServer.getInstance().context.configuration()
     val userDao = new UserDao(dslConfig)
     val workflowDao = new WorkflowDao(dslConfig)
     val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
     val workflowVersionDao = new WorkflowVersionDao(dslConfig)
-    userDao.insert(testUser)
-    workflowDao.insert(testWorkflowEntry)
-    workflowVersionDao.insert(testWorkflowVersionEntry)
-    workflowExecutionsDao.insert(testWorkflowExecutionEntry)
+    userDao.insert(testUser(id))
+    workflowDao.insert(testWorkflowEntry(id))
+    workflowVersionDao.insert(testWorkflowVersionEntry(id))
+    workflowExecutionsDao.insert(testWorkflowExecutionEntry(id))
   }
 
   /**
@@ -318,16 +340,16 @@ object TestUtils {
     result
   }
 
-  def cleanupWorkflowExecutionData(): Unit = {
+  def cleanupWorkflowExecutionData(id: Int): Unit = {
     val dslConfig = SqlServer.getInstance().context.configuration()
     val userDao = new UserDao(dslConfig)
     val workflowDao = new WorkflowDao(dslConfig)
     val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
     val workflowVersionDao = new WorkflowVersionDao(dslConfig)
-    workflowExecutionsDao.deleteById(1)
-    workflowVersionDao.deleteById(1)
-    workflowDao.deleteById(1)
-    userDao.deleteById(1)
+    workflowExecutionsDao.deleteById(id)
+    workflowVersionDao.deleteById(id)
+    workflowDao.deleteById(id)
+    userDao.deleteById(id)
   }
 
 }

Reply via email to