jason810496 opened a new pull request, #67317:
URL: https://github.com/apache/airflow/pull/67317

   # go-sdk: Add concurrent-safe coordinator comms, log handler, and client
   
   - **Depends on https://github.com/apache/airflow/pull/TBD get merged first** 
(refactor/go-sdk/coordinator-protocol)
   - **Diff for early review**: 
https://github.com/jason810496/airflow/compare/refactor/go-sdk/coordinator-protocol...jason810496:refactor/go-sdk/coordinator-comms
   - related: split out of https://github.com/apache/airflow/pull/67154 to land 
it in three reviewable PRs (protocol primitives -> comms layer -> runtime entry 
point).
   
   ## Why
   
   Second PR in a 3-PR stack carved out of #67154. The coordinator protocol is 
fully duplex -- the supervisor can send a `StartupDetails` or an inbound XCom 
value at any time, and the runtime can send a `GetVariable` request and await a 
matching reply at any time -- so a naive read-one / write-one loop would either 
serialise everything (no concurrent task code) or require every caller to 
multiplex frames by hand. `CoordinatorComm` solves this once: a single 
dispatcher goroutine demuxes inbound frames to per-request reply channels keyed 
by a monotonic id, propagates `ctx.Err()` to outstanding requests on 
cancellation, and tears down pending requests cleanly on `SendRequest` failure. 
`SocketLogHandler` exists so the supervisor demuxes task logs the same way it 
demuxes everything else -- structured JSON over the dedicated logs socket -- 
instead of having to parse stderr. `CoordinatorClient` lets the task runner in 
the next PR reuse the existing `sdk.Client` API by re-implementing it on 
 top of the dispatcher, so tasks written against `sdk.Client` run unchanged 
under coordinator mode. The comm-layer dispatcher pattern is inspired by 
https://github.com/apache/airflow/pull/66412. Still no entry point in this PR 
-- that arrives in the third PR of the stack -- so this PR has no user-visible 
effect on its own.
   
   ## How
   
   - `pkg/execution/comms.go` -- `CoordinatorComm` owns the inbound read loop, 
an `atomic.Int64` request-id counter (wide enough to avoid wraparound on a 
long-running runtime), and a `map[int64]chan reply` guarded by a mutex. 
`SendRequest` registers the reply channel, writes the frame, then either 
receives the reply or `ctx.Err()` -- whichever happens first cleans up the 
channel. The dispatcher exits when the inbound stream returns `io.EOF` and 
fails any still-pending requests with the connection error.
   - `pkg/execution/logger.go` -- `SocketLogHandler` implements `slog.Handler`, 
resolves `slog.Value`s before JSON marshaling so lazy / `LogValuer` values 
render correctly, and writes one JSON line per record onto the logs socket. 
Survives a writer that goes away by swallowing the error rather than panicking 
-- log loss is preferable to task failure here.
   - `pkg/execution/client.go` -- `CoordinatorClient` implements `sdk.Client` 
by serialising each call into the matching `pkg/execution/messages.go` 
envelope, routing it through `CoordinatorComm.SendRequest`, and decoding the 
reply. `GetVariable` honours `AIRFLOW_VAR_*` environment overrides before 
hitting the supervisor (matching Python's 
`airflow.models.variable.Variable.get` order). `GetConnection` and 
`GetVariable` translate supervisor "not found" replies into the SDK's sentinel 
errors (`sdk.ErrConnectionNotFound`, `sdk.ErrVariableNotFound`) so callers can 
`errors.Is` on them.
   - Tests cover the concurrent paths explicitly: race-free dispatch under N 
concurrent requests, cleanup on `SendRequest`-with-cancelled-context, EOF 
teardown, and round-tripping every message variant through the client.
   
   ## What
   
   - Add `go-sdk/pkg/execution/{comms,logger,client}.go` plus matching unit 
tests.
   
   ## Next
   
   - refactor/go-sdk/coordinator-runtime-server (PR #TBD, opened after this one 
merges)
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [x] Yes, with help of Claude Code Opus 4.7 following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to