This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 20d169c8e fix: PartitionBuffers should not have their own
MemoryConsumer (#1496)
20d169c8e is described below
commit 20d169c8e3926cb36bff74bbe7e6e04a85a0084d
Author: Emily Matheys <[email protected]>
AuthorDate: Tue Mar 11 18:12:00 2025 +0200
fix: PartitionBuffers should not have their own MemoryConsumer (#1496)
* fix: #1495
* fmt
---------
Co-authored-by: Emily Matheys <[email protected]>
---
.../core/src/execution/shuffle/shuffle_writer.rs | 32 ++++++++++++++--------
1 file changed, 20 insertions(+), 12 deletions(-)
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 8b34cd478..f1c2b4d78 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -186,6 +186,7 @@ impl ExecutionPlan for ShuffleWriterExec {
futures::stream::once(
external_shuffle(
input,
+ partition,
self.output_data_file.clone(),
self.output_index_file.clone(),
self.partitioning.clone(),
@@ -204,6 +205,7 @@ impl ExecutionPlan for ShuffleWriterExec {
#[allow(clippy::too_many_arguments)]
async fn external_shuffle(
mut input: SendableRecordBatchStream,
+ partition: usize,
output_data_file: String,
output_index_file: String,
partitioning: Partitioning,
@@ -214,6 +216,7 @@ async fn external_shuffle(
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::try_new(
+ partition,
output_data_file,
output_index_file,
Arc::clone(&schema),
@@ -301,6 +304,7 @@ struct ShuffleRepartitioner {
impl ShuffleRepartitioner {
#[allow(clippy::too_many_arguments)]
pub fn try_new(
+ partition: usize,
output_data_file: String,
output_index_file: String,
schema: SchemaRef,
@@ -323,17 +327,21 @@ impl ShuffleRepartitioner {
partition_ids.set_len(batch_size);
}
+ // This will be split into each PartitionBuffer Reservations, and does
not need to be kept itself
+ let reservation =
MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition))
+ .with_can_spill(true)
+ .register(&runtime.memory_pool);
+
Ok(Self {
output_data_file,
output_index_file,
schema: Arc::clone(&schema),
buffered_partitions: (0..num_output_partitions)
- .map(|partition_id| {
+ .map(|_| {
PartitionBuffer::try_new(
Arc::clone(&schema),
batch_size,
- partition_id,
- &runtime,
+ reservation.new_empty(),
codec.clone(),
enable_fast_encoding,
)
@@ -473,8 +481,9 @@ impl ShuffleRepartitioner {
Partitioning::UnknownPartitioning(n) if *n == 1 => {
let buffered_partitions = &mut self.buffered_partitions;
- assert!(
- buffered_partitions.len() == 1,
+ assert_eq!(
+ buffered_partitions.len(),
+ 1,
"Expected 1 partition but got {}",
buffered_partitions.len()
);
@@ -708,14 +717,10 @@ impl PartitionBuffer {
fn try_new(
schema: SchemaRef,
batch_size: usize,
- partition_id: usize,
- runtime: &Arc<RuntimeEnv>,
+ reservation: MemoryReservation,
codec: CompressionCodec,
enable_fast_encoding: bool,
) -> Result<Self> {
- let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]",
partition_id))
- .with_can_spill(true)
- .register(&runtime.memory_pool);
let shuffle_block_writer =
ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding,
codec)?;
let active_slots_mem_size = schema
@@ -1008,11 +1013,13 @@ mod test {
fn partition_buffer_memory() {
let batch = create_batch(900);
let runtime_env = create_runtime(128 * 1024);
+ let reservation = MemoryConsumer::new("ShuffleRepartitioner[0]")
+ .with_can_spill(true)
+ .register(&runtime_env.memory_pool);
let mut buffer = PartitionBuffer::try_new(
batch.schema(),
1024,
- 0,
- &runtime_env,
+ reservation,
CompressionCodec::Lz4Frame,
true,
)
@@ -1081,6 +1088,7 @@ mod test {
let runtime_env = create_runtime(memory_limit);
let metrics_set = ExecutionPlanMetricsSet::new();
let mut repartitioner = ShuffleRepartitioner::try_new(
+ 0,
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
batch.schema(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]