Copilot commented on code in PR #67154: URL: https://github.com/apache/airflow/pull/67154#discussion_r3285594567
########## go-sdk/pkg/execution/server.go: ########## @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package execution implements the SDK coordinator-protocol runtime +// (msgpack-over-IPC). It is the second mode of bundlev1server.Serve: when +// the bundle binary is launched with --comm/--logs by the Airflow supervisor +// (Python ExecutableCoordinator), bundlev1server.Serve dispatches here. +// +// The first inbound frame on the comm socket selects between two +// sub-protocols: +// +// - DagFileParseRequest: one-shot, returns DagFileParsingResult and exits. +// - StartupDetails: multi-round task execution. +// +// See go-sdk/adr/0003-coordinator-protocol-msgpack-ipc.md. +package execution + +import ( + "fmt" + "log/slog" + "net" + "sync" + "time" + + "github.com/apache/airflow/go-sdk/bundle/bundlev1" +) + +// dialTimeout bounds how long execution.Serve waits to reach the supervisor's +// comm and logs sockets. The supervisor opens the listeners before spawning +// the bundle binary, so the dials normally succeed in milliseconds; the +// timeout exists so an unreachable address fails fast instead of hanging the +// runtime indefinitely. +const dialTimeout = 30 * time.Second + +// Serve runs the bundle binary in coordinator mode. It dials the supervisor's +// comm and logs sockets, installs an slog handler that writes JSON-line +// records to the logs connection, and dispatches on the first frame. +// +// Serve returns nil on a clean shutdown (one-shot DAG parse or task execution +// completed); a non-nil error indicates a protocol-level failure (connection +// loss, malformed frames, unknown first message type). +func Serve(provider bundlev1.BundleProvider, commAddr, logsAddr string) error { + if commAddr == "" { + return fmt.Errorf("missing --comm=host:port argument") + } + if logsAddr == "" { + return fmt.Errorf("missing --logs=host:port argument") + } + + // Buffer log records until the logs socket is connected. Anything the + // runtime emits between Connect-time and the first frame still gets + // flushed. + logHandler := NewSocketLogHandler(nil, slog.LevelDebug) + logger := slog.New(logHandler) + slog.SetDefault(logger) + + // Connect to both sockets concurrently so the supervisor can accept them + // in either order. + dialer := &net.Dialer{Timeout: dialTimeout} + var commConn, logsConn net.Conn + var commErr, logsErr error + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + commConn, commErr = dialer.Dial("tcp", commAddr) + }() + go func() { + defer wg.Done() + logsConn, logsErr = dialer.Dial("tcp", logsAddr) + }() + wg.Wait() + + if commErr != nil { + return fmt.Errorf("connecting to comm socket %s: %w", commAddr, commErr) + } + defer commConn.Close() + if logsErr != nil { + return fmt.Errorf("connecting to logs socket %s: %w", logsAddr, logsErr) + } + defer logsConn.Close() Review Comment: When the comm dial fails, the logs dial may still have succeeded. Returning immediately without closing `logsConn` can leak an open TCP connection (and similarly, `commConn` can be non-nil even when `commErr` is set). Close any non-nil conns on the error paths before returning. ########## go-sdk/pkg/execution/frames.go: ########## @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package execution + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "github.com/vmihailenco/msgpack/v5" +) + +// maxFrameSize caps the payload length a single frame may declare. A +// malformed length prefix from a corrupted stream (or hostile peer) would +// otherwise let readFrame allocate up to 4 GiB before the read failed. +// 64 MiB is far above any legitimate StartupDetails or XCom payload while +// still preventing accidental OOM. +const maxFrameSize = 64 * 1024 * 1024 + +// IncomingFrame represents a decoded frame received from the comm socket. +type IncomingFrame struct { + ID int + Body map[string]any + Err map[string]any // non-nil only for response frames (3-element arrays) +} + +// encodeRequest encodes a request frame (2-element msgpack array: [id, body]). +func encodeRequest(id int, body map[string]any) ([]byte, error) { + var buf bytes.Buffer + enc := msgpack.NewEncoder(&buf) + enc.UseCompactInts(true) + + if err := enc.EncodeArrayLen(2); err != nil { + return nil, err + } + if err := enc.EncodeInt(int64(id)); err != nil { + return nil, err + } + if err := enc.Encode(body); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// writeFrame writes a length-prefixed msgpack payload to the writer. +// Format: [4-byte big-endian length][payload bytes] +func writeFrame(w io.Writer, payload []byte) error { + prefix := make([]byte, 4) + binary.BigEndian.PutUint32(prefix, uint32(len(payload))) + if _, err := w.Write(prefix); err != nil { + return fmt.Errorf("writing length prefix: %w", err) + } + if _, err := w.Write(payload); err != nil { + return fmt.Errorf("writing payload: %w", err) + } + return nil Review Comment: `writeFrame` assumes `io.Writer.Write` writes the full buffer in one call. The `io.Writer` contract allows short writes with `n < len(p)` and `err == nil`, which would corrupt framing on the wire. Check `n` and surface `io.ErrShortWrite` (or loop until the full buffer is written). ########## go-sdk/pkg/execution/frames.go: ########## @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package execution + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "github.com/vmihailenco/msgpack/v5" +) + +// maxFrameSize caps the payload length a single frame may declare. A +// malformed length prefix from a corrupted stream (or hostile peer) would +// otherwise let readFrame allocate up to 4 GiB before the read failed. +// 64 MiB is far above any legitimate StartupDetails or XCom payload while +// still preventing accidental OOM. +const maxFrameSize = 64 * 1024 * 1024 + +// IncomingFrame represents a decoded frame received from the comm socket. +type IncomingFrame struct { + ID int + Body map[string]any + Err map[string]any // non-nil only for response frames (3-element arrays) +} + +// encodeRequest encodes a request frame (2-element msgpack array: [id, body]). +func encodeRequest(id int, body map[string]any) ([]byte, error) { + var buf bytes.Buffer + enc := msgpack.NewEncoder(&buf) + enc.UseCompactInts(true) + + if err := enc.EncodeArrayLen(2); err != nil { + return nil, err + } + if err := enc.EncodeInt(int64(id)); err != nil { + return nil, err + } + if err := enc.Encode(body); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// writeFrame writes a length-prefixed msgpack payload to the writer. +// Format: [4-byte big-endian length][payload bytes] +func writeFrame(w io.Writer, payload []byte) error { + prefix := make([]byte, 4) + binary.BigEndian.PutUint32(prefix, uint32(len(payload))) + if _, err := w.Write(prefix); err != nil { + return fmt.Errorf("writing length prefix: %w", err) + } + if _, err := w.Write(payload); err != nil { + return fmt.Errorf("writing payload: %w", err) + } + return nil +} + +// readFrame reads one length-prefixed msgpack frame from the reader and decodes it. +func readFrame(r io.Reader) (IncomingFrame, error) { + // Read 4-byte big-endian length prefix. + prefix := make([]byte, 4) + if _, err := io.ReadFull(r, prefix); err != nil { + return IncomingFrame{}, fmt.Errorf("reading length prefix: %w", err) + } + payloadLen := binary.BigEndian.Uint32(prefix) + if payloadLen > maxFrameSize { + return IncomingFrame{}, fmt.Errorf( + "frame payload length %d exceeds max %d", + payloadLen, + maxFrameSize, + ) + } + + // Read the payload. + payload := make([]byte, payloadLen) + if _, err := io.ReadFull(r, payload); err != nil { + return IncomingFrame{}, fmt.Errorf("reading payload (%d bytes): %w", payloadLen, err) + } Review Comment: `payloadLen` is a `uint32`, but `make([]byte, payloadLen)` requires an `int` length. This currently won’t compile. Convert to `int(payloadLen)` after the max-size guard (so the conversion is safe) before allocating/reading the payload. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
