This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 43048cec906 Go-SDK: Make go-sdk docs up to date with Coordinator
change (#68221)
43048cec906 is described below
commit 43048cec906b162825b9e4b97ce636d25e08b6e7
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Tue Jun 9 09:32:37 2026 +0800
Go-SDK: Make go-sdk docs up to date with Coordinator change (#68221)
* Go-SDK: Refactor README to include the Coordinator protocol
* Go SDK: Document user-facing SDK and bundle authoring APIs
Expand GoDoc on the symbols a bundle author touches so the API teaches
its own use: the sdk package overview and Client/Connection interfaces,
and the bundlev1 BundleProvider, Registry, and Dag authoring surface
(including the task-function injection and return-value contract). Also
fixes a ConnectinNotFound typo in an example snippet.
* Go-SDK: Tidy docs, link bundle spec, and pin pkgsite
- Fix the AIRFLOW__SECTION__KEY env var template (double underscore) and
reword the coordinator placement note in the README
- Cross-link the formal executable-bundle-spec from the README
- Move the env-prefix const docs onto per-constant comments so they
satisfy Go doc conventions / revive
- Pin the docs recipe's pkgsite to v0.1.0 for reproducible previews
* Go-SDK: Fix NotFound docstring grammar and use absolute bundle-spec link
- Correct "communication issues to the API server" -> "with" across the
Variable/Connection/XCom NotFound sentinels
- Make the bundle-spec link absolute so it resolves on pkg.go.dev, where
relative cross-module paths break
---
go-sdk/Justfile | 5 +
go-sdk/README.md | 301 ++++++++++++++++++++----
go-sdk/bundle/bundlev1/bundlev1server/server.go | 3 +
go-sdk/bundle/bundlev1/registry.go | 43 +++-
go-sdk/bundle/bundlev1/schemas.go | 30 ++-
go-sdk/bundle/bundlev1/task.go | 4 +
go-sdk/sdk/client.go | 4 +
go-sdk/sdk/connection.go | 8 +
go-sdk/sdk/doc.go | 26 +-
go-sdk/sdk/errors.go | 8 +-
go-sdk/sdk/sdk.go | 29 ++-
11 files changed, 402 insertions(+), 59 deletions(-)
diff --git a/go-sdk/Justfile b/go-sdk/Justfile
index fd4edf266cf..c83f5c1b47f 100644
--- a/go-sdk/Justfile
+++ b/go-sdk/Justfile
@@ -38,6 +38,11 @@ test:
@echo "Running unit tests..."
go run gotest.tools/[email protected] -f dots-v2 ./...
+# Build and serve Go package documentation locally
+docs port="6060":
+ @echo "Serving Go docs at
http://localhost:{{port}}/github.com/apache/airflow/go-sdk"
+ go run golang.org/x/pkgsite/cmd/[email protected] -http=localhost:{{port}} .
+
# Generate protocol buffers
protoc:
@prek run -a protoc
diff --git a/go-sdk/README.md b/go-sdk/README.md
index c84d5529fbf..5ceb1afb4cc 100644
--- a/go-sdk/README.md
+++ b/go-sdk/README.md
@@ -19,38 +19,215 @@
# Apache Airflow Go Task SDK
-The Go SDK uses the Task Execution Interface (TEI or Task API) introduced in
AIP-72 with Airflow 3.0.0 to give
-Task functions written in Go full access to the Airflow "model", natively in
go.
+The Go SDK is a Go implementation of the Airflow Task SDK. It lets you write
task functions in Go that
+have native access to the Airflow "model" (Variables, Connections, and XCom),
instead of writing them in
+Python.
-The Task API however does not provide a means to get the `ExecuteTaskWorkload`
to the go worker itself. For
-that we use the Edge Executor API.
-Longer term we will likely need to stabilize the Edge Executor API and add
versioning to it.
+It is built on the Task Execution Interface (TEI, a.k.a. the Task API)
introduced by AIP-72 in Airflow
+3.0.0. AIP-72 standardised how a task runtime talks to Airflow over an HTTP
Execution API, which decoupled
+the language a task is written in from the Airflow core. The Go SDK is one
such runtime; the Java SDK is
+another.
-Since Go is a compiled language (putting aside projects such as
[YAEGI](https://github.com/traefik/yaegi) that allow Go to be interpreted), all
tasks must:
+> [!WARNING]
+> This is an **experimental** feature. The SDK is under active development and
its APIs, wire protocols,
+> and tooling may change between releases without notice.
-1. Be compiled into a binary ahead of time, and
-2. Be registered inside the worker process in order to be executed.
+## The compiled-language constraint
+Python tasks are imported and run in-process. Go is compiled, so the model is
different.
-> [!NOTE]
-> This Golang SDK is under active development and is not ready for prime-time
yet.
+A single binary that bundles one or more Dags' task functions is called a
**bundle**. You build one with
+the SDK's packer, `airflow-go-pack`, which compiles your code and appends a
metadata footer (the manifest
+of `dag_id`s and `task_id`s, plus the Dag source) to the executable. The
result is a **self-contained
+executable bundle**: a single runnable file that *is* the bundle, with no
separate manifest or archive to
+ship alongside it.
-## Quickstart
+## You still need a Python stub Dag (for now)
-- See [`example/bundle/main.go`](./example/bundle/main.go) for an example dag
bundle where you can define your task functions
+The Task API does not yet carry Dag *structure* for non-Python languages, so
the Dag's shape and task
+dependencies are still declared in a small Python file using `@task.stub`:
-- Compile this into a binary:
+```python
+from airflow.sdk import dag, task
+
+
[email protected](queue="golang")
+def extract(): ...
+
+
[email protected](queue="golang")
+def transform(): ...
+
+
+@dag()
+def simple_dag():
+ extract() >> transform()
+
+
+simple_dag()
+```
+
+`@task.stub` tells the Dag parser the "shape" of the Go tasks (their names and
dependencies) without any
+Python implementation. The `queue=` value routes the task to the Go runtime.
This Python requirement is a
+known limitation.
+
+
+## Authoring a bundle
+
+Implement `bundlev1.BundleProvider`, register your Dags and tasks, and `main`
is one line. From
+[`example/bundle/main.go`](./example/bundle/main.go):
+
+```go
+type myBundle struct{}
+
+var _ v1.BundleProvider = (*myBundle)(nil)
+
+func (m *myBundle) GetBundleVersion() v1.BundleInfo {
+ return v1.BundleInfo{Name: bundleName, Version: &bundleVersion}
+}
+
+func (m *myBundle) RegisterDags(dagbag v1.Registry) error {
+ simpleDag := dagbag.AddDag("simple_dag")
+ simpleDag.AddTask(extract)
+ simpleDag.AddTask(transform)
+ return nil
+}
+
+func main() {
+ if err := bundlev1server.Serve(&myBundle{}); err != nil {
+ log.Fatal(err)
+ }
+}
+```
+
+A task is an ordinary Go function. The runtime inspects its signature and
injects arguments by type:
+`context.Context`, `*slog.Logger`, and an `sdk.Client` (or a narrower
interface such as
+`sdk.VariableClient`). An optional `(any, error)` return becomes the task's
XCom; an `error` return marks
+the task failed.
+
+```go
+func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any,
error) {
+ conn, err := client.GetConnection(ctx, "test_http")
+ // ... do work, honour ctx cancellation ...
+ return map[string]any{"go_version": runtime.Version()}, nil
+}
+
+func transform(ctx context.Context, client sdk.VariableClient, log
*slog.Logger) error {
+ val, err := client.GetVariable(ctx, "my_variable")
+ if err != nil {
+ return err
+ }
+ log.Info("Obtained variable", "my_variable", val)
+ return nil
+}
+```
+
+Asking for the narrowest interface a task needs (e.g. `sdk.VariableClient`
instead of `sdk.Client`) makes
+unit testing easier and documents which Airflow features the task touches.
`RegisterDags` is the single
+source of truth for which `dag_id`s and `task_id`s a bundle can run.
+
+## Deployment modes
+
+A bundle can run in two ways. The same bundle binary works in both; you pick
one per deployment:
+
+1. **Coordinator** (recommended)
+2. **Edge Worker**
+
+For the protocol details behind each, see [How it works](#how-it-works).
+
+### Coordinator (recommended)
+
+A Python task runner executes the Go task directly, with no separate Go worker
process to run on the host.
+This is the same coordinator mechanism the Java SDK uses.
+
+**Why this is recommended:** the mature Python supervisor handles the
Airflow-facing concerns, so this path
+inherits its capabilities (remote task logs to S3/GCS, the full range of task
states, and alternate XCom
+backends) rather than reimplementing them in Go. These are exactly the
features the Edge Worker path is
+still missing (see [Known limitations](#known-limitations)).
+
+#### Quickstart
+
+- Build and pack your bundle with `airflow-go-pack`. The packer compiles the
bundle and appends an
+ embedded metadata footer so the coordinator can read its `dag_id`s without
executing the binary,
+ producing a single runnable file:
```bash
- go build -o ./bin/sample-dag-bundle ./example/bundle
+ go tool airflow-go-pack ./example/bundle -- -trimpath -tags=prod
+ ```
+
+ Use `--output <path>` to write the packed bundle straight into a directory
the coordinator scans
+ (`executables_root`), and pass extra `go build` flags after `--`.
+
+ For cross-compiling (e.g. deploy to a Linux host from an Apple-silicon
(darwin/arm64) machine), pass `--goos`/`--goarch` and the
+ packer cross-builds for you:
+
+ ```bash
+ go tool airflow-go-pack --goos linux --goarch amd64 \
+ --output ~/airflow/executable-bundles/sample-dag-bundle \
+ ./example/bundle
+ ```
+
+ Alternatively, use `--executable`/`--source`. The packer normally execs the
binary to read
+ its metadata; a cross-compiled binary cannot run on the host, so generate
the metadata on a machine that
+ can run it and pass the file with `--airflow-metadata`:
+
+ ```bash
+ # on linux/amd64 machine:
+ go build -o my-bundle ./example/bundle
+ ./my-bundle --airflow-metadata > airflow-metadata.yaml
+
+ # on darwin/arm64 machine:
+ go tool airflow-go-pack --executable ./my-bundle --source main.go
--airflow-metadata airflow-metadata.yaml
+ ```
+
+ > [!NOTE]
+ > The packer ships via the Go 1.24 `tool` directive, so there is no global
install: add
+ > `tool github.com/apache/airflow/go-sdk/cmd/airflow-go-pack` to your bundle
module's `go.mod` and run
+ > it with `go tool airflow-go-pack`. This pins the packer version per
project.
+
+- Register the coordinator and route the queue to it, under `[sdk]` in
`airflow.cfg` (or the equivalent
+ `AIRFLOW__SDK__*` env vars):
+
+ ```ini
+ [sdk]
+ coordinators = {"go": {"classpath":
"airflow.sdk.coordinators.executable.ExecutableCoordinator", "kwargs":
{"executables_root": ["~/airflow/executable-bundles"]}}}
+ queue_to_coordinator = {"golang": "go"}
```
- (or see the [`Justfile`](./example/bundle/Justfile) for how you can build it
and specify they bundle version number at build time.)
+ `executables_root` is one or more directories the coordinator scans for
bundles; `queue_to_coordinator`
+ routes stub tasks with `queue="golang"` to this Go coordinator.
+
+ > [!IMPORTANT]
+ > The coordinator is part of the Airflow worker, so the `[sdk]` config (and
the bundle files in
+ > `executables_root`) only need to be present wherever tasks actually
execute. With `CeleryExecutor`,
+ > setting it on the Celery workers is sufficient. With `LocalExecutor`,
tasks run inside the scheduler
+ > process, so it must be set where the scheduler can read it. The API server
and Dag processor do not
+ > need it.
+
+- Deploy the matching Python stub Dag (above) into Airflow. There is no
separate Go worker to run: the
+ Airflow worker forks the bundle binary once per task instance.
+
+### Edge Worker (go-plugin)
+
+A long-running Go worker process (`airflow-go-edge-worker`) polls Airflow for
work and runs your bundle,
+with no Python in the data path. This path runs end-to-end today, but is
missing the features listed under
+[Known limitations](#known-limitations).
+
+#### Quickstart
-- Configure the go edge worker, by editing `$AIRFLOW_HOME/go-sdk.yaml`:
+- See [`example/bundle/main.go`](./example/bundle/main.go) for an example Dag
bundle.
- These config values need tweaking, especially the ports and secrets. The
ports are the default assuming
- airflow is running locally via `airflow standalone`.
+- Compile it into a binary:
+
+ ```bash
+ go build -o ./bin/sample-dag-bundle ./example/bundle
+ ```
+
+ (or see the [`Justfile`](./example/bundle/Justfile) for how to build it and
set the bundle version at
+ build time.)
+
+- Configure the Go edge worker by editing `$AIRFLOW_HOME/go-sdk.yaml`. The
ports below are the defaults
+ assuming Airflow runs locally via `airflow standalone`; tweak the ports and
secrets to match your setup:
```yaml
edge:
@@ -74,63 +251,97 @@ Since Go is a compiled language (putting aside projects
such as [YAEGI](https://
secret_key: "u0ZDb2ccINAbhzNmvYzclw=="
```
- You can also set these options via environment variables of
`AIRFLOW__${section}_${key}`, for example `AIRFLOW__API_AUTH__SECRET_KEY`.
+ You can also set these options via environment variables of
`AIRFLOW__${SECTION}__${KEY}`, for example
+ `AIRFLOW__API_AUTH__SECRET_KEY`.
-- Install the worker
+- Install the worker:
```bash
go install github.com/apache/airflow/go-sdk/cmd/airflow-go-edge-worker@latest
```
-- Run it!
+- Run it:
```bash
airflow-go-edge-worker run --queues golang
```
-### Example Dag:
+- Deploy the matching Python stub Dag (above) into Airflow.
-You will need to create a python Dag and deploy it in to the Airflow
+## Known limitations
-```python
-from airflow.sdk import dag, task
+A non-exhaustive list of features the **Edge Worker (go-plugin)** path has yet
to implement. These are the
+main reason the coordinator-based path is recommended: in that mode the Python
supervisor handles these
+concerns, so they are not limitations there.
+- Putting tasks into states other than success or failed/up-for-retry
(deferred,
+ failed-without-retries, etc.).
+- Remote task logs (i.e. S3/GCS etc.).
+- XCom reading/writing through non-default XCom backends.
[email protected](queue="golang")
-def extract(): ...
+## How it works
+The same bundle binary speaks two different protocols; which one it uses is
decided at launch by the CLI
+flags it was invoked with. User code (`func main`) is identical either way.
[email protected](queue="golang")
-def transform(): ...
+### Coordinator protocol
+```
+Python supervisor / task runner
+ │ finds + validates the bundle, then forks it:
+ ▼
+ <bundle binary> --comm=127.0.0.1:P1 --logs=127.0.0.1:P2
+ │ binary dials BACK over TCP loopback (msgpack-over-IPC)
+ ▼
+ GetConnection / GetVariable / GetXCom / SetXCom ... → SucceedTask / TaskState
+```
-@dag()
-def simple_dag():
+- The Python `ExecutableCoordinator` forks the bundle binary with
`--comm`/`--logs` addresses it is already
+ listening on. The binary dials back (it never listens) and speaks a
length-prefixed msgpack-over-IPC wire
+ protocol on the comm socket, with structured JSON-line logs on the logs
socket.
+- The Python runtime is the worker. It proxies every `GetConnection` /
`GetVariable` / `GetXCom` /
+ `SetXCom` call through to the Execution API. The Go binary just runs the
task function.
- extract() >> transform()
+The Go side of the protocol is implemented in `pkg/execution/`. On the Python
side it is the
+`ExecutableCoordinator` in
`task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py`.
+### Edge Worker protocol
-simple_dag()
+```
+Airflow scheduler ──Edge Executor API──► airflow-go-edge-worker
──go-plugin/gRPC──► bundle binary
+ (ExecuteTaskWorkload) (long-running Go process)
(child process)
```
-Here we see the `@task.stub` which tells the Dag parser about the "shape" of
the go tasks, and lets us define
-the relationships between them
-
-> [!NOTE]
-> Yes, you still have to have a python Dag file for now. This is a known
limitation at the moment.
+- `airflow-go-edge-worker` is a long-running Go process. It registers with the
scheduler, polls the Edge
+ Executor API for `ExecuteTaskWorkload`s, and heartbeats.
+- For each workload it execs the bundle binary as a child and connects over
HashiCorp
+ [`go-plugin`](https://github.com/hashicorp/go-plugin) (gRPC over a
handshake-gated socket).
+- The Task API itself has no way to deliver an `ExecuteTaskWorkload` to a Go
worker, so the Edge Executor
+ API fills that gap. Longer term that API will likely need stabilising and
versioning.
-## Known missing features
+## Architectural decisions
-A non-exhaustive list of features we have yet to implement
+The [`adr/`](./adr) directory records the design decisions behind the SDK:
-- Support for putting tasks into state other than success or
failed/up-for-retry (deferred, failed-without-retries etc.)
-- Remote task logs (i.e. S3/GCS etc)
-- XCom reading/writing from other XCom backends
+- [ADR 0001](./adr/0001-bundle-packing-options.md): bundle-packing options.
+- [ADR 0002](./adr/0002-use-go-tool-directive-for-bundle-packer.md): deliver
the bundle packer via the
+ Go 1.24 `tool` directive.
+- [ADR 0003](./adr/0003-coordinator-protocol-msgpack-ipc.md): dual-mode
coordinator protocol, where one
+ binary speaks both go-plugin gRPC (Edge Worker) and msgpack-over-IPC (Python
coordinator).
+- [ADR 0004](./adr/0004-self-contained-executable-bundle.md): the
self-contained executable bundle, where
+ the executable *is* the bundle.
+The normative, language-agnostic on-disk bundle format (the footer layout,
manifest fields, and what the
+`ExecutableCoordinator` reads) is specified in
+[`executable-bundle-spec.rst`](https://github.com/apache/airflow/blob/main/task-sdk/docs/executable-bundle-spec.rst).
+`airflow-go-pack` produces bundles conforming to that spec.
## Future Direction
This is more of an "it would be nice to have" than any plan or commitment, and
a place to record ideas.
-- The ability to run Airflow tasks "in" an existing code base - i.e. being
able to define an Airflow task function that runs (in a goroutine) inside an
existing code base an app.
-- Do the task function reflection ahead of time, not for each Execute call.
+- Defining the whole Dag in the Go SDK, so the Python stub Dag is no longer
required and a bundle's
+ structure and task dependencies can be declared natively in Go.
+- The ability to run Airflow tasks "in" an existing code base, i.e. defining
an Airflow task function that
+ runs (in a goroutine) inside an existing app.
+- Doing the task function reflection ahead of time, rather than for each
Execute call.
diff --git a/go-sdk/bundle/bundlev1/bundlev1server/server.go
b/go-sdk/bundle/bundlev1/bundlev1server/server.go
index fde5314d332..2b39392db9c 100644
--- a/go-sdk/bundle/bundlev1/bundlev1server/server.go
+++ b/go-sdk/bundle/bundlev1/bundlev1server/server.go
@@ -89,6 +89,9 @@ func (s serveConfigFunc) ApplyServeOpt(in *ServerConfig)
error {
return s(in)
}
+// ServerConfig holds settings that ServeOpt values apply before the bundle
+// server starts. It is currently empty; it exists so options can be added
later
+// without changing Serve's signature.
type ServerConfig struct{}
// serveMode tags the protocol the binary will speak this run.
diff --git a/go-sdk/bundle/bundlev1/registry.go
b/go-sdk/bundle/bundlev1/registry.go
index 59cad631125..8e74a0c6708 100644
--- a/go-sdk/bundle/bundlev1/registry.go
+++ b/go-sdk/bundle/bundlev1/registry.go
@@ -28,18 +28,50 @@ import (
)
type (
- Task = worker.Task
+ // Task is one registered task: something the runtime can Execute.
Bundle
+ // authors do not implement this directly; Dag.AddTask wraps a plain Go
+ // function into a Task for you.
+ Task = worker.Task
+
+ // Bundle is the execution-time view of a registry: it looks up a task
by
+ // dag_id and task_id. Registry embeds it so the object built during
+ // RegisterDags can also serve tasks when they run.
Bundle = worker.Bundle
+ // Dag is the handle returned by Registry.AddDag. Use it to attach the
Go
+ // functions that implement the dag's tasks.
Dag interface {
+ // AddTask registers fn as a task, deriving the task_id from
fn's own
+ // name (so it must match the @task.stub name in the Python
dag).
+ //
+ // fn is an ordinary Go function whose parameters are injected
by type
+ // and may appear in any order. Recognised parameters are:
+ // - context.Context: cancelled when the task is asked to stop
+ // - *slog.Logger: writes to the task's Airflow log
+ // - sdk.Client (or a narrower sdk.VariableClient /
sdk.ConnectionClient):
+ // access to Variables, Connections, and XCom
+ //
+ // fn must return either error or (result, error): a non-nil
error fails
+ // the task, and a non-nil first result is pushed as the task's
+ // return-value XCom. Passing a non-function, or a function
whose return
+ // signature does not match, panics at registration time.
AddTask(fn any)
+
+ // AddTaskWithName is like AddTask but sets task_id explicitly
instead of
+ // deriving it from the function name. Use it when the Go
function name
+ // cannot match the Python @task.stub id, for example for an
anonymous
+ // function or a differing name.
AddTaskWithName(taskId string, fn any)
}
- // Registry defines the interface that lets user code add dags and
tasks, and extends Bundle for execution
- // time
+ // Registry is the recorder passed to BundleProvider.RegisterDags. Use
it to
+ // declare the dags this bundle can run; it also extends Bundle so the
same
+ // object serves task lookups at execution time.
Registry interface {
Bundle
+ // AddDag registers a dag by its dag_id (matching the Python
stub dag)
+ // and returns a Dag handle for attaching tasks. Registering
the same
+ // dag_id twice panics.
AddDag(dagId string) Dag
}
@@ -83,7 +115,10 @@ func (d dagShim) AddTaskWithName(taskId string, fn any) {
d.registry.registerTaskWithName(d.dagId, taskId, fn)
}
-// Function New creates a new bundle on which Dag and Tasks can be registered
+// New returns an empty Registry on which dags and tasks can be registered. The
+// runtime creates one and hands it to BundleProvider.RegisterDags, so bundle
+// authors rarely call this directly; it is handy for unit-testing a
+// RegisterDags implementation.
func New() Registry {
return ®istry{
taskFuncMap: make(map[string]map[string]Task),
diff --git a/go-sdk/bundle/bundlev1/schemas.go
b/go-sdk/bundle/bundlev1/schemas.go
index f5710e44504..46bf5cc2879 100644
--- a/go-sdk/bundle/bundlev1/schemas.go
+++ b/go-sdk/bundle/bundlev1/schemas.go
@@ -22,29 +22,49 @@ import (
"github.com/apache/airflow/go-sdk/pkg/api"
)
+// BundleProvider is the single interface a bundle author implements. Construct
+// one in main and pass it to bundlev1server.Serve; the runtime calls
+// GetBundleVersion to identify the bundle and RegisterDags to load its tasks.
type BundleProvider interface {
// GetBundleVersion returns upfront information about the bundle name
and version without needing to load
// the full dag and task information, which could be memory intensive.
GetBundleVersion() BundleInfo
- // RegisterDags is called to populate the Task functions in the
registry in order to execute them.
+ // RegisterDags declares every dag and task in this bundle on the
supplied
+ // Registry, for example:
//
- // You should populate all dags and tasks in the bundle.
+ // func (m *myBundle) RegisterDags(dagbag bundlev1.Registry) error
{
+ // dag := dagbag.AddDag("simple_dag")
+ // dag.AddTask(extract)
+ // dag.AddTask(transform)
+ // return nil
+ // }
//
- // This will be called once-per-process-per-bundle and cached
internally. You do not have to cache this
- // yourself
+ // Register all dags and tasks here. It is called once per process per
+ // bundle and cached internally, so you do not have to cache it
yourself.
RegisterDags(Registry) error
}
-// BundleInfo Schema for telling task which bundle to run with.
+// BundleInfo identifies a bundle by name and version. It is returned by
+// BundleProvider.GetBundleVersion and tells Airflow which bundle to run a task
+// with.
type BundleInfo = api.BundleInfo
+// TaskInstance identifies the running task by dag_id, run_id, task_id, and
+// map_index. It is an alias for the Execution-API type and is what
+// XComClient.PushXCom takes.
type TaskInstance = api.TaskInstance
+// GetMetadataResponse is the runtime reply that carries a bundle's BundleInfo
+// back over the go-plugin transport. It is plumbing between the worker and the
+// bundle, not something authors construct.
type GetMetadataResponse struct {
Bundle BundleInfo
}
+// ExecuteTaskWorkload is the runtime payload describing one task to run: its
+// TaskInstance, the bundle to load, and an optional log path. The worker
+// delivers it to the bundle; authors do not build it themselves.
type ExecuteTaskWorkload struct {
Token string `json:"token"`
diff --git a/go-sdk/bundle/bundlev1/task.go b/go-sdk/bundle/bundlev1/task.go
index 5277f40681c..0a3aca70666 100644
--- a/go-sdk/bundle/bundlev1/task.go
+++ b/go-sdk/bundle/bundlev1/task.go
@@ -36,6 +36,10 @@ type taskFunction struct {
var _ Task = (*taskFunction)(nil)
+// NewTaskFunction wraps a plain Go function as a Task, validating its
signature
+// (injectable parameters, and a return of error or (result, error)). Bundle
+// authors normally use Dag.AddTask, which calls this for them; use it directly
+// only when building a Task outside the registry.
func NewTaskFunction(fn any) (Task, error) {
v := reflect.ValueOf(fn)
fullName := runtime.FuncForPC(v.Pointer()).Name()
diff --git a/go-sdk/sdk/client.go b/go-sdk/sdk/client.go
index f719fdc4787..61a8a56f8bb 100644
--- a/go-sdk/sdk/client.go
+++ b/go-sdk/sdk/client.go
@@ -33,6 +33,10 @@ type client struct{}
var _ Client = (*client)(nil)
+// NewClient returns the default Client, which serves Variable, Connection, and
+// XCom calls from the Execution API (reading the per-task API client from the
+// context). Task functions are handed a Client by the runtime and rarely need
+// this; it is exported mainly for tests and advanced setups.
func NewClient() Client {
return &client{}
}
diff --git a/go-sdk/sdk/connection.go b/go-sdk/sdk/connection.go
index 1d0bcf30b6f..07d9b1cd2ca 100644
--- a/go-sdk/sdk/connection.go
+++ b/go-sdk/sdk/connection.go
@@ -26,6 +26,10 @@ import (
"github.com/apache/airflow/go-sdk/pkg/api"
)
+// Connection is an Airflow Connection resolved for a task: the endpoint and
+// credentials registered in Airflow under a conn_id. Obtain one with
+// ConnectionClient.GetConnection. Optional fields are pointers so an absent
+// value (nil) is distinct from an empty one.
type Connection struct {
ID string
@@ -47,6 +51,10 @@ type Connection struct {
Extra map[string]any
}
+// GetURI renders the connection as a URL of the form
+// scheme://login:password@host:port/path?extra. It is the Go equivalent of
+// Python's Connection.get_uri and is handy when a client library wants a
single
+// connection string rather than individual fields.
func (c Connection) GetURI() *url.URL {
uri := &url.URL{}
diff --git a/go-sdk/sdk/doc.go b/go-sdk/sdk/doc.go
index 82aa91d0bc2..edb9f45b211 100644
--- a/go-sdk/sdk/doc.go
+++ b/go-sdk/sdk/doc.go
@@ -16,6 +16,30 @@
// under the License.
/*
-Package sdk provides access to the Airflow objects (Variables, Connection,
XCom etc) during run time for tasks.
+Package sdk gives task functions access to the Airflow "model" (Variables,
+Connections, and XCom) at run time.
+
+A task function does not construct a client itself. The runtime inspects the
+function's parameters and injects one by type, so you declare the narrowest
+interface you need and use it:
+
+ func mytask(ctx context.Context, client sdk.Client, log *slog.Logger)
error {
+ val, err := client.GetVariable(ctx, "my_variable")
+ if err != nil {
+ return err
+ }
+ log.Info("got variable", "value", val)
+ return nil
+ }
+
+Ask for [Client] for full access, or a narrower interface such as
+[VariableClient] or [ConnectionClient] when the task only reads one kind of
+object. The narrower type documents what the task touches and makes it easy to
+pass a fake in unit tests.
+
+To publish a result, return a value from the task function: the runtime pushes
+it as the task's return-value XCom, so most tasks never call [XComClient]
+directly. Lookups that miss return a wrapped sentinel error
([VariableNotFound],
+[ConnectionNotFound], [XComNotFound]) you can test for with errors.Is.
*/
package sdk
diff --git a/go-sdk/sdk/errors.go b/go-sdk/sdk/errors.go
index d8b071f0944..cba5fdccfa6 100644
--- a/go-sdk/sdk/errors.go
+++ b/go-sdk/sdk/errors.go
@@ -22,15 +22,19 @@ import (
)
// VariableNotFound is an error value used to signal that a variable could not
be found (and that there were
-// no communication issues to the API server).
+// no communication issues with the API server).
//
// See the “GetVariable“ method of [VariableClient] for an example
var VariableNotFound = errors.New("variable not found")
// ConnectionNotFound is an error value used to signal that a connection could
not be found (and that there were
-// no communication issues to the API server).
+// no communication issues with the API server).
//
// See the “GetConnection“ method of [ConnectionClient] for an example
var ConnectionNotFound = errors.New("connection not found")
+// XComNotFound is an error value used to signal that an XCom value could not
be found (and that there were
+// no communication issues with the API server).
+//
+// See the “GetXCom“ method of [XComClient] for an example
var XComNotFound = errors.New("xcom not found")
diff --git a/go-sdk/sdk/sdk.go b/go-sdk/sdk/sdk.go
index e4f5a61a331..cda13878fd9 100644
--- a/go-sdk/sdk/sdk.go
+++ b/go-sdk/sdk/sdk.go
@@ -24,7 +24,15 @@ import (
)
const (
- VariableEnvPrefix = "AIRFLOW_VAR_"
+ // VariableEnvPrefix is the environment-variable prefix used as a local
+ // fallback for Variable lookups. GetVariable first checks the process
+ // environment for VariableEnvPrefix plus the uppercased key (so key
+ // "my_var" is read from AIRFLOW_VAR_MY_VAR) before asking the API
server,
+ // mirroring the Python SDK and making local development and tests easy.
+ VariableEnvPrefix = "AIRFLOW_VAR_"
+
+ // ConnectionEnvPrefix is the matching prefix for Connections. The
+ // connection env fallback is not wired up yet, so it is currently
unused.
ConnectionEnvPrefix = "AIRFLOW_CONN_"
)
@@ -62,13 +70,14 @@ type VariableClient interface {
UnmarshalJSONVariable(ctx context.Context, key string, pointer any)
error
}
+// ConnectionClient reads Airflow Connections.
type ConnectionClient interface {
// GetConnection returns the value of an Airflow Connection.
//
// If the conn is not found error will be a wrapped
``ConnectionNotFound``:
//
// conn, err := client.GetConnection(ctx, "my-db")
- // if errors.Is(err, ConnectinNotFound) {
+ // if errors.Is(err, ConnectionNotFound) {
// // Handle not found, set default,
return custom error etc
// } else {
// // Other errors here, such as http
network timeouts etc.
@@ -76,7 +85,17 @@ type ConnectionClient interface {
GetConnection(ctx context.Context, connID string) (Connection, error)
}
+// XComClient reads and writes XCom values. Most tasks never need this: to
+// publish a result, return a value from the task function and the runtime
+// pushes it as the return-value XCom. Reach for these methods only to read
+// another task's XCom, or to push under a custom key.
type XComClient interface {
+ // GetXCom returns the value stored under key by the task identified by
+ // dagId/runId/taskId. For a mapped task instance pass its mapIndex,
+ // otherwise pass nil. If no value exists the error wraps XComNotFound.
+ //
+ // value is reserved for future typed decoding and is currently
ignored; the
+ // stored value is returned as the first result instead.
GetXCom(
ctx context.Context,
dagId, runId, taskId string,
@@ -84,9 +103,15 @@ type XComClient interface {
key string,
value any,
) (any, error)
+
+ // PushXCom stores value under key for the given task instance ti.
PushXCom(ctx context.Context, ti api.TaskInstance, key string, value
any) error
}
+// Client is the full task-facing API: read Variables and Connections, and
+// read/write XCom. A task that declares an sdk.Client parameter is handed one
+// by the runtime. If a task needs only one capability, ask for the narrower
+// VariableClient, ConnectionClient, or XComClient instead.
type Client interface {
VariableClient
ConnectionClient