alamb commented on code in PR #11345:
URL: https://github.com/apache/datafusion/pull/11345#discussion_r1671045290
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1013,27 +1041,39 @@ async fn concatenate_parallel_row_groups(
)?;
while let Some(task) = serialize_rx.recv().await {
+ let mut rg_reservation =
+
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(&pool);
+
let result = task.join_unwind().await;
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, _cnt) = result?;
- for chunk in serialized_columns {
+ for (chunk, col_reservation) in serialized_columns {
Review Comment:
I spent a non trivial time trying to convince myself this code is right. I
think it is correct but I also think it is hard to understand that it is so
(and thus maybe I am wrong about it being correct)
I think we could make the implementation simpler by making the reservations
more closely align to the structures that are storing memory
For example, tracking `col_reservation` here makes sense to me because it is
tracking the data in `chunk` (though perhaps we could call them similarly --
`col` / `col_reservation` or `chunk` / `chunk_reservation`)
However, the rg_reservation is somewhat confusing to me as
```rust
chunk.append_to_row_group(&mut rg_out)?;
```
Eventually [writes out to the
buffer](https://docs.rs/parquet/latest/src/parquet/file/writer.rs.html#593-655),
which in this case is `merged_buff` which is tracked by `file_reservation`
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1013,27 +1041,39 @@ async fn concatenate_parallel_row_groups(
)?;
while let Some(task) = serialize_rx.recv().await {
+ let mut rg_reservation =
+
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(&pool);
+
let result = task.join_unwind().await;
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, _cnt) = result?;
- for chunk in serialized_columns {
+ for (chunk, col_reservation) in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
+ rg_reservation.grow(col_reservation.size());
+ drop(col_reservation);
+
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
+ rg_reservation.shrink(buff_to_flush.len());
Review Comment:
Something doesn't seem quite right to me here. The rg_reservation is grown
by an estimate of the the encoded size but is shrunk by the actual size written
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -848,7 +859,13 @@ fn spawn_column_parallel_row_group_writer(
mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
col_array_channels.push(send_array);
- let task = SpawnedTask::spawn(column_serializer_task(recieve_array,
writer));
+ let reservation =
+
MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
Review Comment:
The name is attached to the reservation so if we ever did something like
#6934 more specific names could help
I would recommend leaving this as is unless we have reason to make it
fancier.
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -749,14 +750,19 @@ impl DataSink for ParquetSink {
parquet_props.writer_options().clone(),
)
.await?;
+ let mut reservation =
+ MemoryConsumer::new(format!("ParquetSink[{}]", path))
+ .register(context.memory_pool());
file_write_tasks.spawn(async move {
while let Some(batch) = rx.recv().await {
writer.write(&batch).await?;
+ reservation.try_resize(writer.memory_size())?;
}
let file_metadata = writer
.close()
.await
.map_err(DataFusionError::ParquetError)?;
+ drop(reservation);
Review Comment:
is this `drop` needed? Since the closure returns on the next line I think
the reservation would be automatically dropped anyways
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -2177,4 +2222,105 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn parquet_sink_write_memory_reservation() -> Result<()> {
+ async fn test_memory_reservation(global: ParquetOptions) -> Result<()>
{
+ let field_a = Field::new("a", DataType::Utf8, false);
+ let field_b = Field::new("b", DataType::Utf8, false);
+ let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+ let object_store_url = ObjectStoreUrl::local_filesystem();
+
+ let file_sink_config = FileSinkConfig {
+ object_store_url: object_store_url.clone(),
+ file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+ table_paths: vec![ListingTableUrl::parse("file:///")?],
+ output_schema: schema.clone(),
+ table_partition_cols: vec![],
+ overwrite: true,
+ keep_partition_by_columns: false,
+ };
+ let parquet_sink = Arc::new(ParquetSink::new(
+ file_sink_config,
+ TableParquetOptions {
+ key_value_metadata: std::collections::HashMap::from([
+ ("my-data".to_string(), Some("stuff".to_string())),
+ ("my-data-bool-key".to_string(), None),
+ ]),
+ global,
+ ..Default::default()
+ },
+ ));
+
+ // create data
+ let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo",
"bar"]));
+ let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz",
"baz"]));
+ let batch =
+ RecordBatch::try_from_iter(vec![("a", col_a), ("b",
col_b)]).unwrap();
+
+ // create task context
+ let task_context = build_ctx(object_store_url.as_ref());
+ assert_eq!(
+ task_context.memory_pool().reserved(),
+ 0,
+ "no bytes are reserved yet"
+ );
+
+ let mut write_task = parquet_sink.write_all(
+ Box::pin(RecordBatchStreamAdapter::new(
+ schema,
+ bounded_stream(batch, 1000),
+ )),
+ &task_context,
+ );
+
+ // incrementally poll and check for memory reservation
Review Comment:
Another way to potentially test this code is to verify that when you set a
memory limit that the write fails
There are tests that do that here:
https://github.com/apache/datafusion/tree/main/datafusion/core/tests/memory_limit
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -818,14 +826,16 @@ impl DataSink for ParquetSink {
async fn column_serializer_task(
mut rx: Receiver<ArrowLeafColumn>,
mut writer: ArrowColumnWriter,
-) -> Result<ArrowColumnWriter> {
+ mut reservation: MemoryReservation,
+) -> Result<(ArrowColumnWriter, MemoryReservation)> {
while let Some(col) = rx.recv().await {
writer.write(&col)?;
+ reservation.try_resize(writer.memory_size())?;
}
- Ok(writer)
+ Ok((writer, reservation))
Review Comment:
I agree it should be documented and on a struct. However, I don't think
there is a good one yet as these are all free functions
Maybe as a follow on PR we could refactor some of the logic / state into
structs that can be documented better / be easier to read
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1002,9 +1026,13 @@ async fn concatenate_parallel_row_groups(
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
+ pool: Arc<dyn MemoryPool>,
) -> Result<FileMetaData> {
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
+ let mut file_reservation =
+
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
Review Comment:
answered above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]