This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1aa5fa3fab8 Add Go-SDK e2e test (#67956)
1aa5fa3fab8 is described below
commit 1aa5fa3fab8646f240bf567d9ebc7882fb80248e
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Mon Jun 8 13:46:38 2026 +0800
Add Go-SDK e2e test (#67956)
* Add Go-SDK e2e test
* Fix e2e test registration
---
.github/workflows/additional-prod-image-tests.yml | 15 ++
.github/workflows/airflow-e2e-tests.yml | 4 +-
.github/workflows/ci-amd.yml | 2 +
.github/workflows/ci-arm.yml | 2 +
airflow-e2e-tests/docker/go.yml | 37 ++++
.../tests/airflow_e2e_tests/conftest.py | 109 ++++++++++++
.../tests/airflow_e2e_tests/constants.py | 17 ++
.../airflow_e2e_tests/go_sdk_tests/__init__.py | 16 ++
.../go_sdk_tests/test_go_sdk_dag.py | 188 +++++++++++++++++++++
.../images/output_testing_airflow-e2e-tests.svg | 2 +-
.../images/output_testing_airflow-e2e-tests.txt | 2 +-
.../airflow_breeze/commands/testing_commands.py | 1 +
.../src/airflow_breeze/utils/selective_checks.py | 14 ++
dev/breeze/tests/test_selective_checks.py | 26 +++
go-sdk/dags/go_examples.py | 87 ++++++++++
go-sdk/example/bundle/main.go | 1 +
16 files changed, 519 insertions(+), 4 deletions(-)
diff --git a/.github/workflows/additional-prod-image-tests.yml
b/.github/workflows/additional-prod-image-tests.yml
index 62fc9436070..5d31ab33edd 100644
--- a/.github/workflows/additional-prod-image-tests.yml
+++ b/.github/workflows/additional-prod-image-tests.yml
@@ -56,6 +56,10 @@ on: # yamllint disable-line rule:truthy
description: "Whether to run Java SDK e2e tests (true/false)"
required: true
type: string
+ run-go-sdk-e2e-tests:
+ description: "Whether to run Go SDK e2e tests (true/false)"
+ required: true
+ type: string
constraints-branch:
description: "Branch used to construct constraints URL from."
required: true
@@ -305,6 +309,17 @@ jobs:
use-uv: ${{ inputs.use-uv }}
e2e_test_mode: "java_sdk"
if: inputs.canary-run == 'true' || inputs.run-java-sdk-e2e-tests == 'true'
+ test-e2e-integration-tests-go-sdk:
+ name: "Go SDK e2e tests with PROD image"
+ uses: ./.github/workflows/airflow-e2e-tests.yml
+ with:
+ workflow-name: "Go SDK e2e test"
+ runners: ${{ inputs.runners }}
+ platform: ${{ inputs.platform }}
+ default-python-version: "${{ inputs.default-python-version }}"
+ use-uv: ${{ inputs.use-uv }}
+ e2e_test_mode: "go_sdk"
+ if: inputs.canary-run == 'true' || inputs.run-go-sdk-e2e-tests == 'true'
test-ui-e2e-chromium:
name: "Chromium UI e2e tests with PROD image"
diff --git a/.github/workflows/airflow-e2e-tests.yml
b/.github/workflows/airflow-e2e-tests.yml
index 425defc4f97..1b19133c3e4 100644
--- a/.github/workflows/airflow-e2e-tests.yml
+++ b/.github/workflows/airflow-e2e-tests.yml
@@ -49,7 +49,7 @@ on: # yamllint disable-line rule:truthy
type: string
required: true
e2e_test_mode:
- description: "Test mode - basic, remote_log, remote_log_elasticsearch,
remote_log_opensearch, xcom_object_storage, event_driven, or java_sdk" #
yamllint disable-line rule:line-length
+ description: "Test mode - basic, remote_log, remote_log_elasticsearch,
remote_log_opensearch, xcom_object_storage, event_driven, java_sdk, or go_sdk"
# yamllint disable-line rule:line-length
type: string
default: "basic"
@@ -80,7 +80,7 @@ on: # yamllint disable-line rule:truthy
type: string
default: ""
e2e_test_mode:
- description: "Test mode - basic, remote_log, remote_log_elasticsearch,
remote_log_opensearch, xcom_object_storage, event_driven, or java_sdk" #
yamllint disable-line rule:line-length
+ description: "Test mode - basic, remote_log, remote_log_elasticsearch,
remote_log_opensearch, xcom_object_storage, event_driven, java_sdk, or go_sdk"
# yamllint disable-line rule:line-length
type: string
default: "basic"
diff --git a/.github/workflows/ci-amd.yml b/.github/workflows/ci-amd.yml
index a4f82608655..f411d43d2b3 100644
--- a/.github/workflows/ci-amd.yml
+++ b/.github/workflows/ci-amd.yml
@@ -141,6 +141,7 @@ jobs:
run-remote-logging-s3-e2e-tests: ${{
steps.selective-checks.outputs.run-remote-logging-s3-e2e-tests }}
run-event-driven-e2e-tests: ${{
steps.selective-checks.outputs.run-event-driven-e2e-tests }}
run-java-sdk-e2e-tests: ${{
steps.selective-checks.outputs.run-java-sdk-e2e-tests }}
+ run-go-sdk-e2e-tests: ${{
steps.selective-checks.outputs.run-go-sdk-e2e-tests }}
run-system-tests: ${{ steps.selective-checks.outputs.run-system-tests }}
run-task-sdk-tests: ${{
steps.selective-checks.outputs.run-task-sdk-tests }}
run-task-sdk-integration-tests: ${{
steps.selective-checks.outputs.run-task-sdk-integration-tests }}
@@ -880,6 +881,7 @@ jobs:
run-remote-logging-s3-e2e-tests: ${{
needs.build-info.outputs.run-remote-logging-s3-e2e-tests }}
run-event-driven-e2e-tests: ${{
needs.build-info.outputs.run-event-driven-e2e-tests }}
run-java-sdk-e2e-tests: ${{
needs.build-info.outputs.run-java-sdk-e2e-tests }}
+ run-go-sdk-e2e-tests: ${{ needs.build-info.outputs.run-go-sdk-e2e-tests
}}
use-uv: ${{ needs.build-info.outputs.use-uv }}
run-ui-e2e-tests: ${{ needs.build-info.outputs.run-ui-e2e-tests }}
run-airflow-ctl-integration-tests: ${{
needs.build-info.outputs.run-airflow-ctl-integration-tests }}
diff --git a/.github/workflows/ci-arm.yml b/.github/workflows/ci-arm.yml
index 7816892ce7e..732d1e261ca 100644
--- a/.github/workflows/ci-arm.yml
+++ b/.github/workflows/ci-arm.yml
@@ -131,6 +131,7 @@ jobs:
run-remote-logging-s3-e2e-tests: ${{
steps.selective-checks.outputs.run-remote-logging-s3-e2e-tests }}
run-event-driven-e2e-tests: ${{
steps.selective-checks.outputs.run-event-driven-e2e-tests }}
run-java-sdk-e2e-tests: ${{
steps.selective-checks.outputs.run-java-sdk-e2e-tests }}
+ run-go-sdk-e2e-tests: ${{
steps.selective-checks.outputs.run-go-sdk-e2e-tests }}
run-system-tests: ${{ steps.selective-checks.outputs.run-system-tests }}
run-task-sdk-tests: ${{
steps.selective-checks.outputs.run-task-sdk-tests }}
run-task-sdk-integration-tests: ${{
steps.selective-checks.outputs.run-task-sdk-integration-tests }}
@@ -870,6 +871,7 @@ jobs:
run-remote-logging-s3-e2e-tests: ${{
needs.build-info.outputs.run-remote-logging-s3-e2e-tests }}
run-event-driven-e2e-tests: ${{
needs.build-info.outputs.run-event-driven-e2e-tests }}
run-java-sdk-e2e-tests: ${{
needs.build-info.outputs.run-java-sdk-e2e-tests }}
+ run-go-sdk-e2e-tests: ${{ needs.build-info.outputs.run-go-sdk-e2e-tests
}}
use-uv: ${{ needs.build-info.outputs.use-uv }}
run-ui-e2e-tests: ${{ needs.build-info.outputs.run-ui-e2e-tests }}
run-airflow-ctl-integration-tests: ${{
needs.build-info.outputs.run-airflow-ctl-integration-tests }}
diff --git a/airflow-e2e-tests/docker/go.yml b/airflow-e2e-tests/docker/go.yml
new file mode 100644
index 00000000000..c1426963765
--- /dev/null
+++ b/airflow-e2e-tests/docker/go.yml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Docker Compose override for go_sdk E2E test mode.
+#
+# The Go bundle compiled by airflow-go-pack
(conftest._setup_go_sdk_integration)
+# is a self-contained, statically linked executable, so the stock Airflow
worker
+# image can exec it directly -- no extra runtime needs to be installed. We only
+# bind-mount the bundle into the directory the ExecutableCoordinator scans and
+# point the worker at the "golang" queue where @task.stub tasks are routed.
+---
+services:
+ airflow-worker:
+ # The bundle is built with CGO_ENABLED=0, so the SDK's user.Current() call
at
+ # init falls back to Go's pure-Go resolver, which reads $USER / $HOME and
+ # panics if either is empty. The supervisor launches the bundle with the
+ # worker's environment, so set both here.
+ environment:
+ USER: airflow
+ HOME: /home/airflow
+ volumes:
+ - ./go-bundles:/opt/airflow/go-bundles:ro
+ command: celery worker -q golang,default
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
index aa4e51d7f54..afef387b5a2 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
@@ -37,6 +37,12 @@ from airflow_e2e_tests.constants import (
E2E_DAGS_FOLDER,
E2E_TEST_MODE,
ELASTICSEARCH_PATH,
+ GO_BUILDER_IMAGE,
+ GO_COMPOSE_PATH,
+ GO_SDK_BIN_PATH,
+ GO_SDK_BUNDLE_NAME,
+ GO_SDK_DAGS_PATH,
+ GO_SDK_EXAMPLE_BUNDLE_PKG,
JAVA_COMPOSE_PATH,
JAVA_DOCKERFILE_PATH,
JAVA_SDK_DAGS_PATH,
@@ -333,6 +339,106 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir):
os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+def _setup_go_sdk_integration(dot_env_file, tmp_dir):
+ """Set up the go_sdk E2E test mode.
+
+ Compiles the Go SDK example bundle into a self-contained executable bundle
+ via the ``airflow-go-pack`` tooling, drops it into the directory the
+ ``ExecutableCoordinator`` scans, copies the Python stub Dag, and writes the
+ coordinator configuration.
+
+ The packed bundle is a statically linked native executable (built with
+ ``CGO_ENABLED=0``), so the stock Airflow worker image can exec it directly
+ without a Go toolchain or any extra runtime installed -- see ``go.yml``.
+ """
+ # Build + pack the example bundle inside an ephemeral Go container so the
+ # host does not need Go installed.
+ #
+ # --user keeps build outputs owned by the current user (not root).
+ # HOME points at a writable, gitignored dir under go-sdk/bin so the Go
build
+ # and module caches persist between runs (first run downloads modules once;
+ # subsequent runs skip straight to compilation).
+ # CGO_ENABLED=0 yields a fully static binary that runs on the stock worker.
+ # USER/HOME must be set because the SDK calls user.Current() at init; with
+ # cgo disabled Go's pure-Go resolver reads those env vars instead of libc,
+ # and panics if either is empty (the same vars are set on the worker in
+ # go.yml so the packed binary runs the same way at execution time).
+ # `go tool airflow-go-pack` builds the bundle package, reads its
+ # --airflow-metadata, and appends the source + airflow-metadata.yaml + the
+ # AFBNDL01 trailer, writing a single self-contained executable bundle.
+ go_cache_home = "/repo/go-sdk/bin/.home"
+ bundle_out = f"/repo/go-sdk/bin/{GO_SDK_BUNDLE_NAME}"
+ console.print(f"[yellow]Building Go SDK example bundle
({GO_BUILDER_IMAGE})...")
+ subprocess.run(
+ [
+ "docker",
+ "run",
+ "--rm",
+ "--user",
+ f"{os.getuid()}:{os.getgid()}",
+ "-e",
+ f"HOME={go_cache_home}",
+ "-e",
+ "USER=airflow",
+ "-e",
+ "CGO_ENABLED=0",
+ # Mount the repo so the whole go-sdk module (go.mod, tool
directive,
+ # example sources) is visible to `go tool`.
+ "-v",
+ f"{AIRFLOW_ROOT_PATH}:/repo",
+ "-w",
+ "/repo/go-sdk",
+ GO_BUILDER_IMAGE,
+ "go",
+ "tool",
+ "airflow-go-pack",
+ "--output",
+ bundle_out,
+ GO_SDK_EXAMPLE_BUNDLE_PKG,
+ ],
+ check=True,
+ )
+
+ # Copy the compose override into the temp directory.
+ copyfile(GO_COMPOSE_PATH, tmp_dir / "go.yml")
+
+ # Place the packed bundle where the compose bind-mount (./go-bundles)
exposes
+ # it to the worker at /opt/airflow/go-bundles. The bundle scanner requires
+ # the file to be executable, so preserve the exec bit.
+ go_bundles_dir = tmp_dir / "go-bundles"
+ go_bundles_dir.mkdir()
+ packed_bundle = go_bundles_dir / GO_SDK_BUNDLE_NAME
+ copyfile(GO_SDK_BIN_PATH / GO_SDK_BUNDLE_NAME, packed_bundle)
+ os.chmod(packed_bundle, 0o755)
+
+ # Copy the Go SDK example stub Dag so Airflow can discover and serialize
it.
+ copyfile(GO_SDK_DAGS_PATH / "go_examples.py", tmp_dir / "dags" /
"go_examples.py")
+
+ # Coordinator registry: maps the logical name "go-sdk" to
ExecutableCoordinator,
+ # which scans executables_root for the packed bundle by dag_id.
+ # Queue mapping: routes tasks on the "golang" queue to "go-sdk".
+ coordinator_config = json.dumps(
+ {
+ "go-sdk": {
+ "classpath":
"airflow.sdk.coordinators.executable.ExecutableCoordinator",
+ "kwargs": {"executables_root": ["/opt/airflow/go-bundles"]},
+ }
+ }
+ )
+ queue_to_coordinator = json.dumps({"golang": "go-sdk"})
+
+ dot_env_file.write_text(
+ f"AIRFLOW_UID={os.getuid()}\n"
+ # Single-quote the JSON values so Docker Compose reads them literally.
+ f"AIRFLOW__SDK__COORDINATORS='{coordinator_config}'\n"
+ f"AIRFLOW__SDK__QUEUE_TO_COORDINATOR='{queue_to_coordinator}'\n"
+ # Connection and variable read by the Go example bundle tasks.
+ "AIRFLOW_CONN_TEST_HTTP=http://test:[email protected]/\n"
+ "AIRFLOW_VAR_MY_VARIABLE=test_value\n"
+ )
+ os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+
+
def spin_up_airflow_environment(tmp_path_factory: pytest.TempPathFactory):
tmp_dir = tmp_path_factory.mktemp("breeze-airflow-e2e-tests")
@@ -377,6 +483,9 @@ def spin_up_airflow_environment(tmp_path_factory:
pytest.TempPathFactory):
elif E2E_TEST_MODE == "java_sdk":
compose_file_names.append("java.yml")
_setup_java_sdk_integration(dot_env_file, tmp_dir)
+ elif E2E_TEST_MODE == "go_sdk":
+ compose_file_names.append("go.yml")
+ _setup_go_sdk_integration(dot_env_file, tmp_dir)
#
# Please Do not use this Fernet key in any deployments! Please generate
your own key.
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
index a81eeb0d6a0..259f8256fd8 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
@@ -57,6 +57,23 @@ JAVA_SDK_EXAMPLE_LIBS_PATH = JAVA_SDK_ROOT_PATH / "example"
/ "build" / "install
JAVA_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" /
"java.yml"
JAVA_DOCKERFILE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" /
"Dockerfile.java"
+# Go SDK E2E test paths
+GO_SDK_ROOT_PATH = AIRFLOW_ROOT_PATH / "go-sdk"
+GO_SDK_DAGS_PATH = GO_SDK_ROOT_PATH / "dags"
+# Package directory holding func main() for the example bundle; airflow-go-pack
+# builds and packs this into a self-contained executable bundle.
+GO_SDK_EXAMPLE_BUNDLE_PKG = "./example/bundle"
+# Name of the packed bundle binary (matches the example bundle's package dir
name).
+GO_SDK_BUNDLE_NAME = "example_dags"
+# Where airflow-go-pack writes the packed bundle inside the repo (go-sdk/bin
is gitignored).
+GO_SDK_BIN_PATH = GO_SDK_ROOT_PATH / "bin"
+GO_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "go.yml"
+# Go toolchain image used to build the bundle; must satisfy go-sdk/go.mod's
toolchain.
+# The Alpine variant is ~7x smaller than the Debian one and is safe here
because the
+# bundle is built with CGO_ENABLED=0 (a fully static binary, independent of
musl/glibc)
+# and module fetches go through the HTTPS proxy (no git/gcc needed).
+GO_BUILDER_IMAGE = os.environ.get("GO_BUILDER_IMAGE", "golang:1.24-alpine")
+
# Local provider sources are mounted into the airflow containers under this
directory so
# ``_PIP_ADDITIONAL_REQUIREMENTS`` can install the in-tree (latest, possibly
unreleased)
# provider rather than the published one from PyPI.
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/__init__.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/test_go_sdk_dag.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/test_go_sdk_dag.py
new file mode 100644
index 00000000000..6cd7ff0779d
--- /dev/null
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/test_go_sdk_dag.py
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""E2E tests for the Go SDK via the compiled example bundle.
+
+Run with::
+
+ E2E_TEST_MODE=go_sdk uv run --project airflow-e2e-tests pytest \\
+ tests/airflow_e2e_tests/go_sdk_tests/ -xvs
+
+What is verified
+----------------
+``conftest._setup_go_sdk_integration`` compiles ``go-sdk/example/bundle`` into
a
+self-contained executable bundle with the ``airflow-go-pack`` tooling and drops
+it into the directory the ``ExecutableCoordinator`` scans. The ``simple_dag``
+Dag (``go-sdk/dags/go_examples.py``) sandwiches the Go tasks between two native
+Python tasks::
+
+ python_task_1 >> extract >> transform >> [load, python_task_2]
+
+* ``extract`` / ``transform`` / ``load`` are ``@task.stub(queue="golang")``
tasks
+ whose implementations live in the Go bundle (``main.go``). The ``golang``
queue
+ is routed to ``ExecutableCoordinator``, which locates the bundle by dag_id,
+ launches the binary with ``--comm`` / ``--logs``, and drives it through the
+ msgpack-over-IPC coordinator protocol.
+* ``python_task_1`` (Python) pushes an XCom; ``extract`` (Go) fetches the
+ ``test_http`` connection and returns ``{go_version, timestamp}``;
``transform``
+ (Go) reads ``my_variable``; ``load`` (Go) returns an error on purpose;
+ ``python_task_2`` (Python) pulls and re-emits the Go ``extract`` task's XCom.
+
+The Dag is triggered exactly once by the module-scoped ``completed_run``
fixture;
+each test asserts a different facet of that single run. Together they confirm,
+end-to-end:
+
+1. ``ExecutableCoordinator`` discovers the AFBNDL01 bundle by dag_id and runs
the
+ binary in coordinator mode for every Go task, reporting ``SucceedTask`` for
+ extract/transform and a failed ``TaskState`` for load.
+2. Connection / Variable reads and XCom writes work through the Task Execution
+ API, XCom values keep their types (the ``timestamp`` stays an ``int``), and
+ XCom crosses the Python <-> Go boundary in both directions.
+3. Structured task logs emitted by the Go binary over the coordinator logs
+ channel reach Airflow's task-log store.
+"""
+
+from __future__ import annotations
+
+import time
+from dataclasses import dataclass
+from datetime import datetime, timezone
+
+import pytest
+
+from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient
+
+# The Go extract task sleeps ~20 s + coordinator startup; allow plenty of room.
+_GO_TASK_TIMEOUT = 600
+# Task logs are written when the task finishes; allow a little slack for them
to
+# become retrievable through the API after the run reaches a terminal state.
+_LOG_FETCH_TIMEOUT = 120
+
+_DAG_ID = "simple_dag"
+
+
+@dataclass
+class _CompletedRun:
+ """The single ``simple_dag`` run shared across the assertions in this
module."""
+
+ client: AirflowClient
+ run_id: str
+ state: str
+ ti_states: dict[str, str]
+
+ def xcom(self, task_id: str, key: str = "return_value"):
+ return self.client.get_xcom_value(dag_id=_DAG_ID, task_id=task_id,
run_id=self.run_id, key=key).get(
+ "value"
+ )
+
+ def logs(self, task_id: str, try_number: int = 1) -> str:
+ """Return the concatenated task-log records for *task_id*, retrying
until present."""
+ deadline = time.monotonic() + _LOG_FETCH_TIMEOUT
+ while True:
+ resp = self.client.get_task_logs(
+ dag_id=_DAG_ID, run_id=self.run_id, task_id=task_id,
try_number=try_number
+ )
+ text = "\n".join(str(entry) for entry in resp.get("content", []))
+ if text.strip() or time.monotonic() > deadline:
+ return text
+ time.sleep(3)
+
+
[email protected](scope="module")
+def completed_run() -> _CompletedRun:
+ """Trigger ``simple_dag`` once and wait for it to finish.
+
+ Module-scoped so the (multi-minute) Go run happens a single time; every
test
+ in this module inspects the resulting states, XComs, and logs.
+ """
+ client = AirflowClient()
+ resp = client.trigger_dag(_DAG_ID, json={"logical_date":
datetime.now(timezone.utc).isoformat()})
+ run_id = resp["dag_run_id"]
+ state = client.wait_for_dag_run(dag_id=_DAG_ID, run_id=run_id,
timeout=_GO_TASK_TIMEOUT)
+ ti_resp = client.get_task_instances(dag_id=_DAG_ID, run_id=run_id)
+ ti_states = {ti["task_id"]: ti.get("state") for ti in
ti_resp.get("task_instances", [])}
+ return _CompletedRun(client=client, run_id=run_id, state=state,
ti_states=ti_states)
+
+
+def test_task_states(completed_run: _CompletedRun):
+ """Every task ends in its expected state (the Go ``load`` task fails on
purpose)."""
+ expected = {
+ "python_task_1": "success",
+ "extract": "success",
+ "transform": "success",
+ "load": "failed",
+ "python_task_2": "success",
+ }
+ for task_id, want in expected.items():
+ assert completed_run.ti_states.get(task_id) == want, (
+ f"{task_id!r} expected {want!r}. all task states:
{completed_run.ti_states}"
+ )
+
+
+def test_dag_run_failed(completed_run: _CompletedRun):
+ """The failing ``load`` leaf makes the overall run fail."""
+ assert completed_run.state == "failed", (
+ f"expected the run to fail because 'load' fails; got
{completed_run.state!r}. "
+ f"task states: {completed_run.ti_states}"
+ )
+
+
+def test_python_task_1_pushes_xcom(completed_run: _CompletedRun):
+ """The upstream Python task's XCom is available (Python -> XCom)."""
+ assert completed_run.xcom("python_task_1") == "value_from_python_task_1"
+
+
+def test_extract_xcom_has_go_version_and_int_timestamp(completed_run:
_CompletedRun):
+ """The map returned by the Go 'extract' task round-trips through XCom (Go
-> XCom)."""
+ value = completed_run.xcom("extract")
+ assert isinstance(value, dict), (
+ f"Expected 'extract' XCom to be a mapping, got {value!r}
({type(value).__name__})"
+ )
+ assert str(value.get("go_version", "")).startswith("go"), (
+ f"Expected go_version to look like a Go runtime version, got
{value.get('go_version')!r}"
+ )
+ timestamp = value.get("timestamp")
+ assert isinstance(timestamp, int), (
+ f"Expected 'timestamp' to be an int, got {timestamp!r}
({type(timestamp).__name__})"
+ )
+ assert timestamp > 0, f"Expected a positive nanosecond timestamp, got
{timestamp!r}"
+
+
+def test_xcom_crosses_go_to_python(completed_run: _CompletedRun):
+ """python_task_2 pulled the Go 'extract' XCom and re-emitted it unchanged
(Go -> Python)."""
+ assert completed_run.xcom("python_task_2") == completed_run.xcom("extract")
+
+
+def test_extract_logs_show_beep_loop(completed_run: _CompletedRun):
+ """The Go 'extract' task's structured logs reach Airflow's task log."""
+ logs = completed_run.logs("extract")
+ beeps = logs.count("After the beep the time will be")
+ assert beeps == 10, f"expected 10 'After the beep the time will be' log
lines from extract, got {beeps}"
+ assert "Goodbye from task" in logs, "extract task should log 'Goodbye from
task'"
+
+
+def test_transform_logs_show_variable_read(completed_run: _CompletedRun):
+ """The Go 'transform' task logs the variable it read."""
+ assert "Obtained variable" in completed_run.logs("transform"), (
+ "transform task should log 'Obtained variable'"
+ )
+
+
+def test_load_logs_show_failure(completed_run: _CompletedRun):
+ """The Go 'load' task's error surfaces in its task log."""
+ assert "Please fail" in completed_run.logs("load"), (
+ "load task log should contain its failure message 'Please fail'"
+ )
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
index 290e96c8a32..0ba82f2e913 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
@@ -135,7 +135,7 @@
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="312.8"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="312.8"
textLength="73.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">(TEXT)</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="312.8"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text
class="breez [...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="337.2"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">│</text><text
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="337.2" textLength="366"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">--e2e-test-mode               </text><text
class="breeze-testing-airflow-e2e-tests-r1" x="463.6" y="337.2" textLen [...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="361.6"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="361.6"
textLength="976"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">(basic|remote_log|remote_log_elasticsearch|remote_log_opensearch|xcom_object_sto</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="361.6"
textLength="12.2" clip-path="u [...]
-</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="386"
textLength="329.4"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">rage|event_driven|java_sdk)</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="386"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><t [...]
+</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="386"
textLength="414.8"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">rage|event_driven|java_sdk|go_sdk)</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="386"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</ [...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="410.4"
textLength="1464"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯</text><text
class="breeze-testing-airflow-e2e-tests-r1" x="1464" y="410.4"
textLength="12.2" clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="434.8"
textLength="24.4"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">╭─</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="24.4" y="434.8"
textLength="195.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)"> Common options </text><text
class="breeze-testing-airflow-e2e-tests-r5" x="219.6" y="434.8"
textLength="1220" clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">─
[...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="459.2"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">│</text><text
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="459.2"
textLength="109.8"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">--verbose</text><text
class="breeze-testing-airflow-e2e-tests-r6" x="158.6" y="459.2"
textLength="24.4"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">-v</text><text
class="br [...]
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
index e855c1591bd..58ae5ef1832 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
@@ -1 +1 @@
-bf6ebaaf5870518bae6d39a46326a769
+518a65f4380756c2c7b0f9c79aa3e3bc
diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
index eb06d8fd967..cc48ee0fb99 100644
--- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
@@ -1421,6 +1421,7 @@ option_e2e_test_mode = click.option(
"xcom_object_storage",
"event_driven",
"java_sdk",
+ "go_sdk",
],
case_sensitive=False,
),
diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
index 481454f6387..022096624d5 100644
--- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
+++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
@@ -127,6 +127,7 @@ class FileGroupForCi(Enum):
REMOTE_LOGGING_E2E_OPENSEARCH_FILES = auto()
EVENT_DRIVEN_E2E_FILES = auto()
JAVA_SDK_E2E_FILES = auto()
+ GO_SDK_E2E_FILES = auto()
ALL_PYPROJECT_TOML_FILES = auto()
ALL_PYTHON_FILES = auto()
ALL_SOURCE_FILES = auto()
@@ -224,8 +225,16 @@ CI_FILE_GROUP_MATCHES: HashableDict[FileGroupForCi] =
HashableDict(
r"^airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/.*",
r"^airflow-e2e-tests/docker/java\.yml$",
r"^airflow-e2e-tests/docker/Dockerfile\.java$",
+ r"^task-sdk/src/airflow/sdk/coordinators/_subprocess\.py$",
r"^task-sdk/src/airflow/sdk/coordinators/java/.*",
],
+ FileGroupForCi.GO_SDK_E2E_FILES: [
+ r"^go-sdk/.*",
+ r"^airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/.*",
+ r"^airflow-e2e-tests/docker/go\.yml$",
+ r"^task-sdk/src/airflow/sdk/coordinators/_subprocess\.py$",
+ r"^task-sdk/src/airflow/sdk/coordinators/executable/.*",
+ ],
FileGroupForCi.PYTHON_PRODUCTION_FILES: [
# Production Python source the runtime ships — excludes tests,
docs,
# dev tooling, and generated files within those trees. Used by
@@ -988,6 +997,10 @@ class SelectiveChecks:
def run_java_sdk_e2e_tests(self) -> bool:
return self._should_be_run(FileGroupForCi.JAVA_SDK_E2E_FILES)
+ @cached_property
+ def run_go_sdk_e2e_tests(self) -> bool:
+ return self._should_be_run(FileGroupForCi.GO_SDK_E2E_FILES)
+
@cached_property
def run_amazon_tests(self) -> bool:
if self.providers_test_types_list_as_strings_in_json == "[]":
@@ -1105,6 +1118,7 @@ class SelectiveChecks:
or self.run_remote_logging_opensearch_e2e_tests
or self.run_event_driven_e2e_tests
or self.run_java_sdk_e2e_tests
+ or self.run_go_sdk_e2e_tests
or self.run_ui_e2e_tests
)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 1e52b80dea4..7615a3db3b6 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -1443,6 +1443,32 @@ def assert_outputs_are_printed(expected_outputs:
dict[str, str], stderr: str):
},
id="Run java e2e tests when JavaCoordinator changes",
),
+ pytest.param(
+ ("go-sdk/sdk/variable.go",),
+ {
+ "run-go-sdk-tests": "true",
+ "run-go-sdk-e2e-tests": "true",
+ "prod-image-build": "true",
+ },
+ id="Run go unit and e2e tests for go-sdk source change",
+ ),
+ pytest.param(
+ ("airflow-e2e-tests/docker/go.yml",),
+ {
+ "run-go-sdk-tests": "false",
+ "run-go-sdk-e2e-tests": "true",
+ "prod-image-build": "true",
+ },
+ id="Run go e2e tests when go compose override changes",
+ ),
+ pytest.param(
+
("task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py",),
+ {
+ "run-go-sdk-e2e-tests": "true",
+ "prod-image-build": "true",
+ },
+ id="Run go e2e tests when ExecutableCoordinator changes",
+ ),
(
pytest.param(
("devel-common/pyproject.toml",),
diff --git a/go-sdk/dags/go_examples.py b/go-sdk/dags/go_examples.py
new file mode 100644
index 00000000000..523b6b8bf23
--- /dev/null
+++ b/go-sdk/dags/go_examples.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Python stub Dag mirroring the Go SDK example bundle
(``go-sdk/example/bundle``).
+
+The graph sandwiches the Go tasks between two native Python tasks so the run
+exercises XCom across the language boundary, the same way
+``java-sdk/dags/java_examples.py`` does for the Java SDK::
+
+ python_task_1 >> extract >> transform >> [load, python_task_2]
+
+* ``python_task_1`` (Python) pushes an XCom.
+* ``extract`` / ``transform`` / ``load`` are ``@task.stub(queue="golang")``
tasks
+ whose implementations live in the compiled Go bundle. The ``golang`` queue is
+ routed to the ``ExecutableCoordinator``, which locates the bundle by dag_id
and
+ runs the binary in coordinator mode. ``extract`` returns a map (pushed as its
+ ``return_value`` XCom); ``transform`` reads the ``my_variable`` variable.
+* ``load`` returns an error on purpose. It is a leaf (not upstream of
+ ``python_task_2``) so its failure is observable while leaving the Go ->
Python
+ XCom hop intact.
+* ``python_task_2`` (Python) pulls the Go ``extract`` task's XCom and re-emits
+ it, demonstrating the Go -> Python direction end-to-end.
+
+The dag_id and the Go task ids MUST match the identities the Go bundle exposes
+via ``--airflow-metadata`` so the coordinator's bundle scanner can locate the
+binary by dag_id and look up each task by id. The Python task ids run on the
+default Python executor and are independent of the bundle.
+"""
+
+from __future__ import annotations
+
+from airflow.sdk import dag, task
+
+
+@task()
+def python_task_1():
+ print("python_task_1")
+ print("Push Python Task 'python_task_1' XCom:")
+ return "value_from_python_task_1"
+
+
[email protected](queue="golang")
+def extract(): ...
+
+
[email protected](queue="golang")
+def transform(): ...
+
+
[email protected](queue="golang")
+def load(): ...
+
+
+@task()
+def python_task_2(extracted):
+ print("python_task_2")
+ print("Pull Go Task 'extract' XCom:")
+ print(extracted)
+ return extracted
+
+
+@dag(dag_id="simple_dag")
+def simple_dag():
+ extracted = extract()
+ transformed = transform()
+ python_task_1() >> extracted >> transformed
+ # ``load`` fails on purpose; keep it a leaf (not upstream of python_task_2)
+ # so the failure is observable without skipping the Python task that pulls
+ # the Go XCom.
+ transformed >> [load(), python_task_2(extracted)]
+
+
+simple_dag()
diff --git a/go-sdk/example/bundle/main.go b/go-sdk/example/bundle/main.go
index 350c101b8b0..f57ec0b67f3 100644
--- a/go-sdk/example/bundle/main.go
+++ b/go-sdk/example/bundle/main.go
@@ -90,6 +90,7 @@ func extract(ctx context.Context, client sdk.Client, log
*slog.Logger) (any, err
ret := map[string]any{
"go_version": runtime.Version(),
+ "timestamp": time.Now().UnixNano(),
}
return ret, nil