wiedld commented on code in PR #7379:
URL: https://github.com/apache/arrow-datafusion/pull/7379#discussion_r1353222471


##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -145,12 +145,14 @@ impl ExecutionPlan for CoalescePartitionsExec {
                 // least one result in an attempt to maximize
                 // parallelism.
                 let mut builder =
-                    RecordBatchReceiverStream::builder(self.schema(), 
input_partitions);
+                    ReceiverStream::builder(self.schema(), input_partitions);
+                let input =
+                    
Arc::new(RecordBatchReceiverStreamAdaptor::new(self.input.clone()));

Review Comment:
   `RecordBatchReceiverStream` was made generic [in this 
commit](https://github.com/apache/arrow-datafusion/pull/7379/commits/0e9573d3950d9dbc005ddb79aa30078db6a0e0f8),
 such that it could handle a buffered stream of record_batches, or the 
sort_orders (yielded per each merge node). 
   
   In order to make generic, did the following:
   * create a StreamAdapter trait, with the `StreamAdapter::call()` to be used 
for `ReceiverStream::run_input()`.
   * impl a `RecordBatchReceiverStreamAdaptor` that is used for record batches
   
   Please let me know if I should have structured this differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to