wiedld commented on code in PR #11345:
URL: https://github.com/apache/datafusion/pull/11345#discussion_r1669599846
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -818,14 +826,16 @@ impl DataSink for ParquetSink {
async fn column_serializer_task(
mut rx: Receiver<ArrowLeafColumn>,
mut writer: ArrowColumnWriter,
-) -> Result<ArrowColumnWriter> {
+ mut reservation: MemoryReservation,
+) -> Result<(ArrowColumnWriter, MemoryReservation)> {
while let Some(col) = rx.recv().await {
writer.write(&col)?;
+ reservation.try_resize(writer.memory_size())?;
}
- Ok(writer)
+ Ok((writer, reservation))
Review Comment:
**Steps for tracking of parallelized write usage:**
In function above (`column_serializer_task`):
1. track when add to ArrowColumnWriters
2. yield reservation (so doesn't drop & free)
In `spawn_rg_join_and_finalize_task`:
3. this performs ArrowColumnWriters.close() and yields the encoded
ColumnChunk.
=> Therefore resize the col reservation, to the encoded size.
In `concatenate_parallel_row_groups`
4. columns into row groups:
=> Therefore add the encoded size to the rg_reservation, and
drop each col_reservation.
5. row groups into file:
=> On rg writer.close(), move the tracking of remaining bytes to
file_reservation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]