dtenedor commented on PR #55657:
URL: https://github.com/apache/spark/pull/55657#issuecomment-4422623788
## PR Review: SPARK-56413 — gRPC UDF execution protocol
### Summary
The PR adds `udf/worker/proto/src/main/protobuf/udf_protocol.proto`, swaps
the placeholder `InitMessage` case class on `WorkerSession` for the generated
`Init` proto, fixes two typos in `common.proto`, and updates the README and a
test. The protocol defines:
- `service Worker { rpc Execute(stream UdfRequest) returns (stream
UdfResponse); rpc Manage(WorkerRequest) returns (WorkerResponse); }`
- `Execute` wire order: `Init` → `PayloadChunk*` →
(`DataRequest`/`DataResponse`)* → exactly one `Finish` or `Cancel`.
- Manage operations: `Heartbeat`, `ShutdownRequest`.
Overall this is a high-quality, well-documented wire contract. The doc
comments are unusually thorough (lifecycle, ordering, "Required/Optional" tags,
escape-hatch conventions, reserved ranges), and the engine/client split — typed
engine-side fields vs. opaque `UdfPayload` carrying everything the client packs
— is a good factoring.
Below are mostly questions / suggested clarifications; only a couple are
blocking-ish.
---
### Strengths
- Clear separation of envelope (`UdfRequest`/`UdfResponse`) vs. control
(`UdfControlRequest`/`UdfControlResponse`) vs. data
(`DataRequest`/`DataResponse`).
- Top-level `bytes data` on `DataRequest`/`DataResponse` (not nested) —
explicit copy-avoidance rationale is great.
- Reserved field range `8 to 99` for graduated `session_conf` keys, with the
`timezone` precedent already showing the pattern.
- `>= 100` convention for opaque escape-hatch fields (`parameters`, future
siblings).
- `PayloadChunk` semantics (concatenation order, single `InitResponse`
covering Init + all chunks, first `DataRequest` ending the chunk phase) are
well-specified.
- Scala-side: the new cancel-lifecycle doc + idempotency of `cancel`/`close`
is exactly the kind of contract that pays off in practice.
---
### Concerns / suggestions
#### 1. Cancel-vs-finish race contradicts wire-level "mutually exclusive"
wording (medium)
`Finish` doc in `udf_protocol.proto` says:
```
// Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream;
// they are mutually exclusive. If the engine has already sent
// [[Finish]] it MUST NOT send [[Cancel]] afterwards (and vice versa).
message Finish {}
```
…but the new Scala doc on `WorkerSession` describes a benign race:
```
* Cancel-vs-finish race: when the session driver has finished
* sending input (and therefore queued an implicit finish on the
* underlying transport) and a [[cancel]] arrives concurrently, both
* are valid stream-terminating actions; the response side carries
* either a `FinishResponse` or a `CancelResponse` depending on which
* the worker observes first, and either is acceptable to the caller.
```
If `Finish` is already queued on the transport and `cancel()` then writes
`Cancel`, the engine has, on the wire, sent both — violating the proto's MUST
NOT. Two reconciliations are possible, please pick one and state it clearly in
the proto:
- a) The wire really does forbid both, so `cancel()` after a buffered
`Finish` is a no-op on the transport (just an early gRPC half-close). Reword
the Scala "queued an implicit finish" sentence to match.
- b) The wire tolerates a `Cancel` after `Finish` *only* before any
`FinishResponse` has been observed (the engine raced with itself, not with
another client). Then the proto MUST NOT needs softening to "MUST NOT send
Cancel after observing FinishResponse" or similar.
As written, the Scala doc invites engine implementations that the proto's
strict reading prohibits.
#### 2. `InitResponse` timing vs. `PayloadChunk` is ambiguous (low/medium)
`PayloadChunk` says "The single `InitResponse` covers Init plus all of its
chunks together", implying the worker must wait until chunking completes. But
the order diagram doesn't say this explicitly, and it interacts with the
optional `PayloadChunk.last` early-completion hint:
```
bytes data = 1;
// (Optional) Set to true on the final chunk. Receivers MAY use
// this as an early signal that the payload is complete and
// decoding can begin; receivers that prefer to wait for the
// first [[DataRequest]] (which marks the end of the chunking
// phase) MAY ignore this. When unset, the receiver determines
// completeness by the arrival of the first [[DataRequest]].
optional bool last = 2;
```
Two questions worth pinning down:
- Is the worker permitted to send `InitResponse` before observing all
`PayloadChunk` messages (e.g., as soon as it's decoded enough to start)? Or
must it wait until the chunking phase is complete?
- If `last` is unset, the receiver can only detect end-of-chunking by the
first `DataRequest`. That's fine for the worker, but it means `InitResponse`
can never be sent until the first `DataRequest` arrives — i.e., the engine
cannot block sending data on receiving `InitResponse`. If that's the intent,
say so; otherwise consider making `last` `true` mandatory when chunking is used.
#### 3. No protocol/version field on `Init` (medium)
`WorkerCapabilities` (in `worker_spec.proto`) presumably handles up-front
capability negotiation, but a `protocol_version` (or `min_required_version`) on
`Init` would make per-stream rollout and rollback much simpler, especially
before the engine has fully read the worker spec. Worth at least reserving a
field number for it now, even if not populated.
#### 4. Unknown `UDFWorkerDataFormat` values (low)
`Init` says receivers MUST reject `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`,
which is good, but doesn't say what to do with values outside the worker's
supported set (e.g., a newer client picks `PARQUET=2` that the worker doesn't
speak). proto3 will pass unknown enums through as numeric values; the doc
should say the worker MUST reject any value not in its declared
`WorkerCapabilities.supported_data_formats`.
#### 5. `WorkerResponse` should require matching branch (low)
```
message WorkerResponse {
// Exactly one branch MUST be set, mirroring the request oneof.
oneof manage {
HeartbeatResponse heartbeat = 1;
ShutdownResponse shutdown = 2;
}
}
```
"Mirroring" is implicit. Suggest tightening to "The engine MUST receive a
response whose oneof branch matches the request's branch; a mismatched response
is a protocol error."
#### 6. Empty `DataRequest`/`DataResponse` (low)
```
message DataRequest {
// (Required, non-empty.) Encoded data bytes for one batch in the
// session-declared format.
bytes data = 1;
}
// Worker -> Engine per-batch payload. The worker emits zero or more
// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] /
// [[CancelResponse]]. Sink-style UDFs (which consume input but
// produce no output rows on the data plane) emit exactly zero.
message DataResponse {
// (Required, non-empty.) Encoded data bytes for one batch in the
// session-declared format.
bytes data = 1;
}
```
For Arrow this is fine (an empty IPC batch still has a header). For future
formats that allow truly empty batches, requiring non-empty may be
inconvenient. Either narrow the wording to "non-empty as defined by the
session's `data_format`" or accept zero-length payloads and let the decoder
reject. Not blocking.
#### 7. Behavior of `cancel()` between init and process (low)
```
* '''Lifecycle:''' [[cancel]] is idempotent and safe at any point in
* the session's life:
* - before [[init]] -- nothing has been sent on the transport yet,
* so [[cancel]] is a no-op (the session may still be closed
* normally via [[close]]).
* - between [[init]] and [[process]] -- transitions the session
* into a cancelled state; subsequent [[process]] calls observe
* the cancellation.
```
"Subsequent `process` calls observe the cancellation" — by throwing?
returning an empty iterator? returning a special exception class? Worth nailing
down. My read of the surrounding contract is that callers expect an exception,
since `process` returns an iterator and a silent empty result would mask
cancellations.
#### 8. `Manage(Shutdown)` while `Execute` streams are active (low)
The proto says `Manage` is "independent of any per-execution stream" but
doesn't say what the worker is supposed to do if `ShutdownRequest` arrives
while Execute streams are still open. Options:
- Reject (return an error in `ShutdownResponse`?)
- Accept and let active streams drain.
- Accept and cancel active streams.
Worth specifying. Today there's no error channel on `ShutdownResponse`; if
the worker can reject, you may want one.
#### 9. `PayloadChunk` size validation vs. `payload_size` (nit)
`UdfPayload.payload_size` is documented for buffer pre-allocation. It would
also be useful for chunk validation (sum of chunk bytes + inline payload ==
payload_size). Worth mentioning that validation use, or adding a sentence to
`PayloadChunk` that the receiver MAY validate against `payload_size`.
#### 10. Service naming (nit)
`service Worker` is quite generic in a project the size of Spark; another
service named `Worker` could land elsewhere and confuse generated code lookups.
Consider `UdfWorker` (matches `UDFWorkerSpecification`, `UDFWorkerDataFormat`).
#### 11. Builder usage in README example (nit)
```scala
.setUdf(UdfPayload.newBuilder()
.setPayload(ByteString.copyFrom(serializedFunction))
.setFormat(payloadFormat)) // worker-recognised tag
```
This works because `setUdf` accepts a builder, but Spark's other
proto-builder snippets tend to call `.build()` explicitly. Either pattern is
fine, just suggest picking one for consistency across docs.
#### 12. `eval_type` shape (discussion, not blocking)
```
// (Optional) Worker / language-specific dispatch hint. A
// free-form string the worker uses to pick the code path that
// handles this payload. The protocol does not enumerate eval
// types because they are language-specific; the client side of
// the protocol and the worker agree on the namespace and the
// values.
```
This deliberately mirrors PySpark's `PythonEvalType`. The "any string both
sides agree on" approach is flexible, but in practice every worker will
hard-code recognised values, so this is effectively an enum with no central
registry. Worth at least linking from here to a doc that lists the recognised
values per worker once the first worker lands, so users don't have to grep
server source.
#### 13. Coupling of abstract `WorkerSession` to generated proto class
(discussion)
`doInit(message: Init)` now binds every `WorkerSession` implementation
directly to the generated class. Pros: zero conversion overhead, single source
of truth. Cons: any proto-incompatible change ripples into every
implementation. Given the `@Experimental` annotation this is fine for now, but
it does mean we should be conservative about breaking changes in `Init` going
forward (more conservative than the proto itself would suggest).
---
### Nits
- `common.proto`: typo fixes look good.
- The doc-link style `[[Init]]` / `[[FinishResponse]]` in proto comments
isn't standard `protoc-gen-doc` markup but is harmless plain text — fine.
- `Heartbeat`/`HeartbeatResponse` empty messages: if you'd like richer
health probes later (e.g., a server-side load hint), reserve a field number now
to make additive evolution easy.
- Consider an explicit note on `Heartbeat` about its relationship to gRPC
keepalive — they aren't the same thing, and users will ask.
---
### Verdict
LGTM in shape; the protocol is well-thought-through and the docs are
noticeably above average. I'd want at least the cancel-vs-finish wire
contradiction (#1) reconciled and the unknown-`data_format` rejection rule (#4)
made explicit before merge; the rest are doc clarifications and minor questions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]