devinjdangelo commented on code in PR #7791:
URL: https://github.com/apache/arrow-datafusion/pull/7791#discussion_r1356838877


##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -390,29 +391,101 @@ async fn serialize_rb_stream_to_object_store(
             ))
         }
     };
-    Ok((serializer, writer, row_count as u64))
+    Ok((writer, row_count as u64))
 }
 
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput 
parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the 
unique
+/// values of a specific column 
`<https://github.com/apache/arrow-datafusion/issues/7744>``
+pub(crate) fn start_demuxer_task(
+    mut input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    _partition_by: Option<&str>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let exec_options = &context.session_config().options().execution;
+
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_parallel_files = exec_options.max_parallel_ouput_files;
+    let max_buffered_recordbatches = 
exec_options.max_buffered_batches_per_output_file;
+
+    let (tx, rx) = tokio::sync::mpsc::channel(max_parallel_files);
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> =
+        tokio::spawn(async move {
+            let mut total_rows_current_file = 0;
+            let mut part_idx = 0;
+            let write_id = rand::distributions::Alphanumeric
+                .sample_string(&mut rand::thread_rng(), 16);
+            let file_path = if !single_file_output {
+                base_output_path
+                    .prefix()
+                    .child(format!("{}_{}.{}", write_id, part_idx, 
file_extension))
+            } else {
+                base_output_path.prefix().to_owned()
+            };
+
+            let (mut tx_file, mut rx_file) =
+                tokio::sync::mpsc::channel(max_buffered_recordbatches / 2);

Review Comment:
   There are actually two buffers that hold RecordBatches and they are moved 
between them, so the effective maximum buffered batches is 
2*max_buffered_recordbatches. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to