jason810496 commented on code in PR #67156:
URL: https://github.com/apache/airflow/pull/67156#discussion_r3353761862


##########
go-sdk/bundle/bundlev1/bundlev1server/server.go:
##########
@@ -114,9 +127,21 @@ func Serve(bundle bundlev1.BundleProvider, opts 
...ServeOpt) error {
                c.ApplyServeOpt(serveConfig)
        }
 
-       switch decideMode() {
-       case modeMetadataDump:
-               return dumpBundleMetadata(bundle)

Review Comment:
   Intentionally remove the `dumpBundleMetadata` utility. Since we don't care 
the bundle metadata at all.



##########
go-sdk/bundle/bundlev1/registry.go:
##########
@@ -43,9 +43,30 @@ type (
                AddDag(dagId string) Dag
        }
 
+       // TaskInfo describes a registered task by its user-visible id.
+       TaskInfo struct {
+               ID string
+       }
+
+       // DagInfo describes a registered dag together with its tasks in
+       // registration order.
+       DagInfo struct {
+               DagID string
+               Tasks []TaskInfo
+       }
+
+       // EnumerableBundle exposes the dag/task identity recorded by 
RegisterDags.
+       // The default registry implements it; airflow-go-pack relies on it to 
read
+       // a bundle's dag/task ids without executing any task.
+       EnumerableBundle interface {
+               OrderedDags() []DagInfo
+       }
+

Review Comment:
   The same `EnumerableBundle` interface will be used by further "defining 
whole Dag in Go-SDK" feature -- https://github.com/apache/airflow/pull/67155



##########
go-sdk/cmd/airflow-go-pack/pack.go:
##########
@@ -0,0 +1,666 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+       "go/ast"
+       "go/parser"
+       "go/token"
+       "io"
+       "os"
+       "os/exec"
+       "path/filepath"
+       "runtime"
+       "sort"
+       "strings"
+
+       "gopkg.in/yaml.v3"
+
+       "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+       "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+// packOptions are the flags accepted by the root pack command.
+type packOptions struct {
+       pkg             string   // target package (default ".")
+       source          string   // override the auto-detected DAG source file
+       executable      string   // pack a pre-built binary instead of building
+       output          string   // override the default <bundleName> output 
path
+       airflowMetadata string   // path to a pre-captured --airflow-metadata 
manifest (JSON or YAML)
+       goos            string   // target GOOS for the deployable build (falls 
back to env GOOS, then host)
+       goarch          string   // target GOARCH for the deployable build 
(falls back to env GOARCH, then host)
+       buildArgs       []string // forwarded verbatim to `go build` (already 
includes the leading "--")
+}
+
+func runPack(stdout, stderr io.Writer, opts *packOptions) error {
+       if opts.executable != "" && len(opts.buildArgs) > 0 {
+               return fmt.Errorf("--executable is mutually exclusive with go 
build flags after \"--\"")
+       }
+       if opts.executable != "" && (opts.goos != "" || opts.goarch != "") {
+               return fmt.Errorf(
+                       "--executable is mutually exclusive with 
--goos/--goarch: --executable packs the " +
+                               "binary as-is and never builds, so it cannot 
cross-compile. To cross-build a " +
+                               "bundle, drop --executable and pass 
--goos/--goarch with the package path",
+               )
+       }
+
+       // Resolve the DAG source file for both modes up front. --executable 
requires
+       // it explicitly; the build path falls back to discovery.
+       sourcePath := opts.source
+       if opts.executable != "" {
+               if sourcePath == "" {
+                       return fmt.Errorf(
+                               "--executable requires --source: cannot infer 
the DAG source for a pre-built binary",
+                       )
+               }
+       } else if sourcePath == "" {
+               // --source is the documented escape hatch for packages whose 
main file
+               // cannot be auto-detected: it may be selected by build tags or 
GOOS, or
+               // the package may have several files with func main(). 
Discovery runs a
+               // plain `go list` without the forwarded build flags, so it can 
fail or
+               // pick the wrong file for such packages. Only fall back to 
discovery
+               // when --source was not supplied, so an explicit --source 
always wins.
+               discovered, err := discoverMainSource(opts.pkg)
+               if err != nil {
+                       return fmt.Errorf("locating DAG source file: %w", err)
+               }
+               sourcePath = discovered
+       }
+       if _, err := os.Stat(sourcePath); err != nil {
+               return fmt.Errorf("source file %s: %w", sourcePath, err)
+       }
+
+       output := opts.output
+       if output == "" {
+               defaultPath, err := defaultOutputPath(sourcePath)
+               if err != nil {
+                       return fmt.Errorf("determining default output path: 
%w", err)
+               }
+               output = defaultPath
+       }
+
+       // The bundle is finalised by renaming a temp file onto output, which 
fails
+       // if output is an existing directory. Catch that here: the default 
output is
+       // the package directory's name, so packing ./foo from a dir that 
already has
+       // a ./foo directory collides. Report it with the fix instead of a bare
+       // "rename ...: file exists" from os.Rename. Done before any build so a
+       // misconfigured output fails fast rather than after a (cross) go build.
+       if info, err := os.Stat(output); err == nil && info.IsDir() {
+               return fmt.Errorf(
+                       "output path %q is an existing directory (the bundle 
output defaults to the "+
+                               "package directory's name); pass --output to 
write the bundle to a file path",
+                       output,
+               )
+       }
+
+       // execPath is the binary that receives the footer (the deployable 
artefact,
+       // which MAY be cross-compiled). introspectPath is the binary 
obtainMetadata
+       // reads --airflow-metadata from. By default that means exec'ing it on 
the
+       // host (so it must be host-runnable, hence the cross-compile sidecar 
below),
+       // but --airflow-metadata bypasses it entirely.
+       var execPath, introspectPath string
+       cleanupExec := func() {}
+       defer func() { cleanupExec() }()
+
+       if opts.executable != "" {
+               execPath = opts.executable
+               introspectPath = opts.executable
+       } else {
+               targetGOOS, targetGOARCH := targetPlatform(opts)
+               artifact, cleanup, err := buildPackage(stderr, opts.pkg, 
opts.buildArgs, targetGOOS, targetGOARCH)
+               if err != nil {
+                       return err
+               }
+               execPath = artifact
+               cleanupExec = cleanup
+               introspectPath = artifact
+
+               // Reading the manifest means exec'ing the binary, so it must 
be a
+               // host-native build. When cross-compiling, the artefact cannot 
run
+               // here; build a throwaway host binary from the same sources 
and the
+               // same forwarded `--` build flags (DAG/task identity is 
arch-independent)
+               // solely to introspect. This sidecar is unnecessary when
+               // --airflow-metadata supplies the manifest directly.
+               crossCompiling := targetGOOS != runtime.GOOS || targetGOARCH != 
runtime.GOARCH
+               if crossCompiling && opts.airflowMetadata == "" {
+                       hostBin, cleanupHost, err := buildPackage(stderr, 
opts.pkg, opts.buildArgs, runtime.GOOS, runtime.GOARCH)
+                       if err != nil {
+                               return fmt.Errorf("building host binary for 
metadata introspection: %w", err)
+                       }
+                       prevCleanup := cleanupExec
+                       cleanupExec = func() { cleanupHost(); prevCleanup() }
+                       introspectPath = hostBin
+               }
+       }
+
+       if _, err := os.Stat(execPath); err != nil {
+               return fmt.Errorf("executable %s: %w", execPath, err)
+       }
+
+       if err := rejectOutputAlias(output, execPath, sourcePath); err != nil {
+               return err
+       }
+
+       meta, err := obtainMetadata(opts, introspectPath)
+       if err != nil {
+               return err
+       }
+       if len(meta.Dags) == 0 {
+               return fmt.Errorf("bundle exposes no dags: nothing to pack")
+       }
+       for dagID, dag := range meta.Dags {
+               if len(dag.Tasks) == 0 {
+                       fmt.Fprintf(stderr, "warning: dag %q has no tasks\n", 
dagID)
+               }
+       }
+
+       manifest, err := renderManifest(meta, filepath.Base(sourcePath))
+       if err != nil {
+               return fmt.Errorf("rendering manifest: %w", err)
+       }
+       sourceBytes, err := os.ReadFile(sourcePath)
+       if err != nil {
+               return fmt.Errorf("reading source file: %w", err)
+       }
+
+       // Assemble the bundle through a temp file and atomically move it into
+       // place: we never mutate the build artefact or the user-supplied
+       // --executable, and a failed pack never leaves a truncated or 
half-written
+       // file at output.
+       if err := writeBundle(execPath, output, sourceBytes, manifest); err != 
nil {
+               return err
+       }
+
+       fmt.Fprintf(stdout, "Wrote bundle %s (sdk=%s/%s, dags=%d)\n",
+               output, meta.SDK.Language, meta.SDK.Version, len(meta.Dags))
+       return nil
+}
+
+// defaultOutputPath derives the default bundle output path from the directory
+// that owns the DAG source file. That directory is the bundle's main package,
+// so its base name is what `go build` itself would name the binary. On Windows
+// the .exe suffix is appended.
+func defaultOutputPath(sourcePath string) (string, error) {
+       abs, err := filepath.Abs(sourcePath)
+       if err != nil {
+               return "", err
+       }
+       name := filepath.Base(filepath.Dir(abs))
+       if runtime.GOOS == "windows" {
+               name += ".exe"
+       }
+       return name, nil
+}
+
+// discoverMainSource locates the file in the given package whose AST contains
+// a top-level `func main()`. Returns an error if the package has zero or
+// more than one such file, mirroring ADR 0002's discovery contract.
+func discoverMainSource(pkg string) (string, error) {
+       cmd := exec.Command("go", "list", "-f", "{{.Dir}}\n{{range 
.GoFiles}}{{.}}\n{{end}}", pkg)
+       var stdout, stderr bytes.Buffer
+       cmd.Stdout = &stdout
+       cmd.Stderr = &stderr
+       if err := cmd.Run(); err != nil {
+               return "", fmt.Errorf(
+                       "go list %s: %w: %s\n"+
+                               "airflow-go-pack packs the Go package in the 
current directory by default; "+
+                               "pass your bundle's package path (e.g. 
`airflow-go-pack ./path/to/bundle`) "+
+                               "or --source to point at the DAG source file 
directly",
+                       pkg, err, strings.TrimSpace(stderr.String()),
+               )
+       }
+
+       lines := splitNonEmpty(stdout.String())
+       if len(lines) < 2 {
+               return "", fmt.Errorf("package %s has no Go source files", pkg)
+       }
+       dir := lines[0]
+       files := lines[1:]
+
+       fset := token.NewFileSet()
+       var matches []string
+       for _, name := range files {
+               full := filepath.Join(dir, name)
+               f, err := parser.ParseFile(fset, full, nil, 
parser.SkipObjectResolution)
+               if err != nil {
+                       return "", fmt.Errorf("parsing %s: %w", full, err)
+               }
+               if hasMainFunc(f) {
+                       matches = append(matches, full)
+               }
+       }
+       switch len(matches) {
+       case 0:
+               return "", fmt.Errorf("no file in package %s defines func 
main()", pkg)
+       case 1:
+               return matches[0], nil
+       default:
+               return "", fmt.Errorf(
+                       "multiple files in package %s define func main(): %v; 
use --source to disambiguate",
+                       pkg,
+                       matches,
+               )
+       }
+}
+
+func hasMainFunc(f *ast.File) bool {
+       for _, decl := range f.Decls {
+               fn, ok := decl.(*ast.FuncDecl)
+               if !ok {
+                       continue
+               }
+               if fn.Recv != nil {
+                       continue
+               }
+               if fn.Name.Name != "main" {
+                       continue
+               }
+               if fn.Type.Params != nil && len(fn.Type.Params.List) != 0 {
+                       continue
+               }
+               return true
+       }
+       return false
+}
+
+func splitNonEmpty(s string) []string {
+       var out []string
+       for line := range strings.SplitSeq(s, "\n") {
+               if t := strings.TrimSpace(line); t != "" {
+                       out = append(out, t)
+               }
+       }
+       return out
+}
+
+// targetPlatform resolves the GOOS/GOARCH the deployable bundle is built for:
+// the --goos/--goarch flags win, then the ambient GOOS/GOARCH env, then the
+// host. The flags exist because `go tool airflow-go-pack` builds the packer
+// using the ambient GOOS/GOARCH — setting those in the env to cross-compile a
+// bundle would instead cross-build the packer itself and fail to exec it on 
the
+// host. Passing the target via flags keeps the env (and the packer build)
+// host-native while still cross-building the bundle.
+func targetPlatform(opts *packOptions) (goos, goarch string) {
+       goos = runtime.GOOS
+       if env := os.Getenv("GOOS"); env != "" {
+               goos = env
+       }
+       if opts.goos != "" {
+               goos = opts.goos
+       }
+       goarch = runtime.GOARCH
+       if env := os.Getenv("GOARCH"); env != "" {
+               goarch = env
+       }
+       if opts.goarch != "" {
+               goarch = opts.goarch
+       }
+       return goos, goarch
+}
+
+// buildPackage runs `go build [extraArgs...] -o <tmp>/bundle <pkg>` for the
+// given GOOS/GOARCH and returns the path to the freshly built executable plus 
a
+// cleanup function. extraArgs is the slice that comes after the "--" separator
+// on the airflow-go-pack command line; we drop the leading "--" before
+// forwarding. GOOS/GOARCH are set explicitly (overriding any ambient env) so
+// the caller controls the target: the deployable build uses the resolved 
target
+// platform, the introspection sidecar uses the host.
+func buildPackage(
+       stderr io.Writer,
+       pkg string,
+       extraArgs []string,
+       goos, goarch string,
+) (string, func(), error) {
+       tmpDir, err := os.MkdirTemp("", "airflow-go-pack-*")
+       if err != nil {
+               return "", nil, fmt.Errorf("creating temp dir: %w", err)
+       }
+       cleanup := func() { _ = os.RemoveAll(tmpDir) }
+
+       binName := "bundle"
+       if goos == "windows" {
+               binName += ".exe"
+       }
+       outPath := filepath.Join(tmpDir, binName)
+
+       args := []string{"build"}
+       for _, a := range extraArgs {
+               if a == "--" {
+                       continue
+               }
+               args = append(args, a)
+       }
+       args = append(args, "-o", outPath, pkg)
+
+       cmd := exec.Command("go", args...)
+       // Later duplicate keys win in os/exec, so these override any ambient
+       // GOOS/GOARCH (which `go tool` already used to build this packer).
+       cmd.Env = append(os.Environ(), "GOOS="+goos, "GOARCH="+goarch)
+       cmd.Stdout = stderr
+       cmd.Stderr = stderr
+       if err := cmd.Run(); err != nil {
+               cleanup()
+               return "", nil, fmt.Errorf("go build failed: %w", err)
+       }
+       return outPath, cleanup, nil
+}
+
+func readAirflowMetadata(execPath string) (airflowmetadata.Manifest, error) {
+       out, err := runIntrospect(execPath, "--airflow-metadata")
+       if err != nil {
+               return airflowmetadata.Manifest{}, err
+       }
+       // Decode with a YAML decoder: it reads the binary's YAML default and 
its
+       // --format json output alike (JSON is a subset of YAML).
+       var meta airflowmetadata.Manifest
+       if err := yaml.Unmarshal(out, &meta); err != nil {
+               return airflowmetadata.Manifest{}, fmt.Errorf(
+                       "decoding --airflow-metadata output (YAML/JSON): %w",
+                       err,
+               )
+       }
+       return meta, nil
+}
+
+// obtainMetadata resolves the bundle manifest either from an explicit
+// --airflow-metadata file or by exec'ing a host-runnable introspection binary.
+// In --executable mode a binary that cannot be exec'd on the host is a hard
+// error with remediation guidance: --executable expects a same-platform 
binary,
+// and the packer never silently rebuilds a host binary, because a rebuild from
+// unknown inputs (the original build tags, ldflags, and GOOS/GOARCH-specific
+// files are not known here, and build flags are rejected in --executable mode)
+// can advertise a different DAG/task set than the artefact actually shipped.
+func obtainMetadata(opts *packOptions, introspectPath string) 
(airflowmetadata.Manifest, error) {
+       if opts.airflowMetadata != "" {
+               meta, err := readMetadataFile(opts.airflowMetadata)
+               if err != nil {
+                       return airflowmetadata.Manifest{}, fmt.Errorf(
+                               "--airflow-metadata %s: %w",
+                               opts.airflowMetadata,
+                               err,
+                       )
+               }
+               return meta, nil
+       }
+
+       meta, err := readAirflowMetadata(introspectPath)
+       if err == nil {
+               return meta, nil
+       }
+       if opts.executable != "" && errors.Is(err, errExecNotStartable) {
+               return airflowmetadata.Manifest{}, fmt.Errorf(

Review Comment:
   Error logs for user to act for the cross-platform build.



##########
go-sdk/pkg/execution/metadata.go:
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "encoding/json"
+       "fmt"
+       "os"
+       "runtime/debug"
+
+       "gopkg.in/yaml.v3"
+
+       "github.com/apache/airflow/go-sdk/bundle/bundlev1"
+       "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+)
+
+// sdkModulePath is the import path of the SDK module. Used to identify the
+// SDK version from the bundle binary's build info dependencies.
+const sdkModulePath = "github.com/apache/airflow/go-sdk"
+
+// MetadataFormat selects the encoding DumpAirflowMetadata writes to stdout for
+// the bundle binary's --airflow-metadata flag.
+type MetadataFormat string
+
+const (
+       // MetadataFormatYAML is the default; it matches the 
airflow-metadata.yaml a
+       // bundle embeds, so `mybundle --airflow-metadata > 
airflow-metadata.yaml`
+       // yields a ready-to-use file.
+       MetadataFormatYAML MetadataFormat = "yaml"
+       // MetadataFormatJSON is opt-in via --format json.
+       MetadataFormatJSON MetadataFormat = "json"
+)
+
+// ParseMetadataFormat validates a --format value and returns the matching
+// MetadataFormat. An empty value defaults to YAML.
+func ParseMetadataFormat(s string) (MetadataFormat, error) {
+       switch MetadataFormat(s) {
+       case "", MetadataFormatYAML:
+               return MetadataFormatYAML, nil
+       case MetadataFormatJSON:
+               return MetadataFormatJSON, nil
+       default:
+               return "", fmt.Errorf(
+                       "unsupported --airflow-metadata format %q: want %q or 
%q",
+                       s, MetadataFormatYAML, MetadataFormatJSON,
+               )
+       }
+}
+
+// DumpAirflowMetadata writes the bundle's airflow-metadata manifest to stdout
+// (YAML by default, JSON when format is MetadataFormatJSON). It runs
+// RegisterDags against an in-memory recorder only — no gRPC server, no 
external
+// services. airflow-go-pack execs the binary with --airflow-metadata and
+// decodes this output to build the embedded manifest.
+func DumpAirflowMetadata(bundle bundlev1.BundleProvider, format 
MetadataFormat) error {

Review Comment:
   The call chain: `executable --airflow-metadata` -> `DumpAirflowMetadata` -> 
`bundle.RegisterDags` -> `EnumerableBundle`/



##########
go-sdk/internal/airflowmetadata/airflowmetadata.go:
##########
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package airflowmetadata defines the airflow-metadata manifest wire shape
+// shared between the producer (a bundle binary's --airflow-metadata flag,
+// emitted from pkg/execution) and the consumer (airflow-go-pack, which decodes
+// it and renders the embedded airflow-metadata.yaml). Keeping the definition 
in
+// one place stops the two sides from drifting. The canonical schema is
+// airflow-metadata.schema.json in the Task SDK docs.
+package airflowmetadata
+
+// FormatVersion is the bundle-spec version emitted manifests conform to.
+const FormatVersion = "1.0"
+
+// Manifest is the shape printed by a bundle binary's --airflow-metadata flag
+// (YAML by default, JSON under --format json). It mirrors
+// airflow-metadata.schema.json minus the source field, which only the packer
+// can resolve from the build inputs. The yaml tags let the packer's
+// --airflow-metadata flag decode a captured manifest in either JSON or YAML
+// (the airflow-metadata.yaml in a bundle).
+type Manifest struct {
+       AirflowBundleMetadataVersion string         
`json:"airflow_bundle_metadata_version" yaml:"airflow_bundle_metadata_version"`
+       SDK                          SDK            `json:"sdk"                 
            yaml:"sdk"`
+       Dags                         map[string]Dag `json:"dags"                
            yaml:"dags"`
+}
+
+// SDK identifies the SDK that produced the bundle.
+type SDK struct {
+       Language                string `json:"language"                  
yaml:"language"`
+       Version                 string `json:"version"                   
yaml:"version"`
+       SupervisorSchemaVersion string `json:"supervisor_schema_version" 
yaml:"supervisor_schema_version"`
+}
+
+// Dag is the static description of a single DAG declared in the bundle.
+type Dag struct {
+       Tasks []string `json:"tasks" yaml:"tasks"`
+}

Review Comment:
   These are the shared modules for both `airflow-go-pack` tool and the Go-SDK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to