This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new fdffddab76 feat(ci): split amber tests into amber + amber-integration 
jobs (#4871)
fdffddab76 is described below

commit fdffddab766078ee83eb352f637c7eb8bd036947
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 14:31:15 2026 -0700

    feat(ci): split amber tests into amber + amber-integration jobs (#4871)
    
    ### What changes were proposed in this PR?
    
    Add a dedicated `amber-integration` CI job for Scala specs that exercise
    both Scala and Python end-to-end. The lighter `amber` job no longer
    installs Python. First migrated specs: 3 Python-UDF reconfiguration
    tests; the remaining e2e specs can move incrementally.
    
    - **Layout**: integration specs live under `amber/src/test/integration/`
    (parallel to `scala/` and `java/`, room for a future `python/`).
    - **sbt**: `lazy val IntegrationTest =
    config("integration").extend(Test)` in `build.sbt`. WES gets
    `inConfig(IntegrationTest)(Defaults.testSettings ++
    scalafmtConfigSettings)` and `IntegrationTest /
    unmanagedSourceDirectories := src/test/integration`. `Test/test` no
    longer sees integration sources, so jacoco runs amber unit tests alone
    with no filter.
    - **Annotation**: `IntegrationTest.java` (Java `@interface` with
    `@TagAnnotation`). Scala `class extends StaticAnnotation` would not be
    visible to ScalaTest's reflection. Kept as a marker; routing is
    path-based.
    - **Test split**: `ReconfigurationSpec` keeps 2 pure-Scala tests;
    `ReconfigurationIntegrationSpec` (under the new path) carries the 3
    Python tests.
    - **`build.yml`**:
    - `amber` job drops `setup-python` + `uv` install + the docker-java
    step.
    - New `amber-integration` job adds Python install and runs one sbt
    invocation: `IntegrationTest/scalafmtCheck` + `IntegrationTest/test`. No
    jacoco — these specs cover code paths already in amber's unit-test
    coverage.
    - **Scalafix scope**: sbt-scalafix's `perConfigSettings` is `private`,
    so there's no public hook to wire scalafix into a custom configuration.
    Integration sources currently rely on scalafmt + manual review;
    cross-cutting `scalafixAll --check` in the amber job continues to cover
    every other module. A future upstream API in sbt-scalafix can add
    scalafix on integration cleanly.
    - **`labeler.yml`**:
    - `engine` enumerates non-Python amber subdirs (excludes
    `src/main/python/**` and `src/test/integration/**`).
    - New `amber-integration` label matches `amber/src/test/integration/**`.
    - `python` label additionally matches `**/requirements.txt` and
    `**/*-requirements.txt` so dep-only PRs still hit the python +
    amber-integration stacks.
    - **`required-checks.yml`** `LABEL_STACKS`:
    
    | Label | frontend | amber | amber-integration | platform | python |
    agent-service |
    |---|---|---|---|---|---|---|
    | `frontend` | ✓ | | | | | |
    | `python` | | | ✓ | | ✓ | |
    | `engine` | | ✓ | ✓ | | | |
    | `amber-integration` | | | ✓ | | | |
    | `platform` | | | | ✓ | | |
    | `agent-service` | | | | | | ✓ |
    | `common` | | ✓ | ✓ | ✓ | | |
    | `ddl-change` | | ✓ | ✓ | ✓ | | |
    | `ci` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
    
    A bare `python` label runs only `amber-integration + python` (no full
    Scala unit suite). Integration-spec-only PRs run only
    `amber-integration`. Pure-Scala amber changes run `amber +
    amber-integration`.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4870. Remaining e2e specs (`DataProcessingSpec`, `PauseSpec`,
    `BatchSizePropagationSpec`, `PythonWorkflowWorkerSpec`) can move in
    follow-ups.
    
    ### How was this PR tested?
    
    YAML parses locally for `build.yml`, `required-checks.yml`,
    `labeler.yml`. The PR's own CI exercises the split — this PR carries
    `engine` + `python` + `ci` labels.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .github/labeler.yml                                |  39 +++-
 .github/workflows/build.yml                        | 127 ++++++++++---
 .github/workflows/required-checks.yml              |  62 ++++---
 amber/build.sbt                                    |  22 +++
 .../e2e/ReconfigurationIntegrationSpec.scala}      | 158 ++--------------
 .../apache/texera/amber/tags/IntegrationTest.java  |  52 ++++++
 .../amber/engine/e2e/ReconfigurationSpec.scala     | 199 +--------------------
 .../apache/texera/amber/engine/e2e/TestUtils.scala | 112 +++++++++++-
 8 files changed, 394 insertions(+), 377 deletions(-)

diff --git a/.github/labeler.yml b/.github/labeler.yml
index 64551a1e0f..5a1b2db0d8 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -45,15 +45,52 @@ agent-service:
           - 'agent-service/**'
 
 engine:
+  # Non-Python, non-integration parts of amber/. Pure Python changes
+  # under amber/src/main/python/** intentionally fall through to the
+  # `python` label (which the labeler also matches via **/*.py), so
+  # they only trigger the python + amber-integration stacks rather
+  # than the full Scala-only `amber` stack. Integration specs live
+  # under amber/src/test/integration/** (added to sbt's Test sources
+  # via amber/build.sbt) and are caught by the `amber-integration`
+  # label below. Top-level Scala formatter / linter configs are
+  # included so a PR that only updates them still triggers the Scala
+  # stacks instead of producing an empty stack union.
   - changed-files:
       - any-glob-to-any-file:
-          - 'amber/**'
+          - 'amber/build.sbt'
+          - 'amber/.scalafmt.conf'
+          - 'amber/.scalafix.conf'
+          - 'amber/project/**'
+          - 'amber/src/main/scala/**'
+          - 'amber/src/main/java/**'
+          - 'amber/src/main/protobuf/**'
+          - 'amber/src/main/resources/**'
+          - 'amber/src/test/scala/**'
+          - 'amber/src/test/java/**'
+
+amber-integration:
+  # Scala specs that exercise both Scala and Python end-to-end. They
+  # live under amber/src/test/integration/** (sbt picks them up via
+  # `Test / unmanagedSourceDirectories += .../test/integration` in
+  # amber/build.sbt) and are tagged @org.apache.texera.amber.tags
+  # .IntegrationTest. This label maps to the amber-integration stack
+  # only, so a PR that touches just an integration spec does not pay
+  # for the full Scala-only `amber` job.
+  - changed-files:
+      - any-glob-to-any-file:
+          - 'amber/src/test/integration/**'
 
 python:
+  # Includes pip requirement manifests so dependency-only PRs still
+  # exercise the Python + amber-integration stacks. Without this a
+  # bumped requirements.txt would only get `dependencies` (no stack
+  # mapping) and silently skip CI for the very deps it's changing.
   - changed-files:
       - any-glob-to-any-file:
           - 'amber/src/main/python/**'
           - '**/*.py'
+          - '**/requirements.txt'
+          - '**/*-requirements.txt'
 
 docs:
   - changed-files:
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index ed59478747..06be31230e 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -44,6 +44,10 @@ on:
         required: false
         type: boolean
         default: true
+      run_amber_integration:
+        required: false
+        type: boolean
+        default: true
       run_platform:
         required: false
         type: boolean
@@ -172,12 +176,6 @@ jobs:
         with:
           distribution: "temurin"
           java-version: 11
-      - name: Setup Python for Scala tests
-        uses: actions/setup-python@v6
-        with:
-          python-version: "3.11"
-      - name: Show Python
-        run: python --version || python3 --version
       - name: Create Databases
         # Must run before any sbt compile step: the build's JOOQ source
         # generators connect to texera_db while compiling.
@@ -218,16 +216,6 @@ jobs:
             /tmp/dists/amber-*/lib || check_exit=$?
           ./bin/licensing/audit_jar_licenses.py /tmp/dists/amber-*/lib || true
           exit "$check_exit"
-      - name: Install dependencies
-        # Only the backend test step needs the python deps; install just
-        # before tests so a lint or dist failure does not pay for them.
-        # Use uv for speed — the scala job's binary license check is
-        # jar-only, so any pip-vs-uv resolver differences in the Python
-        # tree don't affect amber/LICENSE-binary-python here.
-        run: |
-          python -m pip install uv
-          if [ -f amber/requirements.txt ]; then uv pip install --system -r 
amber/requirements.txt; fi
-          if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system -r amber/operator-requirements.txt; fi
       - name: Create texera_db_for_test_cases
         run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases 
-f sql/texera_ddl.sql
         env:
@@ -239,10 +227,16 @@ jobs:
       - name: Run amber and common module tests with coverage
         # 'jacoco' runs tests under sbt-jacoco's JVM agent and emits per-
         # module jacoco.xml that the codecov upload step picks up.
-        # `WorkflowExecutionService/jacoco` only runs that project's tests
-        # (sbt's `test` task does not transit dependsOn), so common
-        # modules' tests are listed explicitly here. Modules with no
-        # tests (Auth, Config) are skipped.
+        # `WorkflowExecutionService/jacoco` only runs that project's
+        # Test config (sbt's `test` task does not transit dependsOn),
+        # so common modules' tests are listed explicitly here. Modules
+        # with no tests (Auth, Config) are skipped.
+        #
+        # AMBER_TEST_FILTER=skip-integration tells amber/build.sbt to
+        # exclude @org.apache.texera.amber.tags.IntegrationTest specs;
+        # those run in the amber-integration job below.
+        env:
+          AMBER_TEST_FILTER: skip-integration
         run: |
           sbt "DAO/jacoco" \
               "PyBuilder/jacoco" \
@@ -258,6 +252,99 @@ jobs:
           flags: amber
           fail_ci_if_error: false
 
+  amber-integration:
+    # Runs Scala tests tagged @org.apache.texera.amber.tags.IntegrationTest —
+    # currently the e2e specs that spawn Python UDF workers. Provisions
+    # Python deps that the lighter `amber` job no longer installs. Cross-
+    # cutting lints (scalafmt / scalafix) and the amber dist + binary
+    # license check stay in `amber`; this job is tests-only.
+    if: ${{ inputs.run_amber_integration }}
+    strategy:
+      matrix:
+        os: [ubuntu-22.04]
+        java-version: [11]
+    runs-on: ${{ matrix.os }}
+    env:
+      JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M 
-Dfile.encoding=UTF-8
+      JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M 
-Dfile.encoding=UTF-8
+    services:
+      postgres:
+        image: postgres
+        env:
+          POSTGRES_PASSWORD: postgres
+        ports:
+          - 5432:5432
+        options: >-
+          --health-cmd="pg_isready -U postgres"
+          --health-interval=10s
+          --health-timeout=5s
+          --health-retries=5
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v5
+        with:
+          ref: ${{ inputs.checkout_ref || github.sha }}
+          fetch-depth: 0
+      - name: Prepare backport workspace
+        if: ${{ inputs.backport_target_branch != '' }}
+        working-directory: ${{ github.workspace }}
+        run: bash ./.github/scripts/prepare-backport-checkout.sh "${{ 
inputs.backport_target_branch }}" "${{ inputs.backport_commit_range }}"
+      - name: Setup JDK
+        uses: actions/setup-java@v5
+        with:
+          distribution: "temurin"
+          java-version: 11
+      - name: Setup Python for Scala-Python integration tests
+        uses: actions/setup-python@v6
+        with:
+          python-version: "3.11"
+      - name: Show Python
+        run: python --version || python3 --version
+      - name: Install Python dependencies
+        # The integration tests spawn Python UDF workers; install
+        # everything they need on the host. uv for speed; no licensing
+        # concerns because no dist is built here.
+        run: |
+          python -m pip install uv
+          if [ -f amber/requirements.txt ]; then uv pip install --system -r 
amber/requirements.txt; fi
+          if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system -r amber/operator-requirements.txt; fi
+      - name: Create Databases
+        run: |
+          psql -h localhost -U postgres -f sql/texera_ddl.sql
+          psql -h localhost -U postgres -f sql/iceberg_postgres_catalog.sql
+          psql -h localhost -U postgres -f sql/texera_lakefs.sql
+        env:
+          PGPASSWORD: postgres
+      - name: Setup sbt launcher
+        uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+      - uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # 
v8.1.0
+        with:
+          extraSbtFiles: '["*.sbt", "project/**.{scala,sbt}", 
"project/build.properties" ]'
+      - name: Create texera_db_for_test_cases
+        run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases 
-f sql/texera_ddl.sql
+        env:
+          PGPASSWORD: postgres
+      - name: Lint and run amber integration tests
+        # AMBER_TEST_FILTER=integration-only tells amber/build.sbt to
+        # keep only @org.apache.texera.amber.tags.IntegrationTest
+        # specs. The Java @TagAnnotation makes the marker visible to
+        # ScalaTest's reflection, so `-n TAG` correctly narrows the
+        # run.
+        #
+        # scalafmtCheckAll + scalafixAll --check are run here as well
+        # because an integration-only PR fires only the
+        # `amber-integration` label; the amber job's own cross-cutting
+        # lint would not run, and the change would otherwise land
+        # unlinted. Costs ~30s when amber also runs, which is fine.
+        # No jacoco — these specs exercise code paths already covered
+        # by amber's unit-test coverage.
+        env:
+          AMBER_TEST_FILTER: integration-only
+        run: |
+          sbt scalafmtCheckAll \
+              "scalafixAll --check" \
+              "WorkflowExecutionService/test"
+
   platform:
     # Per-service build, test, and license check for the non-amber Scala
     # services. Each matrix entry runs its own dist + test in isolation
diff --git a/.github/workflows/required-checks.yml 
b/.github/workflows/required-checks.yml
index 9046682bd7..af4fcc9ea8 100644
--- a/.github/workflows/required-checks.yml
+++ b/.github/workflows/required-checks.yml
@@ -60,6 +60,7 @@ jobs:
     outputs:
       run_frontend: ${{ steps.decide.outputs.run_frontend }}
       run_amber: ${{ steps.decide.outputs.run_amber }}
+      run_amber_integration: ${{ steps.decide.outputs.run_amber_integration }}
       run_platform: ${{ steps.decide.outputs.run_platform }}
       run_python: ${{ steps.decide.outputs.run_python }}
       run_agent_service: ${{ steps.decide.outputs.run_agent_service }}
@@ -115,35 +116,48 @@ jobs:
             // labeler matches lives under a component dir and is already
             // covered by that component's label.
             //
-            //   label          | frontend | amber | platform | python | 
agent-service
-            //   
---------------|----------|-------|----------|--------|--------------
-            //   frontend       |    x     |       |          |        |
-            //   python         |          |   x   |          |   x    |
-            //   engine         |          |   x   |          |   x    |
-            //   platform       |          |       |    x     |        |
-            //   agent-service  |          |       |          |        |     x
-            //   common         |          |   x   |    x     |        |  
(also catches
-            //                                                            root 
scala
-            //                                                            
build/lint
-            //                                                            
config)
-            //   ddl-change     |          |   x   |    x     |        |
-            //   ci             |    x     |   x   |    x     |   x    |     x
-            //   docs / dev /   |          |       |          |        |
-            //   deps / release/|          |       |          |        |
-            //   * / branch     |          |       |          |        |
+            //   label             | frontend | amber | amber-integ | platform 
| python | agent-service
+            //   
------------------|----------|-------|-------------|----------|--------|--------------
+            //   frontend          |    x     |       |             |          
|        |
+            //   python            |          |       |      x      |          
|   x    |
+            //   engine            |          |   x   |      x      |          
|        |
+            //   amber-integration |          |       |      x      |          
|        |
+            //   platform          |          |       |             |    x     
|        |
+            //   agent-service     |          |       |             |          
|        |     x
+            //   common            |          |   x   |      x      |    x     
|        |  (root
+            //                                                                 
           scala
+            //                                                                 
           build/lint
+            //                                                                 
           config)
+            //   ddl-change        |          |   x   |      x      |    x     
|        |
+            //   ci                |    x     |   x   |      x      |    x     
|   x    |     x
+            //   docs / dev /      |          |       |             |          
|        |
+            //   deps / release/   |          |       |             |          
|        |
+            //   * / branch        |          |       |             |          
|        |
+            //
+            // amber-integration runs the Scala tests tagged
+            // @org.apache.texera.amber.tags.IntegrationTest (e2e specs that
+            // spawn Python UDF workers). The labeler attaches `python` to
+            // any *.py change (including amber/src/main/python/**), so
+            // `engine` does not need to fire the python stack itself —
+            // pure-Python amber changes pick up `python` directly. The
+            // `amber-integration` label catches *IntegrationSpec.scala
+            // edits so a test-only change does not trigger the full
+            // Scala-only amber stack.
             const LABEL_STACKS = {
               frontend: ["frontend"],
-              python: ["amber", "python"],          // pyamber drives amber 
integration tests too
-              engine: ["amber", "python"],          // amber/** spans both
-              platform: ["platform"],               // platform services
+              python: ["amber-integration", "python"],
+              engine: ["amber", "amber-integration"],
+              "amber-integration": ["amber-integration"],
+              platform: ["platform"],
               "agent-service": ["agent-service"],
-              common: ["amber", "platform"],        // common/** + root scala 
build/lint
-              "ddl-change": ["amber", "platform"],
-              ci: ["frontend", "amber", "platform", "python", "agent-service"],
+              common: ["amber", "amber-integration", "platform"],
+              "ddl-change": ["amber", "amber-integration", "platform"],
+              ci: ["frontend", "amber", "amber-integration", "platform", 
"python", "agent-service"],
             };
 
             let runFrontend = true;
             let runAmber = true;
+            let runAmberIntegration = true;
             let runPlatform = true;
             let runPython = true;
             let runAgentService = true;
@@ -157,6 +171,7 @@ jobs:
               }
               runFrontend = stacks.has("frontend");
               runAmber = stacks.has("amber");
+              runAmberIntegration = stacks.has("amber-integration");
               runPlatform = stacks.has("platform");
               runPython = stacks.has("python");
               runAgentService = stacks.has("agent-service");
@@ -167,6 +182,7 @@ jobs:
 
             core.setOutput("run_frontend", runFrontend ? "true" : "false");
             core.setOutput("run_amber", runAmber ? "true" : "false");
+            core.setOutput("run_amber_integration", runAmberIntegration ? 
"true" : "false");
             core.setOutput("run_platform", runPlatform ? "true" : "false");
             core.setOutput("run_python", runPython ? "true" : "false");
             core.setOutput("run_agent_service", runAgentService ? "true" : 
"false");
@@ -221,6 +237,7 @@ jobs:
     with:
       run_frontend: ${{ needs.precheck.outputs.run_frontend == 'true' }}
       run_amber: ${{ needs.precheck.outputs.run_amber == 'true' }}
+      run_amber_integration: ${{ needs.precheck.outputs.run_amber_integration 
== 'true' }}
       run_platform: ${{ needs.precheck.outputs.run_platform == 'true' }}
       run_python: ${{ needs.precheck.outputs.run_python == 'true' }}
       run_agent_service: ${{ needs.precheck.outputs.run_agent_service == 
'true' }}
@@ -241,6 +258,7 @@ jobs:
       job_name_suffix: ""
       run_frontend: ${{ needs.precheck.outputs.run_frontend == 'true' }}
       run_amber: ${{ needs.precheck.outputs.run_amber == 'true' }}
+      run_amber_integration: ${{ needs.precheck.outputs.run_amber_integration 
== 'true' }}
       run_platform: ${{ needs.precheck.outputs.run_platform == 'true' }}
       run_python: ${{ needs.precheck.outputs.run_python == 'true' }}
       run_agent_service: ${{ needs.precheck.outputs.run_agent_service == 
'true' }}
diff --git a/amber/build.sbt b/amber/build.sbt
index 4517d02d7e..09e4628ab8 100644
--- a/amber/build.sbt
+++ b/amber/build.sbt
@@ -54,6 +54,28 @@ concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
 // add python as an additional source
 Compile / unmanagedSourceDirectories += baseDirectory.value / "src" / "main" / 
"python"
 
+// `amber/src/test/integration` holds Scala specs that exercise both
+// Scala and Python end-to-end (tagged 
@org.apache.texera.amber.tags.IntegrationTest).
+// Sits next to `src/test/scala` and `src/test/java`; a future 
`src/test/python`
+// can drop in the same way. Adding it to Test/unmanagedSourceDirectories means
+// scalafmtCheckAll / scalafixAll --check naturally cover these sources, and
+// the AMBER_TEST_FILTER env var below routes which tagged subset runs.
+Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "test" / 
"integration"
+
+// Test-filter switch driven by the AMBER_TEST_FILTER env var so the
+// amber and amber-integration CI jobs select disjoint subsets without
+// each invocation having to embed a `set Tests.Argument(...)` prefix.
+//   skip-integration : exclude @IntegrationTest-tagged specs (amber job)
+//   integration-only : include only @IntegrationTest-tagged specs 
(amber-integration job)
+//   (unset)          : run everything (default for local sbt)
+Test / testOptions ++= (sys.env.get("AMBER_TEST_FILTER") match {
+  case Some("skip-integration") =>
+    Seq(Tests.Argument(TestFrameworks.ScalaTest, "-l", 
"org.apache.texera.amber.tags.IntegrationTest"))
+  case Some("integration-only") =>
+    Seq(Tests.Argument(TestFrameworks.ScalaTest, "-n", 
"org.apache.texera.amber.tags.IntegrationTest"))
+  case _ => Nil
+})
+
 // Excluding some proto files:
 PB.generate / excludeFilter := "scalapb.proto"
 
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
similarity index 57%
copy from 
amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
copy to 
amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
index a125c1be00..768caf079f 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
@@ -19,49 +19,40 @@
 
 package org.apache.texera.amber.engine.e2e
 
-import com.twitter.util.{Await, Duration, Promise}
 import com.typesafe.scalalogging.Logger
 import org.apache.pekko.actor.{ActorSystem, Props}
 import org.apache.pekko.testkit.{ImplicitSender, TestKit}
 import org.apache.pekko.util.Timeout
 import org.apache.texera.amber.clustering.SingleNodeListener
 import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
-import org.apache.texera.amber.engine.architecture.controller.{
-  ControllerConfig,
-  ExecutionStateUpdate
-}
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  EmptyRequest,
-  UpdateExecutorRequest,
-  WorkflowReconfigureRequest
-}
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
-  COMPLETED,
-  PAUSED
-}
 import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   cleanupWorkflowExecutionData,
   initiateTexeraDBForTestCases,
-  setUpWorkflowExecutionData,
-  stateReached
+  setUpWorkflowExecutionData
 }
 import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
-import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
+import org.apache.texera.amber.tags.IntegrationTest
 import org.apache.texera.workflow.LogicalLink
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
 import org.scalatest.flatspec.AnyFlatSpecLike
 
 import scala.concurrent.duration._
 
-class ReconfigurationSpec
-    extends TestKit(ActorSystem("ReconfigurationSpec", 
AmberRuntime.akkaConfig))
+/**
+  * E2E reconfiguration tests that spawn Python UDF workers. Routed to the
+  * `amber-integration` CI job via the class-level `@IntegrationTest` tag,
+  * which provisions Python deps; the lighter `amber` job excludes this tag.
+  *
+  * Pure-Scala reconfiguration tests live in [[ReconfigurationSpec]] and run
+  * in the regular `amber` job.
+  */
+@IntegrationTest
+class ReconfigurationIntegrationSpec
+    extends TestKit(ActorSystem("ReconfigurationIntegrationSpec", 
AmberRuntime.akkaConfig))
     with ImplicitSender
     with AnyFlatSpecLike
     with BeforeAndAfterAll
@@ -78,7 +69,7 @@ class ReconfigurationSpec
 
   implicit val timeout: Timeout = Timeout(5.seconds)
 
-  val logger = Logger("ReconfigurationSpecLogger")
+  val logger = Logger("ReconfigurationIntegrationSpecLogger")
   val ctx = new WorkflowContext()
 
   override protected def beforeEach(): Unit = {
@@ -101,83 +92,16 @@ class ReconfigurationSpec
     TestKit.shutdownActorSystem(system)
   }
 
+  // Thin wrapper around the shared TestUtils helper so call sites below stay
+  // ctx/system-implicit. The actual workflow-driver logic lives in TestUtils
+  // and is reused by ReconfigurationSpec.
   def shouldReconfigure(
       operators: List[LogicalOp],
       links: List[LogicalLink],
       targetOps: Seq[LogicalOp],
       newOpExecInitInfo: OpExecInitInfo
-  ): Map[OperatorIdentity, List[Tuple]] = {
-    val workflow =
-      TestUtils.buildWorkflow(operators, links, ctx)
-    val client =
-      new AmberClient(
-        system,
-        workflow.context,
-        workflow.physicalPlan,
-        ControllerConfig.default,
-        error => {}
-      )
-    val completion = Promise[Unit]()
-    var result: Map[OperatorIdentity, List[Tuple]] = null
-    client
-      .registerCallback[ExecutionStateUpdate](evt => {
-        if (evt.state == COMPLETED) {
-          result = workflow.logicalPlan.getTerminalOperatorIds
-            .filter(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflow.context.executionId,
-                terminalOpId,
-                PortIdentity()
-              )
-              uri.nonEmpty
-            })
-            .map(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflow.context.executionId,
-                terminalOpId,
-                PortIdentity()
-              ).get
-              terminalOpId -> DocumentFactory
-                .openDocument(uri)
-                ._1
-                .asInstanceOf[VirtualDocument[Tuple]]
-                .get()
-                .toList
-            })
-            .toMap
-          completion.setDone()
-        }
-      })
-    Await.result(
-      client.controllerInterface.startWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    val pausedReached = stateReached(client, PAUSED)
-    Await.result(
-      client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    Await.result(pausedReached, Duration.fromSeconds(10))
-    val physicalOps = targetOps.flatMap(op =>
-      workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
-    )
-    Await.result(
-      client.controllerInterface.reconfigureWorkflow(
-        WorkflowReconfigureRequest(
-          reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id, 
newOpExecInitInfo)),
-          reconfigurationId = "test-reconfigure-1"
-        ),
-        ()
-      ),
-      Duration.fromSeconds(5)
-    )
-    Await.result(
-      client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    Await.result(completion, Duration.fromMinutes(1))
-    result
-  }
+  ): Map[OperatorIdentity, List[Tuple]] =
+    TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, 
newOpExecInitInfo)
 
   "Engine" should "be able to modify a python UDF worker in workflow" in {
     val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
@@ -210,50 +134,6 @@ class ReconfigurationSpec
     })
   }
 
-  "Engine" should "be able to modify a java operator in workflow" in {
-    val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
-    val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"ShouldMatchNone")
-    val keywordMatchManyOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"Asia")
-    val result = shouldReconfigure(
-      List(sourceOpDesc, keywordMatchNoneOpDesc),
-      List(
-        LogicalLink(
-          sourceOpDesc.operatorIdentifier,
-          PortIdentity(),
-          keywordMatchNoneOpDesc.operatorIdentifier,
-          PortIdentity()
-        )
-      ),
-      Seq(keywordMatchNoneOpDesc),
-      keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo
-    )
-    assert(result(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty)
-  }
-
-  "Engine" should "not be able to modify a source operator in workflow" in {
-    val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
-    val sourceOpDesc2 = TestOperators.mediumCsvScanOpDesc()
-    val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"ShouldMatchNone")
-    val ex = intercept[Throwable] {
-      shouldReconfigure(
-        List(sourceOpDesc, keywordMatchNoneOpDesc),
-        List(
-          LogicalLink(
-            sourceOpDesc.operatorIdentifier,
-            PortIdentity(),
-            keywordMatchNoneOpDesc.operatorIdentifier,
-            PortIdentity()
-          )
-        ),
-        Seq(sourceOpDesc),
-        sourceOpDesc2.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo
-      )
-    }
-    assert(
-      ex.getMessage == "java.lang.IllegalStateException: Reconfiguration 
cannot be applied to source operators"
-    )
-  }
-
   "Engine" should "propagate reconfiguration through a source operator in 
workflow" in {
     val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
     val udfOpDesc = TestOperators.pythonOpDesc()
diff --git 
a/amber/src/test/integration/org/apache/texera/amber/tags/IntegrationTest.java 
b/amber/src/test/integration/org/apache/texera/amber/tags/IntegrationTest.java
new file mode 100644
index 0000000000..024b5594c9
--- /dev/null
+++ 
b/amber/src/test/integration/org/apache/texera/amber/tags/IntegrationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.tags;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.scalatest.TagAnnotation;
+
+/**
+ * Class-level marker tag for ScalaTest specs that exercise both Scala
+ * and Python end-to-end. Routing to the {@code amber-integration} CI
+ * job is by ScalaTest tag filtering, controlled by the
+ * {@code AMBER_TEST_FILTER} env var in {@code amber/build.sbt}: the
+ * lighter {@code amber} job runs with {@code skip-integration} (which
+ * passes {@code -l org.apache.texera.amber.tags.IntegrationTest} to
+ * ScalaTest), and the {@code amber-integration} job runs with
+ * {@code integration-only} (which passes {@code -n} for the same tag).
+ * The {@code amber/src/test/integration} directory is added to sbt's
+ * {@code Test/unmanagedSourceDirectories} so these specs compile in
+ * the regular Test config; there is no separate sbt configuration.
+ *
+ * <p>Written in Java rather than Scala because ScalaTest detects tag
+ * annotations via {@code java.lang.annotation} reflection. A Scala
+ * {@code class extends StaticAnnotation} does not produce a JVM
+ * annotation interface that {@code @TagAnnotation} can attach to, so
+ * the tag would be invisible to ScalaTest at runtime.
+ */
+@TagAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface IntegrationTest {
+}
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index a125c1be00..8cabf8684a 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -19,41 +19,22 @@
 
 package org.apache.texera.amber.engine.e2e
 
-import com.twitter.util.{Await, Duration, Promise}
 import com.typesafe.scalalogging.Logger
 import org.apache.pekko.actor.{ActorSystem, Props}
 import org.apache.pekko.testkit.{ImplicitSender, TestKit}
 import org.apache.pekko.util.Timeout
 import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.executor.OpExecInitInfo
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
-import org.apache.texera.amber.engine.architecture.controller.{
-  ControllerConfig,
-  ExecutionStateUpdate
-}
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  EmptyRequest,
-  UpdateExecutorRequest,
-  WorkflowReconfigureRequest
-}
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
-  COMPLETED,
-  PAUSED
-}
 import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.engine.e2e.TestUtils.{
   cleanupWorkflowExecutionData,
   initiateTexeraDBForTestCases,
-  setUpWorkflowExecutionData,
-  stateReached
+  setUpWorkflowExecutionData
 }
 import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
-import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
 import org.apache.texera.workflow.LogicalLink
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
 import org.scalatest.flatspec.AnyFlatSpecLike
@@ -101,114 +82,16 @@ class ReconfigurationSpec
     TestKit.shutdownActorSystem(system)
   }
 
+  // Thin wrapper around the shared TestUtils helper so call sites below stay
+  // ctx/system-implicit. The actual workflow-driver logic lives in TestUtils
+  // and is reused by ReconfigurationIntegrationSpec.
   def shouldReconfigure(
       operators: List[LogicalOp],
       links: List[LogicalLink],
       targetOps: Seq[LogicalOp],
       newOpExecInitInfo: OpExecInitInfo
-  ): Map[OperatorIdentity, List[Tuple]] = {
-    val workflow =
-      TestUtils.buildWorkflow(operators, links, ctx)
-    val client =
-      new AmberClient(
-        system,
-        workflow.context,
-        workflow.physicalPlan,
-        ControllerConfig.default,
-        error => {}
-      )
-    val completion = Promise[Unit]()
-    var result: Map[OperatorIdentity, List[Tuple]] = null
-    client
-      .registerCallback[ExecutionStateUpdate](evt => {
-        if (evt.state == COMPLETED) {
-          result = workflow.logicalPlan.getTerminalOperatorIds
-            .filter(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflow.context.executionId,
-                terminalOpId,
-                PortIdentity()
-              )
-              uri.nonEmpty
-            })
-            .map(terminalOpId => {
-              val uri = getResultUriByLogicalPortId(
-                workflow.context.executionId,
-                terminalOpId,
-                PortIdentity()
-              ).get
-              terminalOpId -> DocumentFactory
-                .openDocument(uri)
-                ._1
-                .asInstanceOf[VirtualDocument[Tuple]]
-                .get()
-                .toList
-            })
-            .toMap
-          completion.setDone()
-        }
-      })
-    Await.result(
-      client.controllerInterface.startWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    val pausedReached = stateReached(client, PAUSED)
-    Await.result(
-      client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    Await.result(pausedReached, Duration.fromSeconds(10))
-    val physicalOps = targetOps.flatMap(op =>
-      workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
-    )
-    Await.result(
-      client.controllerInterface.reconfigureWorkflow(
-        WorkflowReconfigureRequest(
-          reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id, 
newOpExecInitInfo)),
-          reconfigurationId = "test-reconfigure-1"
-        ),
-        ()
-      ),
-      Duration.fromSeconds(5)
-    )
-    Await.result(
-      client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
-    Await.result(completion, Duration.fromMinutes(1))
-    result
-  }
-
-  "Engine" should "be able to modify a python UDF worker in workflow" in {
-    val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
-    val udfOpDesc = TestOperators.pythonOpDesc()
-    val code = """
-                 |from pytexera import *
-                 |
-                 |class ProcessTupleOperator(UDFOperatorV2):
-                 |    @overrides
-                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
-                 |        tuple_['Region'] = tuple_['Region'] + '_reconfigured'
-                 |        yield tuple_
-                 |""".stripMargin
-
-    val result = shouldReconfigure(
-      List(sourceOpDesc, udfOpDesc),
-      List(
-        LogicalLink(
-          sourceOpDesc.operatorIdentifier,
-          PortIdentity(),
-          udfOpDesc.operatorIdentifier,
-          PortIdentity()
-        )
-      ),
-      Seq(udfOpDesc),
-      OpExecWithCode(code, "python")
-    )
-    assert(result(udfOpDesc.operatorIdentifier).exists { t =>
-      t.getField("Region").asInstanceOf[String].contains("_reconfigured")
-    })
-  }
+  ): Map[OperatorIdentity, List[Tuple]] =
+    TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, 
newOpExecInitInfo)
 
   "Engine" should "be able to modify a java operator in workflow" in {
     val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
@@ -254,72 +137,4 @@ class ReconfigurationSpec
     )
   }
 
-  "Engine" should "propagate reconfiguration through a source operator in 
workflow" in {
-    val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
-    val udfOpDesc = TestOperators.pythonOpDesc()
-    val code = """
-                 |from pytexera import *
-                 |
-                 |class ProcessTupleOperator(UDFOperatorV2):
-                 |    @overrides
-                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
-                 |        tuple_['field_1'] = tuple_['field_1'] + 
'_reconfigured'
-                 |        yield tuple_
-                 |""".stripMargin
-    val result = shouldReconfigure(
-      List(sourceOpDesc, udfOpDesc),
-      List(
-        LogicalLink(
-          sourceOpDesc.operatorIdentifier,
-          PortIdentity(),
-          udfOpDesc.operatorIdentifier,
-          PortIdentity()
-        )
-      ),
-      Seq(udfOpDesc),
-      OpExecWithCode(code, "python")
-    )
-    assert(result(udfOpDesc.operatorIdentifier).exists { t =>
-      t.getField("field_1").asInstanceOf[String].contains("_reconfigured")
-    })
-  }
-
-  "Engine" should "be able to modify two python UDFs in workflow" in {
-    val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
-    val udfOpDesc1 = TestOperators.pythonOpDesc()
-    val udfOpDesc2 = TestOperators.pythonOpDesc()
-    val code = """
-                 |from pytexera import *
-                 |
-                 |class ProcessTupleOperator(UDFOperatorV2):
-                 |    @overrides
-                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
-                 |        tuple_['Region'] = tuple_['Region'] + '_reconfigured'
-                 |        yield tuple_
-                 |""".stripMargin
-
-    val result = shouldReconfigure(
-      List(sourceOpDesc, udfOpDesc1, udfOpDesc2),
-      List(
-        LogicalLink(
-          sourceOpDesc.operatorIdentifier,
-          PortIdentity(),
-          udfOpDesc1.operatorIdentifier,
-          PortIdentity()
-        ),
-        LogicalLink(
-          udfOpDesc1.operatorIdentifier,
-          PortIdentity(),
-          udfOpDesc2.operatorIdentifier,
-          PortIdentity()
-        )
-      ),
-      Seq(udfOpDesc1, udfOpDesc2),
-      OpExecWithCode(code, "python")
-    )
-    assert(result(udfOpDesc2.operatorIdentifier).exists { t =>
-      
t.getField("Region").asInstanceOf[String].contains("_reconfigured_reconfigured")
-    })
-  }
-
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index fab5d5a16c..bcc43b396b 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -19,11 +19,30 @@
 
 package org.apache.texera.amber.engine.e2e
 
-import com.twitter.util.{Promise, Return}
+import com.twitter.util.{Await, Duration, Promise, Return}
+import org.apache.pekko.actor.ActorSystem
 import org.apache.texera.amber.config.StorageConfig
-import org.apache.texera.amber.core.workflow.WorkflowContext
-import 
org.apache.texera.amber.engine.architecture.controller.{ExecutionStateUpdate, 
Workflow}
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.engine.architecture.controller.{
+  ControllerConfig,
+  ExecutionStateUpdate,
+  Workflow
+}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+  EmptyRequest,
+  UpdateExecutorRequest,
+  WorkflowReconfigureRequest
+}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
+  COMPLETED,
+  PAUSED
+}
 import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.amber.operator.LogicalOp
 import org.apache.texera.dao.SqlServer
@@ -41,6 +60,7 @@ import org.apache.texera.dao.jooq.generated.tables.pojos.{
   Workflow => WorkflowPojo
 }
 import org.apache.texera.web.model.websocket.request.LogicalPlanPojo
+import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
 import org.apache.texera.workflow.{LogicalLink, WorkflowCompiler}
 
 object TestUtils {
@@ -139,6 +159,92 @@ object TestUtils {
     p
   }
 
+  /**
+    * Pause a freshly-started workflow, swap the executor for the given target
+    * operators via WorkflowReconfigureRequest, resume, and collect the
+    * terminal-port outputs once the run completes. Shared by 
ReconfigurationSpec
+    * (pure-Scala) and ReconfigurationIntegrationSpec (Python-tagged), so an
+    * earlier in-spec copy doesn't drift between the two as new e2e specs
+    * land. The caller passes its own `system` (TestKit) and `ctx`
+    * (WorkflowContext) since both are tied to the spec lifecycle.
+    */
+  def shouldReconfigure(
+      system: ActorSystem,
+      ctx: WorkflowContext,
+      operators: List[LogicalOp],
+      links: List[LogicalLink],
+      targetOps: Seq[LogicalOp],
+      newOpExecInitInfo: OpExecInitInfo
+  ): Map[OperatorIdentity, List[Tuple]] = {
+    val workflow = buildWorkflow(operators, links, ctx)
+    val client = new AmberClient(
+      system,
+      workflow.context,
+      workflow.physicalPlan,
+      ControllerConfig.default,
+      error => {}
+    )
+    val completion = Promise[Unit]()
+    var result: Map[OperatorIdentity, List[Tuple]] = null
+    client.registerCallback[ExecutionStateUpdate](evt => {
+      if (evt.state == COMPLETED) {
+        result = workflow.logicalPlan.getTerminalOperatorIds
+          .filter(terminalOpId => {
+            val uri = getResultUriByLogicalPortId(
+              workflow.context.executionId,
+              terminalOpId,
+              PortIdentity()
+            )
+            uri.nonEmpty
+          })
+          .map(terminalOpId => {
+            val uri = getResultUriByLogicalPortId(
+              workflow.context.executionId,
+              terminalOpId,
+              PortIdentity()
+            ).get
+            terminalOpId -> DocumentFactory
+              .openDocument(uri)
+              ._1
+              .asInstanceOf[VirtualDocument[Tuple]]
+              .get()
+              .toList
+          })
+          .toMap
+        completion.setDone()
+      }
+    })
+    Await.result(
+      client.controllerInterface.startWorkflow(EmptyRequest(), ()),
+      Duration.fromSeconds(5)
+    )
+    val pausedReached = stateReached(client, PAUSED)
+    Await.result(
+      client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
+      Duration.fromSeconds(5)
+    )
+    Await.result(pausedReached, Duration.fromSeconds(10))
+    val physicalOps = targetOps.flatMap(op =>
+      workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
+    )
+    Await.result(
+      client.controllerInterface.reconfigureWorkflow(
+        WorkflowReconfigureRequest(
+          reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id, 
newOpExecInitInfo)),
+          reconfigurationId = "test-reconfigure-1"
+        ),
+        ()
+      ),
+      Duration.fromSeconds(5)
+    )
+    Await.result(
+      client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
+      Duration.fromSeconds(5)
+    )
+    Await.result(completion, Duration.fromMinutes(1))
+    result
+  }
+
   def cleanupWorkflowExecutionData(): Unit = {
     val dslConfig = SqlServer.getInstance().context.configuration()
     val userDao = new UserDao(dslConfig)

Reply via email to