alamb commented on code in PR #6801:
URL: https://github.com/apache/arrow-datafusion/pull/6801#discussion_r1253475065


##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -240,6 +242,81 @@ impl FileScanConfig {
                 .collect()
         })
     }
+
+    /// Repartition all input files into `target_partitions` partitions, if 
total file size exceed
+    /// `repartition_file_min_size`
+    /// `target_partitions` and `repartition_file_min_size` directly come from 
configuration.
+    ///
+    /// This function only try to partition file byte range evenly, and let 
specific `FileOpener` to
+    /// do actual partition on specific data source type. (e.g. `CsvOpener` 
will only read lines
+    /// overlap with byte range but also handle boundaries to ensure all lines 
will be read exactly once)
+    pub fn repartition_file_groups(
+        file_groups: Vec<Vec<PartitionedFile>>,
+        target_partitions: usize,
+        repartition_file_min_size: usize,
+    ) -> Option<Vec<Vec<PartitionedFile>>> {
+        let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+
+        // Perform redistribution only in case all files should be read from 
beginning to end
+        let has_ranges = flattened_files.iter().any(|f| f.range.is_some());
+        if has_ranges {
+            return None;
+        }
+
+        let total_size = flattened_files
+            .iter()
+            .map(|f| f.object_meta.size as i64)
+            .sum::<i64>();
+        if total_size < (repartition_file_min_size as i64) || total_size == 0 {
+            return None;
+        }
+
+        let target_partition_size =
+            (total_size as usize + (target_partitions) - 1) / 
(target_partitions);
+
+        let current_partition_index: usize = 0;
+        let current_partition_size: usize = 0;
+
+        // Partition byte range evenly for all `PartitionedFile`s
+        let repartitioned_files = flattened_files

Review Comment:
   One thing I was thinking about for the range based approach is that it isn't 
likely to work for streaming compressed files (as you need to decompress the 
data linearly)
   
   I wonder if you have considered that 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to