This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 43048cec906 Go-SDK: Make go-sdk docs up to date with Coordinator 
change (#68221)
43048cec906 is described below

commit 43048cec906b162825b9e4b97ce636d25e08b6e7
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Tue Jun 9 09:32:37 2026 +0800

    Go-SDK: Make go-sdk docs up to date with Coordinator change (#68221)
    
    * Go-SDK: Refactor README to include the Coordinator protocol
    
    * Go SDK: Document user-facing SDK and bundle authoring APIs
    
    Expand GoDoc on the symbols a bundle author touches so the API teaches
    its own use: the sdk package overview and Client/Connection interfaces,
    and the bundlev1 BundleProvider, Registry, and Dag authoring surface
    (including the task-function injection and return-value contract). Also
    fixes a ConnectinNotFound typo in an example snippet.
    
    * Go-SDK: Tidy docs, link bundle spec, and pin pkgsite
    
    - Fix the AIRFLOW__SECTION__KEY env var template (double underscore) and
      reword the coordinator placement note in the README
    - Cross-link the formal executable-bundle-spec from the README
    - Move the env-prefix const docs onto per-constant comments so they
      satisfy Go doc conventions / revive
    - Pin the docs recipe's pkgsite to v0.1.0 for reproducible previews
    
    * Go-SDK: Fix NotFound docstring grammar and use absolute bundle-spec link
    
    - Correct "communication issues to the API server" -> "with" across the
      Variable/Connection/XCom NotFound sentinels
    - Make the bundle-spec link absolute so it resolves on pkg.go.dev, where
      relative cross-module paths break
---
 go-sdk/Justfile                                 |   5 +
 go-sdk/README.md                                | 301 ++++++++++++++++++++----
 go-sdk/bundle/bundlev1/bundlev1server/server.go |   3 +
 go-sdk/bundle/bundlev1/registry.go              |  43 +++-
 go-sdk/bundle/bundlev1/schemas.go               |  30 ++-
 go-sdk/bundle/bundlev1/task.go                  |   4 +
 go-sdk/sdk/client.go                            |   4 +
 go-sdk/sdk/connection.go                        |   8 +
 go-sdk/sdk/doc.go                               |  26 +-
 go-sdk/sdk/errors.go                            |   8 +-
 go-sdk/sdk/sdk.go                               |  29 ++-
 11 files changed, 402 insertions(+), 59 deletions(-)

diff --git a/go-sdk/Justfile b/go-sdk/Justfile
index fd4edf266cf..c83f5c1b47f 100644
--- a/go-sdk/Justfile
+++ b/go-sdk/Justfile
@@ -38,6 +38,11 @@ test:
   @echo "Running unit tests..."
   go run gotest.tools/[email protected] -f dots-v2 ./...
 
+# Build and serve Go package documentation locally
+docs port="6060":
+  @echo "Serving Go docs at 
http://localhost:{{port}}/github.com/apache/airflow/go-sdk";
+  go run golang.org/x/pkgsite/cmd/[email protected] -http=localhost:{{port}} .
+
 # Generate protocol buffers
 protoc:
   @prek run -a protoc
diff --git a/go-sdk/README.md b/go-sdk/README.md
index c84d5529fbf..5ceb1afb4cc 100644
--- a/go-sdk/README.md
+++ b/go-sdk/README.md
@@ -19,38 +19,215 @@
 
 # Apache Airflow Go Task SDK
 
-The Go SDK uses the Task Execution Interface (TEI or Task API) introduced in 
AIP-72 with Airflow 3.0.0 to give
-Task functions written in Go full access to the Airflow "model", natively in 
go.
+The Go SDK is a Go implementation of the Airflow Task SDK. It lets you write 
task functions in Go that
+have native access to the Airflow "model" (Variables, Connections, and XCom), 
instead of writing them in
+Python.
 
-The Task API however does not provide a means to get the `ExecuteTaskWorkload` 
to the go worker itself. For
-that we use the Edge Executor API.
-Longer term we will likely need to stabilize the Edge Executor API and add 
versioning to it.
+It is built on the Task Execution Interface (TEI, a.k.a. the Task API) 
introduced by AIP-72 in Airflow
+3.0.0. AIP-72 standardised how a task runtime talks to Airflow over an HTTP 
Execution API, which decoupled
+the language a task is written in from the Airflow core. The Go SDK is one 
such runtime; the Java SDK is
+another.
 
-Since Go is a compiled language (putting aside projects such as 
[YAEGI](https://github.com/traefik/yaegi) that allow Go to be interpreted), all 
tasks must:
+> [!WARNING]
+> This is an **experimental** feature. The SDK is under active development and 
its APIs, wire protocols,
+> and tooling may change between releases without notice.
 
-1. Be compiled into a binary ahead of time, and
-2. Be registered inside the worker process in order to be executed.
+## The compiled-language constraint
 
+Python tasks are imported and run in-process. Go is compiled, so the model is 
different.
 
-> [!NOTE]
-> This Golang SDK is under active development and is not ready for prime-time 
yet.
+A single binary that bundles one or more Dags' task functions is called a 
**bundle**. You build one with
+the SDK's packer, `airflow-go-pack`, which compiles your code and appends a 
metadata footer (the manifest
+of `dag_id`s and `task_id`s, plus the Dag source) to the executable. The 
result is a **self-contained
+executable bundle**: a single runnable file that *is* the bundle, with no 
separate manifest or archive to
+ship alongside it.
 
-## Quickstart
+## You still need a Python stub Dag (for now)
 
-- See [`example/bundle/main.go`](./example/bundle/main.go) for an example dag 
bundle where you can define your task functions
+The Task API does not yet carry Dag *structure* for non-Python languages, so 
the Dag's shape and task
+dependencies are still declared in a small Python file using `@task.stub`:
 
-- Compile this into a binary:
+```python
+from airflow.sdk import dag, task
+
+
[email protected](queue="golang")
+def extract(): ...
+
+
[email protected](queue="golang")
+def transform(): ...
+
+
+@dag()
+def simple_dag():
+    extract() >> transform()
+
+
+simple_dag()
+```
+
+`@task.stub` tells the Dag parser the "shape" of the Go tasks (their names and 
dependencies) without any
+Python implementation. The `queue=` value routes the task to the Go runtime. 
This Python requirement is a
+known limitation.
+
+
+## Authoring a bundle
+
+Implement `bundlev1.BundleProvider`, register your Dags and tasks, and `main` 
is one line. From
+[`example/bundle/main.go`](./example/bundle/main.go):
+
+```go
+type myBundle struct{}
+
+var _ v1.BundleProvider = (*myBundle)(nil)
+
+func (m *myBundle) GetBundleVersion() v1.BundleInfo {
+    return v1.BundleInfo{Name: bundleName, Version: &bundleVersion}
+}
+
+func (m *myBundle) RegisterDags(dagbag v1.Registry) error {
+    simpleDag := dagbag.AddDag("simple_dag")
+    simpleDag.AddTask(extract)
+    simpleDag.AddTask(transform)
+    return nil
+}
+
+func main() {
+    if err := bundlev1server.Serve(&myBundle{}); err != nil {
+        log.Fatal(err)
+    }
+}
+```
+
+A task is an ordinary Go function. The runtime inspects its signature and 
injects arguments by type:
+`context.Context`, `*slog.Logger`, and an `sdk.Client` (or a narrower 
interface such as
+`sdk.VariableClient`). An optional `(any, error)` return becomes the task's 
XCom; an `error` return marks
+the task failed.
+
+```go
+func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any, 
error) {
+    conn, err := client.GetConnection(ctx, "test_http")
+    // ... do work, honour ctx cancellation ...
+    return map[string]any{"go_version": runtime.Version()}, nil
+}
+
+func transform(ctx context.Context, client sdk.VariableClient, log 
*slog.Logger) error {
+    val, err := client.GetVariable(ctx, "my_variable")
+    if err != nil {
+        return err
+    }
+    log.Info("Obtained variable", "my_variable", val)
+    return nil
+}
+```
+
+Asking for the narrowest interface a task needs (e.g. `sdk.VariableClient` 
instead of `sdk.Client`) makes
+unit testing easier and documents which Airflow features the task touches. 
`RegisterDags` is the single
+source of truth for which `dag_id`s and `task_id`s a bundle can run.
+
+## Deployment modes
+
+A bundle can run in two ways. The same bundle binary works in both; you pick 
one per deployment:
+
+1. **Coordinator** (recommended)
+2. **Edge Worker**
+
+For the protocol details behind each, see [How it works](#how-it-works).
+
+### Coordinator (recommended)
+
+A Python task runner executes the Go task directly, with no separate Go worker 
process to run on the host.
+This is the same coordinator mechanism the Java SDK uses.
+
+**Why this is recommended:** the mature Python supervisor handles the 
Airflow-facing concerns, so this path
+inherits its capabilities (remote task logs to S3/GCS, the full range of task 
states, and alternate XCom
+backends) rather than reimplementing them in Go. These are exactly the 
features the Edge Worker path is
+still missing (see [Known limitations](#known-limitations)).
+
+#### Quickstart
+
+- Build and pack your bundle with `airflow-go-pack`. The packer compiles the 
bundle and appends an
+  embedded metadata footer so the coordinator can read its `dag_id`s without 
executing the binary,
+  producing a single runnable file:
 
   ```bash
-  go build -o ./bin/sample-dag-bundle ./example/bundle
+  go tool airflow-go-pack ./example/bundle -- -trimpath -tags=prod
+  ```
+
+  Use `--output <path>` to write the packed bundle straight into a directory 
the coordinator scans
+  (`executables_root`), and pass extra `go build` flags after `--`.
+
+  For cross-compiling (e.g. deploy to a Linux host from an Apple-silicon 
(darwin/arm64) machine), pass `--goos`/`--goarch` and the
+  packer cross-builds for you:
+
+  ```bash
+  go tool airflow-go-pack --goos linux --goarch amd64 \
+    --output ~/airflow/executable-bundles/sample-dag-bundle \
+    ./example/bundle
+  ```
+
+  Alternatively, use `--executable`/`--source`. The packer normally execs the 
binary to read
+  its metadata; a cross-compiled binary cannot run on the host, so generate 
the metadata on a machine that
+  can run it and pass the file with `--airflow-metadata`:
+
+  ```bash
+  # on linux/amd64 machine:
+  go build -o my-bundle ./example/bundle
+  ./my-bundle --airflow-metadata > airflow-metadata.yaml
+
+  # on darwin/arm64 machine:
+  go tool airflow-go-pack --executable ./my-bundle --source main.go 
--airflow-metadata airflow-metadata.yaml
+  ```
+
+  > [!NOTE]
+  > The packer ships via the Go 1.24 `tool` directive, so there is no global 
install: add
+  > `tool github.com/apache/airflow/go-sdk/cmd/airflow-go-pack` to your bundle 
module's `go.mod` and run
+  > it with `go tool airflow-go-pack`. This pins the packer version per 
project.
+
+- Register the coordinator and route the queue to it, under `[sdk]` in 
`airflow.cfg` (or the equivalent
+  `AIRFLOW__SDK__*` env vars):
+
+  ```ini
+  [sdk]
+  coordinators = {"go": {"classpath": 
"airflow.sdk.coordinators.executable.ExecutableCoordinator", "kwargs": 
{"executables_root": ["~/airflow/executable-bundles"]}}}
+  queue_to_coordinator = {"golang": "go"}
   ```
 
-  (or see the [`Justfile`](./example/bundle/Justfile) for how you can build it 
and specify they bundle version number at build time.)
+  `executables_root` is one or more directories the coordinator scans for 
bundles; `queue_to_coordinator`
+  routes stub tasks with `queue="golang"` to this Go coordinator.
+
+  > [!IMPORTANT]
+  > The coordinator is part of the Airflow worker, so the `[sdk]` config (and 
the bundle files in
+  > `executables_root`) only need to be present wherever tasks actually 
execute. With `CeleryExecutor`,
+  > setting it on the Celery workers is sufficient. With `LocalExecutor`, 
tasks run inside the scheduler
+  > process, so it must be set where the scheduler can read it. The API server 
and Dag processor do not
+  > need it.
+
+- Deploy the matching Python stub Dag (above) into Airflow. There is no 
separate Go worker to run: the
+  Airflow worker forks the bundle binary once per task instance.
+
+### Edge Worker (go-plugin)
+
+A long-running Go worker process (`airflow-go-edge-worker`) polls Airflow for 
work and runs your bundle,
+with no Python in the data path. This path runs end-to-end today, but is 
missing the features listed under
+[Known limitations](#known-limitations).
+
+#### Quickstart
 
-- Configure the go edge worker, by editing `$AIRFLOW_HOME/go-sdk.yaml`:
+- See [`example/bundle/main.go`](./example/bundle/main.go) for an example Dag 
bundle.
 
-  These config values need tweaking, especially the ports and secrets. The 
ports are the default assuming
-  airflow is running locally via `airflow standalone`.
+- Compile it into a binary:
+
+  ```bash
+  go build -o ./bin/sample-dag-bundle ./example/bundle
+  ```
+
+  (or see the [`Justfile`](./example/bundle/Justfile) for how to build it and 
set the bundle version at
+  build time.)
+
+- Configure the Go edge worker by editing `$AIRFLOW_HOME/go-sdk.yaml`. The 
ports below are the defaults
+  assuming Airflow runs locally via `airflow standalone`; tweak the ports and 
secrets to match your setup:
 
   ```yaml
   edge:
@@ -74,63 +251,97 @@ Since Go is a compiled language (putting aside projects 
such as [YAEGI](https://
     secret_key: "u0ZDb2ccINAbhzNmvYzclw=="
   ```
 
-  You can also set these options via environment variables of 
`AIRFLOW__${section}_${key}`, for example `AIRFLOW__API_AUTH__SECRET_KEY`.
+  You can also set these options via environment variables of 
`AIRFLOW__${SECTION}__${KEY}`, for example
+  `AIRFLOW__API_AUTH__SECRET_KEY`.
 
-- Install the worker
+- Install the worker:
 
   ```bash
   go install github.com/apache/airflow/go-sdk/cmd/airflow-go-edge-worker@latest
   ```
 
-- Run it!
+- Run it:
 
   ```bash
   airflow-go-edge-worker run --queues golang
   ```
 
-### Example Dag:
+- Deploy the matching Python stub Dag (above) into Airflow.
 
-You will need to create a python Dag and deploy it in to the Airflow
+## Known limitations
 
-```python
-from airflow.sdk import dag, task
+A non-exhaustive list of features the **Edge Worker (go-plugin)** path has yet 
to implement. These are the
+main reason the coordinator-based path is recommended: in that mode the Python 
supervisor handles these
+concerns, so they are not limitations there.
 
+- Putting tasks into states other than success or failed/up-for-retry 
(deferred,
+  failed-without-retries, etc.).
+- Remote task logs (i.e. S3/GCS etc.).
+- XCom reading/writing through non-default XCom backends.
 
[email protected](queue="golang")
-def extract(): ...
+## How it works
 
+The same bundle binary speaks two different protocols; which one it uses is 
decided at launch by the CLI
+flags it was invoked with. User code (`func main`) is identical either way.
 
[email protected](queue="golang")
-def transform(): ...
+### Coordinator protocol
 
+```
+Python supervisor / task runner
+        │  finds + validates the bundle, then forks it:
+        ▼
+  <bundle binary> --comm=127.0.0.1:P1 --logs=127.0.0.1:P2
+        │  binary dials BACK over TCP loopback (msgpack-over-IPC)
+        ▼
+  GetConnection / GetVariable / GetXCom / SetXCom ... → SucceedTask / TaskState
+```
 
-@dag()
-def simple_dag():
+- The Python `ExecutableCoordinator` forks the bundle binary with 
`--comm`/`--logs` addresses it is already
+  listening on. The binary dials back (it never listens) and speaks a 
length-prefixed msgpack-over-IPC wire
+  protocol on the comm socket, with structured JSON-line logs on the logs 
socket.
+- The Python runtime is the worker. It proxies every `GetConnection` / 
`GetVariable` / `GetXCom` /
+  `SetXCom` call through to the Execution API. The Go binary just runs the 
task function.
 
-    extract() >> transform()
+The Go side of the protocol is implemented in `pkg/execution/`. On the Python 
side it is the
+`ExecutableCoordinator` in 
`task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py`.
 
+### Edge Worker protocol
 
-simple_dag()
+```
+Airflow scheduler ──Edge Executor API──► airflow-go-edge-worker 
──go-plugin/gRPC──► bundle binary
+   (ExecuteTaskWorkload)                  (long-running Go process)            
      (child process)
 ```
 
-Here we see the `@task.stub` which tells the Dag parser about the "shape" of 
the go tasks, and lets us define
-the relationships between them
-
-> [!NOTE]
-> Yes, you still have to have a python Dag file for now. This is a known 
limitation at the moment.
+- `airflow-go-edge-worker` is a long-running Go process. It registers with the 
scheduler, polls the Edge
+  Executor API for `ExecuteTaskWorkload`s, and heartbeats.
+- For each workload it execs the bundle binary as a child and connects over 
HashiCorp
+  [`go-plugin`](https://github.com/hashicorp/go-plugin) (gRPC over a 
handshake-gated socket).
+- The Task API itself has no way to deliver an `ExecuteTaskWorkload` to a Go 
worker, so the Edge Executor
+  API fills that gap. Longer term that API will likely need stabilising and 
versioning.
 
-## Known missing features
+## Architectural decisions
 
-A non-exhaustive list of features we have yet to implement
+The [`adr/`](./adr) directory records the design decisions behind the SDK:
 
-- Support for putting tasks into state other than success or 
failed/up-for-retry (deferred, failed-without-retries etc.)
-- Remote task logs (i.e. S3/GCS etc)
-- XCom reading/writing from other XCom backends
+- [ADR 0001](./adr/0001-bundle-packing-options.md): bundle-packing options.
+- [ADR 0002](./adr/0002-use-go-tool-directive-for-bundle-packer.md): deliver 
the bundle packer via the
+  Go 1.24 `tool` directive.
+- [ADR 0003](./adr/0003-coordinator-protocol-msgpack-ipc.md): dual-mode 
coordinator protocol, where one
+  binary speaks both go-plugin gRPC (Edge Worker) and msgpack-over-IPC (Python 
coordinator).
+- [ADR 0004](./adr/0004-self-contained-executable-bundle.md): the 
self-contained executable bundle, where
+  the executable *is* the bundle.
 
+The normative, language-agnostic on-disk bundle format (the footer layout, 
manifest fields, and what the
+`ExecutableCoordinator` reads) is specified in
+[`executable-bundle-spec.rst`](https://github.com/apache/airflow/blob/main/task-sdk/docs/executable-bundle-spec.rst).
+`airflow-go-pack` produces bundles conforming to that spec.
 
 ## Future Direction
 
 This is more of an "it would be nice to have" than any plan or commitment, and 
a place to record ideas.
 
-- The ability to run Airflow tasks "in" an existing code base - i.e. being 
able to define an Airflow task function that runs (in a goroutine) inside an 
existing code base an app.
-- Do the task function reflection ahead of time, not for each Execute call.
+- Defining the whole Dag in the Go SDK, so the Python stub Dag is no longer 
required and a bundle's
+  structure and task dependencies can be declared natively in Go.
+- The ability to run Airflow tasks "in" an existing code base, i.e. defining 
an Airflow task function that
+  runs (in a goroutine) inside an existing app.
+- Doing the task function reflection ahead of time, rather than for each 
Execute call.
diff --git a/go-sdk/bundle/bundlev1/bundlev1server/server.go 
b/go-sdk/bundle/bundlev1/bundlev1server/server.go
index fde5314d332..2b39392db9c 100644
--- a/go-sdk/bundle/bundlev1/bundlev1server/server.go
+++ b/go-sdk/bundle/bundlev1/bundlev1server/server.go
@@ -89,6 +89,9 @@ func (s serveConfigFunc) ApplyServeOpt(in *ServerConfig) 
error {
        return s(in)
 }
 
+// ServerConfig holds settings that ServeOpt values apply before the bundle
+// server starts. It is currently empty; it exists so options can be added 
later
+// without changing Serve's signature.
 type ServerConfig struct{}
 
 // serveMode tags the protocol the binary will speak this run.
diff --git a/go-sdk/bundle/bundlev1/registry.go 
b/go-sdk/bundle/bundlev1/registry.go
index 59cad631125..8e74a0c6708 100644
--- a/go-sdk/bundle/bundlev1/registry.go
+++ b/go-sdk/bundle/bundlev1/registry.go
@@ -28,18 +28,50 @@ import (
 )
 
 type (
-       Task   = worker.Task
+       // Task is one registered task: something the runtime can Execute. 
Bundle
+       // authors do not implement this directly; Dag.AddTask wraps a plain Go
+       // function into a Task for you.
+       Task = worker.Task
+
+       // Bundle is the execution-time view of a registry: it looks up a task 
by
+       // dag_id and task_id. Registry embeds it so the object built during
+       // RegisterDags can also serve tasks when they run.
        Bundle = worker.Bundle
 
+       // Dag is the handle returned by Registry.AddDag. Use it to attach the 
Go
+       // functions that implement the dag's tasks.
        Dag interface {
+               // AddTask registers fn as a task, deriving the task_id from 
fn's own
+               // name (so it must match the @task.stub name in the Python 
dag).
+               //
+               // fn is an ordinary Go function whose parameters are injected 
by type
+               // and may appear in any order. Recognised parameters are:
+               //   - context.Context: cancelled when the task is asked to stop
+               //   - *slog.Logger: writes to the task's Airflow log
+               //   - sdk.Client (or a narrower sdk.VariableClient / 
sdk.ConnectionClient):
+               //     access to Variables, Connections, and XCom
+               //
+               // fn must return either error or (result, error): a non-nil 
error fails
+               // the task, and a non-nil first result is pushed as the task's
+               // return-value XCom. Passing a non-function, or a function 
whose return
+               // signature does not match, panics at registration time.
                AddTask(fn any)
+
+               // AddTaskWithName is like AddTask but sets task_id explicitly 
instead of
+               // deriving it from the function name. Use it when the Go 
function name
+               // cannot match the Python @task.stub id, for example for an 
anonymous
+               // function or a differing name.
                AddTaskWithName(taskId string, fn any)
        }
 
-       // Registry defines the interface that lets user code add dags and 
tasks, and extends Bundle for execution
-       // time
+       // Registry is the recorder passed to BundleProvider.RegisterDags. Use 
it to
+       // declare the dags this bundle can run; it also extends Bundle so the 
same
+       // object serves task lookups at execution time.
        Registry interface {
                Bundle
+               // AddDag registers a dag by its dag_id (matching the Python 
stub dag)
+               // and returns a Dag handle for attaching tasks. Registering 
the same
+               // dag_id twice panics.
                AddDag(dagId string) Dag
        }
 
@@ -83,7 +115,10 @@ func (d dagShim) AddTaskWithName(taskId string, fn any) {
        d.registry.registerTaskWithName(d.dagId, taskId, fn)
 }
 
-// Function New creates a new bundle on which Dag and Tasks can be registered
+// New returns an empty Registry on which dags and tasks can be registered. The
+// runtime creates one and hands it to BundleProvider.RegisterDags, so bundle
+// authors rarely call this directly; it is handy for unit-testing a
+// RegisterDags implementation.
 func New() Registry {
        return &registry{
                taskFuncMap: make(map[string]map[string]Task),
diff --git a/go-sdk/bundle/bundlev1/schemas.go 
b/go-sdk/bundle/bundlev1/schemas.go
index f5710e44504..46bf5cc2879 100644
--- a/go-sdk/bundle/bundlev1/schemas.go
+++ b/go-sdk/bundle/bundlev1/schemas.go
@@ -22,29 +22,49 @@ import (
        "github.com/apache/airflow/go-sdk/pkg/api"
 )
 
+// BundleProvider is the single interface a bundle author implements. Construct
+// one in main and pass it to bundlev1server.Serve; the runtime calls
+// GetBundleVersion to identify the bundle and RegisterDags to load its tasks.
 type BundleProvider interface {
        // GetBundleVersion returns upfront information about the bundle name 
and version without needing to load
        // the full dag and task information, which could be memory intensive.
        GetBundleVersion() BundleInfo
 
-       // RegisterDags is called to populate the Task functions in the 
registry in order to execute them.
+       // RegisterDags declares every dag and task in this bundle on the 
supplied
+       // Registry, for example:
        //
-       // You should populate all dags and tasks in the bundle.
+       //      func (m *myBundle) RegisterDags(dagbag bundlev1.Registry) error 
{
+       //              dag := dagbag.AddDag("simple_dag")
+       //              dag.AddTask(extract)
+       //              dag.AddTask(transform)
+       //              return nil
+       //      }
        //
-       // This will be called once-per-process-per-bundle and cached 
internally. You do not have to cache this
-       // yourself
+       // Register all dags and tasks here. It is called once per process per
+       // bundle and cached internally, so you do not have to cache it 
yourself.
        RegisterDags(Registry) error
 }
 
-// BundleInfo Schema for telling task which bundle to run with.
+// BundleInfo identifies a bundle by name and version. It is returned by
+// BundleProvider.GetBundleVersion and tells Airflow which bundle to run a task
+// with.
 type BundleInfo = api.BundleInfo
 
+// TaskInstance identifies the running task by dag_id, run_id, task_id, and
+// map_index. It is an alias for the Execution-API type and is what
+// XComClient.PushXCom takes.
 type TaskInstance = api.TaskInstance
 
+// GetMetadataResponse is the runtime reply that carries a bundle's BundleInfo
+// back over the go-plugin transport. It is plumbing between the worker and the
+// bundle, not something authors construct.
 type GetMetadataResponse struct {
        Bundle BundleInfo
 }
 
+// ExecuteTaskWorkload is the runtime payload describing one task to run: its
+// TaskInstance, the bundle to load, and an optional log path. The worker
+// delivers it to the bundle; authors do not build it themselves.
 type ExecuteTaskWorkload struct {
        Token string `json:"token"`
 
diff --git a/go-sdk/bundle/bundlev1/task.go b/go-sdk/bundle/bundlev1/task.go
index 5277f40681c..0a3aca70666 100644
--- a/go-sdk/bundle/bundlev1/task.go
+++ b/go-sdk/bundle/bundlev1/task.go
@@ -36,6 +36,10 @@ type taskFunction struct {
 
 var _ Task = (*taskFunction)(nil)
 
+// NewTaskFunction wraps a plain Go function as a Task, validating its 
signature
+// (injectable parameters, and a return of error or (result, error)). Bundle
+// authors normally use Dag.AddTask, which calls this for them; use it directly
+// only when building a Task outside the registry.
 func NewTaskFunction(fn any) (Task, error) {
        v := reflect.ValueOf(fn)
        fullName := runtime.FuncForPC(v.Pointer()).Name()
diff --git a/go-sdk/sdk/client.go b/go-sdk/sdk/client.go
index f719fdc4787..61a8a56f8bb 100644
--- a/go-sdk/sdk/client.go
+++ b/go-sdk/sdk/client.go
@@ -33,6 +33,10 @@ type client struct{}
 
 var _ Client = (*client)(nil)
 
+// NewClient returns the default Client, which serves Variable, Connection, and
+// XCom calls from the Execution API (reading the per-task API client from the
+// context). Task functions are handed a Client by the runtime and rarely need
+// this; it is exported mainly for tests and advanced setups.
 func NewClient() Client {
        return &client{}
 }
diff --git a/go-sdk/sdk/connection.go b/go-sdk/sdk/connection.go
index 1d0bcf30b6f..07d9b1cd2ca 100644
--- a/go-sdk/sdk/connection.go
+++ b/go-sdk/sdk/connection.go
@@ -26,6 +26,10 @@ import (
        "github.com/apache/airflow/go-sdk/pkg/api"
 )
 
+// Connection is an Airflow Connection resolved for a task: the endpoint and
+// credentials registered in Airflow under a conn_id. Obtain one with
+// ConnectionClient.GetConnection. Optional fields are pointers so an absent
+// value (nil) is distinct from an empty one.
 type Connection struct {
        ID string
 
@@ -47,6 +51,10 @@ type Connection struct {
        Extra map[string]any
 }
 
+// GetURI renders the connection as a URL of the form
+// scheme://login:password@host:port/path?extra. It is the Go equivalent of
+// Python's Connection.get_uri and is handy when a client library wants a 
single
+// connection string rather than individual fields.
 func (c Connection) GetURI() *url.URL {
        uri := &url.URL{}
 
diff --git a/go-sdk/sdk/doc.go b/go-sdk/sdk/doc.go
index 82aa91d0bc2..edb9f45b211 100644
--- a/go-sdk/sdk/doc.go
+++ b/go-sdk/sdk/doc.go
@@ -16,6 +16,30 @@
 // under the License.
 
 /*
-Package sdk provides access to the Airflow objects (Variables, Connection, 
XCom etc) during run time for tasks.
+Package sdk gives task functions access to the Airflow "model" (Variables,
+Connections, and XCom) at run time.
+
+A task function does not construct a client itself. The runtime inspects the
+function's parameters and injects one by type, so you declare the narrowest
+interface you need and use it:
+
+       func mytask(ctx context.Context, client sdk.Client, log *slog.Logger) 
error {
+               val, err := client.GetVariable(ctx, "my_variable")
+               if err != nil {
+                       return err
+               }
+               log.Info("got variable", "value", val)
+               return nil
+       }
+
+Ask for [Client] for full access, or a narrower interface such as
+[VariableClient] or [ConnectionClient] when the task only reads one kind of
+object. The narrower type documents what the task touches and makes it easy to
+pass a fake in unit tests.
+
+To publish a result, return a value from the task function: the runtime pushes
+it as the task's return-value XCom, so most tasks never call [XComClient]
+directly. Lookups that miss return a wrapped sentinel error 
([VariableNotFound],
+[ConnectionNotFound], [XComNotFound]) you can test for with errors.Is.
 */
 package sdk
diff --git a/go-sdk/sdk/errors.go b/go-sdk/sdk/errors.go
index d8b071f0944..cba5fdccfa6 100644
--- a/go-sdk/sdk/errors.go
+++ b/go-sdk/sdk/errors.go
@@ -22,15 +22,19 @@ import (
 )
 
 // VariableNotFound is an error value used to signal that a variable could not 
be found (and that there were
-// no communication issues to the API server).
+// no communication issues with the API server).
 //
 // See the “GetVariable“ method of [VariableClient] for an example
 var VariableNotFound = errors.New("variable not found")
 
 // ConnectionNotFound is an error value used to signal that a connection could 
not be found (and that there were
-// no communication issues to the API server).
+// no communication issues with the API server).
 //
 // See the “GetConnection“ method of [ConnectionClient] for an example
 var ConnectionNotFound = errors.New("connection not found")
 
+// XComNotFound is an error value used to signal that an XCom value could not 
be found (and that there were
+// no communication issues with the API server).
+//
+// See the “GetXCom“ method of [XComClient] for an example
 var XComNotFound = errors.New("xcom not found")
diff --git a/go-sdk/sdk/sdk.go b/go-sdk/sdk/sdk.go
index e4f5a61a331..cda13878fd9 100644
--- a/go-sdk/sdk/sdk.go
+++ b/go-sdk/sdk/sdk.go
@@ -24,7 +24,15 @@ import (
 )
 
 const (
-       VariableEnvPrefix   = "AIRFLOW_VAR_"
+       // VariableEnvPrefix is the environment-variable prefix used as a local
+       // fallback for Variable lookups. GetVariable first checks the process
+       // environment for VariableEnvPrefix plus the uppercased key (so key
+       // "my_var" is read from AIRFLOW_VAR_MY_VAR) before asking the API 
server,
+       // mirroring the Python SDK and making local development and tests easy.
+       VariableEnvPrefix = "AIRFLOW_VAR_"
+
+       // ConnectionEnvPrefix is the matching prefix for Connections. The
+       // connection env fallback is not wired up yet, so it is currently 
unused.
        ConnectionEnvPrefix = "AIRFLOW_CONN_"
 )
 
@@ -62,13 +70,14 @@ type VariableClient interface {
        UnmarshalJSONVariable(ctx context.Context, key string, pointer any) 
error
 }
 
+// ConnectionClient reads Airflow Connections.
 type ConnectionClient interface {
        // GetConnection returns the value of an Airflow Connection.
        //
        // If the conn is not found error will be a wrapped 
``ConnectionNotFound``:
        //
        //              conn, err := client.GetConnection(ctx, "my-db")
-       //              if errors.Is(err, ConnectinNotFound) {
+       //              if errors.Is(err, ConnectionNotFound) {
        //                              // Handle not found, set default, 
return custom error etc
        //              } else {
        //                              // Other errors here, such as http 
network timeouts etc.
@@ -76,7 +85,17 @@ type ConnectionClient interface {
        GetConnection(ctx context.Context, connID string) (Connection, error)
 }
 
+// XComClient reads and writes XCom values. Most tasks never need this: to
+// publish a result, return a value from the task function and the runtime
+// pushes it as the return-value XCom. Reach for these methods only to read
+// another task's XCom, or to push under a custom key.
 type XComClient interface {
+       // GetXCom returns the value stored under key by the task identified by
+       // dagId/runId/taskId. For a mapped task instance pass its mapIndex,
+       // otherwise pass nil. If no value exists the error wraps XComNotFound.
+       //
+       // value is reserved for future typed decoding and is currently 
ignored; the
+       // stored value is returned as the first result instead.
        GetXCom(
                ctx context.Context,
                dagId, runId, taskId string,
@@ -84,9 +103,15 @@ type XComClient interface {
                key string,
                value any,
        ) (any, error)
+
+       // PushXCom stores value under key for the given task instance ti.
        PushXCom(ctx context.Context, ti api.TaskInstance, key string, value 
any) error
 }
 
+// Client is the full task-facing API: read Variables and Connections, and
+// read/write XCom. A task that declares an sdk.Client parameter is handed one
+// by the runtime. If a task needs only one capability, ask for the narrower
+// VariableClient, ConnectionClient, or XComClient instead.
 type Client interface {
        VariableClient
        ConnectionClient

Reply via email to