andygrove opened a new issue, #4383:
URL: https://github.com/apache/datafusion-comet/issues/4383
## What is the problem the feature request solves?
PR #4234 streams Comet's columnar batches to the Python worker via
`CometColumnarPythonInput.writeNextBatchToArrowStream`. Per batch, the current
path does two copies of every buffer:
1. **Comet's `FieldVector` → destination IPC root.** `copyVector` walks the
source/destination trees in lockstep, allocates destination child vectors sized
from the source, and `ArrowBuf.setBytes`-copies each buffer. The destination
root is a child of `ArrowUtils.rootAllocator`, set up by Spark's
`PythonArrowInput` trait.
2. **Destination IPC root → pipe.** `VectorUnloader.getRecordBatch` walks
the root and produces an `ArrowRecordBatch`; `MessageSerializer.serialize`
writes bytes to the `DataOutputStream` that feeds the Python worker's stdin.
Copy 2 is structural: Spark's transport to Python is fork + pipe + Arrow
IPC, so the bytes must reach the pipe at least once. Copy 1 is droppable — we
can build the IPC stream directly from Comet's vectors and skip the
intermediate root entirely. After this change the path is at the single-copy
floor for JVM-to-Python Arrow transport.
The cross-allocator constraint discussed in #4294 is independent of this:
even after copy 1 is dropped, JVM-side zero-copy via `TransferPair` is blocked
because Comet's source `FieldVector`s are imported from native via Arrow C Data
Interface (their buffers route `release` through FFI), while Spark's
destination IPC root is a child of `ArrowUtils.rootAllocator`. The two
reference managers cannot share buffers. The path here doesn't try to: it
bypasses the intermediate root entirely.
## Describe the potential solution
`spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:217-241`
(`serializeBatches`) already does the equivalent walk for the shuffle path:
build a `VectorSchemaRoot` from Comet's `FieldVector`s, run
`ArrowStreamWriter.writeBatch` against it, write the resulting bytes to a
stream. The wire format is the same Arrow IPC the Python worker reads.
For `CometColumnarPythonInput`, the shape is slightly different because
Spark's `PythonArrowInput` trait owns the `ArrowStreamWriter` and writes the
schema header once via `writer.start()`. The per-batch contract is "append one
`RecordBatch` to the already-open stream", not "write a full IPC stream". The
implementation outline:
- The trait's destination `root` is still constructed (its schema is what
`writer.start()` serialises into the stream header). It is never populated.
- Per batch, build an `ArrowRecordBatch` directly from Comet's source
vectors:
- Allocate one small validity buffer for the wrapping struct (all-`0xff`).
- Collect `ArrowFieldNode`s and `ArrowBuf`s in the same depth-first order
`VectorUnloader.appendNodes` uses, with Comet's `FieldVector` buffers in place
of the destination root's.
- Pass the assembled batch to `MessageSerializer.serialize`.
Net result: per-buffer `setBytes` and per-child `allocateNew` go away. The
struct validity buffer (a few bytes per batch) is the only remaining JVM-side
allocation on the input path.
## Additional context
- Reference comments live in
`spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometColumnarPythonInput.scala`
and `docs/source/user-guide/latest/pyarrow-udfs.md` (PR #4234), both already
framed as "two copies, one droppable" so the doc updates will be local.
- Benchmark targets to beat:
`spark/src/test/resources/pyspark/benchmark_pyarrow_udf.py`. Wide-row case
(1.42x today) is where copy 1 dominates and should see the largest delta.
Narrow / mixed (1.92x / 1.79x today) are bottlenecked on Python fork/IPC and
should improve modestly.
- Related #4234 (introduces the operator), #4294 (proposes a more invasive
allocator-tree change to enable `TransferPair`-based zero-copy; the path
proposed here is orthogonal and lands the larger share of the win without that
refactor).
- Originally surfaced in @mbutrovich's review on #4234 (item 3, "two copies
→ one copy").
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]