jason810496 commented on code in PR #67156:
URL: https://github.com/apache/airflow/pull/67156#discussion_r3353761862
##########
go-sdk/bundle/bundlev1/bundlev1server/server.go:
##########
@@ -114,9 +127,21 @@ func Serve(bundle bundlev1.BundleProvider, opts
...ServeOpt) error {
c.ApplyServeOpt(serveConfig)
}
- switch decideMode() {
- case modeMetadataDump:
- return dumpBundleMetadata(bundle)
Review Comment:
Intentionally remove the `dumpBundleMetadata` utility. Since we don't care
the bundle metadata at all.
##########
go-sdk/bundle/bundlev1/registry.go:
##########
@@ -43,9 +43,30 @@ type (
AddDag(dagId string) Dag
}
+ // TaskInfo describes a registered task by its user-visible id.
+ TaskInfo struct {
+ ID string
+ }
+
+ // DagInfo describes a registered dag together with its tasks in
+ // registration order.
+ DagInfo struct {
+ DagID string
+ Tasks []TaskInfo
+ }
+
+ // EnumerableBundle exposes the dag/task identity recorded by
RegisterDags.
+ // The default registry implements it; airflow-go-pack relies on it to
read
+ // a bundle's dag/task ids without executing any task.
+ EnumerableBundle interface {
+ OrderedDags() []DagInfo
+ }
+
Review Comment:
The same `EnumerableBundle` interface will be used by further "defining
whole Dag in Go-SDK" feature -- https://github.com/apache/airflow/pull/67155
##########
go-sdk/cmd/airflow-go-pack/pack.go:
##########
@@ -0,0 +1,666 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "go/ast"
+ "go/parser"
+ "go/token"
+ "io"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "runtime"
+ "sort"
+ "strings"
+
+ "gopkg.in/yaml.v3"
+
+ "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+ "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+// packOptions are the flags accepted by the root pack command.
+type packOptions struct {
+ pkg string // target package (default ".")
+ source string // override the auto-detected DAG source file
+ executable string // pack a pre-built binary instead of building
+ output string // override the default <bundleName> output
path
+ airflowMetadata string // path to a pre-captured --airflow-metadata
manifest (JSON or YAML)
+ goos string // target GOOS for the deployable build (falls
back to env GOOS, then host)
+ goarch string // target GOARCH for the deployable build
(falls back to env GOARCH, then host)
+ buildArgs []string // forwarded verbatim to `go build` (already
includes the leading "--")
+}
+
+func runPack(stdout, stderr io.Writer, opts *packOptions) error {
+ if opts.executable != "" && len(opts.buildArgs) > 0 {
+ return fmt.Errorf("--executable is mutually exclusive with go
build flags after \"--\"")
+ }
+ if opts.executable != "" && (opts.goos != "" || opts.goarch != "") {
+ return fmt.Errorf(
+ "--executable is mutually exclusive with
--goos/--goarch: --executable packs the " +
+ "binary as-is and never builds, so it cannot
cross-compile. To cross-build a " +
+ "bundle, drop --executable and pass
--goos/--goarch with the package path",
+ )
+ }
+
+ // Resolve the DAG source file for both modes up front. --executable
requires
+ // it explicitly; the build path falls back to discovery.
+ sourcePath := opts.source
+ if opts.executable != "" {
+ if sourcePath == "" {
+ return fmt.Errorf(
+ "--executable requires --source: cannot infer
the DAG source for a pre-built binary",
+ )
+ }
+ } else if sourcePath == "" {
+ // --source is the documented escape hatch for packages whose
main file
+ // cannot be auto-detected: it may be selected by build tags or
GOOS, or
+ // the package may have several files with func main().
Discovery runs a
+ // plain `go list` without the forwarded build flags, so it can
fail or
+ // pick the wrong file for such packages. Only fall back to
discovery
+ // when --source was not supplied, so an explicit --source
always wins.
+ discovered, err := discoverMainSource(opts.pkg)
+ if err != nil {
+ return fmt.Errorf("locating DAG source file: %w", err)
+ }
+ sourcePath = discovered
+ }
+ if _, err := os.Stat(sourcePath); err != nil {
+ return fmt.Errorf("source file %s: %w", sourcePath, err)
+ }
+
+ output := opts.output
+ if output == "" {
+ defaultPath, err := defaultOutputPath(sourcePath)
+ if err != nil {
+ return fmt.Errorf("determining default output path:
%w", err)
+ }
+ output = defaultPath
+ }
+
+ // The bundle is finalised by renaming a temp file onto output, which
fails
+ // if output is an existing directory. Catch that here: the default
output is
+ // the package directory's name, so packing ./foo from a dir that
already has
+ // a ./foo directory collides. Report it with the fix instead of a bare
+ // "rename ...: file exists" from os.Rename. Done before any build so a
+ // misconfigured output fails fast rather than after a (cross) go build.
+ if info, err := os.Stat(output); err == nil && info.IsDir() {
+ return fmt.Errorf(
+ "output path %q is an existing directory (the bundle
output defaults to the "+
+ "package directory's name); pass --output to
write the bundle to a file path",
+ output,
+ )
+ }
+
+ // execPath is the binary that receives the footer (the deployable
artefact,
+ // which MAY be cross-compiled). introspectPath is the binary
obtainMetadata
+ // reads --airflow-metadata from. By default that means exec'ing it on
the
+ // host (so it must be host-runnable, hence the cross-compile sidecar
below),
+ // but --airflow-metadata bypasses it entirely.
+ var execPath, introspectPath string
+ cleanupExec := func() {}
+ defer func() { cleanupExec() }()
+
+ if opts.executable != "" {
+ execPath = opts.executable
+ introspectPath = opts.executable
+ } else {
+ targetGOOS, targetGOARCH := targetPlatform(opts)
+ artifact, cleanup, err := buildPackage(stderr, opts.pkg,
opts.buildArgs, targetGOOS, targetGOARCH)
+ if err != nil {
+ return err
+ }
+ execPath = artifact
+ cleanupExec = cleanup
+ introspectPath = artifact
+
+ // Reading the manifest means exec'ing the binary, so it must
be a
+ // host-native build. When cross-compiling, the artefact cannot
run
+ // here; build a throwaway host binary from the same sources
and the
+ // same forwarded `--` build flags (DAG/task identity is
arch-independent)
+ // solely to introspect. This sidecar is unnecessary when
+ // --airflow-metadata supplies the manifest directly.
+ crossCompiling := targetGOOS != runtime.GOOS || targetGOARCH !=
runtime.GOARCH
+ if crossCompiling && opts.airflowMetadata == "" {
+ hostBin, cleanupHost, err := buildPackage(stderr,
opts.pkg, opts.buildArgs, runtime.GOOS, runtime.GOARCH)
+ if err != nil {
+ return fmt.Errorf("building host binary for
metadata introspection: %w", err)
+ }
+ prevCleanup := cleanupExec
+ cleanupExec = func() { cleanupHost(); prevCleanup() }
+ introspectPath = hostBin
+ }
+ }
+
+ if _, err := os.Stat(execPath); err != nil {
+ return fmt.Errorf("executable %s: %w", execPath, err)
+ }
+
+ if err := rejectOutputAlias(output, execPath, sourcePath); err != nil {
+ return err
+ }
+
+ meta, err := obtainMetadata(opts, introspectPath)
+ if err != nil {
+ return err
+ }
+ if len(meta.Dags) == 0 {
+ return fmt.Errorf("bundle exposes no dags: nothing to pack")
+ }
+ for dagID, dag := range meta.Dags {
+ if len(dag.Tasks) == 0 {
+ fmt.Fprintf(stderr, "warning: dag %q has no tasks\n",
dagID)
+ }
+ }
+
+ manifest, err := renderManifest(meta, filepath.Base(sourcePath))
+ if err != nil {
+ return fmt.Errorf("rendering manifest: %w", err)
+ }
+ sourceBytes, err := os.ReadFile(sourcePath)
+ if err != nil {
+ return fmt.Errorf("reading source file: %w", err)
+ }
+
+ // Assemble the bundle through a temp file and atomically move it into
+ // place: we never mutate the build artefact or the user-supplied
+ // --executable, and a failed pack never leaves a truncated or
half-written
+ // file at output.
+ if err := writeBundle(execPath, output, sourceBytes, manifest); err !=
nil {
+ return err
+ }
+
+ fmt.Fprintf(stdout, "Wrote bundle %s (sdk=%s/%s, dags=%d)\n",
+ output, meta.SDK.Language, meta.SDK.Version, len(meta.Dags))
+ return nil
+}
+
+// defaultOutputPath derives the default bundle output path from the directory
+// that owns the DAG source file. That directory is the bundle's main package,
+// so its base name is what `go build` itself would name the binary. On Windows
+// the .exe suffix is appended.
+func defaultOutputPath(sourcePath string) (string, error) {
+ abs, err := filepath.Abs(sourcePath)
+ if err != nil {
+ return "", err
+ }
+ name := filepath.Base(filepath.Dir(abs))
+ if runtime.GOOS == "windows" {
+ name += ".exe"
+ }
+ return name, nil
+}
+
+// discoverMainSource locates the file in the given package whose AST contains
+// a top-level `func main()`. Returns an error if the package has zero or
+// more than one such file, mirroring ADR 0002's discovery contract.
+func discoverMainSource(pkg string) (string, error) {
+ cmd := exec.Command("go", "list", "-f", "{{.Dir}}\n{{range
.GoFiles}}{{.}}\n{{end}}", pkg)
+ var stdout, stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
+ if err := cmd.Run(); err != nil {
+ return "", fmt.Errorf(
+ "go list %s: %w: %s\n"+
+ "airflow-go-pack packs the Go package in the
current directory by default; "+
+ "pass your bundle's package path (e.g.
`airflow-go-pack ./path/to/bundle`) "+
+ "or --source to point at the DAG source file
directly",
+ pkg, err, strings.TrimSpace(stderr.String()),
+ )
+ }
+
+ lines := splitNonEmpty(stdout.String())
+ if len(lines) < 2 {
+ return "", fmt.Errorf("package %s has no Go source files", pkg)
+ }
+ dir := lines[0]
+ files := lines[1:]
+
+ fset := token.NewFileSet()
+ var matches []string
+ for _, name := range files {
+ full := filepath.Join(dir, name)
+ f, err := parser.ParseFile(fset, full, nil,
parser.SkipObjectResolution)
+ if err != nil {
+ return "", fmt.Errorf("parsing %s: %w", full, err)
+ }
+ if hasMainFunc(f) {
+ matches = append(matches, full)
+ }
+ }
+ switch len(matches) {
+ case 0:
+ return "", fmt.Errorf("no file in package %s defines func
main()", pkg)
+ case 1:
+ return matches[0], nil
+ default:
+ return "", fmt.Errorf(
+ "multiple files in package %s define func main(): %v;
use --source to disambiguate",
+ pkg,
+ matches,
+ )
+ }
+}
+
+func hasMainFunc(f *ast.File) bool {
+ for _, decl := range f.Decls {
+ fn, ok := decl.(*ast.FuncDecl)
+ if !ok {
+ continue
+ }
+ if fn.Recv != nil {
+ continue
+ }
+ if fn.Name.Name != "main" {
+ continue
+ }
+ if fn.Type.Params != nil && len(fn.Type.Params.List) != 0 {
+ continue
+ }
+ return true
+ }
+ return false
+}
+
+func splitNonEmpty(s string) []string {
+ var out []string
+ for line := range strings.SplitSeq(s, "\n") {
+ if t := strings.TrimSpace(line); t != "" {
+ out = append(out, t)
+ }
+ }
+ return out
+}
+
+// targetPlatform resolves the GOOS/GOARCH the deployable bundle is built for:
+// the --goos/--goarch flags win, then the ambient GOOS/GOARCH env, then the
+// host. The flags exist because `go tool airflow-go-pack` builds the packer
+// using the ambient GOOS/GOARCH — setting those in the env to cross-compile a
+// bundle would instead cross-build the packer itself and fail to exec it on
the
+// host. Passing the target via flags keeps the env (and the packer build)
+// host-native while still cross-building the bundle.
+func targetPlatform(opts *packOptions) (goos, goarch string) {
+ goos = runtime.GOOS
+ if env := os.Getenv("GOOS"); env != "" {
+ goos = env
+ }
+ if opts.goos != "" {
+ goos = opts.goos
+ }
+ goarch = runtime.GOARCH
+ if env := os.Getenv("GOARCH"); env != "" {
+ goarch = env
+ }
+ if opts.goarch != "" {
+ goarch = opts.goarch
+ }
+ return goos, goarch
+}
+
+// buildPackage runs `go build [extraArgs...] -o <tmp>/bundle <pkg>` for the
+// given GOOS/GOARCH and returns the path to the freshly built executable plus
a
+// cleanup function. extraArgs is the slice that comes after the "--" separator
+// on the airflow-go-pack command line; we drop the leading "--" before
+// forwarding. GOOS/GOARCH are set explicitly (overriding any ambient env) so
+// the caller controls the target: the deployable build uses the resolved
target
+// platform, the introspection sidecar uses the host.
+func buildPackage(
+ stderr io.Writer,
+ pkg string,
+ extraArgs []string,
+ goos, goarch string,
+) (string, func(), error) {
+ tmpDir, err := os.MkdirTemp("", "airflow-go-pack-*")
+ if err != nil {
+ return "", nil, fmt.Errorf("creating temp dir: %w", err)
+ }
+ cleanup := func() { _ = os.RemoveAll(tmpDir) }
+
+ binName := "bundle"
+ if goos == "windows" {
+ binName += ".exe"
+ }
+ outPath := filepath.Join(tmpDir, binName)
+
+ args := []string{"build"}
+ for _, a := range extraArgs {
+ if a == "--" {
+ continue
+ }
+ args = append(args, a)
+ }
+ args = append(args, "-o", outPath, pkg)
+
+ cmd := exec.Command("go", args...)
+ // Later duplicate keys win in os/exec, so these override any ambient
+ // GOOS/GOARCH (which `go tool` already used to build this packer).
+ cmd.Env = append(os.Environ(), "GOOS="+goos, "GOARCH="+goarch)
+ cmd.Stdout = stderr
+ cmd.Stderr = stderr
+ if err := cmd.Run(); err != nil {
+ cleanup()
+ return "", nil, fmt.Errorf("go build failed: %w", err)
+ }
+ return outPath, cleanup, nil
+}
+
+func readAirflowMetadata(execPath string) (airflowmetadata.Manifest, error) {
+ out, err := runIntrospect(execPath, "--airflow-metadata")
+ if err != nil {
+ return airflowmetadata.Manifest{}, err
+ }
+ // Decode with a YAML decoder: it reads the binary's YAML default and
its
+ // --format json output alike (JSON is a subset of YAML).
+ var meta airflowmetadata.Manifest
+ if err := yaml.Unmarshal(out, &meta); err != nil {
+ return airflowmetadata.Manifest{}, fmt.Errorf(
+ "decoding --airflow-metadata output (YAML/JSON): %w",
+ err,
+ )
+ }
+ return meta, nil
+}
+
+// obtainMetadata resolves the bundle manifest either from an explicit
+// --airflow-metadata file or by exec'ing a host-runnable introspection binary.
+// In --executable mode a binary that cannot be exec'd on the host is a hard
+// error with remediation guidance: --executable expects a same-platform
binary,
+// and the packer never silently rebuilds a host binary, because a rebuild from
+// unknown inputs (the original build tags, ldflags, and GOOS/GOARCH-specific
+// files are not known here, and build flags are rejected in --executable mode)
+// can advertise a different DAG/task set than the artefact actually shipped.
+func obtainMetadata(opts *packOptions, introspectPath string)
(airflowmetadata.Manifest, error) {
+ if opts.airflowMetadata != "" {
+ meta, err := readMetadataFile(opts.airflowMetadata)
+ if err != nil {
+ return airflowmetadata.Manifest{}, fmt.Errorf(
+ "--airflow-metadata %s: %w",
+ opts.airflowMetadata,
+ err,
+ )
+ }
+ return meta, nil
+ }
+
+ meta, err := readAirflowMetadata(introspectPath)
+ if err == nil {
+ return meta, nil
+ }
+ if opts.executable != "" && errors.Is(err, errExecNotStartable) {
+ return airflowmetadata.Manifest{}, fmt.Errorf(
Review Comment:
Error logs for user to act for the cross-platform build.
##########
go-sdk/pkg/execution/metadata.go:
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "runtime/debug"
+
+ "gopkg.in/yaml.v3"
+
+ "github.com/apache/airflow/go-sdk/bundle/bundlev1"
+ "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+)
+
+// sdkModulePath is the import path of the SDK module. Used to identify the
+// SDK version from the bundle binary's build info dependencies.
+const sdkModulePath = "github.com/apache/airflow/go-sdk"
+
+// MetadataFormat selects the encoding DumpAirflowMetadata writes to stdout for
+// the bundle binary's --airflow-metadata flag.
+type MetadataFormat string
+
+const (
+ // MetadataFormatYAML is the default; it matches the
airflow-metadata.yaml a
+ // bundle embeds, so `mybundle --airflow-metadata >
airflow-metadata.yaml`
+ // yields a ready-to-use file.
+ MetadataFormatYAML MetadataFormat = "yaml"
+ // MetadataFormatJSON is opt-in via --format json.
+ MetadataFormatJSON MetadataFormat = "json"
+)
+
+// ParseMetadataFormat validates a --format value and returns the matching
+// MetadataFormat. An empty value defaults to YAML.
+func ParseMetadataFormat(s string) (MetadataFormat, error) {
+ switch MetadataFormat(s) {
+ case "", MetadataFormatYAML:
+ return MetadataFormatYAML, nil
+ case MetadataFormatJSON:
+ return MetadataFormatJSON, nil
+ default:
+ return "", fmt.Errorf(
+ "unsupported --airflow-metadata format %q: want %q or
%q",
+ s, MetadataFormatYAML, MetadataFormatJSON,
+ )
+ }
+}
+
+// DumpAirflowMetadata writes the bundle's airflow-metadata manifest to stdout
+// (YAML by default, JSON when format is MetadataFormatJSON). It runs
+// RegisterDags against an in-memory recorder only — no gRPC server, no
external
+// services. airflow-go-pack execs the binary with --airflow-metadata and
+// decodes this output to build the embedded manifest.
+func DumpAirflowMetadata(bundle bundlev1.BundleProvider, format
MetadataFormat) error {
Review Comment:
The call chain: `executable --airflow-metadata` -> `DumpAirflowMetadata` ->
`bundle.RegisterDags` -> `EnumerableBundle`/
##########
go-sdk/internal/airflowmetadata/airflowmetadata.go:
##########
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package airflowmetadata defines the airflow-metadata manifest wire shape
+// shared between the producer (a bundle binary's --airflow-metadata flag,
+// emitted from pkg/execution) and the consumer (airflow-go-pack, which decodes
+// it and renders the embedded airflow-metadata.yaml). Keeping the definition
in
+// one place stops the two sides from drifting. The canonical schema is
+// airflow-metadata.schema.json in the Task SDK docs.
+package airflowmetadata
+
+// FormatVersion is the bundle-spec version emitted manifests conform to.
+const FormatVersion = "1.0"
+
+// Manifest is the shape printed by a bundle binary's --airflow-metadata flag
+// (YAML by default, JSON under --format json). It mirrors
+// airflow-metadata.schema.json minus the source field, which only the packer
+// can resolve from the build inputs. The yaml tags let the packer's
+// --airflow-metadata flag decode a captured manifest in either JSON or YAML
+// (the airflow-metadata.yaml in a bundle).
+type Manifest struct {
+ AirflowBundleMetadataVersion string
`json:"airflow_bundle_metadata_version" yaml:"airflow_bundle_metadata_version"`
+ SDK SDK `json:"sdk"
yaml:"sdk"`
+ Dags map[string]Dag `json:"dags"
yaml:"dags"`
+}
+
+// SDK identifies the SDK that produced the bundle.
+type SDK struct {
+ Language string `json:"language"
yaml:"language"`
+ Version string `json:"version"
yaml:"version"`
+ SupervisorSchemaVersion string `json:"supervisor_schema_version"
yaml:"supervisor_schema_version"`
+}
+
+// Dag is the static description of a single DAG declared in the bundle.
+type Dag struct {
+ Tasks []string `json:"tasks" yaml:"tasks"`
+}
Review Comment:
These are the shared modules for both `airflow-go-pack` tool and the Go-SDK.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]