This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1aa5fa3fab8 Add Go-SDK e2e test (#67956)
1aa5fa3fab8 is described below

commit 1aa5fa3fab8646f240bf567d9ebc7882fb80248e
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Mon Jun 8 13:46:38 2026 +0800

    Add Go-SDK e2e test (#67956)
    
    * Add Go-SDK e2e test
    
    * Fix e2e test registration
---
 .github/workflows/additional-prod-image-tests.yml  |  15 ++
 .github/workflows/airflow-e2e-tests.yml            |   4 +-
 .github/workflows/ci-amd.yml                       |   2 +
 .github/workflows/ci-arm.yml                       |   2 +
 airflow-e2e-tests/docker/go.yml                    |  37 ++++
 .../tests/airflow_e2e_tests/conftest.py            | 109 ++++++++++++
 .../tests/airflow_e2e_tests/constants.py           |  17 ++
 .../airflow_e2e_tests/go_sdk_tests/__init__.py     |  16 ++
 .../go_sdk_tests/test_go_sdk_dag.py                | 188 +++++++++++++++++++++
 .../images/output_testing_airflow-e2e-tests.svg    |   2 +-
 .../images/output_testing_airflow-e2e-tests.txt    |   2 +-
 .../airflow_breeze/commands/testing_commands.py    |   1 +
 .../src/airflow_breeze/utils/selective_checks.py   |  14 ++
 dev/breeze/tests/test_selective_checks.py          |  26 +++
 go-sdk/dags/go_examples.py                         |  87 ++++++++++
 go-sdk/example/bundle/main.go                      |   1 +
 16 files changed, 519 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/additional-prod-image-tests.yml 
b/.github/workflows/additional-prod-image-tests.yml
index 62fc9436070..5d31ab33edd 100644
--- a/.github/workflows/additional-prod-image-tests.yml
+++ b/.github/workflows/additional-prod-image-tests.yml
@@ -56,6 +56,10 @@ on:  # yamllint disable-line rule:truthy
         description: "Whether to run Java SDK e2e tests (true/false)"
         required: true
         type: string
+      run-go-sdk-e2e-tests:
+        description: "Whether to run Go SDK e2e tests (true/false)"
+        required: true
+        type: string
       constraints-branch:
         description: "Branch used to construct constraints URL from."
         required: true
@@ -305,6 +309,17 @@ jobs:
       use-uv: ${{ inputs.use-uv }}
       e2e_test_mode: "java_sdk"
     if: inputs.canary-run == 'true' || inputs.run-java-sdk-e2e-tests == 'true'
+  test-e2e-integration-tests-go-sdk:
+    name: "Go SDK e2e tests with PROD image"
+    uses: ./.github/workflows/airflow-e2e-tests.yml
+    with:
+      workflow-name: "Go SDK e2e test"
+      runners: ${{ inputs.runners }}
+      platform: ${{ inputs.platform }}
+      default-python-version: "${{ inputs.default-python-version }}"
+      use-uv: ${{ inputs.use-uv }}
+      e2e_test_mode: "go_sdk"
+    if: inputs.canary-run == 'true' || inputs.run-go-sdk-e2e-tests == 'true'
 
   test-ui-e2e-chromium:
     name: "Chromium UI e2e tests with PROD image"
diff --git a/.github/workflows/airflow-e2e-tests.yml 
b/.github/workflows/airflow-e2e-tests.yml
index 425defc4f97..1b19133c3e4 100644
--- a/.github/workflows/airflow-e2e-tests.yml
+++ b/.github/workflows/airflow-e2e-tests.yml
@@ -49,7 +49,7 @@ on:  # yamllint disable-line rule:truthy
         type: string
         required: true
       e2e_test_mode:
-        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
remote_log_opensearch, xcom_object_storage, event_driven, or java_sdk"  # 
yamllint disable-line rule:line-length
+        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
remote_log_opensearch, xcom_object_storage, event_driven, java_sdk, or go_sdk"  
# yamllint disable-line rule:line-length
         type: string
         default: "basic"
 
@@ -80,7 +80,7 @@ on:  # yamllint disable-line rule:truthy
         type: string
         default: ""
       e2e_test_mode:
-        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
remote_log_opensearch, xcom_object_storage, event_driven, or java_sdk"  # 
yamllint disable-line rule:line-length
+        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
remote_log_opensearch, xcom_object_storage, event_driven, java_sdk, or go_sdk"  
# yamllint disable-line rule:line-length
         type: string
         default: "basic"
 
diff --git a/.github/workflows/ci-amd.yml b/.github/workflows/ci-amd.yml
index a4f82608655..f411d43d2b3 100644
--- a/.github/workflows/ci-amd.yml
+++ b/.github/workflows/ci-amd.yml
@@ -141,6 +141,7 @@ jobs:
       run-remote-logging-s3-e2e-tests: ${{ 
steps.selective-checks.outputs.run-remote-logging-s3-e2e-tests }}
       run-event-driven-e2e-tests: ${{ 
steps.selective-checks.outputs.run-event-driven-e2e-tests }}
       run-java-sdk-e2e-tests: ${{ 
steps.selective-checks.outputs.run-java-sdk-e2e-tests }}
+      run-go-sdk-e2e-tests: ${{ 
steps.selective-checks.outputs.run-go-sdk-e2e-tests }}
       run-system-tests: ${{ steps.selective-checks.outputs.run-system-tests }}
       run-task-sdk-tests: ${{ 
steps.selective-checks.outputs.run-task-sdk-tests }}
       run-task-sdk-integration-tests: ${{ 
steps.selective-checks.outputs.run-task-sdk-integration-tests }}
@@ -880,6 +881,7 @@ jobs:
       run-remote-logging-s3-e2e-tests: ${{ 
needs.build-info.outputs.run-remote-logging-s3-e2e-tests }}
       run-event-driven-e2e-tests: ${{ 
needs.build-info.outputs.run-event-driven-e2e-tests }}
       run-java-sdk-e2e-tests: ${{ 
needs.build-info.outputs.run-java-sdk-e2e-tests }}
+      run-go-sdk-e2e-tests: ${{ needs.build-info.outputs.run-go-sdk-e2e-tests 
}}
       use-uv: ${{ needs.build-info.outputs.use-uv }}
       run-ui-e2e-tests: ${{ needs.build-info.outputs.run-ui-e2e-tests }}
       run-airflow-ctl-integration-tests: ${{ 
needs.build-info.outputs.run-airflow-ctl-integration-tests }}
diff --git a/.github/workflows/ci-arm.yml b/.github/workflows/ci-arm.yml
index 7816892ce7e..732d1e261ca 100644
--- a/.github/workflows/ci-arm.yml
+++ b/.github/workflows/ci-arm.yml
@@ -131,6 +131,7 @@ jobs:
       run-remote-logging-s3-e2e-tests: ${{ 
steps.selective-checks.outputs.run-remote-logging-s3-e2e-tests }}
       run-event-driven-e2e-tests: ${{ 
steps.selective-checks.outputs.run-event-driven-e2e-tests }}
       run-java-sdk-e2e-tests: ${{ 
steps.selective-checks.outputs.run-java-sdk-e2e-tests }}
+      run-go-sdk-e2e-tests: ${{ 
steps.selective-checks.outputs.run-go-sdk-e2e-tests }}
       run-system-tests: ${{ steps.selective-checks.outputs.run-system-tests }}
       run-task-sdk-tests: ${{ 
steps.selective-checks.outputs.run-task-sdk-tests }}
       run-task-sdk-integration-tests: ${{ 
steps.selective-checks.outputs.run-task-sdk-integration-tests }}
@@ -870,6 +871,7 @@ jobs:
       run-remote-logging-s3-e2e-tests: ${{ 
needs.build-info.outputs.run-remote-logging-s3-e2e-tests }}
       run-event-driven-e2e-tests: ${{ 
needs.build-info.outputs.run-event-driven-e2e-tests }}
       run-java-sdk-e2e-tests: ${{ 
needs.build-info.outputs.run-java-sdk-e2e-tests }}
+      run-go-sdk-e2e-tests: ${{ needs.build-info.outputs.run-go-sdk-e2e-tests 
}}
       use-uv: ${{ needs.build-info.outputs.use-uv }}
       run-ui-e2e-tests: ${{ needs.build-info.outputs.run-ui-e2e-tests }}
       run-airflow-ctl-integration-tests: ${{ 
needs.build-info.outputs.run-airflow-ctl-integration-tests }}
diff --git a/airflow-e2e-tests/docker/go.yml b/airflow-e2e-tests/docker/go.yml
new file mode 100644
index 00000000000..c1426963765
--- /dev/null
+++ b/airflow-e2e-tests/docker/go.yml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Docker Compose override for go_sdk E2E test mode.
+#
+# The Go bundle compiled by airflow-go-pack 
(conftest._setup_go_sdk_integration)
+# is a self-contained, statically linked executable, so the stock Airflow 
worker
+# image can exec it directly -- no extra runtime needs to be installed. We only
+# bind-mount the bundle into the directory the ExecutableCoordinator scans and
+# point the worker at the "golang" queue where @task.stub tasks are routed.
+---
+services:
+  airflow-worker:
+    # The bundle is built with CGO_ENABLED=0, so the SDK's user.Current() call 
at
+    # init falls back to Go's pure-Go resolver, which reads $USER / $HOME and
+    # panics if either is empty. The supervisor launches the bundle with the
+    # worker's environment, so set both here.
+    environment:
+      USER: airflow
+      HOME: /home/airflow
+    volumes:
+      - ./go-bundles:/opt/airflow/go-bundles:ro
+    command: celery worker -q golang,default
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
index aa4e51d7f54..afef387b5a2 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
@@ -37,6 +37,12 @@ from airflow_e2e_tests.constants import (
     E2E_DAGS_FOLDER,
     E2E_TEST_MODE,
     ELASTICSEARCH_PATH,
+    GO_BUILDER_IMAGE,
+    GO_COMPOSE_PATH,
+    GO_SDK_BIN_PATH,
+    GO_SDK_BUNDLE_NAME,
+    GO_SDK_DAGS_PATH,
+    GO_SDK_EXAMPLE_BUNDLE_PKG,
     JAVA_COMPOSE_PATH,
     JAVA_DOCKERFILE_PATH,
     JAVA_SDK_DAGS_PATH,
@@ -333,6 +339,106 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir):
     os.environ["ENV_FILE_PATH"] = str(dot_env_file)
 
 
+def _setup_go_sdk_integration(dot_env_file, tmp_dir):
+    """Set up the go_sdk E2E test mode.
+
+    Compiles the Go SDK example bundle into a self-contained executable bundle
+    via the ``airflow-go-pack`` tooling, drops it into the directory the
+    ``ExecutableCoordinator`` scans, copies the Python stub Dag, and writes the
+    coordinator configuration.
+
+    The packed bundle is a statically linked native executable (built with
+    ``CGO_ENABLED=0``), so the stock Airflow worker image can exec it directly
+    without a Go toolchain or any extra runtime installed -- see ``go.yml``.
+    """
+    # Build + pack the example bundle inside an ephemeral Go container so the
+    # host does not need Go installed.
+    #
+    # --user keeps build outputs owned by the current user (not root).
+    # HOME points at a writable, gitignored dir under go-sdk/bin so the Go 
build
+    # and module caches persist between runs (first run downloads modules once;
+    # subsequent runs skip straight to compilation).
+    # CGO_ENABLED=0 yields a fully static binary that runs on the stock worker.
+    # USER/HOME must be set because the SDK calls user.Current() at init; with
+    # cgo disabled Go's pure-Go resolver reads those env vars instead of libc,
+    # and panics if either is empty (the same vars are set on the worker in
+    # go.yml so the packed binary runs the same way at execution time).
+    # `go tool airflow-go-pack` builds the bundle package, reads its
+    # --airflow-metadata, and appends the source + airflow-metadata.yaml + the
+    # AFBNDL01 trailer, writing a single self-contained executable bundle.
+    go_cache_home = "/repo/go-sdk/bin/.home"
+    bundle_out = f"/repo/go-sdk/bin/{GO_SDK_BUNDLE_NAME}"
+    console.print(f"[yellow]Building Go SDK example bundle 
({GO_BUILDER_IMAGE})...")
+    subprocess.run(
+        [
+            "docker",
+            "run",
+            "--rm",
+            "--user",
+            f"{os.getuid()}:{os.getgid()}",
+            "-e",
+            f"HOME={go_cache_home}",
+            "-e",
+            "USER=airflow",
+            "-e",
+            "CGO_ENABLED=0",
+            # Mount the repo so the whole go-sdk module (go.mod, tool 
directive,
+            # example sources) is visible to `go tool`.
+            "-v",
+            f"{AIRFLOW_ROOT_PATH}:/repo",
+            "-w",
+            "/repo/go-sdk",
+            GO_BUILDER_IMAGE,
+            "go",
+            "tool",
+            "airflow-go-pack",
+            "--output",
+            bundle_out,
+            GO_SDK_EXAMPLE_BUNDLE_PKG,
+        ],
+        check=True,
+    )
+
+    # Copy the compose override into the temp directory.
+    copyfile(GO_COMPOSE_PATH, tmp_dir / "go.yml")
+
+    # Place the packed bundle where the compose bind-mount (./go-bundles) 
exposes
+    # it to the worker at /opt/airflow/go-bundles. The bundle scanner requires
+    # the file to be executable, so preserve the exec bit.
+    go_bundles_dir = tmp_dir / "go-bundles"
+    go_bundles_dir.mkdir()
+    packed_bundle = go_bundles_dir / GO_SDK_BUNDLE_NAME
+    copyfile(GO_SDK_BIN_PATH / GO_SDK_BUNDLE_NAME, packed_bundle)
+    os.chmod(packed_bundle, 0o755)
+
+    # Copy the Go SDK example stub Dag so Airflow can discover and serialize 
it.
+    copyfile(GO_SDK_DAGS_PATH / "go_examples.py", tmp_dir / "dags" / 
"go_examples.py")
+
+    # Coordinator registry: maps the logical name "go-sdk" to 
ExecutableCoordinator,
+    # which scans executables_root for the packed bundle by dag_id.
+    # Queue mapping: routes tasks on the "golang" queue to "go-sdk".
+    coordinator_config = json.dumps(
+        {
+            "go-sdk": {
+                "classpath": 
"airflow.sdk.coordinators.executable.ExecutableCoordinator",
+                "kwargs": {"executables_root": ["/opt/airflow/go-bundles"]},
+            }
+        }
+    )
+    queue_to_coordinator = json.dumps({"golang": "go-sdk"})
+
+    dot_env_file.write_text(
+        f"AIRFLOW_UID={os.getuid()}\n"
+        # Single-quote the JSON values so Docker Compose reads them literally.
+        f"AIRFLOW__SDK__COORDINATORS='{coordinator_config}'\n"
+        f"AIRFLOW__SDK__QUEUE_TO_COORDINATOR='{queue_to_coordinator}'\n"
+        # Connection and variable read by the Go example bundle tasks.
+        "AIRFLOW_CONN_TEST_HTTP=http://test:[email protected]/\n";
+        "AIRFLOW_VAR_MY_VARIABLE=test_value\n"
+    )
+    os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+
+
 def spin_up_airflow_environment(tmp_path_factory: pytest.TempPathFactory):
     tmp_dir = tmp_path_factory.mktemp("breeze-airflow-e2e-tests")
 
@@ -377,6 +483,9 @@ def spin_up_airflow_environment(tmp_path_factory: 
pytest.TempPathFactory):
     elif E2E_TEST_MODE == "java_sdk":
         compose_file_names.append("java.yml")
         _setup_java_sdk_integration(dot_env_file, tmp_dir)
+    elif E2E_TEST_MODE == "go_sdk":
+        compose_file_names.append("go.yml")
+        _setup_go_sdk_integration(dot_env_file, tmp_dir)
 
     #
     # Please Do not use this Fernet key in any deployments! Please generate 
your own key.
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
index a81eeb0d6a0..259f8256fd8 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
@@ -57,6 +57,23 @@ JAVA_SDK_EXAMPLE_LIBS_PATH = JAVA_SDK_ROOT_PATH / "example" 
/ "build" / "install
 JAVA_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / 
"java.yml"
 JAVA_DOCKERFILE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / 
"Dockerfile.java"
 
+# Go SDK E2E test paths
+GO_SDK_ROOT_PATH = AIRFLOW_ROOT_PATH / "go-sdk"
+GO_SDK_DAGS_PATH = GO_SDK_ROOT_PATH / "dags"
+# Package directory holding func main() for the example bundle; airflow-go-pack
+# builds and packs this into a self-contained executable bundle.
+GO_SDK_EXAMPLE_BUNDLE_PKG = "./example/bundle"
+# Name of the packed bundle binary (matches the example bundle's package dir 
name).
+GO_SDK_BUNDLE_NAME = "example_dags"
+# Where airflow-go-pack writes the packed bundle inside the repo (go-sdk/bin 
is gitignored).
+GO_SDK_BIN_PATH = GO_SDK_ROOT_PATH / "bin"
+GO_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "go.yml"
+# Go toolchain image used to build the bundle; must satisfy go-sdk/go.mod's 
toolchain.
+# The Alpine variant is ~7x smaller than the Debian one and is safe here 
because the
+# bundle is built with CGO_ENABLED=0 (a fully static binary, independent of 
musl/glibc)
+# and module fetches go through the HTTPS proxy (no git/gcc needed).
+GO_BUILDER_IMAGE = os.environ.get("GO_BUILDER_IMAGE", "golang:1.24-alpine")
+
 # Local provider sources are mounted into the airflow containers under this 
directory so
 # ``_PIP_ADDITIONAL_REQUIREMENTS`` can install the in-tree (latest, possibly 
unreleased)
 # provider rather than the published one from PyPI.
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/__init__.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/test_go_sdk_dag.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/test_go_sdk_dag.py
new file mode 100644
index 00000000000..6cd7ff0779d
--- /dev/null
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/test_go_sdk_dag.py
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""E2E tests for the Go SDK via the compiled example bundle.
+
+Run with::
+
+    E2E_TEST_MODE=go_sdk uv run --project airflow-e2e-tests pytest \\
+        tests/airflow_e2e_tests/go_sdk_tests/ -xvs
+
+What is verified
+----------------
+``conftest._setup_go_sdk_integration`` compiles ``go-sdk/example/bundle`` into 
a
+self-contained executable bundle with the ``airflow-go-pack`` tooling and drops
+it into the directory the ``ExecutableCoordinator`` scans. The ``simple_dag``
+Dag (``go-sdk/dags/go_examples.py``) sandwiches the Go tasks between two native
+Python tasks::
+
+    python_task_1 >> extract >> transform >> [load, python_task_2]
+
+* ``extract`` / ``transform`` / ``load`` are ``@task.stub(queue="golang")`` 
tasks
+  whose implementations live in the Go bundle (``main.go``). The ``golang`` 
queue
+  is routed to ``ExecutableCoordinator``, which locates the bundle by dag_id,
+  launches the binary with ``--comm`` / ``--logs``, and drives it through the
+  msgpack-over-IPC coordinator protocol.
+* ``python_task_1`` (Python) pushes an XCom; ``extract`` (Go) fetches the
+  ``test_http`` connection and returns ``{go_version, timestamp}``; 
``transform``
+  (Go) reads ``my_variable``; ``load`` (Go) returns an error on purpose;
+  ``python_task_2`` (Python) pulls and re-emits the Go ``extract`` task's XCom.
+
+The Dag is triggered exactly once by the module-scoped ``completed_run`` 
fixture;
+each test asserts a different facet of that single run. Together they confirm,
+end-to-end:
+
+1. ``ExecutableCoordinator`` discovers the AFBNDL01 bundle by dag_id and runs 
the
+   binary in coordinator mode for every Go task, reporting ``SucceedTask`` for
+   extract/transform and a failed ``TaskState`` for load.
+2. Connection / Variable reads and XCom writes work through the Task Execution
+   API, XCom values keep their types (the ``timestamp`` stays an ``int``), and
+   XCom crosses the Python <-> Go boundary in both directions.
+3. Structured task logs emitted by the Go binary over the coordinator logs
+   channel reach Airflow's task-log store.
+"""
+
+from __future__ import annotations
+
+import time
+from dataclasses import dataclass
+from datetime import datetime, timezone
+
+import pytest
+
+from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient
+
+# The Go extract task sleeps ~20 s + coordinator startup; allow plenty of room.
+_GO_TASK_TIMEOUT = 600
+# Task logs are written when the task finishes; allow a little slack for them 
to
+# become retrievable through the API after the run reaches a terminal state.
+_LOG_FETCH_TIMEOUT = 120
+
+_DAG_ID = "simple_dag"
+
+
+@dataclass
+class _CompletedRun:
+    """The single ``simple_dag`` run shared across the assertions in this 
module."""
+
+    client: AirflowClient
+    run_id: str
+    state: str
+    ti_states: dict[str, str]
+
+    def xcom(self, task_id: str, key: str = "return_value"):
+        return self.client.get_xcom_value(dag_id=_DAG_ID, task_id=task_id, 
run_id=self.run_id, key=key).get(
+            "value"
+        )
+
+    def logs(self, task_id: str, try_number: int = 1) -> str:
+        """Return the concatenated task-log records for *task_id*, retrying 
until present."""
+        deadline = time.monotonic() + _LOG_FETCH_TIMEOUT
+        while True:
+            resp = self.client.get_task_logs(
+                dag_id=_DAG_ID, run_id=self.run_id, task_id=task_id, 
try_number=try_number
+            )
+            text = "\n".join(str(entry) for entry in resp.get("content", []))
+            if text.strip() or time.monotonic() > deadline:
+                return text
+            time.sleep(3)
+
+
[email protected](scope="module")
+def completed_run() -> _CompletedRun:
+    """Trigger ``simple_dag`` once and wait for it to finish.
+
+    Module-scoped so the (multi-minute) Go run happens a single time; every 
test
+    in this module inspects the resulting states, XComs, and logs.
+    """
+    client = AirflowClient()
+    resp = client.trigger_dag(_DAG_ID, json={"logical_date": 
datetime.now(timezone.utc).isoformat()})
+    run_id = resp["dag_run_id"]
+    state = client.wait_for_dag_run(dag_id=_DAG_ID, run_id=run_id, 
timeout=_GO_TASK_TIMEOUT)
+    ti_resp = client.get_task_instances(dag_id=_DAG_ID, run_id=run_id)
+    ti_states = {ti["task_id"]: ti.get("state") for ti in 
ti_resp.get("task_instances", [])}
+    return _CompletedRun(client=client, run_id=run_id, state=state, 
ti_states=ti_states)
+
+
+def test_task_states(completed_run: _CompletedRun):
+    """Every task ends in its expected state (the Go ``load`` task fails on 
purpose)."""
+    expected = {
+        "python_task_1": "success",
+        "extract": "success",
+        "transform": "success",
+        "load": "failed",
+        "python_task_2": "success",
+    }
+    for task_id, want in expected.items():
+        assert completed_run.ti_states.get(task_id) == want, (
+            f"{task_id!r} expected {want!r}. all task states: 
{completed_run.ti_states}"
+        )
+
+
+def test_dag_run_failed(completed_run: _CompletedRun):
+    """The failing ``load`` leaf makes the overall run fail."""
+    assert completed_run.state == "failed", (
+        f"expected the run to fail because 'load' fails; got 
{completed_run.state!r}. "
+        f"task states: {completed_run.ti_states}"
+    )
+
+
+def test_python_task_1_pushes_xcom(completed_run: _CompletedRun):
+    """The upstream Python task's XCom is available (Python -> XCom)."""
+    assert completed_run.xcom("python_task_1") == "value_from_python_task_1"
+
+
+def test_extract_xcom_has_go_version_and_int_timestamp(completed_run: 
_CompletedRun):
+    """The map returned by the Go 'extract' task round-trips through XCom (Go 
-> XCom)."""
+    value = completed_run.xcom("extract")
+    assert isinstance(value, dict), (
+        f"Expected 'extract' XCom to be a mapping, got {value!r} 
({type(value).__name__})"
+    )
+    assert str(value.get("go_version", "")).startswith("go"), (
+        f"Expected go_version to look like a Go runtime version, got 
{value.get('go_version')!r}"
+    )
+    timestamp = value.get("timestamp")
+    assert isinstance(timestamp, int), (
+        f"Expected 'timestamp' to be an int, got {timestamp!r} 
({type(timestamp).__name__})"
+    )
+    assert timestamp > 0, f"Expected a positive nanosecond timestamp, got 
{timestamp!r}"
+
+
+def test_xcom_crosses_go_to_python(completed_run: _CompletedRun):
+    """python_task_2 pulled the Go 'extract' XCom and re-emitted it unchanged 
(Go -> Python)."""
+    assert completed_run.xcom("python_task_2") == completed_run.xcom("extract")
+
+
+def test_extract_logs_show_beep_loop(completed_run: _CompletedRun):
+    """The Go 'extract' task's structured logs reach Airflow's task log."""
+    logs = completed_run.logs("extract")
+    beeps = logs.count("After the beep the time will be")
+    assert beeps == 10, f"expected 10 'After the beep the time will be' log 
lines from extract, got {beeps}"
+    assert "Goodbye from task" in logs, "extract task should log 'Goodbye from 
task'"
+
+
+def test_transform_logs_show_variable_read(completed_run: _CompletedRun):
+    """The Go 'transform' task logs the variable it read."""
+    assert "Obtained variable" in completed_run.logs("transform"), (
+        "transform task should log 'Obtained variable'"
+    )
+
+
+def test_load_logs_show_failure(completed_run: _CompletedRun):
+    """The Go 'load' task's error surfaces in its task log."""
+    assert "Please fail" in completed_run.logs("load"), (
+        "load task log should contain its failure message 'Please fail'"
+    )
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg 
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
index 290e96c8a32..0ba82f2e913 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
@@ -135,7 +135,7 @@
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="312.8" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="312.8" 
textLength="73.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">(TEXT)</text><text 
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="312.8" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text 
class="breez [...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="337.2" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="337.2" textLength="366" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">--e2e-test-mode&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;</text><text
 class="breeze-testing-airflow-e2e-tests-r1" x="463.6" y="337.2" textLen [...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="361.6" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="361.6" 
textLength="976" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">(basic|remote_log|remote_log_elasticsearch|remote_log_opensearch|xcom_object_sto</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="361.6" 
textLength="12.2" clip-path="u [...]
-</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="386" 
textLength="329.4" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">rage|event_driven|java_sdk)</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="386" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><t [...]
+</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="386" 
textLength="414.8" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">rage|event_driven|java_sdk|go_sdk)</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="386" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">│</ [...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="410.4" 
textLength="1464" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯</text><text
 class="breeze-testing-airflow-e2e-tests-r1" x="1464" y="410.4" 
textLength="12.2" clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="434.8" 
textLength="24.4" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">╭─</text><text 
class="breeze-testing-airflow-e2e-tests-r5" x="24.4" y="434.8" 
textLength="195.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">&#160;Common&#160;options&#160;</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="219.6" y="434.8" 
textLength="1220" clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">─ 
[...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="459.2" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="459.2" 
textLength="109.8" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">--verbose</text><text
 class="breeze-testing-airflow-e2e-tests-r6" x="158.6" y="459.2" 
textLength="24.4" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-18)">-v</text><text 
class="br [...]
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt 
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
index e855c1591bd..58ae5ef1832 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
@@ -1 +1 @@
-bf6ebaaf5870518bae6d39a46326a769
+518a65f4380756c2c7b0f9c79aa3e3bc
diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py 
b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
index eb06d8fd967..cc48ee0fb99 100644
--- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
@@ -1421,6 +1421,7 @@ option_e2e_test_mode = click.option(
             "xcom_object_storage",
             "event_driven",
             "java_sdk",
+            "go_sdk",
         ],
         case_sensitive=False,
     ),
diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py 
b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
index 481454f6387..022096624d5 100644
--- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
+++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
@@ -127,6 +127,7 @@ class FileGroupForCi(Enum):
     REMOTE_LOGGING_E2E_OPENSEARCH_FILES = auto()
     EVENT_DRIVEN_E2E_FILES = auto()
     JAVA_SDK_E2E_FILES = auto()
+    GO_SDK_E2E_FILES = auto()
     ALL_PYPROJECT_TOML_FILES = auto()
     ALL_PYTHON_FILES = auto()
     ALL_SOURCE_FILES = auto()
@@ -224,8 +225,16 @@ CI_FILE_GROUP_MATCHES: HashableDict[FileGroupForCi] = 
HashableDict(
             r"^airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/.*",
             r"^airflow-e2e-tests/docker/java\.yml$",
             r"^airflow-e2e-tests/docker/Dockerfile\.java$",
+            r"^task-sdk/src/airflow/sdk/coordinators/_subprocess\.py$",
             r"^task-sdk/src/airflow/sdk/coordinators/java/.*",
         ],
+        FileGroupForCi.GO_SDK_E2E_FILES: [
+            r"^go-sdk/.*",
+            r"^airflow-e2e-tests/tests/airflow_e2e_tests/go_sdk_tests/.*",
+            r"^airflow-e2e-tests/docker/go\.yml$",
+            r"^task-sdk/src/airflow/sdk/coordinators/_subprocess\.py$",
+            r"^task-sdk/src/airflow/sdk/coordinators/executable/.*",
+        ],
         FileGroupForCi.PYTHON_PRODUCTION_FILES: [
             # Production Python source the runtime ships — excludes tests, 
docs,
             # dev tooling, and generated files within those trees. Used by
@@ -988,6 +997,10 @@ class SelectiveChecks:
     def run_java_sdk_e2e_tests(self) -> bool:
         return self._should_be_run(FileGroupForCi.JAVA_SDK_E2E_FILES)
 
+    @cached_property
+    def run_go_sdk_e2e_tests(self) -> bool:
+        return self._should_be_run(FileGroupForCi.GO_SDK_E2E_FILES)
+
     @cached_property
     def run_amazon_tests(self) -> bool:
         if self.providers_test_types_list_as_strings_in_json == "[]":
@@ -1105,6 +1118,7 @@ class SelectiveChecks:
             or self.run_remote_logging_opensearch_e2e_tests
             or self.run_event_driven_e2e_tests
             or self.run_java_sdk_e2e_tests
+            or self.run_go_sdk_e2e_tests
             or self.run_ui_e2e_tests
         )
 
diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 1e52b80dea4..7615a3db3b6 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -1443,6 +1443,32 @@ def assert_outputs_are_printed(expected_outputs: 
dict[str, str], stderr: str):
             },
             id="Run java e2e tests when JavaCoordinator changes",
         ),
+        pytest.param(
+            ("go-sdk/sdk/variable.go",),
+            {
+                "run-go-sdk-tests": "true",
+                "run-go-sdk-e2e-tests": "true",
+                "prod-image-build": "true",
+            },
+            id="Run go unit and e2e tests for go-sdk source change",
+        ),
+        pytest.param(
+            ("airflow-e2e-tests/docker/go.yml",),
+            {
+                "run-go-sdk-tests": "false",
+                "run-go-sdk-e2e-tests": "true",
+                "prod-image-build": "true",
+            },
+            id="Run go e2e tests when go compose override changes",
+        ),
+        pytest.param(
+            
("task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py",),
+            {
+                "run-go-sdk-e2e-tests": "true",
+                "prod-image-build": "true",
+            },
+            id="Run go e2e tests when ExecutableCoordinator changes",
+        ),
         (
             pytest.param(
                 ("devel-common/pyproject.toml",),
diff --git a/go-sdk/dags/go_examples.py b/go-sdk/dags/go_examples.py
new file mode 100644
index 00000000000..523b6b8bf23
--- /dev/null
+++ b/go-sdk/dags/go_examples.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Python stub Dag mirroring the Go SDK example bundle 
(``go-sdk/example/bundle``).
+
+The graph sandwiches the Go tasks between two native Python tasks so the run
+exercises XCom across the language boundary, the same way
+``java-sdk/dags/java_examples.py`` does for the Java SDK::
+
+    python_task_1 >> extract >> transform >> [load, python_task_2]
+
+* ``python_task_1`` (Python) pushes an XCom.
+* ``extract`` / ``transform`` / ``load`` are ``@task.stub(queue="golang")`` 
tasks
+  whose implementations live in the compiled Go bundle. The ``golang`` queue is
+  routed to the ``ExecutableCoordinator``, which locates the bundle by dag_id 
and
+  runs the binary in coordinator mode. ``extract`` returns a map (pushed as its
+  ``return_value`` XCom); ``transform`` reads the ``my_variable`` variable.
+* ``load`` returns an error on purpose. It is a leaf (not upstream of
+  ``python_task_2``) so its failure is observable while leaving the Go -> 
Python
+  XCom hop intact.
+* ``python_task_2`` (Python) pulls the Go ``extract`` task's XCom and re-emits
+  it, demonstrating the Go -> Python direction end-to-end.
+
+The dag_id and the Go task ids MUST match the identities the Go bundle exposes
+via ``--airflow-metadata`` so the coordinator's bundle scanner can locate the
+binary by dag_id and look up each task by id. The Python task ids run on the
+default Python executor and are independent of the bundle.
+"""
+
+from __future__ import annotations
+
+from airflow.sdk import dag, task
+
+
+@task()
+def python_task_1():
+    print("python_task_1")
+    print("Push Python Task 'python_task_1' XCom:")
+    return "value_from_python_task_1"
+
+
[email protected](queue="golang")
+def extract(): ...
+
+
[email protected](queue="golang")
+def transform(): ...
+
+
[email protected](queue="golang")
+def load(): ...
+
+
+@task()
+def python_task_2(extracted):
+    print("python_task_2")
+    print("Pull Go Task 'extract' XCom:")
+    print(extracted)
+    return extracted
+
+
+@dag(dag_id="simple_dag")
+def simple_dag():
+    extracted = extract()
+    transformed = transform()
+    python_task_1() >> extracted >> transformed
+    # ``load`` fails on purpose; keep it a leaf (not upstream of python_task_2)
+    # so the failure is observable without skipping the Python task that pulls
+    # the Go XCom.
+    transformed >> [load(), python_task_2(extracted)]
+
+
+simple_dag()
diff --git a/go-sdk/example/bundle/main.go b/go-sdk/example/bundle/main.go
index 350c101b8b0..f57ec0b67f3 100644
--- a/go-sdk/example/bundle/main.go
+++ b/go-sdk/example/bundle/main.go
@@ -90,6 +90,7 @@ func extract(ctx context.Context, client sdk.Client, log 
*slog.Logger) (any, err
 
        ret := map[string]any{
                "go_version": runtime.Version(),
+               "timestamp":  time.Now().UnixNano(),
        }
 
        return ret, nil


Reply via email to