alamb commented on code in PR #7841:
URL: https://github.com/apache/arrow-datafusion/pull/7841#discussion_r1365699151


##########
datafusion/common/src/config.rs:
##########
@@ -255,6 +255,12 @@ config_namespace! {
         /// Number of files to read in parallel when inferring schema and 
statistics
         pub meta_fetch_concurrency: usize, default = 32
 
+        /// Guarentees a minimum level of output files running in parallel.
+        /// RecordBatches will be distributed in round robin fashion to each
+        /// parallel writer. Each writer is closed and a new file opened once
+        /// soft_max_rows_per_output_file is reached.
+        pub minimum_parallel_output_files: usize, default = 4

Review Comment:
   What do you think about defaulting to the number of cores (maybe if this was 
`0`)?



##########
datafusion/common/src/config.rs:
##########
@@ -255,6 +255,12 @@ config_namespace! {
         /// Number of files to read in parallel when inferring schema and 
statistics
         pub meta_fetch_concurrency: usize, default = 32
 
+        /// Guarentees a minimum level of output files running in parallel.

Review Comment:
   ```suggestion
           /// Guarantees a minimum level of output files running in parallel.
   ```



##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -493,38 +492,57 @@ async fn create_new_file_stream(
 async fn row_count_demuxer(
     mut input: SendableRecordBatchStream,
     base_output_path: ListingTableUrl,
+    context: Arc<TaskContext>,
     file_extension: String,
     single_file_output: bool,
-    max_rows_per_file: usize,
-    max_buffered_batches: usize,
     mut tx: Sender<(Path, Receiver<RecordBatch>)>,
 ) -> Result<()> {
-    let mut total_rows_current_file = 0;
+    let exec_options = &context.session_config().options().execution;
+
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_batches = 
exec_options.max_buffered_batches_per_output_file;
+    let minimum_parallel_files = exec_options.minimum_parallel_output_files;
     let mut part_idx = 0;
     let write_id =
         rand::distributions::Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
 
-    let mut tx_file = create_new_file_stream(
-        &base_output_path,
-        &write_id,
-        part_idx,
-        &file_extension,
-        single_file_output,
-        max_buffered_batches,
-        &mut tx,
-    )
-    .await?;
-    part_idx += 1;
+    let mut open_file_streams = Vec::with_capacity(minimum_parallel_files);
 
-    while let Some(rb) = input.next().await.transpose()? {
-        total_rows_current_file += rb.num_rows();
-        tx_file.send(rb).await.map_err(|_| {
-            DataFusionError::Execution("Error sending RecordBatch to file 
stream!".into())
-        })?;
+    let mut next_send_steam = 0;
+    let mut row_counts = Vec::with_capacity(minimum_parallel_files);
 
-        if total_rows_current_file >= max_rows_per_file && !single_file_output 
{
-            total_rows_current_file = 0;
-            tx_file = create_new_file_stream(
+    // Overrides if single_file_output is set
+    let minimum_parallel_files = if single_file_output {
+        1
+    } else {
+        minimum_parallel_files
+    };
+
+    let max_rows_per_file = if single_file_output {
+        usize::MAX
+    } else {
+        max_rows_per_file
+    };
+
+    while let Some(rb) = input.next().await.transpose()? {
+        if open_file_streams.len() < minimum_parallel_files {

Review Comment:
   ```suggestion
           // ensure we have at least minimum_parallel_files open
           if open_file_streams.len() < minimum_parallel_files {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to