LantaoJin opened a new issue, #95:
URL: https://github.com/apache/datafusion-java/issues/95
### Is your feature request related to a problem or challenge?
PR **#65** shipped a Java-implemented `TableProvider` and
`SessionContext.registerTable(String, TableProvider)`. That covered the
**pull** shape: DataFusion calls `scan(BufferAllocator)` and reads the returned
`ArrowReader`.
But tt does **not** cover the **push** shape that event-driven batch sources
need:
- A coordinator that reduces over shard responses arriving incrementally --
the producer can't materialise an `ArrowReader` because the next batch hasn't
arrived yet.
- A Flight stream feeding into a query -- same problem; the producer is
event-driven.
- Any in-process producer that emits batches as side-effects of other work
and doesn't know in advance how many will arrive.
-
To bridge these into PR #65 today, callers have to write a
`BlockingArrowReader` adapter that buffers pushed batches and serves them
through the pull interface. That's a serialisation point: the producer blocks
waiting for `loadNextBatch()` to be called, or DataFusion blocks waiting for
the next batch -- the two ends can never run truly concurrently. The adapter
also has to invent its own backpressure semantics, error propagation,
end-of-stream signalling, and thread-safety story.
DataFusion itself solves this on the Rust side with `StreamingTable` +
`PartitionStream` plus an mpsc channel: producer pushes `Result<RecordBatch>`
into the sender, the consumer (DataFusion's `StreamingTableExec`) polls the
receiver as part of normal query execution. The two ends decouple via the
channel buffer, with the runtime providing backpressure and cancellation
propagation.
### Describe the solution you'd like
One new method on `SessionContext` returning a `TableSink`:
```java
TableSink sink = ctx.registerStreamingTable("shard_results", schema,
capacity);
// Producer thread (any thread, including outside any Tokio runtime):
try {
while (hasMoreInput()) {
sink.write(batch); // backpressures when channel is full
}
sink.close(); // EOF: queries see end-of-stream cleanly
} catch (Throwable t) {
sink.fail(t); // signal error: queries see RuntimeException
}
```
```java
public final class TableSink implements AutoCloseable {
void write(VectorSchemaRoot batch); // exports via
Data.exportVectorSchemaRoot
void close(); // EOF
void fail(Throwable cause); // error propagated to readers
}
```
After registration the table can be referenced like any other registered
table:
```java
DataFrame df = ctx.sql("SELECT count(*) FROM shard_results");
ArrowReader r = df.executeStream(allocator);
// Producer thread continues writing as r.loadNextBatch() drains.
```
**Single-scan semantics.** The registered table can only be queried *once*.
After that scan completes (or is cancelled), the sink is no longer usable and
the table cannot be re-scanned. This is the natural semantic for an
event-driven producer -- the data is consumed as it arrives -- and matches what
every downstream Substrait/Calcite plan that uses streaming tables already
assumes. Documented loudly on `registerStreamingTable`'s Javadoc; trying to
re-execute against the same registration throws.
This is intentional. Re-scannable streaming would require buffering every
batch internally, which defeats the streaming use case. Callers who need to
re-scan the same data should use the existing `registerTable` /
`SimpleTableProvider` pull shape (PR #65) instead.
### Describe alternatives you've considered
- **`BlockingArrowReader` adapter on top of PR #65's `registerTable`.** What
every caller currently has to hand-roll. It works but pushes the channel +
backpressure + EOF + error story onto every embedder. Bridging via the
upstream-canonical `StreamingTable` shape is strictly less code and gets
cancellation propagation for free.
- **Backpressure-free `try_write`.** A non-blocking variant that returns
`false` when the channel is full. Easy to add later as a follow-up if anyone
wants it; not in scope here. Default `write` blocks, which is the contract
every Java I/O caller expects.
- **Reuse PR #65's `TableProvider` interface and wrap mpsc internally.**
Considered. The problem: PR #65's `scan(BufferAllocator) -> ArrowReader`
returns synchronously, so a mpsc-backed implementation has to block on
`loadNextBatch()` waiting for the producer -- exactly the serialisation point
we're trying to avoid. Going direct to `StreamingTable` + `PartitionStream` is
the right layer.
### Additional context
- The OpenSearch backend's `rust/src/api.rs:572` `register_partition_stream`
is the prior-art template; it does almost exactly this. The Java side there
uses a hand-rolled FFM bridge (`sender_send`) that can be replaced with this
surface as soon as it lands.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]