LantaoJin opened a new issue, #50:
URL: https://github.com/apache/datafusion-java/issues/50

   ### Is your feature request related to a problem or challenge?
   
   `DataFrame.collect(allocator)` 
(`core/src/main/java/org/apache/datafusion/DataFrame.java`) is the only way to 
retrieve query results today. Internally, the native side calls 
`df.collect().await` (`native/src/lib.rs`) which materializes the entire result 
set into a `Vec<RecordBatch>` on the Rust heap *before* the first batch crosses 
the FFI boundary into Java.
   
   For TB-scale queries, or any query whose result set exceeds the native heap 
budget, this OOMs the Rust side regardless of how memory accounting is 
configured downstream. The Java caller can drain batches one at a time from the 
returned `ArrowReader`, but by the time the reader exists, every batch is 
already buffered in native memory.
   
   `SessionContext::execute_stream` already exists upstream — it returns a 
`SendableRecordBatchStream` (`Pin<Box<dyn RecordBatchStream + Send>>`) that 
yields one batch at a time. The Java binding just needs to wire that to the 
existing `FFI_ArrowArrayStream` path and hand the resulting reader back to Java.
   
   This is what most embedders running production-shaped workloads need by 
default; `collect()` is the right API for small results but the wrong default 
for analytics-scale queries.
   
   ### Describe the solution you'd like
   
   A new method on `DataFrame`, peer to `collect`:
   
   ```java
   try (DataFrame df = ctx.sql("SELECT ... FROM big_table");
        ArrowReader reader = df.executeStream(allocator)) {
       while (reader.loadNextBatch()) {
           VectorSchemaRoot batch = reader.getVectorSchemaRoot();
           // process one batch at a time; nothing else materialized
       }
   }
   ```
   
   Same return type as `collect` (`ArrowReader`), same lifecycle rules (caller 
closes, allocator must outlive). The semantic difference is purely "one batch 
is in memory at a time" instead of "all batches in memory before the first is 
yielded."
   
   ### Native shape
   
   The Rust side already exports record batches via 
`FFI_ArrowArrayStream::new(Box::new(iter))` (see the existing 
`Java_org_apache_datafusion_DataFrame_collectDataFrame` handler in 
`native/src/lib.rs`). The streaming variant substitutes a `RecordBatchReader` 
that pulls from `df.execute_stream()` lazily:
   
   ```rust
   struct StreamingReader {
       schema: SchemaRef,
       stream: Pin<Box<SendableRecordBatchStream>>,
   }
   
   impl Iterator for StreamingReader {
       type Item = Result<RecordBatch, ArrowError>;
       fn next(&mut self) -> Option<Self::Item> {
           runtime().block_on(self.stream.next())
               .map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e))))
       }
   }
   
   impl RecordBatchReader for StreamingReader {
       fn schema(&self) -> SchemaRef { self.schema.clone() }
   }
   ```
   
   Each `loadNextBatch()` call from Java drives one 
`runtime().block_on(stream.next())` on the Rust side. No `Vec<RecordBatch>` is 
ever built; memory pressure stays bounded by what the executor's pipeline holds 
plus one batch in flight.
   
   ### Lifecycle
   
   - Like `collect`, `executeStream` *consumes* the `DataFrame`: the native 
plan handle is released as the stream is established. Subsequent 
`executeStream` or `collect` on the same instance throws 
`IllegalStateException`.
   - `close()` on a successfully-streamed `DataFrame` is a no-op; the 
`ArrowReader` is the new owner of native resources.
   - If construction of the stream fails (e.g. plan error), the native handle 
must be cleaned up so the original `DataFrame.close()` doesn't double-free.
   
   ### Coexistence with `collect`
   
   Both methods stay. `collect` remains the right call for small results where 
you want a single owning buffer; `executeStream` is the right call for 
unbounded or large results. After this PR lands, the two paths could share a 
single Rust implementation (`collect` becomes `executeStream` + `concat`), but 
that re-implementation is out of scope here — keeping `collect` on its current 
code path keeps the diff small and lets a follow-up consolidate.
   
   ### Describe alternatives you've considered
   
   1. **Replace `collect` with `executeStream` and offer 
`collectAll(allocator)` for the small-result case.** Cleaner long-term but 
breaks an unreleased-but-shipping API.
   
   2. **Add a `BatchSize` knob that limits how much `collect` materializes 
before yielding control.** Doesn't actually fix the OOM — the executor still 
buffers everything; just makes it harder to reason about.
   
   3. **Expose `execute_stream_partitioned`.** Useful for parallel consumers, 
but a single-stream API is the v1 most callers want. Partitioned can be a 
follow-up.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to