andygrove opened a new issue, #4383:
URL: https://github.com/apache/datafusion-comet/issues/4383

   ## What is the problem the feature request solves?
   
   PR #4234 streams Comet's columnar batches to the Python worker via 
`CometColumnarPythonInput.writeNextBatchToArrowStream`. Per batch, the current 
path does two copies of every buffer:
   
   1. **Comet's `FieldVector` → destination IPC root.** `copyVector` walks the 
source/destination trees in lockstep, allocates destination child vectors sized 
from the source, and `ArrowBuf.setBytes`-copies each buffer. The destination 
root is a child of `ArrowUtils.rootAllocator`, set up by Spark's 
`PythonArrowInput` trait.
   2. **Destination IPC root → pipe.** `VectorUnloader.getRecordBatch` walks 
the root and produces an `ArrowRecordBatch`; `MessageSerializer.serialize` 
writes bytes to the `DataOutputStream` that feeds the Python worker's stdin.
   
   Copy 2 is structural: Spark's transport to Python is fork + pipe + Arrow 
IPC, so the bytes must reach the pipe at least once. Copy 1 is droppable — we 
can build the IPC stream directly from Comet's vectors and skip the 
intermediate root entirely. After this change the path is at the single-copy 
floor for JVM-to-Python Arrow transport.
   
   The cross-allocator constraint discussed in #4294 is independent of this: 
even after copy 1 is dropped, JVM-side zero-copy via `TransferPair` is blocked 
because Comet's source `FieldVector`s are imported from native via Arrow C Data 
Interface (their buffers route `release` through FFI), while Spark's 
destination IPC root is a child of `ArrowUtils.rootAllocator`. The two 
reference managers cannot share buffers. The path here doesn't try to: it 
bypasses the intermediate root entirely.
   
   ## Describe the potential solution
   
   `spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:217-241` 
(`serializeBatches`) already does the equivalent walk for the shuffle path: 
build a `VectorSchemaRoot` from Comet's `FieldVector`s, run 
`ArrowStreamWriter.writeBatch` against it, write the resulting bytes to a 
stream. The wire format is the same Arrow IPC the Python worker reads.
   
   For `CometColumnarPythonInput`, the shape is slightly different because 
Spark's `PythonArrowInput` trait owns the `ArrowStreamWriter` and writes the 
schema header once via `writer.start()`. The per-batch contract is "append one 
`RecordBatch` to the already-open stream", not "write a full IPC stream". The 
implementation outline:
   
   - The trait's destination `root` is still constructed (its schema is what 
`writer.start()` serialises into the stream header). It is never populated.
   - Per batch, build an `ArrowRecordBatch` directly from Comet's source 
vectors:
     - Allocate one small validity buffer for the wrapping struct (all-`0xff`).
     - Collect `ArrowFieldNode`s and `ArrowBuf`s in the same depth-first order 
`VectorUnloader.appendNodes` uses, with Comet's `FieldVector` buffers in place 
of the destination root's.
     - Pass the assembled batch to `MessageSerializer.serialize`.
   
   Net result: per-buffer `setBytes` and per-child `allocateNew` go away. The 
struct validity buffer (a few bytes per batch) is the only remaining JVM-side 
allocation on the input path.
   
   ## Additional context
   
   - Reference comments live in 
`spark/src/main/spark-4.x/org/apache/spark/sql/execution/python/CometColumnarPythonInput.scala`
 and `docs/source/user-guide/latest/pyarrow-udfs.md` (PR #4234), both already 
framed as "two copies, one droppable" so the doc updates will be local.
   - Benchmark targets to beat: 
`spark/src/test/resources/pyspark/benchmark_pyarrow_udf.py`. Wide-row case 
(1.42x today) is where copy 1 dominates and should see the largest delta. 
Narrow / mixed (1.92x / 1.79x today) are bottlenecked on Python fork/IPC and 
should improve modestly.
   - Related #4234 (introduces the operator), #4294 (proposes a more invasive 
allocator-tree change to enable `TransferPair`-based zero-copy; the path 
proposed here is orthogonal and lands the larger share of the win without that 
refactor).
   - Originally surfaced in @mbutrovich's review on #4234 (item 3, "two copies 
→ one copy").


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to