dtenedor commented on PR #55657:
URL: https://github.com/apache/spark/pull/55657#issuecomment-4422623788

   ## PR Review: SPARK-56413 — gRPC UDF execution protocol
   
   ### Summary
   
   The PR adds `udf/worker/proto/src/main/protobuf/udf_protocol.proto`, swaps 
the placeholder `InitMessage` case class on `WorkerSession` for the generated 
`Init` proto, fixes two typos in `common.proto`, and updates the README and a 
test. The protocol defines:
   
   - `service Worker { rpc Execute(stream UdfRequest) returns (stream 
UdfResponse); rpc Manage(WorkerRequest) returns (WorkerResponse); }`
   - `Execute` wire order: `Init` → `PayloadChunk*` → 
(`DataRequest`/`DataResponse`)* → exactly one `Finish` or `Cancel`.
   - Manage operations: `Heartbeat`, `ShutdownRequest`.
   
   Overall this is a high-quality, well-documented wire contract. The doc 
comments are unusually thorough (lifecycle, ordering, "Required/Optional" tags, 
escape-hatch conventions, reserved ranges), and the engine/client split — typed 
engine-side fields vs. opaque `UdfPayload` carrying everything the client packs 
— is a good factoring.
   
   Below are mostly questions / suggested clarifications; only a couple are 
blocking-ish.
   
   ---
   
   ### Strengths
   
   - Clear separation of envelope (`UdfRequest`/`UdfResponse`) vs. control 
(`UdfControlRequest`/`UdfControlResponse`) vs. data 
(`DataRequest`/`DataResponse`).
   - Top-level `bytes data` on `DataRequest`/`DataResponse` (not nested) — 
explicit copy-avoidance rationale is great.
   - Reserved field range `8 to 99` for graduated `session_conf` keys, with the 
`timezone` precedent already showing the pattern.
   - `>= 100` convention for opaque escape-hatch fields (`parameters`, future 
siblings).
   - `PayloadChunk` semantics (concatenation order, single `InitResponse` 
covering Init + all chunks, first `DataRequest` ending the chunk phase) are 
well-specified.
   - Scala-side: the new cancel-lifecycle doc + idempotency of `cancel`/`close` 
is exactly the kind of contract that pays off in practice.
   
   ---
   
   ### Concerns / suggestions
   
   #### 1. Cancel-vs-finish race contradicts wire-level "mutually exclusive" 
wording (medium)
   
   `Finish` doc in `udf_protocol.proto` says:
   
   ```
   // Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream;
   // they are mutually exclusive. If the engine has already sent
   // [[Finish]] it MUST NOT send [[Cancel]] afterwards (and vice versa).
   message Finish {}
   ```
   
   …but the new Scala doc on `WorkerSession` describes a benign race:
   
   ```
    * Cancel-vs-finish race: when the session driver has finished
    * sending input (and therefore queued an implicit finish on the
    * underlying transport) and a [[cancel]] arrives concurrently, both
    * are valid stream-terminating actions; the response side carries
    * either a `FinishResponse` or a `CancelResponse` depending on which
    * the worker observes first, and either is acceptable to the caller.
   ```
   
   If `Finish` is already queued on the transport and `cancel()` then writes 
`Cancel`, the engine has, on the wire, sent both — violating the proto's MUST 
NOT. Two reconciliations are possible, please pick one and state it clearly in 
the proto:
   
   - a) The wire really does forbid both, so `cancel()` after a buffered 
`Finish` is a no-op on the transport (just an early gRPC half-close). Reword 
the Scala "queued an implicit finish" sentence to match.
   - b) The wire tolerates a `Cancel` after `Finish` *only* before any 
`FinishResponse` has been observed (the engine raced with itself, not with 
another client). Then the proto MUST NOT needs softening to "MUST NOT send 
Cancel after observing FinishResponse" or similar.
   
   As written, the Scala doc invites engine implementations that the proto's 
strict reading prohibits.
   
   #### 2. `InitResponse` timing vs. `PayloadChunk` is ambiguous (low/medium)
   
   `PayloadChunk` says "The single `InitResponse` covers Init plus all of its 
chunks together", implying the worker must wait until chunking completes. But 
the order diagram doesn't say this explicitly, and it interacts with the 
optional `PayloadChunk.last` early-completion hint:
   
   ```
       bytes data = 1;
   
       // (Optional) Set to true on the final chunk. Receivers MAY use
       // this as an early signal that the payload is complete and
       // decoding can begin; receivers that prefer to wait for the
       // first [[DataRequest]] (which marks the end of the chunking
       // phase) MAY ignore this. When unset, the receiver determines
       // completeness by the arrival of the first [[DataRequest]].
       optional bool last = 2;
   ```
   
   Two questions worth pinning down:
   
   - Is the worker permitted to send `InitResponse` before observing all 
`PayloadChunk` messages (e.g., as soon as it's decoded enough to start)? Or 
must it wait until the chunking phase is complete?
   - If `last` is unset, the receiver can only detect end-of-chunking by the 
first `DataRequest`. That's fine for the worker, but it means `InitResponse` 
can never be sent until the first `DataRequest` arrives — i.e., the engine 
cannot block sending data on receiving `InitResponse`. If that's the intent, 
say so; otherwise consider making `last` `true` mandatory when chunking is used.
   
   #### 3. No protocol/version field on `Init` (medium)
   
   `WorkerCapabilities` (in `worker_spec.proto`) presumably handles up-front 
capability negotiation, but a `protocol_version` (or `min_required_version`) on 
`Init` would make per-stream rollout and rollback much simpler, especially 
before the engine has fully read the worker spec. Worth at least reserving a 
field number for it now, even if not populated.
   
   #### 4. Unknown `UDFWorkerDataFormat` values (low)
   
   `Init` says receivers MUST reject `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`, 
which is good, but doesn't say what to do with values outside the worker's 
supported set (e.g., a newer client picks `PARQUET=2` that the worker doesn't 
speak). proto3 will pass unknown enums through as numeric values; the doc 
should say the worker MUST reject any value not in its declared 
`WorkerCapabilities.supported_data_formats`.
   
   #### 5. `WorkerResponse` should require matching branch (low)
   
   ```
   message WorkerResponse {
       // Exactly one branch MUST be set, mirroring the request oneof.
       oneof manage {
           HeartbeatResponse heartbeat = 1;
           ShutdownResponse  shutdown  = 2;
       }
   }
   ```
   
   "Mirroring" is implicit. Suggest tightening to "The engine MUST receive a 
response whose oneof branch matches the request's branch; a mismatched response 
is a protocol error."
   
   #### 6. Empty `DataRequest`/`DataResponse` (low)
   
   ```
   message DataRequest {
       // (Required, non-empty.) Encoded data bytes for one batch in the
       // session-declared format.
       bytes data = 1;
   }
   
   // Worker -> Engine per-batch payload. The worker emits zero or more
   // [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] /
   // [[CancelResponse]]. Sink-style UDFs (which consume input but
   // produce no output rows on the data plane) emit exactly zero.
   message DataResponse {
       // (Required, non-empty.) Encoded data bytes for one batch in the
       // session-declared format.
       bytes data = 1;
   }
   ```
   
   For Arrow this is fine (an empty IPC batch still has a header). For future 
formats that allow truly empty batches, requiring non-empty may be 
inconvenient. Either narrow the wording to "non-empty as defined by the 
session's `data_format`" or accept zero-length payloads and let the decoder 
reject. Not blocking.
   
   #### 7. Behavior of `cancel()` between init and process (low)
   
   ```
    * '''Lifecycle:''' [[cancel]] is idempotent and safe at any point in
    * the session's life:
    *  - before [[init]] -- nothing has been sent on the transport yet,
    *    so [[cancel]] is a no-op (the session may still be closed
    *    normally via [[close]]).
    *  - between [[init]] and [[process]] -- transitions the session
    *    into a cancelled state; subsequent [[process]] calls observe
    *    the cancellation.
   ```
   
   "Subsequent `process` calls observe the cancellation" — by throwing? 
returning an empty iterator? returning a special exception class? Worth nailing 
down. My read of the surrounding contract is that callers expect an exception, 
since `process` returns an iterator and a silent empty result would mask 
cancellations.
   
   #### 8. `Manage(Shutdown)` while `Execute` streams are active (low)
   
   The proto says `Manage` is "independent of any per-execution stream" but 
doesn't say what the worker is supposed to do if `ShutdownRequest` arrives 
while Execute streams are still open. Options:
   
   - Reject (return an error in `ShutdownResponse`?)
   - Accept and let active streams drain.
   - Accept and cancel active streams.
   
   Worth specifying. Today there's no error channel on `ShutdownResponse`; if 
the worker can reject, you may want one.
   
   #### 9. `PayloadChunk` size validation vs. `payload_size` (nit)
   
   `UdfPayload.payload_size` is documented for buffer pre-allocation. It would 
also be useful for chunk validation (sum of chunk bytes + inline payload == 
payload_size). Worth mentioning that validation use, or adding a sentence to 
`PayloadChunk` that the receiver MAY validate against `payload_size`.
   
   #### 10. Service naming (nit)
   
   `service Worker` is quite generic in a project the size of Spark; another 
service named `Worker` could land elsewhere and confuse generated code lookups. 
Consider `UdfWorker` (matches `UDFWorkerSpecification`, `UDFWorkerDataFormat`).
   
   #### 11. Builder usage in README example (nit)
   
   ```scala
   .setUdf(UdfPayload.newBuilder()
     .setPayload(ByteString.copyFrom(serializedFunction))
     .setFormat(payloadFormat))   // worker-recognised tag
   ```
   
   This works because `setUdf` accepts a builder, but Spark's other 
proto-builder snippets tend to call `.build()` explicitly. Either pattern is 
fine, just suggest picking one for consistency across docs.
   
   #### 12. `eval_type` shape (discussion, not blocking)
   
   ```
       // (Optional) Worker / language-specific dispatch hint. A
       // free-form string the worker uses to pick the code path that
       // handles this payload. The protocol does not enumerate eval
       // types because they are language-specific; the client side of
       // the protocol and the worker agree on the namespace and the
       // values.
   ```
   
   This deliberately mirrors PySpark's `PythonEvalType`. The "any string both 
sides agree on" approach is flexible, but in practice every worker will 
hard-code recognised values, so this is effectively an enum with no central 
registry. Worth at least linking from here to a doc that lists the recognised 
values per worker once the first worker lands, so users don't have to grep 
server source.
   
   #### 13. Coupling of abstract `WorkerSession` to generated proto class 
(discussion)
   
   `doInit(message: Init)` now binds every `WorkerSession` implementation 
directly to the generated class. Pros: zero conversion overhead, single source 
of truth. Cons: any proto-incompatible change ripples into every 
implementation. Given the `@Experimental` annotation this is fine for now, but 
it does mean we should be conservative about breaking changes in `Init` going 
forward (more conservative than the proto itself would suggest).
   
   ---
   
   ### Nits
   
   - `common.proto`: typo fixes look good.
   - The doc-link style `[[Init]]` / `[[FinishResponse]]` in proto comments 
isn't standard `protoc-gen-doc` markup but is harmless plain text — fine.
   - `Heartbeat`/`HeartbeatResponse` empty messages: if you'd like richer 
health probes later (e.g., a server-side load hint), reserve a field number now 
to make additive evolution easy.
   - Consider an explicit note on `Heartbeat` about its relationship to gRPC 
keepalive — they aren't the same thing, and users will ask.
   
   ---
   
   ### Verdict
   
   LGTM in shape; the protocol is well-thought-through and the docs are 
noticeably above average. I'd want at least the cancel-vs-finish wire 
contradiction (#1) reconciled and the unknown-`data_format` rejection rule (#4) 
made explicit before merge; the rest are doc clarifications and minor questions.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to