alamb commented on code in PR #11345:
URL: https://github.com/apache/datafusion/pull/11345#discussion_r1671045290


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1013,27 +1041,39 @@ async fn concatenate_parallel_row_groups(
     )?;
 
     while let Some(task) = serialize_rx.recv().await {
+        let mut rg_reservation =
+            
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(&pool);
+
         let result = task.join_unwind().await;
         let mut rg_out = parquet_writer.next_row_group()?;
         let (serialized_columns, _cnt) = result?;
-        for chunk in serialized_columns {
+        for (chunk, col_reservation) in serialized_columns {

Review Comment:
   I spent a non trivial time trying to convince myself this code is right. I 
think it is correct but I also think it is hard to understand that it is so 
(and thus maybe I am wrong about it being correct)
   
   I think we could make the implementation simpler by making the reservations 
more closely align to the structures that are storing memory
   
   For example, tracking `col_reservation` here makes sense to me because it is 
tracking the data in `chunk` (though perhaps we could call them similarly -- 
`col` / `col_reservation` or `chunk` / `chunk_reservation`)
   
   However, the rg_reservation is somewhat confusing to me as 
   
   ```rust
               chunk.append_to_row_group(&mut rg_out)?;
   ```
   
   Eventually [writes out to the  
buffer](https://docs.rs/parquet/latest/src/parquet/file/writer.rs.html#593-655),
 which in this case is `merged_buff` which is tracked by `file_reservation`
   
   



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1013,27 +1041,39 @@ async fn concatenate_parallel_row_groups(
     )?;
 
     while let Some(task) = serialize_rx.recv().await {
+        let mut rg_reservation =
+            
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(&pool);
+
         let result = task.join_unwind().await;
         let mut rg_out = parquet_writer.next_row_group()?;
         let (serialized_columns, _cnt) = result?;
-        for chunk in serialized_columns {
+        for (chunk, col_reservation) in serialized_columns {
             chunk.append_to_row_group(&mut rg_out)?;
+            rg_reservation.grow(col_reservation.size());
+            drop(col_reservation);
+
             let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
             if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
                 object_store_writer
                     .write_all(buff_to_flush.as_slice())
                     .await?;
+                rg_reservation.shrink(buff_to_flush.len());

Review Comment:
   Something doesn't seem quite right to me here. The rg_reservation is grown 
by an estimate of the the encoded size but is shrunk by the actual size written
   
   



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -848,7 +859,13 @@ fn spawn_column_parallel_row_group_writer(
             mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
         col_array_channels.push(send_array);
 
-        let task = SpawnedTask::spawn(column_serializer_task(recieve_array, 
writer));
+        let reservation =
+            
MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);

Review Comment:
   The name is attached to the reservation so if we ever did something like 
#6934 more specific names could help
   
   I would recommend leaving this as is unless we have reason to make it 
fancier.



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -749,14 +750,19 @@ impl DataSink for ParquetSink {
                         parquet_props.writer_options().clone(),
                     )
                     .await?;
+                let mut reservation =
+                    MemoryConsumer::new(format!("ParquetSink[{}]", path))
+                        .register(context.memory_pool());
                 file_write_tasks.spawn(async move {
                     while let Some(batch) = rx.recv().await {
                         writer.write(&batch).await?;
+                        reservation.try_resize(writer.memory_size())?;
                     }
                     let file_metadata = writer
                         .close()
                         .await
                         .map_err(DataFusionError::ParquetError)?;
+                    drop(reservation);

Review Comment:
   is this `drop` needed? Since the closure returns on the next line I think 
the reservation would be automatically dropped anyways



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -2177,4 +2222,105 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn parquet_sink_write_memory_reservation() -> Result<()> {
+        async fn test_memory_reservation(global: ParquetOptions) -> Result<()> 
{
+            let field_a = Field::new("a", DataType::Utf8, false);
+            let field_b = Field::new("b", DataType::Utf8, false);
+            let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+            let object_store_url = ObjectStoreUrl::local_filesystem();
+
+            let file_sink_config = FileSinkConfig {
+                object_store_url: object_store_url.clone(),
+                file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+                table_paths: vec![ListingTableUrl::parse("file:///")?],
+                output_schema: schema.clone(),
+                table_partition_cols: vec![],
+                overwrite: true,
+                keep_partition_by_columns: false,
+            };
+            let parquet_sink = Arc::new(ParquetSink::new(
+                file_sink_config,
+                TableParquetOptions {
+                    key_value_metadata: std::collections::HashMap::from([
+                        ("my-data".to_string(), Some("stuff".to_string())),
+                        ("my-data-bool-key".to_string(), None),
+                    ]),
+                    global,
+                    ..Default::default()
+                },
+            ));
+
+            // create data
+            let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", 
"bar"]));
+            let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", 
"baz"]));
+            let batch =
+                RecordBatch::try_from_iter(vec![("a", col_a), ("b", 
col_b)]).unwrap();
+
+            // create task context
+            let task_context = build_ctx(object_store_url.as_ref());
+            assert_eq!(
+                task_context.memory_pool().reserved(),
+                0,
+                "no bytes are reserved yet"
+            );
+
+            let mut write_task = parquet_sink.write_all(
+                Box::pin(RecordBatchStreamAdapter::new(
+                    schema,
+                    bounded_stream(batch, 1000),
+                )),
+                &task_context,
+            );
+
+            // incrementally poll and check for memory reservation

Review Comment:
   Another way to potentially test this code is to verify that when you set a 
memory limit that the write fails 
   
   There are tests that do that here: 
https://github.com/apache/datafusion/tree/main/datafusion/core/tests/memory_limit



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -818,14 +826,16 @@ impl DataSink for ParquetSink {
 async fn column_serializer_task(
     mut rx: Receiver<ArrowLeafColumn>,
     mut writer: ArrowColumnWriter,
-) -> Result<ArrowColumnWriter> {
+    mut reservation: MemoryReservation,
+) -> Result<(ArrowColumnWriter, MemoryReservation)> {
     while let Some(col) = rx.recv().await {
         writer.write(&col)?;
+        reservation.try_resize(writer.memory_size())?;
     }
-    Ok(writer)
+    Ok((writer, reservation))

Review Comment:
   I agree it should be documented and on a struct. However, I don't think 
there is a good one yet as these are all free functions
   
   Maybe as a follow on PR we could refactor some of the logic / state into 
structs that can be documented better / be easier to read



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1002,9 +1026,13 @@ async fn concatenate_parallel_row_groups(
     schema: Arc<Schema>,
     writer_props: Arc<WriterProperties>,
     mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
+    pool: Arc<dyn MemoryPool>,
 ) -> Result<FileMetaData> {
     let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
 
+    let mut file_reservation =
+        
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);

Review Comment:
   answered above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to