jason810496 commented on code in PR #67153: URL: https://github.com/apache/airflow/pull/67153#discussion_r3296402393
########## go-sdk/adr/0003-coordinator-protocol-msgpack-ipc.md: ########## @@ -0,0 +1,381 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> + +# 3. Dual-mode bundle binary: msgpack-over-IPC coordinator protocol alongside the existing go-plugin/Edge-Worker path + +Date: 2026-04-30 + +## Status + +Accepted. + +## Context + +A Go SDK bundle binary today (the artefact built from +[`go-sdk/example/bundle/main.go`](../example/bundle/main.go) via +`bundlev1server.Serve`) speaks exactly one protocol: HashiCorp +[`go-plugin`](https://github.com/hashicorp/go-plugin) gRPC over a +stdio-negotiated socket, gated by the magic-cookie handshake declared in +[`pkg/bundles/shared/handshake.go`](../pkg/bundles/shared/handshake.go). +The Airflow Go *Edge Worker* +([`cmd/airflow-go-edge-worker`](../cmd/airflow-go-edge-worker/main.go), +[`edge/`](../edge)) is the consumer of that protocol — it execs the +bundle binary as a child process, completes the go-plugin handshake, +opens the `DagBundle` gRPC client, and drives `GetMetadata`/`Execute` +([`bundle/bundlev1/bundlev1server/impl/plugin.go`](../bundle/bundlev1/bundlev1server/impl/plugin.go)). +The bundle binary never listens on a public socket; the protocol is +local-process only. + +Meanwhile, the Python side of Airflow has standardised on a different +wire protocol for non-Python language runtimes — the *coordinator +protocol* — pioneered by the Java SDK and described in +[java-sdk ADR 0002](../../java-sdk/adr/0002-dag-parsing.md) +and +[java-sdk ADR 0003](../../java-sdk/adr/0003-workload-execution.md). +Its shape is: + +- The runtime is launched with `--comm=<host:port>` and + `--logs=<host:port>` CLI arguments. +- It connects out (TCP, loopback) to both addresses. +- Frames on the comm channel are length-prefixed msgpack: a 4-byte + big-endian length followed by the msgpack payload. Requests are + `[id, body]`; responses are `[id, body, error]`. +- Two workloads share one channel, distinguished by the first inbound + frame: `DagFileParseRequest` (one-shot, returns + `DagFileParsingResult` and exits) or `StartupDetails` (multi-round + task execution: the runtime sends `GetConnection` / `GetVariable` / + `GetXCom` / `SetXCom` and terminates with `SucceedTask` or + `TaskState`). +- The logs channel carries structured JSON log records emitted by the + runtime. + +The Python-side launcher is +[`ExecutableCoordinator`](../../task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py), +which already builds command lines of the form +`<binary> --comm=<addr> --logs=<addr>` for both `dag_parsing_runtime_cmd` +and `task_execution_runtime_cmd`. The bundle-spec contract +([`task-sdk/docs/bundle-spec.rst`](../../task-sdk/docs/bundle-spec.rst)) +ratifies that any compiled SDK shipping a ZIP bundle "MUST honour the +SDK coordinator protocol (`--comm=<addr>` / `--logs=<addr>` +socket-based IPC)". The Java SDK satisfies this contract; the Go SDK +currently does not. + +The two protocols target different deployment shapes: + +- **go-plugin / Edge Worker.** The Go-native worker is itself a long-running + process that loads bundles in-process and dispatches tasks to them + over gRPC. It is the only consumer that speaks go-plugin to a Go + bundle today, and it owns the full task-runtime stack on the worker + host (no Python in the data path). This is the path + [`go-sdk/example/bundle/main.go`](../example/bundle/main.go) was + written for and the path that + [`pkg/worker`](../pkg/worker) drives. +- **Coordinator / `ExecutableCoordinator`.** The Python task + runner forks a child that runs `<binary> --comm=… --logs=…`, + bridges its socket to the Airflow supervisor's fd 0, and proxies + Airflow service calls (`GetConnection`, `GetVariable`, ...) through + to the Execution API. This is how Airflow runs non-Python tasks + *without* a per-language worker — the same way Java runs today, and + the same way Rust/C++/Zig will run in the future. It is also the + only path the executable provider's bundle spec recognises. + +Today these two paths require two different binaries, even though the +DAG/task definitions, the registry, the worker plumbing, and the +serialisation surfaces overlap almost entirely. That is the gap this +ADR closes. + +The user-written `main()` is one line — +`bundlev1server.Serve(&myBundle{})` — and we want to keep it one line. +Whichever protocol the binary should speak must be decided inside +`Serve` based on how it was invoked, not by branching in user code. + +## Decision + +Make the SDK bundle binary **dual-mode**. A single +`bundlev1server.Serve(bundle, opts...)` call dispatches to one of two +protocol servers based on its CLI arguments and process environment. +User code does not change. + +### Invocation matrix + +`Serve` parses flags first, then chooses a mode in this order: + +| Trigger | Mode | Behaviour | +|--------------------------------------------------------|-----------------|-----------| +| `--bundle-metadata` | metadata-dump | Existing flag (ADR 0001 / `server.go:37`). Prints `BundleInfo` JSON and exits. | +| `--dump-bundle-spec` | spec-dump | Existing flag added by [ADR 0002](0002-use-go-tool-directive-for-bundle-packer.md). Prints the full bundle spec JSON (`sdk`, `dags`) used by `airflow-go-pack`. | +| `--comm=<host:port> --logs=<host:port>` | **coordinator** | New. Speaks the msgpack-over-IPC coordinator protocol. Both flags are required; partial use is a hard error. | +| `AIRFLOW_BUNDLE_MAGIC_COOKIE` env var present (default) | go-plugin | Existing behaviour. Hands off to `plugin.Serve` which performs the handshake and serves `DagBundle` gRPC to the Edge Worker. | +| Otherwise | error | Print usage to stderr and exit non-zero. Today this case implicitly errors via go-plugin's failed handshake; we make the diagnostic explicit so authors running the binary directly get a clear message. | + +The two server modes share the same `bundlev1.BundleProvider` +implementation and the same lazy `RegisterDags` recorder cache that +`impl.server` already maintains (`impl/plugin.go:99-121`). Only the +front door changes. + +### Coordinator mode: protocol details + +When `Serve` enters coordinator mode it: + +1. **Parses and validates the addresses.** Both `--comm` and `--logs` + are `host:port` strings. `127.0.0.1` is the only host the coordinator + protocol is designed for, but we do not pin it — the value is whatever + `_runtime_subprocess_entrypoint` chose on the Python side. + +2. **Connects out** to the comm address, then to the logs address. Both + are TCP. We dial; we do not listen. The launcher already has both + listeners up before exec'ing the binary + ([java-sdk ADR 0002, "What the Base Class Handles Automatically"](../../java-sdk/adr/0002-dag-parsing.md#what-the-base-class-handles-automatically)). + +3. **Routes structured logs to the logs socket.** A new + `slog.Handler` writes JSON-line records (one record per line, UTF-8, + newline-terminated) to the logs connection, replacing the + `hclog`/stderr handler used in go-plugin mode. `slog.SetDefault` is + called before any user code runs so `log` arguments injected into + tasks land on the right channel. On disconnect the handler falls + back to stderr so the binary never deadlocks on a closed sink. + +4. **Reads the first comm frame and dispatches by message type.** The + first frame's body has a `type` field per the Java SDK's encoding + ([java-sdk ADR 0003, "Task SDK Protocol Messages"](../../java-sdk/adr/0003-workload-execution.md#task-sdk-protocol-messages)). + Two values are valid here: + + - `DagFileParseRequest` → DAG-parsing one-shot. + - `StartupDetails` → task execution. + + Any other type is an error frame back to the supervisor and + `os.Exit(1)`. + +#### DAG-parsing path (`DagFileParseRequest` → `DagFileParsingResult`) + +```text +Supervisor Bundle binary (Go) + │ │ + ├── [4B len][msgpack: id, ─────────────►│ + │ {type: "DagFileParseRequest", │ + │ file: "<bundle path>"}] │ + │ │ + │ ├── BundleProvider.RegisterDags(reg) + │ │ (cached, same as gRPC path) + │ │ + │ ├── serialise(reg) → + │ │ DagFileParsingResult + │ │ in DagSerialization v3 JSON + │ │ (see java-sdk ADR-0002) + │ │ + │◄────────────────[4B len][msgpack: ────┤ + │ id, {type: "DagFileParsingResult", + │ fileloc: "...", + │ serialized_dags: [...] }] │ + │ │ + │ └── close + exit(0) +``` + +The serialised DAG payload must match Python's `SerializedDAG.serialize_dag` +output **exactly**, including the `__type` / `__var` wrapping rules, +unwrapping of "non-decorated" fields (`start_date`, `end_date`, `tags`), +and the timetable encoding listed in +[java-sdk ADR 0002, "DagFileParsingResult Format"](../../java-sdk/adr/0002-dag-parsing.md#dagfileparsingresult-format). +The Go SDK gains a `serde` package that performs this encoding from +`bundlev1.Bundle` / `bundlev1.Task`, validated against +`validation/serialization/test_dags.yaml` (the same fixture set the Java +SDK uses), so the Go and Java outputs are byte-identical for shared +inputs. + +#### Task-execution path (`StartupDetails` → multi-round → `SucceedTask` / `TaskState`) + +```text +Supervisor Bundle binary (Go) + │ │ + ├── StartupDetails ────────────────────►│ + │ (ti, dag_rel_path, bundle_info, │ + │ start_date, ti_context) │ + │ │ + │ ├── lookup task: + │ │ bundle.dags[ti.dag_id] + │ │ .tasks[ti.task_id] + │ │ (returns TaskState{state:"removed"} + │ │ if not found, mirroring Java) + │ │ + │ ├── construct sdk.Client whose + │ │ GetConnection / GetVariable / + │ │ GetXCom / SetXCom calls block on + │ │ request/response over the + │ │ comm socket + │ │ + │◄── GetConnection(conn_id) ────────────┤ + ├── ConnectionResult ──────────────────►│ + │◄── GetVariable(key) ──────────────────┤ + ├── VariableResult ────────────────────►│ + │◄── GetXCom(...) ──────────────────────┤ + ├── XComResult ────────────────────────►│ + │◄── SetXCom(...) ──────────────────────┤ + ├── (empty response) ──────────────────►│ + │ │ + │ ├── task fn returns: + │ │ err == nil → SucceedTask + │ │ err != nil → TaskState{"failed"} + │ │ (panic recovered → "failed") Review Comment: The panic recovery existed since Edge Worker (`pkg/worker`), not coordinator-only. Noted in 0e3b6e0336 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
