This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch release/v1.1.0-incubating
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/release/v1.1.0-incubating by 
this push:
     new 5b3b76c24d ci: split amber tests into amber + amber-integration jobs
5b3b76c24d is described below

commit 5b3b76c24dd7dee3d7d43be04d02495ff8a84c41
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 14:46:16 2026 -0700

    ci: split amber tests into amber + amber-integration jobs
    
    Backport of #4871 (head f49285d8cc) onto release/v1.1.0-incubating.
    
    Adds a dedicated amber-integration CI job for Scala specs that
    exercise both Scala and Python end-to-end. The lighter amber job
    no longer installs Python.
    
    - Layout: integration specs live under amber/src/test/integration/
      (parallel to scala/ and java/, room for a future python/).
    - amber/build.sbt: Test/unmanagedSourceDirectories +=
      src/test/integration so scalafmtCheckAll / scalafixAll --check
      cover the integration sources naturally; AMBER_TEST_FILTER
      env-var-driven testOptions add -l / -n
      org.apache.texera.amber.tags.IntegrationTest.
    - Annotation: IntegrationTest.java (Java @interface with
      @TagAnnotation). Scala class extends StaticAnnotation would not
      be visible to ScalaTest's reflection; routing is path-based but
      the annotation makes the marker functional too.
    - TestUtils.scala: extract shouldReconfigure helper so both
      ReconfigurationSpec and ReconfigurationIntegrationSpec can call
      it without duplicating the workflow-driver logic.
    - Test split: ReconfigurationSpec keeps two pure-Scala tests
      (Java operator + source-operator guardrail);
      ReconfigurationIntegrationSpec carries the three Python-UDF
      tests, annotated @IntegrationTest.
    - build.yml:
      - amber job: drops setup-python + uv install + the docker-java
        step. Test step sets AMBER_TEST_FILTER=skip-integration.
      - new amber-integration job: mirrors amber's JDK + sbt + Postgres
        setup, adds Python install, runs scalafmtCheckAll +
        scalafixAll --check + WorkflowExecutionService/test under
        AMBER_TEST_FILTER=integration-only. No jacoco — these specs
        cover code paths already in amber's unit-test coverage.
    - labeler.yml:
      - engine enumerates non-Python amber subdirs (excludes
        src/main/python/** and src/test/integration/**).
      - new amber-integration label matches src/test/integration/**.
      - python label additionally matches **/requirements.txt and
        **/*-requirements.txt so dep-only PRs still hit the python +
        amber-integration stacks.
    - required-checks.yml LABEL_STACKS: adds amber-integration as its
      own stack. python -> [amber-integration, python] (no full Scala
      unit suite). engine -> [amber, amber-integration]. common /
      ddl-change -> [amber, amber-integration, platform].
      amber-integration -> [amber-integration].
    
    (backported from commit f49285d8cc0ce8cf73987d04b6e013e1d66c5ba2)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .github/labeler.yml                                |  39 +++-
 .github/workflows/build.yml                        | 127 ++++++++++---
 .github/workflows/required-checks.yml              |  62 ++++---
 amber/build.sbt                                    |  22 +++
 .../e2e/ReconfigurationIntegrationSpec.scala}      | 158 ++--------------
 .../apache/texera/amber/tags/IntegrationTest.java  |  52 ++++++
 .../amber/engine/e2e/ReconfigurationSpec.scala     | 199 +--------------------
 .../apache/texera/amber/engine/e2e/TestUtils.scala | 112 +++++++++++-
 8 files changed, 394 insertions(+), 377 deletions(-)

diff --git a/.github/labeler.yml b/.github/labeler.yml
index 64551a1e0f..5a1b2db0d8 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -45,15 +45,52 @@ agent-service:
           - 'agent-service/**'
 
 engine:
+  # Non-Python, non-integration parts of amber/. Pure Python changes
+  # under amber/src/main/python/** intentionally fall through to the
+  # `python` label (which the labeler also matches via **/*.py), so
+  # they only trigger the python + amber-integration stacks rather
+  # than the full Scala-only `amber` stack. Integration specs live
+  # under amber/src/test/integration/** (added to sbt's Test sources
+  # via amber/build.sbt) and are caught by the `amber-integration`
+  # label below. Top-level Scala formatter / linter configs are
+  # included so a PR that only updates them still triggers the Scala
+  # stacks instead of producing an empty stack union.
   - changed-files:
       - any-glob-to-any-file:
-          - 'amber/**'
+          - 'amber/build.sbt'
+          - 'amber/.scalafmt.conf'
+          - 'amber/.scalafix.conf'
+          - 'amber/project/**'
+          - 'amber/src/main/scala/**'
+          - 'amber/src/main/java/**'
+          - 'amber/src/main/protobuf/**'
+          - 'amber/src/main/resources/**'
+          - 'amber/src/test/scala/**'
+          - 'amber/src/test/java/**'
+
+amber-integration:
+  # Scala specs that exercise both Scala and Python end-to-end. They
+  # live under amber/src/test/integration/** (sbt picks them up via
+  # `Test / unmanagedSourceDirectories += .../test/integration` in
+  # amber/build.sbt) and are tagged @org.apache.texera.amber.tags
+  # .IntegrationTest. This label maps to the amber-integration stack
+  # only, so a PR that touches just an integration spec does not pay
+  # for the full Scala-only `amber` job.
+  - changed-files:
+      - any-glob-to-any-file:
+          - 'amber/src/test/integration/**'
 
 python:
+  # Includes pip requirement manifests so dependency-only PRs still
+  # exercise the Python + amber-integration stacks. Without this a
+  # bumped requirements.txt would only get `dependencies` (no stack
+  # mapping) and silently skip CI for the very deps it's changing.
   - changed-files:
       - any-glob-to-any-file:
           - 'amber/src/main/python/**'
           - '**/*.py'
+          - '**/requirements.txt'
+          - '**/*-requirements.txt'
 
 docs:
   - changed-files:
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index ec18dd0b6c..8bc07eedb1 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -44,6 +44,10 @@ on:
         required: false
         type: boolean
         default: true
+      run_amber_integration:
+        required: false
+        type: boolean
+        default: true
       run_platform:
         required: false
         type: boolean
@@ -174,12 +178,6 @@ jobs:
         with:
           distribution: "temurin"
           java-version: 11
-      - name: Setup Python for Scala tests
-        uses: actions/setup-python@v6
-        with:
-          python-version: "3.11"
-      - name: Show Python
-        run: python --version || python3 --version
       - name: Create Databases
         # Must run before any sbt compile step: the build's JOOQ source
         # generators connect to texera_db while compiling.
@@ -220,16 +218,6 @@ jobs:
             /tmp/dists/amber-*/lib || check_exit=$?
           ./bin/licensing/audit_jar_licenses.py /tmp/dists/amber-*/lib || true
           exit "$check_exit"
-      - name: Install dependencies
-        # Only the backend test step needs the python deps; install just
-        # before tests so a lint or dist failure does not pay for them.
-        # Use uv for speed — the scala job's binary license check is
-        # jar-only, so any pip-vs-uv resolver differences in the Python
-        # tree don't affect amber/LICENSE-binary-python here.
-        run: |
-          python -m pip install uv
-          if [ -f amber/requirements.txt ]; then uv pip install --system -r 
amber/requirements.txt; fi
-          if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system -r amber/operator-requirements.txt; fi
       - name: Create texera_db_for_test_cases
         run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases 
-f sql/texera_ddl.sql
         env:
@@ -241,10 +229,16 @@ jobs:
       - name: Run amber and common module tests with coverage
         # 'jacoco' runs tests under sbt-jacoco's JVM agent and emits per-
         # module jacoco.xml that the codecov upload step picks up.
-        # `WorkflowExecutionService/jacoco` only runs that project's tests
-        # (sbt's `test` task does not transit dependsOn), so common
-        # modules' tests are listed explicitly here. Modules with no
-        # tests (Auth, Config) are skipped.
+        # `WorkflowExecutionService/jacoco` only runs that project's
+        # Test config (sbt's `test` task does not transit dependsOn),
+        # so common modules' tests are listed explicitly here. Modules
+        # with no tests (Auth, Config) are skipped.
+        #
+        # AMBER_TEST_FILTER=skip-integration tells amber/build.sbt to
+        # exclude @org.apache.texera.amber.tags.IntegrationTest specs;
+        # those run in the amber-integration job below.
+        env:
+          AMBER_TEST_FILTER: skip-integration
         run: |
           sbt "DAO/jacoco" \
               "PyBuilder/jacoco" \
@@ -260,6 +254,99 @@ jobs:
           flags: amber
           fail_ci_if_error: false
 
+  amber-integration:
+    # Runs Scala tests tagged @org.apache.texera.amber.tags.IntegrationTest —
+    # currently the e2e specs that spawn Python UDF workers. Provisions
+    # Python deps that the lighter `amber` job no longer installs. Cross-
+    # cutting lints (scalafmt / scalafix) and the amber dist + binary
+    # license check stay in `amber`; this job is tests-only.
+    if: ${{ inputs.run_amber_integration }}
+    strategy:
+      matrix:
+        os: [ubuntu-22.04]
+        java-version: [11]
+    runs-on: ${{ matrix.os }}
+    env:
+      JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M 
-Dfile.encoding=UTF-8
+      JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M 
-Dfile.encoding=UTF-8
+    services:
+      postgres:
+        image: postgres
+        env:
+          POSTGRES_PASSWORD: postgres
+        ports:
+          - 5432:5432
+        options: >-
+          --health-cmd="pg_isready -U postgres"
+          --health-interval=10s
+          --health-timeout=5s
+          --health-retries=5
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v5
+        with:
+          ref: ${{ inputs.checkout_ref || github.sha }}
+          fetch-depth: 0
+      - name: Prepare backport workspace
+        if: ${{ inputs.backport_target_branch != '' }}
+        working-directory: ${{ github.workspace }}
+        run: bash ./.github/scripts/prepare-backport-checkout.sh "${{ 
inputs.backport_target_branch }}" "${{ inputs.backport_commit_range }}"
+      - name: Setup JDK
+        uses: actions/setup-java@v5
+        with:
+          distribution: "temurin"
+          java-version: 11
+      - name: Setup Python for Scala-Python integration tests
+        uses: actions/setup-python@v6
+        with:
+          python-version: "3.11"
+      - name: Show Python
+        run: python --version || python3 --version
+      - name: Install Python dependencies
+        # The integration tests spawn Python UDF workers; install
+        # everything they need on the host. uv for speed; no licensing
+        # concerns because no dist is built here.
+        run: |
+          python -m pip install uv
+          if [ -f amber/requirements.txt ]; then uv pip install --system -r 
amber/requirements.txt; fi
+          if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system -r amber/operator-requirements.txt; fi
+      - name: Create Databases
+        run: |
+          psql -h localhost -U postgres -f sql/texera_ddl.sql
+          psql -h localhost -U postgres -f sql/iceberg_postgres_catalog.sql
+          psql -h localhost -U postgres -f sql/texera_lakefs.sql
+        env:
+          PGPASSWORD: postgres
+      - name: Setup sbt launcher
+        uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+      - uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # 
v8.1.0
+        with:
+          extraSbtFiles: '["*.sbt", "project/**.{scala,sbt}", 
"project/build.properties" ]'
+      - name: Create texera_db_for_test_cases
+        run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases 
-f sql/texera_ddl.sql
+        env:
+          PGPASSWORD: postgres
+      - name: Lint and run amber integration tests
+        # AMBER_TEST_FILTER=integration-only tells amber/build.sbt to
+        # keep only @org.apache.texera.amber.tags.IntegrationTest
+        # specs. The Java @TagAnnotation makes the marker visible to
+        # ScalaTest's reflection, so `-n TAG` correctly narrows the
+        # run.
+        #
+        # scalafmtCheckAll + scalafixAll --check are run here as well
+        # because an integration-only PR fires only the
+        # `amber-integration` label; the amber job's own cross-cutting
+        # lint would not run, and the change would otherwise land
+        # unlinted. Costs ~30s when amber also runs, which is fine.
+        # No jacoco — these specs exercise code paths already covered
+        # by amber's unit-test coverage.
+        env:
+          AMBER_TEST_FILTER: integration-only
+        run: |
+          sbt scalafmtCheckAll \
+              "scalafixAll --check" \
+              "WorkflowExecutionService/test"
+
   platform:
     # Per-service build, test, and license check for the non-amber Scala
     # services. Each matrix entry runs its own dist + test in isolation
diff --git a/.github/workflows/required-checks.yml 
b/.github/workflows/required-checks.yml
index 9046682bd7..af4fcc9ea8 100644
--- a/.github/workflows/required-checks.yml
+++ b/.github/workflows/required-checks.yml
@@ -60,6 +60,7 @@ jobs:
     outputs:
       run_frontend: ${{ steps.decide.outputs.run_frontend }}
       run_amber: ${{ steps.decide.outputs.run_amber }}
+      run_amber_integration: ${{ steps.decide.outputs.run_amber_integration }}
       run_platform: ${{ steps.decide.outputs.run_platform }}
       run_python: ${{ steps.decide.outputs.run_python }}
       run_agent_service: ${{ steps.decide.outputs.run_agent_service }}
@@ -115,35 +116,48 @@ jobs:
             // labeler matches lives under a component dir and is already
             // covered by that component's label.
             //
-            //   label          | frontend | amber | platform | python | 
agent-service
-            //   
---------------|----------|-------|----------|--------|--------------
-            //   frontend       |    x     |       |          |        |
-            //   python         |          |   x   |          |   x    |
-            //   engine         |          |   x   |          |   x    |
-            //   platform       |          |       |    x     |        |
-            //   agent-service  |          |       |          |        |     x
-            //   common         |          |   x   |    x     |        |  
(also catches
-            //                                                            root 
scala
-            //                                                            
build/lint
-            //                                                            
config)
-            //   ddl-change     |          |   x   |    x     |        |
-            //   ci             |    x     |   x   |    x     |   x    |     x
-            //   docs / dev /   |          |       |          |        |
-            //   deps / release/|          |       |          |        |
-            //   * / branch     |          |       |          |        |
+            //   label             | frontend | amber | amber-integ | platform 
| python | agent-service
+            //   
------------------|----------|-------|-------------|----------|--------|--------------
+            //   frontend          |    x     |       |             |          
|        |
+            //   python            |          |       |      x      |          
|   x    |
+            //   engine            |          |   x   |      x      |          
|        |
+            //   amber-integration |          |       |      x      |          
|        |
+            //   platform          |          |       |             |    x     
|        |
+            //   agent-service     |          |       |             |          
|        |     x
+            //   common            |          |   x   |      x      |    x     
|        |  (root
+            //                                                                 
           scala
+            //                                                                 
           build/lint
+            //                                                                 
           config)
+            //   ddl-change        |          |   x   |      x      |    x     
|        |
+            //   ci                |    x     |   x   |      x      |    x     
|   x    |     x
+            //   docs / dev /      |          |       |             |          
|        |
+            //   deps / release/   |          |       |             |          
|        |
+            //   * / branch        |          |       |             |          
|        |
+            //
+            // amber-integration runs the Scala tests tagged
+            // @org.apache.texera.amber.tags.IntegrationTest (e2e specs that
+            // spawn Python UDF workers). The labeler attaches `python` to
+            // any *.py change (including amber/src/main/python/**), so
+            // `engine` does not need to fire the python stack itself —
+            // pure-Python amber changes pick up `python` directly. The
+            // `amber-integration` label catches *IntegrationSpec.scala
+            // edits so a test-only change does not trigger the full
+            // Scala-only amber stack.
             const LABEL_STACKS = {
               frontend: ["frontend"],
-              python: ["amber", "python"],          // pyamber drives amber 
integration tests too
-              engine: ["amber", "python"],          // amber/** spans both
-              platform: ["platform"],               // platform services
+              python: ["amber-integration", "python"],
+              engine: ["amber", "amber-integration"],
+              "amber-integration": ["amber-integration"],
+              platform: ["platform"],
               "agent-service": ["agent-service"],
-              common: ["amber", "platform"],        // common/** + root scala 
build/lint
-              "ddl-change": ["amber", "platform"],
-              ci: ["frontend", "amber", "platform", "python", "agent-service"],
+              common: ["amber", "amber-integration", "platform"],
+              "ddl-change": ["amber", "amber-integration", "platform"],
+              ci: ["frontend", "amber", "amber-integration", "platform", 
"python", "agent-service"],
             };
 
             let runFrontend = true;
             let runAmber = true;
+            let runAmberIntegration = true;
             let runPlatform = true;
             let runPython = true;
             let runAgentService = true;
@@ -157,6 +171,7 @@ jobs:
               }
               runFrontend = stacks.has("frontend");
               runAmber = stacks.has("amber");
+              runAmberIntegration = stacks.has("amber-integration");
               runPlatform = stacks.has("platform");
               runPython = stacks.has("python");
               runAgentService = stacks.has("agent-service");
@@ -167,6 +182,7 @@ jobs:
 
             core.setOutput("run_frontend", runFrontend ? "true" : "false");
             core.setOutput("run_amber", runAmber ? "true" : "false");
+            core.setOutput("run_amber_integration", runAmberIntegration ? 
"true" : "false");
             core.setOutput("run_platform", runPlatform ? "true" : "false");
             core.setOutput("run_python", runPython ? "true" : "false");
             core.setOutput("run_agent_service", runAgentService ? "true" : 
"false");
@@ -221,6 +237,7 @@ jobs:
     with:
       run_frontend: ${{ needs.precheck.outputs.run_frontend == 'true' }}
       run_amber: ${{ needs.precheck.outputs.run_amber == 'true' }}
+      run_amber_integration: ${{ needs.precheck.outputs.run_amber_integration 
== 'true' }}
       run_platform: ${{ needs.precheck.outputs.run_platform == 'true' }}
       run_python: ${{ needs.precheck.outputs.run_python == 'true' }}
       run_agent_service: ${{ needs.precheck.outputs.run_agent_service == 
'true' }}
@@ -241,6 +258,7 @@ jobs:
       job_name_suffix: ""
       run_frontend: ${{ needs.precheck.outputs.run_frontend == 'true' }}
       run_amber: ${{ needs.precheck.outputs.run_amber == 'true' }}
+      run_amber_integration: ${{ needs.precheck.outputs.run_amber_integration 
== 'true' }}
       run_platform: ${{ needs.precheck.outputs.run_platform == 'true' }}
       run_python: ${{ needs.precheck.outputs.run_python == 'true' }}
       run_agent_service: ${{ needs.precheck.outputs.run_agent_service == 
'true' }}
diff --git a/amber/build.sbt b/amber/build.sbt
index 4517d02d7e..09e4628ab8 100644
--- a/amber/build.sbt
+++ b/amber/build.sbt
@@ -54,6 +54,28 @@ concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
 // add python as an additional source
 Compile / unmanagedSourceDirectories += baseDirectory.value / "src" / "main" / 
"python"
 
+// `amber/src/test/integration` holds Scala specs that exercise both
+// Scala and Python end-to-end (tagged 
@org.apache.texera.amber.tags.IntegrationTest).
+// Sits next to `src/test/scala` and `src/test/java`; a future 
`src/test/python`
+// can drop in the same way. Adding it to Test/unmanagedSourceDirectories means
+// scalafmtCheckAll / scalafixAll --check naturally cover these sources, and
+// the AMBER_TEST_FILTER env var below routes which tagged subset runs.
+Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "test" / 
"integration"
+
+// Test-filter switch driven by the AMBER_TEST_FILTER env var so the
+// amber and amber-integration CI jobs select disjoint subsets without
+// each invocation having to embed a `set Tests.Argument(...)` prefix.
+//   skip-integration : exclude @IntegrationTest-tagged specs (amber job)
+//   integration-only : include only @IntegrationTest-tagged specs 
(amber-integration job)
+//   (unset)          : run everything (default for local sbt)
+Test / testOptions ++= (sys.env.get("AMBER_TEST_FILTER") match {
+  case Some("skip-integration") =>
+    Seq(Tests.Argument(TestFrameworks.ScalaTest, "-l", 
"org.apache.texera.amber.tags.IntegrationTest"))
+  case Some("integration-only") =>
+    Seq(Tests.Argument(TestFrameworks.ScalaTest, "-n", 
"org.apache.texera.amber.tags.IntegrationTest"))
+  case _ => Nil
+})
+
 // Excluding some proto files:
 PB.generate / excludeFilter := "scalapb.proto"
 
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
similarity index 57%
copy from 
amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
copy to 
amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
index a125c1be00..768caf079f 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
@@ -19,49 +19,40 @@
 
 package org.apache.texera.amber.engine.e2e
 
-import com.twitter.util.{Await, Duration, Promise}
 import com.typesafe.scalalogging.Logger
 import org.apache.pekko.actor.{ActorSystem, Props}
 import org.apache.pekko.testkit.{ImplicitSender, TestKit}
 import org.apache.pekko.util.Timeout
 import org.apache.texera.amber.clustering.SingleNodeListener
 import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
-import org.apache.texera.amber.engine.architecture.controller.{
-  ControllerConfig,
-  ExecutionStateUpdate
-}
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  EmptyRequest,
-  UpdateExecutorRequest,
-  WorkflowReconfigureRequest
-}
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
-  COMPLETED,
-  PAUSED
-}
 import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   cleanupWorkflowExecutionData,
   initiateTexeraDBForTestCases,
-  setUpWorkflowExecutionData,
-  stateReached
+  setUpWorkflowExecutionData
 }
 import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
-import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
+import org.apache.texera.amber.tags.IntegrationTest
 import org.apache.texera.workflow.LogicalLink
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
 import org.scalatest.flatspec.AnyFlatSpecLike
 
 import scala.concurrent.duration._
 
-class ReconfigurationSpec
-    extends TestKit(ActorSystem("ReconfigurationSpec", 
AmberRuntime.akkaConfig))
+/**
+  * E2E reconfiguration tests that spawn Python UDF workers. Routed to the
+  * `amber-integration` CI job via the class-level `@IntegrationTest` tag,
+  * which provisions Python deps; the lighter `amber` job excludes this tag.
+  *
+  * Pure-Scala reconfiguration tests live in [[ReconfigurationSpec]] and run
+  * in the regular `amber` job.
+  */
+@IntegrationTest
+class ReconfigurationIntegrationSpec
+    extends TestKit(ActorSystem("ReconfigurationIntegrationSpec", 
AmberRuntime.akkaConfig))
     with ImplicitSender
     with AnyFlatSpecLike
     with BeforeAndAfterAll
@@ -78,7 +69,7 @@ class ReconfigurationSpec
 
   implicit val timeout: Timeout = Timeout(5.seconds)
 
-  val logger = Logger("ReconfigurationSpecLogger")
+  val logger = Logger("ReconfigurationIntegrationSpecLogger")
   val ctx = new WorkflowContext()
 
   override protected def beforeEach(): Unit = {
@@ -101,83 +92,16 @@ class ReconfigurationSpec
     TestKit.shutdownActorSystem(system)
   }
 
+  // Thin wrapper around the shared TestUtils helper so call sites below stay
+  // ctx/system-implicit. The actual workflow-driver logic lives in TestUtils
+  // and is reused by ReconfigurationSpec.
   def shouldReconfigure(
       operators: List[LogicalOp],
       links: List[LogicalLink],
       targetOps: Seq[LogicalOp],
       newOpExecInitInfo: OpExecInitInfo
-  ): Map[OperatorIdentity, List[Tuple]] = {
-    val workflow =
-      TestUtils.buildWorkflow(operators, links, ctx)
-    val client =
-      new AmberClient(
-        system,
-        workflow.context,
-        workflow.physicalPlan,
-        ControllerConfig.default,
-        error => {}
-      )
-    val completion = Promise[Unit]()
-    var result: Map[OperatorIdentity, List[Tuple]] = null
-    client
-      .registerCallback[ExecutionStateUpdate](evt => {
-        if (evt.state == COMPLETED) {
-          result = workflow.logicalPlan.getTerminalOperatorIds
-            .filter(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflow.context.executionId,
-                terminalOpId,
-                PortIdentity()
-              )
-              uri.nonEmpty
-            })
-            .map(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflow.context.executionId,
-                terminalOpId,
-                PortIdentity()
-              ).get
-              terminalOpId -> DocumentFactory
-                .openDocument(uri)
-                ._1
-                .asInstanceOf[VirtualDocument[Tuple]]
-                .get()
-                .toList
-            })
-            .toMap
-          completion.setDone()
-        }
-      })
-    Await.result(
-      client.controllerInterface.startWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    val pausedReached = stateReached(client, PAUSED)
-    Await.result(
-      client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    Await.result(pausedReached, Duration.fromSeconds(10))
-    val physicalOps = targetOps.flatMap(op =>
-      workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
-    )
-    Await.result(
-      client.controllerInterface.reconfigureWorkflow(
-        WorkflowReconfigureRequest(
-          reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id, 
newOpExecInitInfo)),
-          reconfigurationId = "test-reconfigure-1"
-        ),
-        ()
-      ),
-      Duration.fromSeconds(5)
-    )
-    Await.result(
-      client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    Await.result(completion, Duration.fromMinutes(1))
-    result
-  }
+  ): Map[OperatorIdentity, List[Tuple]] =
+    TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, 
newOpExecInitInfo)
 
   "Engine" should "be able to modify a python UDF worker in workflow" in {
     val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
@@ -210,50 +134,6 @@ class ReconfigurationSpec
     })
   }
 
-  "Engine" should "be able to modify a java operator in workflow" in {
-    val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
-    val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"ShouldMatchNone")
-    val keywordMatchManyOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"Asia")
-    val result = shouldReconfigure(
-      List(sourceOpDesc, keywordMatchNoneOpDesc),
-      List(
-        LogicalLink(
-          sourceOpDesc.operatorIdentifier,
-          PortIdentity(),
-          keywordMatchNoneOpDesc.operatorIdentifier,
-          PortIdentity()
-        )
-      ),
-      Seq(keywordMatchNoneOpDesc),
-      keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo
-    )
-    assert(result(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty)
-  }
-
-  "Engine" should "not be able to modify a source operator in workflow" in {
-    val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
-    val sourceOpDesc2 = TestOperators.mediumCsvScanOpDesc()
-    val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"ShouldMatchNone")
-    val ex = intercept[Throwable] {
-      shouldReconfigure(
-        List(sourceOpDesc, keywordMatchNoneOpDesc),
-        List(
-          LogicalLink(
-            sourceOpDesc.operatorIdentifier,
-            PortIdentity(),
-            keywordMatchNoneOpDesc.operatorIdentifier,
-            PortIdentity()
-          )
-        ),
-        Seq(sourceOpDesc),
-        sourceOpDesc2.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo
-      )
-    }
-    assert(
-      ex.getMessage == "java.lang.IllegalStateException: Reconfiguration 
cannot be applied to source operators"
-    )
-  }
-
   "Engine" should "propagate reconfiguration through a source operator in 
workflow" in {
     val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
     val udfOpDesc = TestOperators.pythonOpDesc()
diff --git 
a/amber/src/test/integration/org/apache/texera/amber/tags/IntegrationTest.java 
b/amber/src/test/integration/org/apache/texera/amber/tags/IntegrationTest.java
new file mode 100644
index 0000000000..024b5594c9
--- /dev/null
+++ 
b/amber/src/test/integration/org/apache/texera/amber/tags/IntegrationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.tags;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.scalatest.TagAnnotation;
+
+/**
+ * Class-level marker tag for ScalaTest specs that exercise both Scala
+ * and Python end-to-end. Routing to the {@code amber-integration} CI
+ * job is by ScalaTest tag filtering, controlled by the
+ * {@code AMBER_TEST_FILTER} env var in {@code amber/build.sbt}: the
+ * lighter {@code amber} job runs with {@code skip-integration} (which
+ * passes {@code -l org.apache.texera.amber.tags.IntegrationTest} to
+ * ScalaTest), and the {@code amber-integration} job runs with
+ * {@code integration-only} (which passes {@code -n} for the same tag).
+ * The {@code amber/src/test/integration} directory is added to sbt's
+ * {@code Test/unmanagedSourceDirectories} so these specs compile in
+ * the regular Test config; there is no separate sbt configuration.
+ *
+ * <p>Written in Java rather than Scala because ScalaTest detects tag
+ * annotations via {@code java.lang.annotation} reflection. A Scala
+ * {@code class extends StaticAnnotation} does not produce a JVM
+ * annotation interface that {@code @TagAnnotation} can attach to, so
+ * the tag would be invisible to ScalaTest at runtime.
+ */
+@TagAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface IntegrationTest {
+}
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index a125c1be00..8cabf8684a 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -19,41 +19,22 @@
 
 package org.apache.texera.amber.engine.e2e
 
-import com.twitter.util.{Await, Duration, Promise}
 import com.typesafe.scalalogging.Logger
 import org.apache.pekko.actor.{ActorSystem, Props}
 import org.apache.pekko.testkit.{ImplicitSender, TestKit}
 import org.apache.pekko.util.Timeout
 import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.executor.OpExecInitInfo
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
-import org.apache.texera.amber.engine.architecture.controller.{
-  ControllerConfig,
-  ExecutionStateUpdate
-}
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  EmptyRequest,
-  UpdateExecutorRequest,
-  WorkflowReconfigureRequest
-}
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
-  COMPLETED,
-  PAUSED
-}
 import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   cleanupWorkflowExecutionData,
   initiateTexeraDBForTestCases,
-  setUpWorkflowExecutionData,
-  stateReached
+  setUpWorkflowExecutionData
 }
 import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
-import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
 import org.apache.texera.workflow.LogicalLink
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
 import org.scalatest.flatspec.AnyFlatSpecLike
@@ -101,114 +82,16 @@ class ReconfigurationSpec
     TestKit.shutdownActorSystem(system)
   }
 
+  // Thin wrapper around the shared TestUtils helper so call sites below stay
+  // ctx/system-implicit. The actual workflow-driver logic lives in TestUtils
+  // and is reused by ReconfigurationIntegrationSpec.
   def shouldReconfigure(
       operators: List[LogicalOp],
       links: List[LogicalLink],
       targetOps: Seq[LogicalOp],
       newOpExecInitInfo: OpExecInitInfo
-  ): Map[OperatorIdentity, List[Tuple]] = {
-    val workflow =
-      TestUtils.buildWorkflow(operators, links, ctx)
-    val client =
-      new AmberClient(
-        system,
-        workflow.context,
-        workflow.physicalPlan,
-        ControllerConfig.default,
-        error => {}
-      )
-    val completion = Promise[Unit]()
-    var result: Map[OperatorIdentity, List[Tuple]] = null
-    client
-      .registerCallback[ExecutionStateUpdate](evt => {
-        if (evt.state == COMPLETED) {
-          result = workflow.logicalPlan.getTerminalOperatorIds
-            .filter(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflow.context.executionId,
-                terminalOpId,
-                PortIdentity()
-              )
-              uri.nonEmpty
-            })
-            .map(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflow.context.executionId,
-                terminalOpId,
-                PortIdentity()
-              ).get
-              terminalOpId -> DocumentFactory
-                .openDocument(uri)
-                ._1
-                .asInstanceOf[VirtualDocument[Tuple]]
-                .get()
-                .toList
-            })
-            .toMap
-          completion.setDone()
-        }
-      })
-    Await.result(
-      client.controllerInterface.startWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    val pausedReached = stateReached(client, PAUSED)
-    Await.result(
-      client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    Await.result(pausedReached, Duration.fromSeconds(10))
-    val physicalOps = targetOps.flatMap(op =>
-      workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
-    )
-    Await.result(
-      client.controllerInterface.reconfigureWorkflow(
-        WorkflowReconfigureRequest(
-          reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id, 
newOpExecInitInfo)),
-          reconfigurationId = "test-reconfigure-1"
-        ),
-        ()
-      ),
-      Duration.fromSeconds(5)
-    )
-    Await.result(
-      client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    Await.result(completion, Duration.fromMinutes(1))
-    result
-  }
-
-  "Engine" should "be able to modify a python UDF worker in workflow" in {
-    val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
-    val udfOpDesc = TestOperators.pythonOpDesc()
-    val code = """
-                 |from pytexera import *
-                 |
-                 |class ProcessTupleOperator(UDFOperatorV2):
-                 |    @overrides
-                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
-                 |        tuple_['Region'] = tuple_['Region'] + '_reconfigured'
-                 |        yield tuple_
-                 |""".stripMargin
-
-    val result = shouldReconfigure(
-      List(sourceOpDesc, udfOpDesc),
-      List(
-        LogicalLink(
-          sourceOpDesc.operatorIdentifier,
-          PortIdentity(),
-          udfOpDesc.operatorIdentifier,
-          PortIdentity()
-        )
-      ),
-      Seq(udfOpDesc),
-      OpExecWithCode(code, "python")
-    )
-    assert(result(udfOpDesc.operatorIdentifier).exists { t =>
-      t.getField("Region").asInstanceOf[String].contains("_reconfigured")
-    })
-  }
+  ): Map[OperatorIdentity, List[Tuple]] =
+    TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, 
newOpExecInitInfo)
 
   "Engine" should "be able to modify a java operator in workflow" in {
     val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
@@ -254,72 +137,4 @@ class ReconfigurationSpec
     )
   }
 
-  "Engine" should "propagate reconfiguration through a source operator in 
workflow" in {
-    val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
-    val udfOpDesc = TestOperators.pythonOpDesc()
-    val code = """
-                 |from pytexera import *
-                 |
-                 |class ProcessTupleOperator(UDFOperatorV2):
-                 |    @overrides
-                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
-                 |        tuple_['field_1'] = tuple_['field_1'] + 
'_reconfigured'
-                 |        yield tuple_
-                 |""".stripMargin
-    val result = shouldReconfigure(
-      List(sourceOpDesc, udfOpDesc),
-      List(
-        LogicalLink(
-          sourceOpDesc.operatorIdentifier,
-          PortIdentity(),
-          udfOpDesc.operatorIdentifier,
-          PortIdentity()
-        )
-      ),
-      Seq(udfOpDesc),
-      OpExecWithCode(code, "python")
-    )
-    assert(result(udfOpDesc.operatorIdentifier).exists { t =>
-      t.getField("field_1").asInstanceOf[String].contains("_reconfigured")
-    })
-  }
-
-  "Engine" should "be able to modify two python UDFs in workflow" in {
-    val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
-    val udfOpDesc1 = TestOperators.pythonOpDesc()
-    val udfOpDesc2 = TestOperators.pythonOpDesc()
-    val code = """
-                 |from pytexera import *
-                 |
-                 |class ProcessTupleOperator(UDFOperatorV2):
-                 |    @overrides
-                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
-                 |        tuple_['Region'] = tuple_['Region'] + '_reconfigured'
-                 |        yield tuple_
-                 |""".stripMargin
-
-    val result = shouldReconfigure(
-      List(sourceOpDesc, udfOpDesc1, udfOpDesc2),
-      List(
-        LogicalLink(
-          sourceOpDesc.operatorIdentifier,
-          PortIdentity(),
-          udfOpDesc1.operatorIdentifier,
-          PortIdentity()
-        ),
-        LogicalLink(
-          udfOpDesc1.operatorIdentifier,
-          PortIdentity(),
-          udfOpDesc2.operatorIdentifier,
-          PortIdentity()
-        )
-      ),
-      Seq(udfOpDesc1, udfOpDesc2),
-      OpExecWithCode(code, "python")
-    )
-    assert(result(udfOpDesc2.operatorIdentifier).exists { t =>
-      
t.getField("Region").asInstanceOf[String].contains("_reconfigured_reconfigured")
-    })
-  }
-
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index fab5d5a16c..bcc43b396b 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -19,11 +19,30 @@
 
 package org.apache.texera.amber.engine.e2e
 
-import com.twitter.util.{Promise, Return}
+import com.twitter.util.{Await, Duration, Promise, Return}
+import org.apache.pekko.actor.ActorSystem
 import org.apache.texera.amber.config.StorageConfig
-import org.apache.texera.amber.core.workflow.WorkflowContext
-import 
org.apache.texera.amber.engine.architecture.controller.{ExecutionStateUpdate, 
Workflow}
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.engine.architecture.controller.{
+  ControllerConfig,
+  ExecutionStateUpdate,
+  Workflow
+}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+  EmptyRequest,
+  UpdateExecutorRequest,
+  WorkflowReconfigureRequest
+}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
+  COMPLETED,
+  PAUSED
+}
 import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.operator.LogicalOp
 import org.apache.texera.dao.SqlServer
@@ -41,6 +60,7 @@ import org.apache.texera.dao.jooq.generated.tables.pojos.{
   Workflow => WorkflowPojo
 }
 import org.apache.texera.web.model.websocket.request.LogicalPlanPojo
+import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
 import org.apache.texera.workflow.{LogicalLink, WorkflowCompiler}
 
 object TestUtils {
@@ -139,6 +159,92 @@ object TestUtils {
     p
   }
 
+  /**
+    * Pause a freshly-started workflow, swap the executor for the given target
+    * operators via WorkflowReconfigureRequest, resume, and collect the
+    * terminal-port outputs once the run completes. Shared by 
ReconfigurationSpec
+    * (pure-Scala) and ReconfigurationIntegrationSpec (Python-tagged), so an
+    * earlier in-spec copy doesn't drift between the two as new e2e specs
+    * land. The caller passes its own `system` (TestKit) and `ctx`
+    * (WorkflowContext) since both are tied to the spec lifecycle.
+    */
+  def shouldReconfigure(
+      system: ActorSystem,
+      ctx: WorkflowContext,
+      operators: List[LogicalOp],
+      links: List[LogicalLink],
+      targetOps: Seq[LogicalOp],
+      newOpExecInitInfo: OpExecInitInfo
+  ): Map[OperatorIdentity, List[Tuple]] = {
+    val workflow = buildWorkflow(operators, links, ctx)
+    val client = new AmberClient(
+      system,
+      workflow.context,
+      workflow.physicalPlan,
+      ControllerConfig.default,
+      error => {}
+    )
+    val completion = Promise[Unit]()
+    var result: Map[OperatorIdentity, List[Tuple]] = null
+    client.registerCallback[ExecutionStateUpdate](evt => {
+      if (evt.state == COMPLETED) {
+        result = workflow.logicalPlan.getTerminalOperatorIds
+          .filter(terminalOpId => {
+            val uri = getResultUriByLogicalPortId(
+              workflow.context.executionId,
+              terminalOpId,
+              PortIdentity()
+            )
+            uri.nonEmpty
+          })
+          .map(terminalOpId => {
+            val uri = getResultUriByLogicalPortId(
+              workflow.context.executionId,
+              terminalOpId,
+              PortIdentity()
+            ).get
+            terminalOpId -> DocumentFactory
+              .openDocument(uri)
+              ._1
+              .asInstanceOf[VirtualDocument[Tuple]]
+              .get()
+              .toList
+          })
+          .toMap
+        completion.setDone()
+      }
+    })
+    Await.result(
+      client.controllerInterface.startWorkflow(EmptyRequest(), ()),
+      Duration.fromSeconds(5)
+    )
+    val pausedReached = stateReached(client, PAUSED)
+    Await.result(
+      client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
+      Duration.fromSeconds(5)
+    )
+    Await.result(pausedReached, Duration.fromSeconds(10))
+    val physicalOps = targetOps.flatMap(op =>
+      workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
+    )
+    Await.result(
+      client.controllerInterface.reconfigureWorkflow(
+        WorkflowReconfigureRequest(
+          reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id, 
newOpExecInitInfo)),
+          reconfigurationId = "test-reconfigure-1"
+        ),
+        ()
+      ),
+      Duration.fromSeconds(5)
+    )
+    Await.result(
+      client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
+      Duration.fromSeconds(5)
+    )
+    Await.result(completion, Duration.fromMinutes(1))
+    result
+  }
+
   def cleanupWorkflowExecutionData(): Unit = {
     val dslConfig = SqlServer.getInstance().context.configuration()
     val userDao = new UserDao(dslConfig)


Reply via email to