jason810496 opened a new pull request, #67318:
URL: https://github.com/apache/airflow/pull/67318

   # go-sdk: Implement coordinator-mode runtime entry point and task runner
   
   - **Depends on https://github.com/apache/airflow/pull/TBD get merged first** 
(refactor/go-sdk/coordinator-comms)
   - **Diff for early review**: 
https://github.com/jason810496/airflow/compare/refactor/go-sdk/coordinator-comms...jason810496:refactor/go-sdk/coordinator-runtime-server
   - related: split out of https://github.com/apache/airflow/pull/67154 to land 
it in three reviewable PRs (protocol primitives -> comms layer -> runtime entry 
point).
   
   ## Why
   
   Third and final PR in the stack carved out of #67154. With the protocol 
primitives merged in PR1 and the dispatcher / logger / client merged in PR2, 
this PR wires the entry point: the same bundle binary that today serves 
go-plugin can now also be launched directly by the Python supervisor, dial the 
supervisor's comm and logs sockets, and run a single TaskInstance. Coordinator 
mode is the path that lets the Python supervisor schedule Go tasks without 
standing up a separate worker process -- it launches the bundle binary as a 
child, hands it two socket addresses on the CLI, and talks the msgpack-over-IPC 
protocol directly -- so a Go-task DagRun looks operationally indistinguishable 
from a Python-task DagRun on the supervisor side. This is the smallest PR in 
the stack (~650 LOC) because all the heavy lifting -- frame I/O, dispatcher, 
slog handler, `sdk.Client` re-implementation -- already landed in PR1 and PR2. 
Dag-file parsing over the coordinator protocol is intentionally not part 
 of this stack and will land in a follow-up once that protocol settles.
   
   ## How
   
   - `pkg/execution/server.go` -- `execution.Serve(bundle, commAddr, logsAddr)` 
dials both supervisor sockets, defers a `Close` on each, installs 
`SocketLogHandler` as the slog default before any user code runs, constructs a 
`CoordinatorComm` over the comm socket, reads the initial `StartupDetails`, and 
dispatches to `task_runner.Run`. If `Serve` itself errors before the dispatcher 
spins up, the deferred close still releases the dialed sockets so the 
supervisor doesn't see a stuck child.
   - `pkg/execution/task_runner.go` -- runs a single task. Builds a context 
carrying the `CoordinatorClient` under `sdkcontext.SdkClientContextKey` (PR1 
added the injection site in `bundlev1.taskFunction.Execute`), invokes 
`bundle.LookupTask(dag, task).Execute`, and sends the resulting `TaskStateMsg` 
back through the dispatcher. Terminal-state delivery is `ctx.Err()`-aware so a 
cancelled supervisor doesn't leave the runtime blocked on a send.
   - `pkg/execution/integration_test.go` -- end-to-end test that pipes a fake 
supervisor against the real `Serve` over an in-memory socket pair, exercises 
GetVariable / XCom push / deferral, and asserts the emitted `TaskStateMsg`.
   - `bundle/bundlev1/bundlev1server/server.go` -- splits `Serve` into a 
`decideMode` switch over `(--bundle-metadata | --comm/--logs | <none>)` so the 
same binary still serves go-plugin when no coordinator flags are present. 
Partial use of `--comm` / `--logs` is a hard error 
(`ErrCoordinatorFlagsIncomplete`), returned to `main` so the caller exits 
non-zero with usage rather than silently falling back to go-plugin.
   - `example/bundle/main.go` -- propagates `bundlev1server.Serve`'s error via 
`log.Fatal`, and tightens the example connection-log to log only non-sensitive 
fields, matching the masker TODOs PR1 added on `sdk.Client.GetConnection`.
   
   ## What
   
   - Add `go-sdk/pkg/execution/{server,task_runner,integration_test}.go`.
   - Extend `go-sdk/bundle/bundlev1/bundlev1server/server.go` with 
coordinator-mode dispatch and `ErrCoordinatorFlagsIncomplete`.
   - Update `go-sdk/example/bundle/main.go` to propagate `Serve`'s error and 
redact the connection log.
   
   ## Next
   
   - (none -- last PR of the stack)
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [x] Yes, with help of Claude Code Opus 4.7 following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to