Copilot commented on code in PR #67154:
URL: https://github.com/apache/airflow/pull/67154#discussion_r3280509008


##########
go-sdk/sdk/sdk.go:
##########
@@ -43,6 +52,12 @@ type VariableClient interface {
        //                              // Other errors here, such as http 
network timeouts etc.
        //              }
        GetVariable(ctx context.Context, key string) (string, error)
+
+       // UnmarshalJSONVariable fetches a variable and json.Unmarshal's its 
value
+       // into pointer. Use this when the variable was stored as a JSON object,
+       // array, or number; for plain string variables call GetVariable 
directly.

Review Comment:
   Docstring grammar: "json.Unmarshal's" is unidiomatic/incorrect (reads like a 
possessive). Consider rephrasing to "json.Unmarshal" (e.g. "unmarshals") to 
keep GoDoc clear.
   



##########
go-sdk/pkg/execution/task_runner.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "context"
+       "fmt"
+       "log/slog"
+       "runtime/debug"
+       "time"
+
+       "github.com/google/uuid"
+
+       "github.com/apache/airflow/go-sdk/bundle/bundlev1"
+       "github.com/apache/airflow/go-sdk/pkg/api"
+       "github.com/apache/airflow/go-sdk/pkg/sdkcontext"
+       "github.com/apache/airflow/go-sdk/sdk"
+)
+
+// RunTask executes a task based on StartupDetails received from the 
supervisor.
+//
+// It looks up the task in the bundle, creates a CoordinatorClient for SDK
+// calls, executes the task, and returns a terminal message body
+// (SucceedTaskMsg or TaskStateMsg) ready to ship as the final response frame.
+//
+// The supervisor owns the Execution-API state transitions in coordinator
+// mode, so we deliberately bypass worker.ExecuteTaskWorkload (which drives
+// Run / UpdateState itself) and only invoke the user's task function.
+func RunTask(
+       bundle bundlev1.Bundle,
+       details *StartupDetails,
+       comm *CoordinatorComm,
+       logger *slog.Logger,
+) map[string]any {
+       task, exists := bundle.LookupTask(details.TI.DagID, details.TI.TaskID)
+       if !exists {
+               logger.Error("Task not registered",
+                       "dag_id", details.TI.DagID,
+                       "task_id", details.TI.TaskID,
+               )
+               return TaskStateMsg{State: TaskStateRemoved, EndDate: 
time.Now().UTC()}.toMap()
+       }
+
+       client := NewCoordinatorClient(comm, details)
+
+       // taskFunction.sendXcom reads the workload from context to get the task
+       // instance ids; populate it the same shape the gRPC path uses.
+       tiUUID, err := uuid.Parse(details.TI.ID)
+       if err != nil {
+               logger.Error("Invalid task instance UUID from supervisor",
+                       "dag_id", details.TI.DagID,
+                       "task_id", details.TI.TaskID,
+                       "ti_id", details.TI.ID,
+                       "error", err,
+               )
+               return TaskStateMsg{State: TaskStateFailed, EndDate: 
time.Now().UTC()}.toMap()
+       }
+       mapIndex := details.TI.MapIndex
+       workload := api.ExecuteTaskWorkload{
+               TI: api.TaskInstance{
+                       Id:        tiUUID,
+                       DagId:     details.TI.DagID,
+                       RunId:     details.TI.RunID,
+                       TaskId:    details.TI.TaskID,
+                       TryNumber: details.TI.TryNumber,
+                       MapIndex:  &mapIndex,
+               },
+               BundleInfo: api.BundleInfo{
+                       Name:    details.BundleInfo.Name,
+                       Version: &details.BundleInfo.Version,
+               },
+       }
+
+       ctx := context.Background()
+       ctx = context.WithValue(ctx, sdkcontext.WorkloadContextKey, workload)
+       ctx = context.WithValue(ctx, sdkcontext.SdkClientContextKey, 
sdk.Client(client))
+
+       return executeTask(ctx, task, logger)
+}
+
+// executeTask runs the task and handles success, failure, and panics.
+func executeTask(
+       ctx context.Context,
+       task bundlev1.Task,
+       logger *slog.Logger,
+) (result map[string]any) {
+       defer func() {
+               if r := recover(); r != nil {
+                       logger.Error("Recovered panic in task",
+                               "error", r,
+                               "stack", string(debug.Stack()),
+                       )
+                       result = TaskStateMsg{
+                               State:   TaskStateFailed,
+                               EndDate: time.Now().UTC(),
+                       }.toMap()
+               }
+       }()
+
+       if err := task.Execute(ctx, logger); err != nil {
+               logger.Error("Task failed", "error", fmt.Sprintf("%v", err))

Review Comment:
   Logging the error via fmt.Sprintf("%v", err) loses the structured error 
value; slog can take the error directly. Pass the error as the value (and 
ideally use ErrorContext with ctx) so downstream handlers can format/redact 
consistently.
   



##########
go-sdk/pkg/execution/client.go:
##########
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+
+       "github.com/apache/airflow/go-sdk/pkg/api"
+       "github.com/apache/airflow/go-sdk/sdk"
+)
+
+// CoordinatorClient implements sdk.Client by communicating with the Airflow 
supervisor
+// over the comm socket using msgpack-framed IPC instead of HTTP.
+type CoordinatorClient struct {
+       comm    *CoordinatorComm
+       details *StartupDetails
+}
+
+var _ sdk.Client = (*CoordinatorClient)(nil)
+
+// NewCoordinatorClient creates a new client backed by the comm socket.
+func NewCoordinatorClient(comm *CoordinatorComm, details *StartupDetails) 
*CoordinatorClient {
+       return &CoordinatorClient{
+               comm:    comm,
+               details: details,
+       }
+}
+
+// GetVariable requests a variable value from the supervisor.
+func (c *CoordinatorClient) GetVariable(_ context.Context, key string) 
(string, error) {
+       resp, err := c.comm.Communicate(GetVariableMsg{Key: key}.toMap())

Review Comment:
   CoordinatorClient methods currently ignore the provided context (e.g. 
GetVariable takes `_ context.Context`). If the task context is cancelled or has 
a deadline, SDK calls can still block indefinitely waiting on the comm socket. 
Thread ctx through to CoordinatorComm (or at least check ctx.Done before/while 
waiting for the response) so cancellations propagate cleanly.
   



##########
go-sdk/pkg/execution/comms.go:
##########
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "errors"
+       "fmt"
+       "io"
+       "log/slog"
+       "sync"
+       "sync/atomic"
+)
+
+// CoordinatorComm manages bidirectional communication with the Airflow
+// supervisor over a length-prefixed msgpack socket connection.
+//
+// Reads are multiplexed by frame ID. A single background reader goroutine,
+// lazily started on the first Communicate call, consumes inbound frames and
+// dispatches each one to the Communicate caller whose request used that ID.
+// This lets multiple goroutines call Communicate concurrently without
+// serialising the full send-then-read round trip behind a single mutex, and
+// guarantees the response a caller receives matches the request it sent.
+//
+// The supervisor's initial frame (DagFileParseRequest or StartupDetails)
+// arrives unsolicited, before any client request is in flight, and is read
+// synchronously via ReadMessage; ReadMessage must not be called after the
+// dispatcher has been started.
+type CoordinatorComm struct {
+       reader io.Reader
+       writer io.Writer
+       nextID atomic.Int64
+       logger *slog.Logger
+
+       wmu sync.Mutex // serialises writes
+
+       // Multiplexer state. mu protects every field below.
+       mu      sync.Mutex
+       pending map[int]chan frameResult
+       started bool
+       readErr error
+}
+
+// frameResult is the value the dispatcher delivers to a Communicate caller.
+type frameResult struct {
+       frame IncomingFrame
+       err   error
+}
+
+// ErrDispatcherClosed is wrapped into the error Communicate returns once the
+// background reader goroutine has exited — typically because the supervisor
+// closed the comm socket.
+var ErrDispatcherClosed = errors.New("coordinator comm: dispatcher closed")
+
+// NewCoordinatorComm creates a new communication channel.
+func NewCoordinatorComm(reader io.Reader, writer io.Writer, logger 
*slog.Logger) *CoordinatorComm {
+       return &CoordinatorComm{
+               reader:  reader,
+               writer:  writer,
+               logger:  logger,
+               pending: make(map[int]chan frameResult),
+       }
+}
+
+// ReadMessage reads and decodes one frame directly from the comm socket.
+// It is used to read the supervisor's initial frame before any 
request/response
+// traffic begins. Calling it after the dispatcher has started would race the
+// reader goroutine for input bytes, so it returns an error in that case.
+func (c *CoordinatorComm) ReadMessage() (IncomingFrame, error) {
+       c.mu.Lock()
+       started := c.started
+       c.mu.Unlock()
+       if started {
+               return IncomingFrame{}, errors.New(
+                       "coordinator comm: ReadMessage cannot be used after the 
dispatcher has started",
+               )
+       }
+
+       frame, err := readFrame(c.reader)
+       if err != nil {
+               return IncomingFrame{}, fmt.Errorf("reading frame: %w", err)
+       }
+       c.logger.Debug("Received frame", "id", frame.ID)
+       return frame, nil
+}
+
+// SendRequest writes a request frame (2-element [id, body]) to the supervisor.
+// Concurrent calls are serialised so frames are never interleaved on the wire.
+func (c *CoordinatorComm) SendRequest(id int, body map[string]any) error {
+       payload, err := encodeRequest(id, body)
+       if err != nil {
+               return fmt.Errorf("encoding request: %w", err)
+       }
+       c.logger.Debug("Sending request", "id", id)
+       c.wmu.Lock()
+       defer c.wmu.Unlock()
+       return writeFrame(c.writer, payload)
+}
+
+// Communicate sends a request and blocks until the supervisor's response with
+// the matching frame ID is delivered by the dispatcher. Safe to call
+// concurrently from multiple goroutines.
+//
+// If the response carries an error (either as the third element of a 3-tuple
+// frame or as a body whose "type" is "ErrorResponse") it is returned as an
+// *ApiError. If the dispatcher's read loop has terminated, the underlying read
+// error is returned wrapped in ErrDispatcherClosed.
+func (c *CoordinatorComm) Communicate(body map[string]any) (map[string]any, 
error) {
+       id := int(c.nextID.Add(1) - 1)
+       ch := make(chan frameResult, 1)
+
+       // Register the waiter under the same lock the dispatcher uses, and 
before
+       // sending the request, so the dispatcher can never deliver the response
+       // (or a terminal read error) before this caller is ready to receive it.
+       c.mu.Lock()
+       if !c.started {
+               c.started = true
+               go c.readLoop()
+       }
+       if c.readErr != nil {
+               err := c.readErr
+               c.mu.Unlock()
+               return nil, fmt.Errorf("%w: %w", ErrDispatcherClosed, err)
+       }
+       c.pending[id] = ch
+       c.mu.Unlock()
+
+       if err := c.SendRequest(id, body); err != nil {
+               c.mu.Lock()
+               delete(c.pending, id)
+               c.mu.Unlock()
+               return nil, err
+       }
+
+       result := <-ch
+       if result.err != nil {
+               return nil, fmt.Errorf("%w: %w", ErrDispatcherClosed, 
result.err)

Review Comment:
   CoordinatorComm.Communicate blocks on `<-ch` with no way to respect 
cancellation/timeouts. Since higher-level SDK methods accept a context, 
consider adding `ctx context.Context` to Communicate and selecting on 
`ctx.Done()` to avoid permanently stuck goroutines when the supervisor is 
slow/unresponsive.



##########
go-sdk/pkg/execution/frames.go:
##########
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/vmihailenco/msgpack/v5"
+)
+
+// maxFrameSize caps the payload length a single frame may declare. A
+// malformed length prefix from a corrupted stream (or hostile peer) would
+// otherwise let readFrame allocate up to 4 GiB before the read failed.
+// 64 MiB is far above any legitimate StartupDetails or XCom payload while
+// still preventing accidental OOM.
+const maxFrameSize = 64 * 1024 * 1024
+
+// IncomingFrame represents a decoded frame received from the comm socket.
+type IncomingFrame struct {
+       ID   int
+       Body map[string]any
+       Err  map[string]any // non-nil only for response frames (3-element 
arrays)
+}
+
+// encodeRequest encodes a request frame (2-element msgpack array: [id, body]).
+func encodeRequest(id int, body map[string]any) ([]byte, error) {
+       var buf bytes.Buffer
+       enc := msgpack.NewEncoder(&buf)
+       enc.UseCompactInts(true)
+
+       if err := enc.EncodeArrayLen(2); err != nil {
+               return nil, err
+       }
+       if err := enc.EncodeInt(int64(id)); err != nil {
+               return nil, err
+       }
+       if err := enc.Encode(body); err != nil {
+               return nil, err
+       }
+       return buf.Bytes(), nil
+}
+
+// writeFrame writes a length-prefixed msgpack payload to the writer.
+// Format: [4-byte big-endian length][payload bytes]
+func writeFrame(w io.Writer, payload []byte) error {
+       prefix := make([]byte, 4)
+       binary.BigEndian.PutUint32(prefix, uint32(len(payload)))
+       if _, err := w.Write(prefix); err != nil {
+               return fmt.Errorf("writing length prefix: %w", err)
+       }
+       if _, err := w.Write(payload); err != nil {
+               return fmt.Errorf("writing payload: %w", err)
+       }
+       return nil
+}
+
+// readFrame reads one length-prefixed msgpack frame from the reader and 
decodes it.
+func readFrame(r io.Reader) (IncomingFrame, error) {
+       // Read 4-byte big-endian length prefix.
+       prefix := make([]byte, 4)
+       if _, err := io.ReadFull(r, prefix); err != nil {
+               return IncomingFrame{}, fmt.Errorf("reading length prefix: %w", 
err)
+       }
+       payloadLen := binary.BigEndian.Uint32(prefix)
+       if payloadLen > maxFrameSize {
+               return IncomingFrame{}, fmt.Errorf(
+                       "frame payload length %d exceeds max %d",
+                       payloadLen,
+                       maxFrameSize,
+               )
+       }
+
+       // Read the payload.
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return IncomingFrame{}, fmt.Errorf("reading payload (%d bytes): 
%w", payloadLen, err)
+       }
+
+       return decodeFrame(payload)
+}
+
+// decodeFrame decodes a msgpack payload into an IncomingFrame.
+func decodeFrame(data []byte) (IncomingFrame, error) {
+       dec := msgpack.NewDecoder(bytes.NewReader(data))
+
+       arrLen, err := dec.DecodeArrayLen()
+       if err != nil {
+               return IncomingFrame{}, fmt.Errorf("decoding array header: %w", 
err)
+       }
+       if arrLen < 2 {
+               return IncomingFrame{}, fmt.Errorf("unexpected frame arity %d, 
need at least 2", arrLen)
+       }
+
+       id64, err := dec.DecodeInt64()
+       if err != nil {
+               return IncomingFrame{}, fmt.Errorf("decoding frame id: %w", err)
+       }
+
+       // Decode the body element.
+       bodyRaw, err := dec.DecodeInterface()
+       if err != nil {
+               return IncomingFrame{}, fmt.Errorf("decoding body: %w", err)
+       }
+       body, _ := toStringMap(bodyRaw)
+
+       // For response frames (3-element), decode the error element.
+       var errMap map[string]any
+       if arrLen >= 3 {
+               errRaw, err := dec.DecodeInterface()
+               if err != nil {
+                       return IncomingFrame{}, fmt.Errorf("decoding error 
element: %w", err)
+               }
+               errMap, _ = toStringMap(errRaw)
+       }

Review Comment:
   decodeFrame currently ignores failed map conversions (`body, _ := 
toStringMap(bodyRaw)` / `errMap, _ = toStringMap(errRaw)`), silently turning a 
non-map payload into nil. This can mask protocol bugs and lead to confusing 
downstream errors. Consider returning a decode error when bodyRaw/errRaw is 
non-nil but not a map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to