This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 20d169c8e fix: PartitionBuffers should not have their own 
MemoryConsumer (#1496)
20d169c8e is described below

commit 20d169c8e3926cb36bff74bbe7e6e04a85a0084d
Author: Emily Matheys <[email protected]>
AuthorDate: Tue Mar 11 18:12:00 2025 +0200

    fix: PartitionBuffers should not have their own MemoryConsumer (#1496)
    
    * fix: #1495
    
    * fmt
    
    ---------
    
    Co-authored-by: Emily Matheys <[email protected]>
---
 .../core/src/execution/shuffle/shuffle_writer.rs   | 32 ++++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 8b34cd478..f1c2b4d78 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -186,6 +186,7 @@ impl ExecutionPlan for ShuffleWriterExec {
             futures::stream::once(
                 external_shuffle(
                     input,
+                    partition,
                     self.output_data_file.clone(),
                     self.output_index_file.clone(),
                     self.partitioning.clone(),
@@ -204,6 +205,7 @@ impl ExecutionPlan for ShuffleWriterExec {
 #[allow(clippy::too_many_arguments)]
 async fn external_shuffle(
     mut input: SendableRecordBatchStream,
+    partition: usize,
     output_data_file: String,
     output_index_file: String,
     partitioning: Partitioning,
@@ -214,6 +216,7 @@ async fn external_shuffle(
 ) -> Result<SendableRecordBatchStream> {
     let schema = input.schema();
     let mut repartitioner = ShuffleRepartitioner::try_new(
+        partition,
         output_data_file,
         output_index_file,
         Arc::clone(&schema),
@@ -301,6 +304,7 @@ struct ShuffleRepartitioner {
 impl ShuffleRepartitioner {
     #[allow(clippy::too_many_arguments)]
     pub fn try_new(
+        partition: usize,
         output_data_file: String,
         output_index_file: String,
         schema: SchemaRef,
@@ -323,17 +327,21 @@ impl ShuffleRepartitioner {
             partition_ids.set_len(batch_size);
         }
 
+        // This will be split into each PartitionBuffer Reservations, and does 
not need to be kept itself
+        let reservation = 
MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition))
+            .with_can_spill(true)
+            .register(&runtime.memory_pool);
+
         Ok(Self {
             output_data_file,
             output_index_file,
             schema: Arc::clone(&schema),
             buffered_partitions: (0..num_output_partitions)
-                .map(|partition_id| {
+                .map(|_| {
                     PartitionBuffer::try_new(
                         Arc::clone(&schema),
                         batch_size,
-                        partition_id,
-                        &runtime,
+                        reservation.new_empty(),
                         codec.clone(),
                         enable_fast_encoding,
                     )
@@ -473,8 +481,9 @@ impl ShuffleRepartitioner {
             Partitioning::UnknownPartitioning(n) if *n == 1 => {
                 let buffered_partitions = &mut self.buffered_partitions;
 
-                assert!(
-                    buffered_partitions.len() == 1,
+                assert_eq!(
+                    buffered_partitions.len(),
+                    1,
                     "Expected 1 partition but got {}",
                     buffered_partitions.len()
                 );
@@ -708,14 +717,10 @@ impl PartitionBuffer {
     fn try_new(
         schema: SchemaRef,
         batch_size: usize,
-        partition_id: usize,
-        runtime: &Arc<RuntimeEnv>,
+        reservation: MemoryReservation,
         codec: CompressionCodec,
         enable_fast_encoding: bool,
     ) -> Result<Self> {
-        let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", 
partition_id))
-            .with_can_spill(true)
-            .register(&runtime.memory_pool);
         let shuffle_block_writer =
             ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, 
codec)?;
         let active_slots_mem_size = schema
@@ -1008,11 +1013,13 @@ mod test {
     fn partition_buffer_memory() {
         let batch = create_batch(900);
         let runtime_env = create_runtime(128 * 1024);
+        let reservation = MemoryConsumer::new("ShuffleRepartitioner[0]")
+            .with_can_spill(true)
+            .register(&runtime_env.memory_pool);
         let mut buffer = PartitionBuffer::try_new(
             batch.schema(),
             1024,
-            0,
-            &runtime_env,
+            reservation,
             CompressionCodec::Lz4Frame,
             true,
         )
@@ -1081,6 +1088,7 @@ mod test {
         let runtime_env = create_runtime(memory_limit);
         let metrics_set = ExecutionPlanMetricsSet::new();
         let mut repartitioner = ShuffleRepartitioner::try_new(
+            0,
             "/tmp/data.out".to_string(),
             "/tmp/index.out".to_string(),
             batch.schema(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to