tustvold commented on code in PR #4867:
URL: https://github.com/apache/arrow-datafusion/pull/4867#discussion_r1067038365


##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -132,70 +135,103 @@ impl BatchPartitioner {
     where
         F: FnMut(usize, RecordBatch) -> Result<()>,
     {
-        match &mut self.state {
-            BatchPartitionerState::RoundRobin {
-                num_partitions,
-                next_idx,
-            } => {
-                let idx = *next_idx;
-                *next_idx = (*next_idx + 1) % *num_partitions;
-                f(idx, batch)?;
-            }
-            BatchPartitionerState::Hash {
-                random_state,
-                exprs,
-                num_partitions: partitions,
-                hash_buffer,
-            } => {
-                let mut timer = self.timer.timer();
-
-                let arrays = exprs
-                    .iter()
-                    .map(|expr| 
Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
-                    .collect::<Result<Vec<_>>>()?;
+        self.partition_iter(batch)?.try_for_each(|res| match res {
+            Ok((partition, batch)) => f(partition, batch),
+            Err(e) => Err(e),
+        })
+    }
 
-                hash_buffer.clear();
-                hash_buffer.resize(batch.num_rows(), 0);
+    /// Actual implementation of [`partition`](Self::partition).
+    ///
+    /// The reason this was pulled out is that we need to have a variant of 
`partition` that works w/ sync functions,
+    /// and one that works w/ async. Using an iterator as an intermediate 
representation was the best way to achieve
+    /// this (so we don't need to clone the entire implementation).
+    fn partition_iter(
+        &mut self,
+        batch: RecordBatch,
+    ) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + 
'_> {
+        let it: Box<dyn Iterator<Item = Result<(usize, RecordBatch)>> + Send> =
+            match &mut self.state {
+                BatchPartitionerState::RoundRobin {
+                    num_partitions,
+                    next_idx,
+                } => {
+                    let idx = *next_idx;
+                    *next_idx = (*next_idx + 1) % *num_partitions;
+                    Box::new(std::iter::once(Ok((idx, batch))))
+                }
+                BatchPartitionerState::Hash {
+                    random_state,
+                    exprs,
+                    num_partitions: partitions,
+                    hash_buffer,
+                } => {
+                    let mut timer = self.timer.timer();
+
+                    let arrays = exprs
+                        .iter()
+                        .map(|expr| {
+                            
Ok(expr.evaluate(&batch)?.into_array(batch.num_rows()))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
 
-                create_hashes(&arrays, random_state, hash_buffer)?;
+                    hash_buffer.clear();
+                    hash_buffer.resize(batch.num_rows(), 0);
 
-                let mut indices: Vec<_> = (0..*partitions)
-                    .map(|_| UInt64Builder::with_capacity(batch.num_rows()))
-                    .collect();
+                    create_hashes(&arrays, random_state, hash_buffer)?;
 
-                for (index, hash) in hash_buffer.iter().enumerate() {
-                    indices[(*hash % *partitions as u64) as usize]
-                        .append_value(index as u64);
-                }
+                    let mut indices: Vec<_> = (0..*partitions)
+                        .map(|_| 
UInt64Builder::with_capacity(batch.num_rows()))
+                        .collect();
 
-                for (partition, mut indices) in 
indices.into_iter().enumerate() {
-                    let indices = indices.finish();
-                    if indices.is_empty() {
-                        continue;
+                    for (index, hash) in hash_buffer.iter().enumerate() {
+                        indices[(*hash % *partitions as u64) as usize]
+                            .append_value(index as u64);
                     }
 
-                    // Produce batches based on indices
-                    let columns = batch
-                        .columns()
-                        .iter()
-                        .map(|c| {
-                            arrow::compute::take(c.as_ref(), &indices, None)
-                                .map_err(DataFusionError::ArrowError)
+                    let it = indices
+                        .into_iter()
+                        .enumerate()
+                        .filter_map(|(partition, mut indices)| {
+                            let indices = indices.finish();
+                            (!indices.is_empty()).then_some((partition, 
indices))
                         })
-                        .collect::<Result<Vec<ArrayRef>>>()?;
-
-                    let batch = RecordBatch::try_new(batch.schema(), 
columns).unwrap();
-
-                    timer.stop();
-                    f(partition, batch)?;
-                    timer.restart();
+                        .map(move |(partition, indices)| {
+                            // Produce batches based on indices
+                            let columns = batch
+                                .columns()
+                                .iter()
+                                .map(|c| {
+                                    arrow::compute::take(c.as_ref(), &indices, 
None)
+                                        .map_err(DataFusionError::ArrowError)
+                                })
+                                .collect::<Result<Vec<ArrayRef>>>()?;
+
+                            let batch =
+                                RecordBatch::try_new(batch.schema(), 
columns).unwrap();
+
+                            timer.stop();
+                            timer.restart();
+
+                            Ok((partition, batch))
+                        });
+
+                    Box::new(it)
                 }
-            }
-        }
-        Ok(())
+            };
+
+        Ok(it)
     }
 }
 
+trait PartitiontFun {

Review Comment:
   ```suggestion
   trait PartitionFun {
   ```
   



##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -132,70 +135,103 @@ impl BatchPartitioner {
     where
         F: FnMut(usize, RecordBatch) -> Result<()>,
     {
-        match &mut self.state {
-            BatchPartitionerState::RoundRobin {
-                num_partitions,
-                next_idx,
-            } => {
-                let idx = *next_idx;
-                *next_idx = (*next_idx + 1) % *num_partitions;
-                f(idx, batch)?;
-            }
-            BatchPartitionerState::Hash {
-                random_state,
-                exprs,
-                num_partitions: partitions,
-                hash_buffer,
-            } => {
-                let mut timer = self.timer.timer();
-
-                let arrays = exprs
-                    .iter()
-                    .map(|expr| 
Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
-                    .collect::<Result<Vec<_>>>()?;
+        self.partition_iter(batch)?.try_for_each(|res| match res {
+            Ok((partition, batch)) => f(partition, batch),
+            Err(e) => Err(e),
+        })
+    }
 
-                hash_buffer.clear();
-                hash_buffer.resize(batch.num_rows(), 0);
+    /// Actual implementation of [`partition`](Self::partition).
+    ///
+    /// The reason this was pulled out is that we need to have a variant of 
`partition` that works w/ sync functions,
+    /// and one that works w/ async. Using an iterator as an intermediate 
representation was the best way to achieve
+    /// this (so we don't need to clone the entire implementation).
+    fn partition_iter(
+        &mut self,
+        batch: RecordBatch,
+    ) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + 
'_> {
+        let it: Box<dyn Iterator<Item = Result<(usize, RecordBatch)>> + Send> =
+            match &mut self.state {
+                BatchPartitionerState::RoundRobin {
+                    num_partitions,
+                    next_idx,
+                } => {
+                    let idx = *next_idx;
+                    *next_idx = (*next_idx + 1) % *num_partitions;
+                    Box::new(std::iter::once(Ok((idx, batch))))
+                }
+                BatchPartitionerState::Hash {
+                    random_state,
+                    exprs,
+                    num_partitions: partitions,
+                    hash_buffer,
+                } => {
+                    let mut timer = self.timer.timer();
+
+                    let arrays = exprs
+                        .iter()
+                        .map(|expr| {
+                            
Ok(expr.evaluate(&batch)?.into_array(batch.num_rows()))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
 
-                create_hashes(&arrays, random_state, hash_buffer)?;
+                    hash_buffer.clear();
+                    hash_buffer.resize(batch.num_rows(), 0);
 
-                let mut indices: Vec<_> = (0..*partitions)
-                    .map(|_| UInt64Builder::with_capacity(batch.num_rows()))
-                    .collect();
+                    create_hashes(&arrays, random_state, hash_buffer)?;
 
-                for (index, hash) in hash_buffer.iter().enumerate() {
-                    indices[(*hash % *partitions as u64) as usize]
-                        .append_value(index as u64);
-                }
+                    let mut indices: Vec<_> = (0..*partitions)
+                        .map(|_| 
UInt64Builder::with_capacity(batch.num_rows()))
+                        .collect();
 
-                for (partition, mut indices) in 
indices.into_iter().enumerate() {
-                    let indices = indices.finish();
-                    if indices.is_empty() {
-                        continue;
+                    for (index, hash) in hash_buffer.iter().enumerate() {
+                        indices[(*hash % *partitions as u64) as usize]
+                            .append_value(index as u64);
                     }
 
-                    // Produce batches based on indices
-                    let columns = batch
-                        .columns()
-                        .iter()
-                        .map(|c| {
-                            arrow::compute::take(c.as_ref(), &indices, None)
-                                .map_err(DataFusionError::ArrowError)
+                    let it = indices
+                        .into_iter()
+                        .enumerate()
+                        .filter_map(|(partition, mut indices)| {
+                            let indices = indices.finish();
+                            (!indices.is_empty()).then_some((partition, 
indices))
                         })
-                        .collect::<Result<Vec<ArrayRef>>>()?;
-
-                    let batch = RecordBatch::try_new(batch.schema(), 
columns).unwrap();
-
-                    timer.stop();
-                    f(partition, batch)?;
-                    timer.restart();
+                        .map(move |(partition, indices)| {
+                            // Produce batches based on indices
+                            let columns = batch
+                                .columns()
+                                .iter()
+                                .map(|c| {
+                                    arrow::compute::take(c.as_ref(), &indices, 
None)
+                                        .map_err(DataFusionError::ArrowError)
+                                })
+                                .collect::<Result<Vec<ArrayRef>>>()?;
+
+                            let batch =
+                                RecordBatch::try_new(batch.schema(), 
columns).unwrap();
+
+                            timer.stop();
+                            timer.restart();

Review Comment:
   ```suggestion
   ```
   This is no longer needed as f has been removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to