This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5888-5a3ddcc111f115bfbf9936bdd08f48db5ff7cabd in repository https://gitbox.apache.org/repos/asf/texera.git
commit 8803d084cd68fc98c06c782aa7a56450e7ddc9ee Author: Matthew B. <[email protected]> AuthorDate: Mon Jun 22 22:30:09 2026 -0700 fix(test): isolate e2e suites by unique workflow/execution id (#5888) ### What changes were proposed in this PR? - Add `TestUtils.workflowContext(id, settings)` that sets both `workflowId` and `executionId` to `id`, and make the DB fixtures plus `setUp`/`cleanupWorkflowExecutionData` take an `id` (the user email is derived from `id` to avoid the unique-email collision). - Give each materializing e2e suite a distinct id so concurrent suites no longer share an Iceberg result keyspace or DB rows: DataProcessingSpec=1, PauseSpec=2, ReconfigurationSpec=3, ReconfigurationIntegrationSpec=4. - Test-only change, no production code; BatchSizePropagationSpec and CheckpointSpec are untouched because they do not materialize results. ### Any related issues, documentation, discussions? Closes: #5887 ### How was this PR tested? - `sbt "WorkflowExecutionService/Test/compile"` compiles clean on Java 17. - Run the previously-flaky suite against the integration services (Postgres test DB + MinIO/S3 + Iceberg catalog, as in the `build / amber` CI job): `sbt "WorkflowExecutionService/testOnly *DataProcessingSpec"`; expect all DataProcessingSpec tests green with no CommitFailedException flake. - The timing-dependent flake could not be reproduced locally (no MinIO/Iceberg env), so final verification is the `build / amber` CI job staying green across re-runs. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF --- .../e2e/ReconfigurationIntegrationSpec.scala | 15 ++--- .../amber/engine/e2e/DataProcessingSpec.scala | 13 ++-- .../apache/texera/amber/engine/e2e/PauseSpec.scala | 10 +-- .../amber/engine/e2e/ReconfigurationSpec.scala | 9 +-- .../apache/texera/amber/engine/e2e/TestUtils.scala | 72 ++++++++++++++-------- 5 files changed, 74 insertions(+), 45 deletions(-) diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index 6f0936da28..5d2ed7e5e4 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -28,7 +28,7 @@ import org.apache.texera.amber.clustering.SingleNodeListener import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode} import org.apache.texera.amber.core.tuple.Tuple import org.apache.texera.amber.core.virtualidentity.OperatorIdentity -import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.core.workflow.PortIdentity import org.apache.texera.amber.engine.architecture.controller.{ ControllerConfig, ExecutionStateUpdate @@ -80,14 +80,15 @@ class ReconfigurationIntegrationSpec implicit val timeout: Timeout = Timeout(5.seconds) val logger = Logger("ReconfigurationIntegrationSpecLogger") - val ctx = new WorkflowContext() + private val specId = 4 + val ctx = TestUtils.workflowContext(specId) override protected def beforeEach(): Unit = { - setUpWorkflowExecutionData() + setUpWorkflowExecutionData(specId) } override protected def afterEach(): Unit = { - cleanupWorkflowExecutionData() + cleanupWorkflowExecutionData(specId) } override def beforeAll(): Unit = { @@ -117,12 +118,12 @@ class ReconfigurationIntegrationSpec */ private def warmupOnce(): Unit = { val warmupCap = Duration.fromSeconds(10) - setUpWorkflowExecutionData() + setUpWorkflowExecutionData(specId) var client: AmberClient = null try { val src = new TextInputSourceOpDesc() src.textInput = "warmup" - val warmupCtx = new WorkflowContext() + val warmupCtx = TestUtils.workflowContext(specId) val workflow = buildWorkflow(List(src), List.empty, warmupCtx) client = new AmberClient( system, @@ -150,7 +151,7 @@ class ReconfigurationIntegrationSpec try client.shutdown() catch { case _: Throwable => () } } - cleanupWorkflowExecutionData() + cleanupWorkflowExecutionData(specId) } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala index 2606d9d656..6dde4eff9f 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala @@ -66,21 +66,24 @@ class DataProcessingSpec implicit val timeout: Timeout = Timeout(5.seconds) - val workflowContext: WorkflowContext = new WorkflowContext() + private val specId = 1 - val materializedWorkflowContext: WorkflowContext = new WorkflowContext( - workflowSettings = WorkflowSettings( + val workflowContext: WorkflowContext = TestUtils.workflowContext(specId) + + val materializedWorkflowContext: WorkflowContext = TestUtils.workflowContext( + specId, + WorkflowSettings( dataTransferBatchSize = 400, executionMode = ExecutionMode.MATERIALIZED ) ) override protected def beforeEach(): Unit = { - setUpWorkflowExecutionData() + setUpWorkflowExecutionData(specId) } override protected def afterEach(): Unit = { - cleanupWorkflowExecutionData() + cleanupWorkflowExecutionData(specId) } override def beforeAll(): Unit = { diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala index 2cc268608f..036caefa0a 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala @@ -25,7 +25,7 @@ import org.apache.pekko.util.Timeout import com.twitter.util.{Await, Duration, Promise} import com.typesafe.scalalogging.Logger import org.apache.texera.amber.clustering.SingleNodeListener -import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.core.workflow.PortIdentity import org.apache.texera.amber.engine.architecture.controller.{ ControllerConfig, ExecutionStateUpdate @@ -70,12 +70,14 @@ class PauseSpec val logger = Logger("PauseSpecLogger") + private val specId = 2 + override protected def beforeEach(): Unit = { - setUpWorkflowExecutionData() + setUpWorkflowExecutionData(specId) } override protected def afterEach(): Unit = { - cleanupWorkflowExecutionData() + cleanupWorkflowExecutionData(specId) } override def beforeAll(): Unit = { @@ -95,7 +97,7 @@ class PauseSpec links: List[LogicalLink] ): Unit = { val workflow = - TestUtils.buildWorkflow(operators, links, new WorkflowContext()) + TestUtils.buildWorkflow(operators, links, TestUtils.workflowContext(specId)) val client = new AmberClient( system, diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala index 2cd3559736..6dfb23f6ac 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala @@ -27,7 +27,7 @@ import org.apache.texera.amber.clustering.SingleNodeListener import org.apache.texera.amber.core.executor.OpExecInitInfo import org.apache.texera.amber.core.tuple.Tuple import org.apache.texera.amber.core.virtualidentity.OperatorIdentity -import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.core.workflow.PortIdentity import org.apache.texera.amber.engine.common.AmberRuntime import org.apache.texera.amber.engine.e2e.TestUtils.{ cleanupWorkflowExecutionData, @@ -60,14 +60,15 @@ class ReconfigurationSpec implicit val timeout: Timeout = Timeout(5.seconds) val logger = Logger("ReconfigurationSpecLogger") - val ctx = new WorkflowContext() + private val specId = 3 + val ctx = TestUtils.workflowContext(specId) override protected def beforeEach(): Unit = { - setUpWorkflowExecutionData() + setUpWorkflowExecutionData(specId) } override protected def afterEach(): Unit = { - cleanupWorkflowExecutionData() + cleanupWorkflowExecutionData(specId) } override def beforeAll(): Unit = { diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala index 2a87fe3490..ac71483a5d 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala @@ -26,8 +26,12 @@ import org.apache.texera.amber.core.executor.OpExecInitInfo import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.Tuple -import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, OperatorIdentity} -import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.core.virtualidentity.{ + ExecutionIdentity, + OperatorIdentity, + WorkflowIdentity +} +import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext, WorkflowSettings} import org.apache.texera.amber.engine.architecture.controller.{ ControllerConfig, ExecutionStateUpdate, @@ -66,6 +70,22 @@ import org.apache.texera.workflow.{LogicalLink, WorkflowCompiler} object TestUtils { + /** + * A WorkflowContext whose workflow- and execution-id are both `id`. Each e2e + * suite passes a distinct id so its results land in a disjoint storage + * keyspace (`vfs:///wid/{id}/eid/{id}/...`) and disjoint DB rows, letting the + * suites run concurrently without colliding on the shared Iceberg catalog. + */ + def workflowContext( + id: Int, + workflowSettings: WorkflowSettings = WorkflowSettings() + ): WorkflowContext = + new WorkflowContext( + workflowId = WorkflowIdentity(id.toLong), + executionId = ExecutionIdentity(id.toLong), + workflowSettings = workflowSettings + ) + def buildWorkflow( operators: List[LogicalOp], links: List[LogicalLink], @@ -186,53 +206,55 @@ object TestUtils { ) } - val testUser: User = { + // All fixture rows for one suite share `id` as uid/wid/vid/eid; the email is + // derived from it so concurrent suites don't collide on the unique email key. + def testUser(id: Int): User = { val user = new User - user.setUid(Integer.valueOf(1)) - user.setName("test_user") + user.setUid(Integer.valueOf(id)) + user.setName(s"test_user_$id") user.setRole(UserRoleEnum.ADMIN) user.setPassword("123") - user.setEmail("[email protected]") + user.setEmail(s"[email protected]") user } - val testWorkflowEntry: WorkflowPojo = { + def testWorkflowEntry(id: Int): WorkflowPojo = { val workflow = new WorkflowPojo workflow.setName("test workflow") - workflow.setWid(Integer.valueOf(1)) + workflow.setWid(Integer.valueOf(id)) workflow.setContent("test workflow content") workflow.setDescription("test description") workflow } - val testWorkflowVersionEntry: WorkflowVersion = { + def testWorkflowVersionEntry(id: Int): WorkflowVersion = { val workflowVersion = new WorkflowVersion - workflowVersion.setWid(Integer.valueOf(1)) - workflowVersion.setVid(Integer.valueOf(1)) + workflowVersion.setWid(Integer.valueOf(id)) + workflowVersion.setVid(Integer.valueOf(id)) workflowVersion.setContent("test version content") workflowVersion } - val testWorkflowExecutionEntry: WorkflowExecutions = { + def testWorkflowExecutionEntry(id: Int): WorkflowExecutions = { val workflowExecution = new WorkflowExecutions - workflowExecution.setEid(Integer.valueOf(1)) - workflowExecution.setVid(Integer.valueOf(1)) - workflowExecution.setUid(Integer.valueOf(1)) + workflowExecution.setEid(Integer.valueOf(id)) + workflowExecution.setVid(Integer.valueOf(id)) + workflowExecution.setUid(Integer.valueOf(id)) workflowExecution.setStatus(3.toByte) workflowExecution.setEnvironmentVersion("test engine") workflowExecution } - def setUpWorkflowExecutionData(): Unit = { + def setUpWorkflowExecutionData(id: Int): Unit = { val dslConfig = SqlServer.getInstance().context.configuration() val userDao = new UserDao(dslConfig) val workflowDao = new WorkflowDao(dslConfig) val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) val workflowVersionDao = new WorkflowVersionDao(dslConfig) - userDao.insert(testUser) - workflowDao.insert(testWorkflowEntry) - workflowVersionDao.insert(testWorkflowVersionEntry) - workflowExecutionsDao.insert(testWorkflowExecutionEntry) + userDao.insert(testUser(id)) + workflowDao.insert(testWorkflowEntry(id)) + workflowVersionDao.insert(testWorkflowVersionEntry(id)) + workflowExecutionsDao.insert(testWorkflowExecutionEntry(id)) } /** @@ -318,16 +340,16 @@ object TestUtils { result } - def cleanupWorkflowExecutionData(): Unit = { + def cleanupWorkflowExecutionData(id: Int): Unit = { val dslConfig = SqlServer.getInstance().context.configuration() val userDao = new UserDao(dslConfig) val workflowDao = new WorkflowDao(dslConfig) val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) val workflowVersionDao = new WorkflowVersionDao(dslConfig) - workflowExecutionsDao.deleteById(1) - workflowVersionDao.deleteById(1) - workflowDao.deleteById(1) - userDao.deleteById(1) + workflowExecutionsDao.deleteById(id) + workflowVersionDao.deleteById(id) + workflowDao.deleteById(id) + userDao.deleteById(id) } }
