This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.1.0-incubating
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.1.0-incubating by
this push:
new 5b3b76c24d ci: split amber tests into amber + amber-integration jobs
5b3b76c24d is described below
commit 5b3b76c24dd7dee3d7d43be04d02495ff8a84c41
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 14:46:16 2026 -0700
ci: split amber tests into amber + amber-integration jobs
Backport of #4871 (head f49285d8cc) onto release/v1.1.0-incubating.
Adds a dedicated amber-integration CI job for Scala specs that
exercise both Scala and Python end-to-end. The lighter amber job
no longer installs Python.
- Layout: integration specs live under amber/src/test/integration/
(parallel to scala/ and java/, room for a future python/).
- amber/build.sbt: Test/unmanagedSourceDirectories +=
src/test/integration so scalafmtCheckAll / scalafixAll --check
cover the integration sources naturally; AMBER_TEST_FILTER
env-var-driven testOptions add -l / -n
org.apache.texera.amber.tags.IntegrationTest.
- Annotation: IntegrationTest.java (Java @interface with
@TagAnnotation). Scala class extends StaticAnnotation would not
be visible to ScalaTest's reflection; routing is path-based but
the annotation makes the marker functional too.
- TestUtils.scala: extract shouldReconfigure helper so both
ReconfigurationSpec and ReconfigurationIntegrationSpec can call
it without duplicating the workflow-driver logic.
- Test split: ReconfigurationSpec keeps two pure-Scala tests
(Java operator + source-operator guardrail);
ReconfigurationIntegrationSpec carries the three Python-UDF
tests, annotated @IntegrationTest.
- build.yml:
- amber job: drops setup-python + uv install + the docker-java
step. Test step sets AMBER_TEST_FILTER=skip-integration.
- new amber-integration job: mirrors amber's JDK + sbt + Postgres
setup, adds Python install, runs scalafmtCheckAll +
scalafixAll --check + WorkflowExecutionService/test under
AMBER_TEST_FILTER=integration-only. No jacoco — these specs
cover code paths already in amber's unit-test coverage.
- labeler.yml:
- engine enumerates non-Python amber subdirs (excludes
src/main/python/** and src/test/integration/**).
- new amber-integration label matches src/test/integration/**.
- python label additionally matches **/requirements.txt and
**/*-requirements.txt so dep-only PRs still hit the python +
amber-integration stacks.
- required-checks.yml LABEL_STACKS: adds amber-integration as its
own stack. python -> [amber-integration, python] (no full Scala
unit suite). engine -> [amber, amber-integration]. common /
ddl-change -> [amber, amber-integration, platform].
amber-integration -> [amber-integration].
(backported from commit f49285d8cc0ce8cf73987d04b6e013e1d66c5ba2)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.github/labeler.yml | 39 +++-
.github/workflows/build.yml | 127 ++++++++++---
.github/workflows/required-checks.yml | 62 ++++---
amber/build.sbt | 22 +++
.../e2e/ReconfigurationIntegrationSpec.scala} | 158 ++--------------
.../apache/texera/amber/tags/IntegrationTest.java | 52 ++++++
.../amber/engine/e2e/ReconfigurationSpec.scala | 199 +--------------------
.../apache/texera/amber/engine/e2e/TestUtils.scala | 112 +++++++++++-
8 files changed, 394 insertions(+), 377 deletions(-)
diff --git a/.github/labeler.yml b/.github/labeler.yml
index 64551a1e0f..5a1b2db0d8 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -45,15 +45,52 @@ agent-service:
- 'agent-service/**'
engine:
+ # Non-Python, non-integration parts of amber/. Pure Python changes
+ # under amber/src/main/python/** intentionally fall through to the
+ # `python` label (which the labeler also matches via **/*.py), so
+ # they only trigger the python + amber-integration stacks rather
+ # than the full Scala-only `amber` stack. Integration specs live
+ # under amber/src/test/integration/** (added to sbt's Test sources
+ # via amber/build.sbt) and are caught by the `amber-integration`
+ # label below. Top-level Scala formatter / linter configs are
+ # included so a PR that only updates them still triggers the Scala
+ # stacks instead of producing an empty stack union.
- changed-files:
- any-glob-to-any-file:
- - 'amber/**'
+ - 'amber/build.sbt'
+ - 'amber/.scalafmt.conf'
+ - 'amber/.scalafix.conf'
+ - 'amber/project/**'
+ - 'amber/src/main/scala/**'
+ - 'amber/src/main/java/**'
+ - 'amber/src/main/protobuf/**'
+ - 'amber/src/main/resources/**'
+ - 'amber/src/test/scala/**'
+ - 'amber/src/test/java/**'
+
+amber-integration:
+ # Scala specs that exercise both Scala and Python end-to-end. They
+ # live under amber/src/test/integration/** (sbt picks them up via
+ # `Test / unmanagedSourceDirectories += .../test/integration` in
+ # amber/build.sbt) and are tagged @org.apache.texera.amber.tags
+ # .IntegrationTest. This label maps to the amber-integration stack
+ # only, so a PR that touches just an integration spec does not pay
+ # for the full Scala-only `amber` job.
+ - changed-files:
+ - any-glob-to-any-file:
+ - 'amber/src/test/integration/**'
python:
+ # Includes pip requirement manifests so dependency-only PRs still
+ # exercise the Python + amber-integration stacks. Without this a
+ # bumped requirements.txt would only get `dependencies` (no stack
+ # mapping) and silently skip CI for the very deps it's changing.
- changed-files:
- any-glob-to-any-file:
- 'amber/src/main/python/**'
- '**/*.py'
+ - '**/requirements.txt'
+ - '**/*-requirements.txt'
docs:
- changed-files:
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index ec18dd0b6c..8bc07eedb1 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -44,6 +44,10 @@ on:
required: false
type: boolean
default: true
+ run_amber_integration:
+ required: false
+ type: boolean
+ default: true
run_platform:
required: false
type: boolean
@@ -174,12 +178,6 @@ jobs:
with:
distribution: "temurin"
java-version: 11
- - name: Setup Python for Scala tests
- uses: actions/setup-python@v6
- with:
- python-version: "3.11"
- - name: Show Python
- run: python --version || python3 --version
- name: Create Databases
# Must run before any sbt compile step: the build's JOOQ source
# generators connect to texera_db while compiling.
@@ -220,16 +218,6 @@ jobs:
/tmp/dists/amber-*/lib || check_exit=$?
./bin/licensing/audit_jar_licenses.py /tmp/dists/amber-*/lib || true
exit "$check_exit"
- - name: Install dependencies
- # Only the backend test step needs the python deps; install just
- # before tests so a lint or dist failure does not pay for them.
- # Use uv for speed — the scala job's binary license check is
- # jar-only, so any pip-vs-uv resolver differences in the Python
- # tree don't affect amber/LICENSE-binary-python here.
- run: |
- python -m pip install uv
- if [ -f amber/requirements.txt ]; then uv pip install --system -r
amber/requirements.txt; fi
- if [ -f amber/operator-requirements.txt ]; then uv pip install
--system -r amber/operator-requirements.txt; fi
- name: Create texera_db_for_test_cases
run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases
-f sql/texera_ddl.sql
env:
@@ -241,10 +229,16 @@ jobs:
- name: Run amber and common module tests with coverage
# 'jacoco' runs tests under sbt-jacoco's JVM agent and emits per-
# module jacoco.xml that the codecov upload step picks up.
- # `WorkflowExecutionService/jacoco` only runs that project's tests
- # (sbt's `test` task does not transit dependsOn), so common
- # modules' tests are listed explicitly here. Modules with no
- # tests (Auth, Config) are skipped.
+ # `WorkflowExecutionService/jacoco` only runs that project's
+ # Test config (sbt's `test` task does not transit dependsOn),
+ # so common modules' tests are listed explicitly here. Modules
+ # with no tests (Auth, Config) are skipped.
+ #
+ # AMBER_TEST_FILTER=skip-integration tells amber/build.sbt to
+ # exclude @org.apache.texera.amber.tags.IntegrationTest specs;
+ # those run in the amber-integration job below.
+ env:
+ AMBER_TEST_FILTER: skip-integration
run: |
sbt "DAO/jacoco" \
"PyBuilder/jacoco" \
@@ -260,6 +254,99 @@ jobs:
flags: amber
fail_ci_if_error: false
+ amber-integration:
+ # Runs Scala tests tagged @org.apache.texera.amber.tags.IntegrationTest —
+ # currently the e2e specs that spawn Python UDF workers. Provisions
+ # Python deps that the lighter `amber` job no longer installs. Cross-
+ # cutting lints (scalafmt / scalafix) and the amber dist + binary
+ # license check stay in `amber`; this job is tests-only.
+ if: ${{ inputs.run_amber_integration }}
+ strategy:
+ matrix:
+ os: [ubuntu-22.04]
+ java-version: [11]
+ runs-on: ${{ matrix.os }}
+ env:
+ JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M
-Dfile.encoding=UTF-8
+ JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M
-Dfile.encoding=UTF-8
+ services:
+ postgres:
+ image: postgres
+ env:
+ POSTGRES_PASSWORD: postgres
+ ports:
+ - 5432:5432
+ options: >-
+ --health-cmd="pg_isready -U postgres"
+ --health-interval=10s
+ --health-timeout=5s
+ --health-retries=5
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v5
+ with:
+ ref: ${{ inputs.checkout_ref || github.sha }}
+ fetch-depth: 0
+ - name: Prepare backport workspace
+ if: ${{ inputs.backport_target_branch != '' }}
+ working-directory: ${{ github.workspace }}
+ run: bash ./.github/scripts/prepare-backport-checkout.sh "${{
inputs.backport_target_branch }}" "${{ inputs.backport_commit_range }}"
+ - name: Setup JDK
+ uses: actions/setup-java@v5
+ with:
+ distribution: "temurin"
+ java-version: 11
+ - name: Setup Python for Scala-Python integration tests
+ uses: actions/setup-python@v6
+ with:
+ python-version: "3.11"
+ - name: Show Python
+ run: python --version || python3 --version
+ - name: Install Python dependencies
+ # The integration tests spawn Python UDF workers; install
+ # everything they need on the host. uv for speed; no licensing
+ # concerns because no dist is built here.
+ run: |
+ python -m pip install uv
+ if [ -f amber/requirements.txt ]; then uv pip install --system -r
amber/requirements.txt; fi
+ if [ -f amber/operator-requirements.txt ]; then uv pip install
--system -r amber/operator-requirements.txt; fi
+ - name: Create Databases
+ run: |
+ psql -h localhost -U postgres -f sql/texera_ddl.sql
+ psql -h localhost -U postgres -f sql/iceberg_postgres_catalog.sql
+ psql -h localhost -U postgres -f sql/texera_lakefs.sql
+ env:
+ PGPASSWORD: postgres
+ - name: Setup sbt launcher
+ uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+ - uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 #
v8.1.0
+ with:
+ extraSbtFiles: '["*.sbt", "project/**.{scala,sbt}",
"project/build.properties" ]'
+ - name: Create texera_db_for_test_cases
+ run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases
-f sql/texera_ddl.sql
+ env:
+ PGPASSWORD: postgres
+ - name: Lint and run amber integration tests
+ # AMBER_TEST_FILTER=integration-only tells amber/build.sbt to
+ # keep only @org.apache.texera.amber.tags.IntegrationTest
+ # specs. The Java @TagAnnotation makes the marker visible to
+ # ScalaTest's reflection, so `-n TAG` correctly narrows the
+ # run.
+ #
+ # scalafmtCheckAll + scalafixAll --check are run here as well
+ # because an integration-only PR fires only the
+ # `amber-integration` label; the amber job's own cross-cutting
+ # lint would not run, and the change would otherwise land
+ # unlinted. Costs ~30s when amber also runs, which is fine.
+ # No jacoco — these specs exercise code paths already covered
+ # by amber's unit-test coverage.
+ env:
+ AMBER_TEST_FILTER: integration-only
+ run: |
+ sbt scalafmtCheckAll \
+ "scalafixAll --check" \
+ "WorkflowExecutionService/test"
+
platform:
# Per-service build, test, and license check for the non-amber Scala
# services. Each matrix entry runs its own dist + test in isolation
diff --git a/.github/workflows/required-checks.yml
b/.github/workflows/required-checks.yml
index 9046682bd7..af4fcc9ea8 100644
--- a/.github/workflows/required-checks.yml
+++ b/.github/workflows/required-checks.yml
@@ -60,6 +60,7 @@ jobs:
outputs:
run_frontend: ${{ steps.decide.outputs.run_frontend }}
run_amber: ${{ steps.decide.outputs.run_amber }}
+ run_amber_integration: ${{ steps.decide.outputs.run_amber_integration }}
run_platform: ${{ steps.decide.outputs.run_platform }}
run_python: ${{ steps.decide.outputs.run_python }}
run_agent_service: ${{ steps.decide.outputs.run_agent_service }}
@@ -115,35 +116,48 @@ jobs:
// labeler matches lives under a component dir and is already
// covered by that component's label.
//
- // label | frontend | amber | platform | python |
agent-service
- //
---------------|----------|-------|----------|--------|--------------
- // frontend | x | | | |
- // python | | x | | x |
- // engine | | x | | x |
- // platform | | | x | |
- // agent-service | | | | | x
- // common | | x | x | |
(also catches
- // root
scala
- //
build/lint
- //
config)
- // ddl-change | | x | x | |
- // ci | x | x | x | x | x
- // docs / dev / | | | | |
- // deps / release/| | | | |
- // * / branch | | | | |
+ // label | frontend | amber | amber-integ | platform
| python | agent-service
+ //
------------------|----------|-------|-------------|----------|--------|--------------
+ // frontend | x | | |
| |
+ // python | | | x |
| x |
+ // engine | | x | x |
| |
+ // amber-integration | | | x |
| |
+ // platform | | | | x
| |
+ // agent-service | | | |
| | x
+ // common | | x | x | x
| | (root
+ //
scala
+ //
build/lint
+ //
config)
+ // ddl-change | | x | x | x
| |
+ // ci | x | x | x | x
| x | x
+ // docs / dev / | | | |
| |
+ // deps / release/ | | | |
| |
+ // * / branch | | | |
| |
+ //
+ // amber-integration runs the Scala tests tagged
+ // @org.apache.texera.amber.tags.IntegrationTest (e2e specs that
+ // spawn Python UDF workers). The labeler attaches `python` to
+ // any *.py change (including amber/src/main/python/**), so
+ // `engine` does not need to fire the python stack itself —
+ // pure-Python amber changes pick up `python` directly. The
+ // `amber-integration` label catches *IntegrationSpec.scala
+ // edits so a test-only change does not trigger the full
+ // Scala-only amber stack.
const LABEL_STACKS = {
frontend: ["frontend"],
- python: ["amber", "python"], // pyamber drives amber
integration tests too
- engine: ["amber", "python"], // amber/** spans both
- platform: ["platform"], // platform services
+ python: ["amber-integration", "python"],
+ engine: ["amber", "amber-integration"],
+ "amber-integration": ["amber-integration"],
+ platform: ["platform"],
"agent-service": ["agent-service"],
- common: ["amber", "platform"], // common/** + root scala
build/lint
- "ddl-change": ["amber", "platform"],
- ci: ["frontend", "amber", "platform", "python", "agent-service"],
+ common: ["amber", "amber-integration", "platform"],
+ "ddl-change": ["amber", "amber-integration", "platform"],
+ ci: ["frontend", "amber", "amber-integration", "platform",
"python", "agent-service"],
};
let runFrontend = true;
let runAmber = true;
+ let runAmberIntegration = true;
let runPlatform = true;
let runPython = true;
let runAgentService = true;
@@ -157,6 +171,7 @@ jobs:
}
runFrontend = stacks.has("frontend");
runAmber = stacks.has("amber");
+ runAmberIntegration = stacks.has("amber-integration");
runPlatform = stacks.has("platform");
runPython = stacks.has("python");
runAgentService = stacks.has("agent-service");
@@ -167,6 +182,7 @@ jobs:
core.setOutput("run_frontend", runFrontend ? "true" : "false");
core.setOutput("run_amber", runAmber ? "true" : "false");
+ core.setOutput("run_amber_integration", runAmberIntegration ?
"true" : "false");
core.setOutput("run_platform", runPlatform ? "true" : "false");
core.setOutput("run_python", runPython ? "true" : "false");
core.setOutput("run_agent_service", runAgentService ? "true" :
"false");
@@ -221,6 +237,7 @@ jobs:
with:
run_frontend: ${{ needs.precheck.outputs.run_frontend == 'true' }}
run_amber: ${{ needs.precheck.outputs.run_amber == 'true' }}
+ run_amber_integration: ${{ needs.precheck.outputs.run_amber_integration
== 'true' }}
run_platform: ${{ needs.precheck.outputs.run_platform == 'true' }}
run_python: ${{ needs.precheck.outputs.run_python == 'true' }}
run_agent_service: ${{ needs.precheck.outputs.run_agent_service ==
'true' }}
@@ -241,6 +258,7 @@ jobs:
job_name_suffix: ""
run_frontend: ${{ needs.precheck.outputs.run_frontend == 'true' }}
run_amber: ${{ needs.precheck.outputs.run_amber == 'true' }}
+ run_amber_integration: ${{ needs.precheck.outputs.run_amber_integration
== 'true' }}
run_platform: ${{ needs.precheck.outputs.run_platform == 'true' }}
run_python: ${{ needs.precheck.outputs.run_python == 'true' }}
run_agent_service: ${{ needs.precheck.outputs.run_agent_service ==
'true' }}
diff --git a/amber/build.sbt b/amber/build.sbt
index 4517d02d7e..09e4628ab8 100644
--- a/amber/build.sbt
+++ b/amber/build.sbt
@@ -54,6 +54,28 @@ concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
// add python as an additional source
Compile / unmanagedSourceDirectories += baseDirectory.value / "src" / "main" /
"python"
+// `amber/src/test/integration` holds Scala specs that exercise both
+// Scala and Python end-to-end (tagged
@org.apache.texera.amber.tags.IntegrationTest).
+// Sits next to `src/test/scala` and `src/test/java`; a future
`src/test/python`
+// can drop in the same way. Adding it to Test/unmanagedSourceDirectories means
+// scalafmtCheckAll / scalafixAll --check naturally cover these sources, and
+// the AMBER_TEST_FILTER env var below routes which tagged subset runs.
+Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "test" /
"integration"
+
+// Test-filter switch driven by the AMBER_TEST_FILTER env var so the
+// amber and amber-integration CI jobs select disjoint subsets without
+// each invocation having to embed a `set Tests.Argument(...)` prefix.
+// skip-integration : exclude @IntegrationTest-tagged specs (amber job)
+// integration-only : include only @IntegrationTest-tagged specs
(amber-integration job)
+// (unset) : run everything (default for local sbt)
+Test / testOptions ++= (sys.env.get("AMBER_TEST_FILTER") match {
+ case Some("skip-integration") =>
+ Seq(Tests.Argument(TestFrameworks.ScalaTest, "-l",
"org.apache.texera.amber.tags.IntegrationTest"))
+ case Some("integration-only") =>
+ Seq(Tests.Argument(TestFrameworks.ScalaTest, "-n",
"org.apache.texera.amber.tags.IntegrationTest"))
+ case _ => Nil
+})
+
// Excluding some proto files:
PB.generate / excludeFilter := "scalapb.proto"
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
similarity index 57%
copy from
amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
copy to
amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
index a125c1be00..768caf079f 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
@@ -19,49 +19,40 @@
package org.apache.texera.amber.engine.e2e
-import com.twitter.util.{Await, Duration, Promise}
import com.typesafe.scalalogging.Logger
import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
import org.apache.texera.amber.clustering.SingleNodeListener
import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
-import org.apache.texera.amber.engine.architecture.controller.{
- ControllerConfig,
- ExecutionStateUpdate
-}
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
- EmptyRequest,
- UpdateExecutorRequest,
- WorkflowReconfigureRequest
-}
-import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
- COMPLETED,
- PAUSED
-}
import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.e2e.TestUtils.{
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
- setUpWorkflowExecutionData,
- stateReached
+ setUpWorkflowExecutionData
}
import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
-import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
+import org.apache.texera.amber.tags.IntegrationTest
import org.apache.texera.workflow.LogicalLink
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
import org.scalatest.flatspec.AnyFlatSpecLike
import scala.concurrent.duration._
-class ReconfigurationSpec
- extends TestKit(ActorSystem("ReconfigurationSpec",
AmberRuntime.akkaConfig))
+/**
+ * E2E reconfiguration tests that spawn Python UDF workers. Routed to the
+ * `amber-integration` CI job via the class-level `@IntegrationTest` tag,
+ * which provisions Python deps; the lighter `amber` job excludes this tag.
+ *
+ * Pure-Scala reconfiguration tests live in [[ReconfigurationSpec]] and run
+ * in the regular `amber` job.
+ */
+@IntegrationTest
+class ReconfigurationIntegrationSpec
+ extends TestKit(ActorSystem("ReconfigurationIntegrationSpec",
AmberRuntime.akkaConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
@@ -78,7 +69,7 @@ class ReconfigurationSpec
implicit val timeout: Timeout = Timeout(5.seconds)
- val logger = Logger("ReconfigurationSpecLogger")
+ val logger = Logger("ReconfigurationIntegrationSpecLogger")
val ctx = new WorkflowContext()
override protected def beforeEach(): Unit = {
@@ -101,83 +92,16 @@ class ReconfigurationSpec
TestKit.shutdownActorSystem(system)
}
+ // Thin wrapper around the shared TestUtils helper so call sites below stay
+ // ctx/system-implicit. The actual workflow-driver logic lives in TestUtils
+ // and is reused by ReconfigurationSpec.
def shouldReconfigure(
operators: List[LogicalOp],
links: List[LogicalLink],
targetOps: Seq[LogicalOp],
newOpExecInitInfo: OpExecInitInfo
- ): Map[OperatorIdentity, List[Tuple]] = {
- val workflow =
- TestUtils.buildWorkflow(operators, links, ctx)
- val client =
- new AmberClient(
- system,
- workflow.context,
- workflow.physicalPlan,
- ControllerConfig.default,
- error => {}
- )
- val completion = Promise[Unit]()
- var result: Map[OperatorIdentity, List[Tuple]] = null
- client
- .registerCallback[ExecutionStateUpdate](evt => {
- if (evt.state == COMPLETED) {
- result = workflow.logicalPlan.getTerminalOperatorIds
- .filter(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- )
- uri.nonEmpty
- })
- .map(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- ).get
- terminalOpId -> DocumentFactory
- .openDocument(uri)
- ._1
- .asInstanceOf[VirtualDocument[Tuple]]
- .get()
- .toList
- })
- .toMap
- completion.setDone()
- }
- })
- Await.result(
- client.controllerInterface.startWorkflow(EmptyRequest(), ()),
- Duration.fromSeconds(5)
- )
- val pausedReached = stateReached(client, PAUSED)
- Await.result(
- client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
- Duration.fromSeconds(5)
- )
- Await.result(pausedReached, Duration.fromSeconds(10))
- val physicalOps = targetOps.flatMap(op =>
- workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
- )
- Await.result(
- client.controllerInterface.reconfigureWorkflow(
- WorkflowReconfigureRequest(
- reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id,
newOpExecInitInfo)),
- reconfigurationId = "test-reconfigure-1"
- ),
- ()
- ),
- Duration.fromSeconds(5)
- )
- Await.result(
- client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
- Duration.fromSeconds(5)
- )
- Await.result(completion, Duration.fromMinutes(1))
- result
- }
+ ): Map[OperatorIdentity, List[Tuple]] =
+ TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps,
newOpExecInitInfo)
"Engine" should "be able to modify a python UDF worker in workflow" in {
val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
@@ -210,50 +134,6 @@ class ReconfigurationSpec
})
}
- "Engine" should "be able to modify a java operator in workflow" in {
- val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
- val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region",
"ShouldMatchNone")
- val keywordMatchManyOpDesc = TestOperators.keywordSearchOpDesc("Region",
"Asia")
- val result = shouldReconfigure(
- List(sourceOpDesc, keywordMatchNoneOpDesc),
- List(
- LogicalLink(
- sourceOpDesc.operatorIdentifier,
- PortIdentity(),
- keywordMatchNoneOpDesc.operatorIdentifier,
- PortIdentity()
- )
- ),
- Seq(keywordMatchNoneOpDesc),
- keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId,
ctx.executionId).opExecInitInfo
- )
- assert(result(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty)
- }
-
- "Engine" should "not be able to modify a source operator in workflow" in {
- val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
- val sourceOpDesc2 = TestOperators.mediumCsvScanOpDesc()
- val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region",
"ShouldMatchNone")
- val ex = intercept[Throwable] {
- shouldReconfigure(
- List(sourceOpDesc, keywordMatchNoneOpDesc),
- List(
- LogicalLink(
- sourceOpDesc.operatorIdentifier,
- PortIdentity(),
- keywordMatchNoneOpDesc.operatorIdentifier,
- PortIdentity()
- )
- ),
- Seq(sourceOpDesc),
- sourceOpDesc2.getPhysicalOp(ctx.workflowId,
ctx.executionId).opExecInitInfo
- )
- }
- assert(
- ex.getMessage == "java.lang.IllegalStateException: Reconfiguration
cannot be applied to source operators"
- )
- }
-
"Engine" should "propagate reconfiguration through a source operator in
workflow" in {
val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
val udfOpDesc = TestOperators.pythonOpDesc()
diff --git
a/amber/src/test/integration/org/apache/texera/amber/tags/IntegrationTest.java
b/amber/src/test/integration/org/apache/texera/amber/tags/IntegrationTest.java
new file mode 100644
index 0000000000..024b5594c9
--- /dev/null
+++
b/amber/src/test/integration/org/apache/texera/amber/tags/IntegrationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.tags;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.scalatest.TagAnnotation;
+
+/**
+ * Class-level marker tag for ScalaTest specs that exercise both Scala
+ * and Python end-to-end. Routing to the {@code amber-integration} CI
+ * job is by ScalaTest tag filtering, controlled by the
+ * {@code AMBER_TEST_FILTER} env var in {@code amber/build.sbt}: the
+ * lighter {@code amber} job runs with {@code skip-integration} (which
+ * passes {@code -l org.apache.texera.amber.tags.IntegrationTest} to
+ * ScalaTest), and the {@code amber-integration} job runs with
+ * {@code integration-only} (which passes {@code -n} for the same tag).
+ * The {@code amber/src/test/integration} directory is added to sbt's
+ * {@code Test/unmanagedSourceDirectories} so these specs compile in
+ * the regular Test config; there is no separate sbt configuration.
+ *
+ * <p>Written in Java rather than Scala because ScalaTest detects tag
+ * annotations via {@code java.lang.annotation} reflection. A Scala
+ * {@code class extends StaticAnnotation} does not produce a JVM
+ * annotation interface that {@code @TagAnnotation} can attach to, so
+ * the tag would be invisible to ScalaTest at runtime.
+ */
+@TagAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface IntegrationTest {
+}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index a125c1be00..8cabf8684a 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -19,41 +19,22 @@
package org.apache.texera.amber.engine.e2e
-import com.twitter.util.{Await, Duration, Promise}
import com.typesafe.scalalogging.Logger
import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.executor.OpExecInitInfo
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
-import org.apache.texera.amber.engine.architecture.controller.{
- ControllerConfig,
- ExecutionStateUpdate
-}
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
- EmptyRequest,
- UpdateExecutorRequest,
- WorkflowReconfigureRequest
-}
-import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
- COMPLETED,
- PAUSED
-}
import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.e2e.TestUtils.{
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
- setUpWorkflowExecutionData,
- stateReached
+ setUpWorkflowExecutionData
}
import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
-import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
import org.apache.texera.workflow.LogicalLink
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
import org.scalatest.flatspec.AnyFlatSpecLike
@@ -101,114 +82,16 @@ class ReconfigurationSpec
TestKit.shutdownActorSystem(system)
}
+ // Thin wrapper around the shared TestUtils helper so call sites below stay
+ // ctx/system-implicit. The actual workflow-driver logic lives in TestUtils
+ // and is reused by ReconfigurationIntegrationSpec.
def shouldReconfigure(
operators: List[LogicalOp],
links: List[LogicalLink],
targetOps: Seq[LogicalOp],
newOpExecInitInfo: OpExecInitInfo
- ): Map[OperatorIdentity, List[Tuple]] = {
- val workflow =
- TestUtils.buildWorkflow(operators, links, ctx)
- val client =
- new AmberClient(
- system,
- workflow.context,
- workflow.physicalPlan,
- ControllerConfig.default,
- error => {}
- )
- val completion = Promise[Unit]()
- var result: Map[OperatorIdentity, List[Tuple]] = null
- client
- .registerCallback[ExecutionStateUpdate](evt => {
- if (evt.state == COMPLETED) {
- result = workflow.logicalPlan.getTerminalOperatorIds
- .filter(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- )
- uri.nonEmpty
- })
- .map(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- ).get
- terminalOpId -> DocumentFactory
- .openDocument(uri)
- ._1
- .asInstanceOf[VirtualDocument[Tuple]]
- .get()
- .toList
- })
- .toMap
- completion.setDone()
- }
- })
- Await.result(
- client.controllerInterface.startWorkflow(EmptyRequest(), ()),
- Duration.fromSeconds(5)
- )
- val pausedReached = stateReached(client, PAUSED)
- Await.result(
- client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
- Duration.fromSeconds(5)
- )
- Await.result(pausedReached, Duration.fromSeconds(10))
- val physicalOps = targetOps.flatMap(op =>
- workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
- )
- Await.result(
- client.controllerInterface.reconfigureWorkflow(
- WorkflowReconfigureRequest(
- reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id,
newOpExecInitInfo)),
- reconfigurationId = "test-reconfigure-1"
- ),
- ()
- ),
- Duration.fromSeconds(5)
- )
- Await.result(
- client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
- Duration.fromSeconds(5)
- )
- Await.result(completion, Duration.fromMinutes(1))
- result
- }
-
- "Engine" should "be able to modify a python UDF worker in workflow" in {
- val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
- val udfOpDesc = TestOperators.pythonOpDesc()
- val code = """
- |from pytexera import *
- |
- |class ProcessTupleOperator(UDFOperatorV2):
- | @overrides
- | def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
- | tuple_['Region'] = tuple_['Region'] + '_reconfigured'
- | yield tuple_
- |""".stripMargin
-
- val result = shouldReconfigure(
- List(sourceOpDesc, udfOpDesc),
- List(
- LogicalLink(
- sourceOpDesc.operatorIdentifier,
- PortIdentity(),
- udfOpDesc.operatorIdentifier,
- PortIdentity()
- )
- ),
- Seq(udfOpDesc),
- OpExecWithCode(code, "python")
- )
- assert(result(udfOpDesc.operatorIdentifier).exists { t =>
- t.getField("Region").asInstanceOf[String].contains("_reconfigured")
- })
- }
+ ): Map[OperatorIdentity, List[Tuple]] =
+ TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps,
newOpExecInitInfo)
"Engine" should "be able to modify a java operator in workflow" in {
val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
@@ -254,72 +137,4 @@ class ReconfigurationSpec
)
}
- "Engine" should "propagate reconfiguration through a source operator in
workflow" in {
- val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
- val udfOpDesc = TestOperators.pythonOpDesc()
- val code = """
- |from pytexera import *
- |
- |class ProcessTupleOperator(UDFOperatorV2):
- | @overrides
- | def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
- | tuple_['field_1'] = tuple_['field_1'] +
'_reconfigured'
- | yield tuple_
- |""".stripMargin
- val result = shouldReconfigure(
- List(sourceOpDesc, udfOpDesc),
- List(
- LogicalLink(
- sourceOpDesc.operatorIdentifier,
- PortIdentity(),
- udfOpDesc.operatorIdentifier,
- PortIdentity()
- )
- ),
- Seq(udfOpDesc),
- OpExecWithCode(code, "python")
- )
- assert(result(udfOpDesc.operatorIdentifier).exists { t =>
- t.getField("field_1").asInstanceOf[String].contains("_reconfigured")
- })
- }
-
- "Engine" should "be able to modify two python UDFs in workflow" in {
- val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
- val udfOpDesc1 = TestOperators.pythonOpDesc()
- val udfOpDesc2 = TestOperators.pythonOpDesc()
- val code = """
- |from pytexera import *
- |
- |class ProcessTupleOperator(UDFOperatorV2):
- | @overrides
- | def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
- | tuple_['Region'] = tuple_['Region'] + '_reconfigured'
- | yield tuple_
- |""".stripMargin
-
- val result = shouldReconfigure(
- List(sourceOpDesc, udfOpDesc1, udfOpDesc2),
- List(
- LogicalLink(
- sourceOpDesc.operatorIdentifier,
- PortIdentity(),
- udfOpDesc1.operatorIdentifier,
- PortIdentity()
- ),
- LogicalLink(
- udfOpDesc1.operatorIdentifier,
- PortIdentity(),
- udfOpDesc2.operatorIdentifier,
- PortIdentity()
- )
- ),
- Seq(udfOpDesc1, udfOpDesc2),
- OpExecWithCode(code, "python")
- )
- assert(result(udfOpDesc2.operatorIdentifier).exists { t =>
-
t.getField("Region").asInstanceOf[String].contains("_reconfigured_reconfigured")
- })
- }
-
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index fab5d5a16c..bcc43b396b 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -19,11 +19,30 @@
package org.apache.texera.amber.engine.e2e
-import com.twitter.util.{Promise, Return}
+import com.twitter.util.{Await, Duration, Promise, Return}
+import org.apache.pekko.actor.ActorSystem
import org.apache.texera.amber.config.StorageConfig
-import org.apache.texera.amber.core.workflow.WorkflowContext
-import
org.apache.texera.amber.engine.architecture.controller.{ExecutionStateUpdate,
Workflow}
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.engine.architecture.controller.{
+ ControllerConfig,
+ ExecutionStateUpdate,
+ Workflow
+}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+ EmptyRequest,
+ UpdateExecutorRequest,
+ WorkflowReconfigureRequest
+}
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
+ COMPLETED,
+ PAUSED
+}
import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.operator.LogicalOp
import org.apache.texera.dao.SqlServer
@@ -41,6 +60,7 @@ import org.apache.texera.dao.jooq.generated.tables.pojos.{
Workflow => WorkflowPojo
}
import org.apache.texera.web.model.websocket.request.LogicalPlanPojo
+import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
import org.apache.texera.workflow.{LogicalLink, WorkflowCompiler}
object TestUtils {
@@ -139,6 +159,92 @@ object TestUtils {
p
}
+ /**
+ * Pause a freshly-started workflow, swap the executor for the given target
+ * operators via WorkflowReconfigureRequest, resume, and collect the
+ * terminal-port outputs once the run completes. Shared by
ReconfigurationSpec
+ * (pure-Scala) and ReconfigurationIntegrationSpec (Python-tagged), so an
+ * earlier in-spec copy doesn't drift between the two as new e2e specs
+ * land. The caller passes its own `system` (TestKit) and `ctx`
+ * (WorkflowContext) since both are tied to the spec lifecycle.
+ */
+ def shouldReconfigure(
+ system: ActorSystem,
+ ctx: WorkflowContext,
+ operators: List[LogicalOp],
+ links: List[LogicalLink],
+ targetOps: Seq[LogicalOp],
+ newOpExecInitInfo: OpExecInitInfo
+ ): Map[OperatorIdentity, List[Tuple]] = {
+ val workflow = buildWorkflow(operators, links, ctx)
+ val client = new AmberClient(
+ system,
+ workflow.context,
+ workflow.physicalPlan,
+ ControllerConfig.default,
+ error => {}
+ )
+ val completion = Promise[Unit]()
+ var result: Map[OperatorIdentity, List[Tuple]] = null
+ client.registerCallback[ExecutionStateUpdate](evt => {
+ if (evt.state == COMPLETED) {
+ result = workflow.logicalPlan.getTerminalOperatorIds
+ .filter(terminalOpId => {
+ val uri = getResultUriByLogicalPortId(
+ workflow.context.executionId,
+ terminalOpId,
+ PortIdentity()
+ )
+ uri.nonEmpty
+ })
+ .map(terminalOpId => {
+ val uri = getResultUriByLogicalPortId(
+ workflow.context.executionId,
+ terminalOpId,
+ PortIdentity()
+ ).get
+ terminalOpId -> DocumentFactory
+ .openDocument(uri)
+ ._1
+ .asInstanceOf[VirtualDocument[Tuple]]
+ .get()
+ .toList
+ })
+ .toMap
+ completion.setDone()
+ }
+ })
+ Await.result(
+ client.controllerInterface.startWorkflow(EmptyRequest(), ()),
+ Duration.fromSeconds(5)
+ )
+ val pausedReached = stateReached(client, PAUSED)
+ Await.result(
+ client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
+ Duration.fromSeconds(5)
+ )
+ Await.result(pausedReached, Duration.fromSeconds(10))
+ val physicalOps = targetOps.flatMap(op =>
+ workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
+ )
+ Await.result(
+ client.controllerInterface.reconfigureWorkflow(
+ WorkflowReconfigureRequest(
+ reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id,
newOpExecInitInfo)),
+ reconfigurationId = "test-reconfigure-1"
+ ),
+ ()
+ ),
+ Duration.fromSeconds(5)
+ )
+ Await.result(
+ client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
+ Duration.fromSeconds(5)
+ )
+ Await.result(completion, Duration.fromMinutes(1))
+ result
+ }
+
def cleanupWorkflowExecutionData(): Unit = {
val dslConfig = SqlServer.getInstance().context.configuration()
val userDao = new UserDao(dslConfig)