alamb commented on code in PR #7791:
URL: https://github.com/apache/arrow-datafusion/pull/7791#discussion_r1356652459


##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -481,6 +479,81 @@ impl CsvSink {
     fn new(config: FileSinkConfig) -> Self {
         Self { config }
     }
+
+    async fn append_all(

Review Comment:
   this is a very nice cleanup / refactor



##########
datafusion/common/src/config.rs:
##########
@@ -254,6 +254,24 @@ config_namespace! {
 
         /// Number of files to read in parallel when inferring schema and 
statistics
         pub meta_fetch_concurrency: usize, default = 32
+
+        /// Target number of rows in output files when writing multiple.
+        /// This is a soft max, so it can be exceeded slightly. There also
+        /// will be one file smaller than the limit if the total
+        /// number of rows written is not roughly divisible by the soft max
+        pub soft_max_rows_per_output_file: usize, default = 50000000
+
+        /// This is the maximum number of output files being written
+        /// in parallel. Higher values can potentially give faster write
+        /// performance at the cost of higher peak memory consumption.
+        pub max_parallel_ouput_files: usize, default = 8
+
+        /// This is the maximum number of RecordBatches buffered
+        /// for each output file being worked. Higher values can potentially
+        /// give faster write performance at the cost of higher peak
+        /// memory consumption
+        pub max_buffered_batches_per_output_file: usize, default = 5000

Review Comment:
   This value seems far to high to me -- it seems like there is no reason to 
buffer more than 1-2 batches per output file -- the rationale being that it 
makes no sense to let a producer of data run run so far ahead of the consumer 
(writer). 
   
   Adding some buffer makes sense so that a new input batch can be computed 
concurrently while writing the next output batch, but I don't see why it would 
make sense to buffer so many. 
   
   Maybe a value of `2` here would be good



##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -390,29 +391,101 @@ async fn serialize_rb_stream_to_object_store(
             ))
         }
     };
-    Ok((serializer, writer, row_count as u64))
+    Ok((writer, row_count as u64))
 }
 
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput 
parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the 
unique
+/// values of a specific column 
`<https://github.com/apache/arrow-datafusion/issues/7744>``
+pub(crate) fn start_demuxer_task(
+    mut input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    _partition_by: Option<&str>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let exec_options = &context.session_config().options().execution;
+
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_parallel_files = exec_options.max_parallel_ouput_files;
+    let max_buffered_recordbatches = 
exec_options.max_buffered_batches_per_output_file;
+
+    let (tx, rx) = tokio::sync::mpsc::channel(max_parallel_files);
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> =
+        tokio::spawn(async move {
+            let mut total_rows_current_file = 0;
+            let mut part_idx = 0;
+            let write_id = rand::distributions::Alphanumeric
+                .sample_string(&mut rand::thread_rng(), 16);
+            let file_path = if !single_file_output {
+                base_output_path
+                    .prefix()
+                    .child(format!("{}_{}.{}", write_id, part_idx, 
file_extension))
+            } else {
+                base_output_path.prefix().to_owned()
+            };
+
+            let (mut tx_file, mut rx_file) =
+                tokio::sync::mpsc::channel(max_buffered_recordbatches / 2);

Review Comment:
   why is the max buffer size divided by 2 ?



##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -390,29 +391,101 @@ async fn serialize_rb_stream_to_object_store(
             ))
         }
     };
-    Ok((serializer, writer, row_count as u64))
+    Ok((writer, row_count as u64))
 }
 
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput 
parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the 
unique
+/// values of a specific column 
`<https://github.com/apache/arrow-datafusion/issues/7744>``
+pub(crate) fn start_demuxer_task(

Review Comment:
   Is there any way you can put the ASCII art picture from this PR description 
into this comment -- I think it would add a lot to the context



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to