Hello everyone, I am excited to announce that the GoSDK is finally ready enough for people other than me to use and try out.
I said in the Keynote “as soon as I get back to my laptop", and well, that turned into a week off and then noticing the docs needed updating, so here we are. There are a few things to note about this beta release: - As with all beta “releases” of Apache Airflow, It is not a formal release according to ASF rules, it is a preview for members of the development community to try out and give feedback to. - This is a beta for a reason. While it works On My Machine (TM), it has not be extensively tested in various error conditions and it will undoubtedly have rough edges - DAGs must still be defined in python files. To make this work the new `@task.stub` decorator has been added to the standard provider (already released before Airflow summit) - Use of the TaskFlow API does not yet support passing XComs or static values directly to stub tasks. (It is not complex to fix this, but unfortunately it needs both some provider and API server side changes so I’m not sure when this will be available.) - This only works with the Edge Executor right now. - There is some reference API from the godoc available at https://pkg.go.dev/github.com/apache/airflow/go-sdk (but that is not yet showing the tagged version, I might have missed a step somewhere) Architecturally note on the high-level architecture of the GoSDK: There are two components involved on the worker side, the first is the “airflow-go-edge-worker” which contains no user code, and speaks to the Edge Worker API to get tasks to execute, and then launches hands off to the second component, the dag bundle, which is a pre-compiled go binary “plugin” (using the hashicorp/go-plugin framework right now) which runs the actual tasks. Feedback of any kind is always super valuable — either of things that don’t work, are hard to understand or just improvements you’d like. For now, please either reply to this email, or ping me on Slack. Soon we will add a new issue type to the repo. An example DAG (taken from the readme[1]): ```python from airflow.sdk import dag, task @task.stub(queue="golang") def extract(): ... @task.stub(queue="golang") def transform(): ... @dag() def simple_dag(): extract() >> transform() multi_language() ``` And here are the task functions, and some of the registration code, from the example bundle in the repo[2]: ```go func (m *myBundle) RegisterDags(dagbag v1.Registry) error { tutorial_dag := dagbag.AddDag("tutorial_dag") tutorial_dag.AddTask(extract) tutorial_dag.AddTask(transform) tutorial_dag.AddTask(load) return nil } func main() { bundlev1server.Serve(&myBundle{}) } func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any, error) { for range 10 { // Once per loop,.check if we've been asked to cancel! select { case <-ctx.Done(): return nil, ctx.Err() default: } log.Info("After the beep the time will be", "time", time.Now()) time.Sleep(2 * time.Second) } log.Info("Goodbye from task") ret := map[string]any{ "go_version": runtime.Version(), } return ret, nil } func transform(ctx context.Context, client sdk.VariableClient, log *slog.Logger) error { key := "my_variable" val, err := client.GetVariable(ctx, key) if err != nil { return err } log.Info("Obtained variable", key, val) return nil } ``` Cheers, Ash [1]: https://github.com/apache/airflow/tree/go-sdk/v1.0.0-beta1/go-sdk [2]: https://github.com/apache/airflow/blob/go-sdk/v1.0.0-beta1/go-sdk/example/bundle/main.go
