LantaoJin opened a new issue, #50:
URL: https://github.com/apache/datafusion-java/issues/50
### Is your feature request related to a problem or challenge?
`DataFrame.collect(allocator)`
(`core/src/main/java/org/apache/datafusion/DataFrame.java`) is the only way to
retrieve query results today. Internally, the native side calls
`df.collect().await` (`native/src/lib.rs`) which materializes the entire result
set into a `Vec<RecordBatch>` on the Rust heap *before* the first batch crosses
the FFI boundary into Java.
For TB-scale queries, or any query whose result set exceeds the native heap
budget, this OOMs the Rust side regardless of how memory accounting is
configured downstream. The Java caller can drain batches one at a time from the
returned `ArrowReader`, but by the time the reader exists, every batch is
already buffered in native memory.
`SessionContext::execute_stream` already exists upstream — it returns a
`SendableRecordBatchStream` (`Pin<Box<dyn RecordBatchStream + Send>>`) that
yields one batch at a time. The Java binding just needs to wire that to the
existing `FFI_ArrowArrayStream` path and hand the resulting reader back to Java.
This is what most embedders running production-shaped workloads need by
default; `collect()` is the right API for small results but the wrong default
for analytics-scale queries.
### Describe the solution you'd like
A new method on `DataFrame`, peer to `collect`:
```java
try (DataFrame df = ctx.sql("SELECT ... FROM big_table");
ArrowReader reader = df.executeStream(allocator)) {
while (reader.loadNextBatch()) {
VectorSchemaRoot batch = reader.getVectorSchemaRoot();
// process one batch at a time; nothing else materialized
}
}
```
Same return type as `collect` (`ArrowReader`), same lifecycle rules (caller
closes, allocator must outlive). The semantic difference is purely "one batch
is in memory at a time" instead of "all batches in memory before the first is
yielded."
### Native shape
The Rust side already exports record batches via
`FFI_ArrowArrayStream::new(Box::new(iter))` (see the existing
`Java_org_apache_datafusion_DataFrame_collectDataFrame` handler in
`native/src/lib.rs`). The streaming variant substitutes a `RecordBatchReader`
that pulls from `df.execute_stream()` lazily:
```rust
struct StreamingReader {
schema: SchemaRef,
stream: Pin<Box<SendableRecordBatchStream>>,
}
impl Iterator for StreamingReader {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
runtime().block_on(self.stream.next())
.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e))))
}
}
impl RecordBatchReader for StreamingReader {
fn schema(&self) -> SchemaRef { self.schema.clone() }
}
```
Each `loadNextBatch()` call from Java drives one
`runtime().block_on(stream.next())` on the Rust side. No `Vec<RecordBatch>` is
ever built; memory pressure stays bounded by what the executor's pipeline holds
plus one batch in flight.
### Lifecycle
- Like `collect`, `executeStream` *consumes* the `DataFrame`: the native
plan handle is released as the stream is established. Subsequent
`executeStream` or `collect` on the same instance throws
`IllegalStateException`.
- `close()` on a successfully-streamed `DataFrame` is a no-op; the
`ArrowReader` is the new owner of native resources.
- If construction of the stream fails (e.g. plan error), the native handle
must be cleaned up so the original `DataFrame.close()` doesn't double-free.
### Coexistence with `collect`
Both methods stay. `collect` remains the right call for small results where
you want a single owning buffer; `executeStream` is the right call for
unbounded or large results. After this PR lands, the two paths could share a
single Rust implementation (`collect` becomes `executeStream` + `concat`), but
that re-implementation is out of scope here — keeping `collect` on its current
code path keeps the diff small and lets a follow-up consolidate.
### Describe alternatives you've considered
1. **Replace `collect` with `executeStream` and offer
`collectAll(allocator)` for the small-result case.** Cleaner long-term but
breaks an unreleased-but-shipping API.
2. **Add a `BatchSize` knob that limits how much `collect` materializes
before yielding control.** Doesn't actually fix the OOM — the executor still
buffers everything; just makes it harder to reason about.
3. **Expose `execute_stream_partitioned`.** Useful for parallel consumers,
but a single-stream API is the v1 most callers want. Partitioned can be a
follow-up.
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]