wiedld commented on code in PR #7379:
URL: https://github.com/apache/arrow-datafusion/pull/7379#discussion_r1353222471
##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -145,12 +145,14 @@ impl ExecutionPlan for CoalescePartitionsExec {
// least one result in an attempt to maximize
// parallelism.
let mut builder =
- RecordBatchReceiverStream::builder(self.schema(),
input_partitions);
+ ReceiverStream::builder(self.schema(), input_partitions);
+ let input =
+
Arc::new(RecordBatchReceiverStreamAdaptor::new(self.input.clone()));
Review Comment:
`RecordBatchReceiverStream` was made generic [in this
commit](https://github.com/apache/arrow-datafusion/pull/7379/commits/0e9573d3950d9dbc005ddb79aa30078db6a0e0f8),
such that it could handle a buffered stream of record_batches, or the
sort_orders (yielded per each merge node).
In order to make generic, did the following:
* create a StreamAdapter trait, with the `StreamAdapter::call()` to be used
for `ReceiverStream::run_input()`.
* impl a `RecordBatchReceiverStreamAdaptor` that is used for record batches
Please let me know if I should have structured this differently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]