This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fdb5454930 Move source repartitioning into 
`ExecutionPlan::repartition` (#7936)
fdb5454930 is described below

commit fdb54549301ccbede0b48788ff0fa057dd799d63
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Oct 30 13:17:04 2023 -0400

    Move source repartitioning into `ExecutionPlan::repartition` (#7936)
    
    * Move source repartitioning into ExecutionPlan::repartition
    
    * cleanup
    
    * update test
    
    * update test
    
    * refine docs
    
    * fix merge
---
 .../core/src/datasource/physical_plan/csv.rs       | 58 ++++++-------
 .../core/src/datasource/physical_plan/mod.rs       | 96 ++++++++++------------
 .../core/src/datasource/physical_plan/parquet.rs   | 41 ++++-----
 .../src/physical_optimizer/enforce_distribution.rs | 28 ++-----
 datafusion/physical-plan/src/lib.rs                | 30 ++++++-
 5 files changed, 130 insertions(+), 123 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 8117e101ea..82163da64a 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -46,6 +46,7 @@ use datafusion_physical_expr::{
 };
 
 use bytes::{Buf, Bytes};
+use datafusion_common::config::ConfigOptions;
 use futures::{ready, StreamExt, TryStreamExt};
 use object_store::{GetOptions, GetResultPayload, ObjectStore};
 use tokio::io::AsyncWriteExt;
@@ -117,34 +118,6 @@ impl CsvExec {
     pub fn escape(&self) -> Option<u8> {
         self.escape
     }
-
-    /// Redistribute files across partitions according to their size
-    /// See comments on `repartition_file_groups()` for more detail.
-    ///
-    /// Return `None` if can't get repartitioned(empty/compressed file).
-    pub fn get_repartitioned(
-        &self,
-        target_partitions: usize,
-        repartition_file_min_size: usize,
-    ) -> Option<Self> {
-        // Parallel execution on compressed CSV file is not supported yet.
-        if self.file_compression_type.is_compressed() {
-            return None;
-        }
-
-        let repartitioned_file_groups_option = 
FileScanConfig::repartition_file_groups(
-            self.base_config.file_groups.clone(),
-            target_partitions,
-            repartition_file_min_size,
-        );
-
-        if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
-            let mut new_plan = self.clone();
-            new_plan.base_config.file_groups = repartitioned_file_groups;
-            return Some(new_plan);
-        }
-        None
-    }
 }
 
 impl DisplayAs for CsvExec {
@@ -205,6 +178,35 @@ impl ExecutionPlan for CsvExec {
         Ok(self)
     }
 
+    /// Redistribute files across partitions according to their size
+    /// See comments on `repartition_file_groups()` for more detail.
+    ///
+    /// Return `None` if can't get repartitioned(empty/compressed file).
+    fn repartitioned(
+        &self,
+        target_partitions: usize,
+        config: &ConfigOptions,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let repartition_file_min_size = 
config.optimizer.repartition_file_min_size;
+        // Parallel execution on compressed CSV file is not supported yet.
+        if self.file_compression_type.is_compressed() {
+            return Ok(None);
+        }
+
+        let repartitioned_file_groups_option = 
FileScanConfig::repartition_file_groups(
+            self.base_config.file_groups.clone(),
+            target_partitions,
+            repartition_file_min_size,
+        );
+
+        if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
+            let mut new_plan = self.clone();
+            new_plan.base_config.file_groups = repartitioned_file_groups;
+            return Ok(Some(Arc::new(new_plan)));
+        }
+        Ok(None)
+    }
+
     fn execute(
         &self,
         partition: usize,
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 6643e4127d..ea0a9698ff 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -527,6 +527,7 @@ mod tests {
     };
     use arrow_schema::Field;
     use chrono::Utc;
+    use datafusion_common::config::ConfigOptions;
 
     use crate::physical_plan::{DefaultDisplay, VerboseDisplay};
 
@@ -828,11 +829,7 @@ mod tests {
                 None,
             );
 
-            let partitioned_file = parquet_exec
-                .get_repartitioned(4, 0)
-                .base_config()
-                .file_groups
-                .clone();
+            let partitioned_file = repartition_with_size(&parquet_exec, 4, 0);
 
             assert!(partitioned_file[0][0].range.is_none());
         }
@@ -893,13 +890,8 @@ mod tests {
                         None,
                     );
 
-                    let actual = file_groups_to_vec(
-                        parquet_exec
-                            .get_repartitioned(n_partition, 10)
-                            .base_config()
-                            .file_groups
-                            .clone(),
-                    );
+                    let actual =
+                        repartition_with_size_to_vec(&parquet_exec, 
n_partition, 10);
 
                     assert_eq!(expected, &actual);
                 }
@@ -927,13 +919,7 @@ mod tests {
                 None,
             );
 
-            let actual = file_groups_to_vec(
-                parquet_exec
-                    .get_repartitioned(4, 10)
-                    .base_config()
-                    .file_groups
-                    .clone(),
-            );
+            let actual = repartition_with_size_to_vec(&parquet_exec, 4, 10);
             let expected = vec![
                 (0, "a".to_string(), 0, 31),
                 (1, "a".to_string(), 31, 62),
@@ -964,13 +950,7 @@ mod tests {
                 None,
             );
 
-            let actual = file_groups_to_vec(
-                parquet_exec
-                    .get_repartitioned(96, 5)
-                    .base_config()
-                    .file_groups
-                    .clone(),
-            );
+            let actual = repartition_with_size_to_vec(&parquet_exec, 96, 5);
             let expected = vec![
                 (0, "a".to_string(), 0, 1),
                 (1, "a".to_string(), 1, 2),
@@ -1007,13 +987,7 @@ mod tests {
                 None,
             );
 
-            let actual = file_groups_to_vec(
-                parquet_exec
-                    .get_repartitioned(3, 10)
-                    .base_config()
-                    .file_groups
-                    .clone(),
-            );
+            let actual = repartition_with_size_to_vec(&parquet_exec, 3, 10);
             let expected = vec![
                 (0, "a".to_string(), 0, 34),
                 (1, "a".to_string(), 34, 40),
@@ -1046,13 +1020,7 @@ mod tests {
                 None,
             );
 
-            let actual = file_groups_to_vec(
-                parquet_exec
-                    .get_repartitioned(2, 10)
-                    .base_config()
-                    .file_groups
-                    .clone(),
-            );
+            let actual = repartition_with_size_to_vec(&parquet_exec, 2, 10);
             let expected = vec![
                 (0, "a".to_string(), 0, 40),
                 (0, "b".to_string(), 0, 10),
@@ -1086,11 +1054,7 @@ mod tests {
                 None,
             );
 
-            let actual = parquet_exec
-                .get_repartitioned(65, 10)
-                .base_config()
-                .file_groups
-                .clone();
+            let actual = repartition_with_size(&parquet_exec, 65, 10);
             assert_eq!(2, actual.len());
         }
 
@@ -1115,17 +1079,47 @@ mod tests {
                 None,
             );
 
-            let actual = parquet_exec
-                .get_repartitioned(65, 500)
+            let actual = repartition_with_size(&parquet_exec, 65, 500);
+            assert_eq!(1, actual.len());
+        }
+
+        /// Calls `ParquetExec.repartitioned` with the  specified
+        /// `target_partitions` and `repartition_file_min_size`, returning the
+        /// resulting `PartitionedFile`s
+        fn repartition_with_size(
+            parquet_exec: &ParquetExec,
+            target_partitions: usize,
+            repartition_file_min_size: usize,
+        ) -> Vec<Vec<PartitionedFile>> {
+            let mut config = ConfigOptions::new();
+            config.optimizer.repartition_file_min_size = 
repartition_file_min_size;
+
+            parquet_exec
+                .repartitioned(target_partitions, &config)
+                .unwrap() // unwrap Result
+                .unwrap() // unwrap Option
+                .as_any()
+                .downcast_ref::<ParquetExec>()
+                .unwrap()
                 .base_config()
                 .file_groups
-                .clone();
-            assert_eq!(1, actual.len());
+                .clone()
         }
 
-        fn file_groups_to_vec(
-            file_groups: Vec<Vec<PartitionedFile>>,
+        /// Calls `repartition_with_size` and returns a tuple for each output 
`PartitionedFile`:
+        ///
+        /// `(partition index, file path, start, end)`
+        fn repartition_with_size_to_vec(
+            parquet_exec: &ParquetExec,
+            target_partitions: usize,
+            repartition_file_min_size: usize,
         ) -> Vec<(usize, String, i64, i64)> {
+            let file_groups = repartition_with_size(
+                parquet_exec,
+                target_partitions,
+                repartition_file_min_size,
+            );
+
             file_groups
                 .iter()
                 .enumerate()
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 3a2459bec8..f6e999f602 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -259,26 +259,6 @@ impl ParquetExec {
         self.enable_bloom_filter
             .unwrap_or(config_options.execution.parquet.bloom_filter_enabled)
     }
-
-    /// Redistribute files across partitions according to their size
-    /// See comments on `get_file_groups_repartitioned()` for more detail.
-    pub fn get_repartitioned(
-        &self,
-        target_partitions: usize,
-        repartition_file_min_size: usize,
-    ) -> Self {
-        let repartitioned_file_groups_option = 
FileScanConfig::repartition_file_groups(
-            self.base_config.file_groups.clone(),
-            target_partitions,
-            repartition_file_min_size,
-        );
-
-        let mut new_plan = self.clone();
-        if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
-            new_plan.base_config.file_groups = repartitioned_file_groups;
-        }
-        new_plan
-    }
 }
 
 impl DisplayAs for ParquetExec {
@@ -349,6 +329,27 @@ impl ExecutionPlan for ParquetExec {
         Ok(self)
     }
 
+    /// Redistribute files across partitions according to their size
+    /// See comments on `get_file_groups_repartitioned()` for more detail.
+    fn repartitioned(
+        &self,
+        target_partitions: usize,
+        config: &ConfigOptions,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let repartition_file_min_size = 
config.optimizer.repartition_file_min_size;
+        let repartitioned_file_groups_option = 
FileScanConfig::repartition_file_groups(
+            self.base_config.file_groups.clone(),
+            target_partitions,
+            repartition_file_min_size,
+        );
+
+        let mut new_plan = self.clone();
+        if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
+            new_plan.base_config.file_groups = repartitioned_file_groups;
+        }
+        Ok(Some(Arc::new(new_plan)))
+    }
+
     fn execute(
         &self,
         partition_index: usize,
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 12df9efbbc..6de39db7d5 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -26,9 +26,6 @@ use std::fmt::Formatter;
 use std::sync::Arc;
 
 use crate::config::ConfigOptions;
-use crate::datasource::physical_plan::CsvExec;
-#[cfg(feature = "parquet")]
-use crate::datasource::physical_plan::ParquetExec;
 use crate::error::Result;
 use crate::physical_optimizer::utils::{
     add_sort_above, get_children_exectrees, get_plan_string, 
is_coalesce_partitions,
@@ -1188,7 +1185,6 @@ fn ensure_distribution(
     // When `false`, round robin repartition will not be added to increase 
parallelism
     let enable_round_robin = config.optimizer.enable_round_robin_repartition;
     let repartition_file_scans = config.optimizer.repartition_file_scans;
-    let repartition_file_min_size = config.optimizer.repartition_file_min_size;
     let batch_size = config.execution.batch_size;
     let is_unbounded = unbounded_output(&dist_context.plan);
     // Use order preserving variants either of the conditions true
@@ -1265,25 +1261,13 @@ fn ensure_distribution(
                 // Unless partitioning doesn't increase the partition count, 
it is not beneficial:
                 && child.output_partitioning().partition_count() < 
target_partitions
             {
-                // When `repartition_file_scans` is set, leverage source 
operators
-                // (`ParquetExec`, `CsvExec` etc.) to increase parallelism at 
the source.
+                // When `repartition_file_scans` is set, attempt to increase
+                // parallelism at the source.
                 if repartition_file_scans {
-                    #[cfg(feature = "parquet")]
-                    if let Some(parquet_exec) =
-                        child.as_any().downcast_ref::<ParquetExec>()
+                    if let Some(new_child) =
+                        child.repartitioned(target_partitions, config)?
                     {
-                        child = Arc::new(parquet_exec.get_repartitioned(
-                            target_partitions,
-                            repartition_file_min_size,
-                        ));
-                    }
-                    if let Some(csv_exec) = 
child.as_any().downcast_ref::<CsvExec>() {
-                        if let Some(csv_exec) = csv_exec.get_repartitioned(
-                            target_partitions,
-                            repartition_file_min_size,
-                        ) {
-                            child = Arc::new(csv_exec);
-                        }
+                        child = new_child;
                     }
                 }
                 // Increase parallelism by adding round-robin repartitioning
@@ -1644,8 +1628,8 @@ mod tests {
     use 
crate::datasource::file_format::file_compression_type::FileCompressionType;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
-    use crate::datasource::physical_plan::FileScanConfig;
     use crate::datasource::physical_plan::ParquetExec;
+    use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
     use crate::physical_optimizer::enforce_sorting::EnforceSorting;
     use crate::physical_optimizer::output_requirements::OutputRequirements;
     use crate::physical_plan::aggregates::{
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index b2f81579f8..3ada2fa163 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -76,6 +76,7 @@ pub use crate::metrics::Metric;
 pub use crate::topk::TopK;
 pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
 
+use datafusion_common::config::ConfigOptions;
 pub use datafusion_common::hash_utils;
 pub use datafusion_common::utils::project_schema;
 pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
@@ -209,7 +210,32 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
 
-    /// creates an iterator
+    /// If supported, attempt to increase the partitioning of this 
`ExecutionPlan` to
+    /// produce `target_partitions` partitions.
+    ///
+    /// If the `ExecutionPlan` does not support changing its partitioning,
+    /// returns `Ok(None)` (the default).
+    ///
+    /// It is the `ExecutionPlan` can increase its partitioning, but not to the
+    /// `target_partitions`, it may return an ExecutionPlan with fewer
+    /// partitions. This might happen, for example, if each new partition would
+    /// be too small to be efficiently processed individually.
+    ///
+    /// The DataFusion optimizer attempts to use as many threads as possible by
+    /// repartitioning its inputs to match the target number of threads
+    /// available (`target_partitions`). Some data sources, such as the built 
in
+    /// CSV and Parquet readers, implement this method as they are able to read
+    /// from their input files in parallel, regardless of how the source data 
is
+    /// split amongst files.
+    fn repartitioned(
+        &self,
+        _target_partitions: usize,
+        _config: &ConfigOptions,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        Ok(None)
+    }
+
+    /// Begin execution of `partition`, returning a stream of 
[`RecordBatch`]es.
     fn execute(
         &self,
         partition: usize,
@@ -217,7 +243,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ) -> Result<SendableRecordBatchStream>;
 
     /// Return a snapshot of the set of [`Metric`]s for this
-    /// [`ExecutionPlan`].
+    /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
     ///
     /// While the values of the metrics in the returned
     /// [`MetricsSet`]s may change as execution progresses, the

Reply via email to