alamb commented on code in PR #18014:
URL: https://github.com/apache/datafusion/pull/18014#discussion_r2447643653


##########
datafusion/core/tests/memory_limit/repartition_mem_limit.rs:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array, RecordBatch};
+use datafusion::{
+    assert_batches_sorted_eq,
+    prelude::{SessionConfig, SessionContext},
+};
+use datafusion_catalog::MemTable;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use futures::TryStreamExt;
+use itertools::Itertools;
+
+/// End to end test for spilling in RepartitionExec.
+/// The idea is to make a real world query with a relatively low memory limit 
and
+/// then drive one partition at a time, simulating dissimilar execution speed 
in partitions.
+/// Just as some examples of real world scenarios where this can happen 
consider
+/// lopsided groups in a group by especially if one partitions spills and 
others don't,
+/// or in distributed systems if one upstream node is slower than others.
+#[tokio::test]

Review Comment:
   This is a neat test. However, for some reason it takes over 5 seconds to run 
on laptop. 
   
   ```
   andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ cargo test --test 
core_integration -- repartition_memory
       Finished `test` profile [unoptimized + debuginfo] target(s) in 0.21s
        Running tests/core_integration.rs 
(target/debug/deps/core_integration-97767bbb2ea8803a)
   
   running 1 test
   test memory_limit::repartition_mem_limit::test_repartition_memory_limit ... 
ok
   
   test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 704 filtered 
out; finished in 4.99s. <--- 5 seconds!!!
   ```
   
   I suspect it is because it is creating something like 1M / 32 = 31,250 files
   
   Is there any way to we can reduce the runtime (like maybe only use 1000 
rows, or 10,000 rows?)



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1007,12 +1071,37 @@ impl RepartitionExec {
 
                 let timer = metrics.send_time[partition].timer();
                 // if there is still a receiver, send to it
-                if let Some((tx, reservation)) = 
output_channels.get_mut(&partition) {
-                    reservation.lock().try_grow(size)?;
-
-                    if tx.send(Some(Ok(batch))).await.is_err() {
+                if let Some((tx, reservation, spill_manager)) =
+                    output_channels.get_mut(&partition)
+                {
+                    let (batch_to_send, is_memory_batch) =
+                        match reservation.lock().try_grow(size) {
+                            Ok(_) => {
+                                // Memory available - send in-memory batch
+                                (RepartitionBatch::Memory(batch), true)
+                            }
+                            Err(_) => {
+                                // We're memory limited - spill this single 
batch to its own file

Review Comment:
   > One alternative could be to do a spill file per-channel and have some sort 
of gc process where we say "if the spill file exceeds XGB and/or we have more 
than YGB of junk space we pay the price of copying over into a new spill file 
to keep disk usage from blowing up".
   
   You could avoid a copying gc process maybe via:
   1. Keep a LIFO list of spill files per channel 
   2. write new batches to the end of the most recent file until its size 
exceeds some threshold (e.g, 100MB). When the threshold is exceed  make a new 
file
   3. Read from the oldest file until all batches have been read. Once all 
batches have been read from a file it can be deleted. 



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1183,21 +1337,63 @@ impl Stream for PerPartitionStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        match self.receiver.recv().poll_unpin(cx) {
-            Poll::Ready(Some(Some(v))) => {
-                if let Ok(batch) = &v {
-                    self.reservation
-                        .lock()
-                        .shrink(batch.get_array_memory_size());
+        loop {
+            match &mut self.state {
+                RepartitionStreamState::ReceivingFromChannel => {
+                    match self.receiver.recv().poll_unpin(cx) {
+                        Poll::Ready(Some(Some(v))) => {

Review Comment:
   same comment here about using ready! macro to simpify this loop



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1121,33 +1229,73 @@ impl Stream for RepartitionStream {
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
         loop {
-            match self.input.recv().poll_unpin(cx) {
-                Poll::Ready(Some(Some(v))) => {
-                    if let Ok(batch) = &v {
-                        self.reservation
-                            .lock()
-                            .shrink(batch.get_array_memory_size());
+            match &mut self.state {
+                RepartitionStreamState::ReceivingFromChannel => {
+                    match self.input.recv().poll_unpin(cx) {
+                        Poll::Ready(Some(Some(v))) => {

Review Comment:
   I think you could reduce the level of indenting here using the `ready!` 
macro which would make it more readable



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1712,14 +1908,14 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn oom() -> Result<()> {

Review Comment:
   I think it is important to leave the `oom` test here to verify that the 
system will still get an OOM error when no disk manager is enabled



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -66,22 +68,42 @@ use parking_lot::Mutex;
 
 mod distributor_channels;
 
-type MaybeBatch = Option<Result<RecordBatch>>;
+/// A batch in the repartition queue - either in memory or spilled to disk
+#[derive(Debug)]
+enum RepartitionBatch {
+    /// Batch held in memory (counts against memory reservation)
+    Memory(RecordBatch),
+    /// Batch spilled to disk (one file per batch for queue semantics)
+    /// File automatically deleted when dropped via reference counting
+    /// The size field stores the original batch size for validation when 
reading back
+    Spilled {
+        spill_file: RefCountedTempFile,
+        size: usize,
+    },
+}
+
+type MaybeBatch = Option<Result<RepartitionBatch>>;
 type InputPartitionsToCurrentPartitionSender = 
Vec<DistributionSender<MaybeBatch>>;
 type InputPartitionsToCurrentPartitionReceiver = 
Vec<DistributionReceiver<MaybeBatch>>;
 
+/// Channels and resources for a single output partition
+#[derive(Debug)]
+struct PartitionChannels {
+    /// Senders for each input partition to send data to this output partition
+    tx: InputPartitionsToCurrentPartitionSender,
+    /// Receivers for each input partition sending data to this output 
partition
+    rx: InputPartitionsToCurrentPartitionReceiver,
+    /// Memory reservation for this output partition
+    reservation: SharedMemoryReservation,
+    /// Spill manager for handling disk spills for this output partition
+    spill_manager: Arc<SpillManager>,
+}
+
 #[derive(Debug)]
 struct ConsumingInputStreamsState {
     /// Channels for sending batches from input partitions to output 
partitions.
     /// Key is the partition number.
-    channels: HashMap<
-        usize,
-        (
-            InputPartitionsToCurrentPartitionSender,
-            InputPartitionsToCurrentPartitionReceiver,
-            SharedMemoryReservation,
-        ),
-    >,
+    channels: HashMap<usize, PartitionChannels>,

Review Comment:
   this is much nicer



##########
datafusion/core/tests/memory_limit/repartition_mem_limit.rs:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array, RecordBatch};
+use datafusion::{
+    assert_batches_sorted_eq,
+    prelude::{SessionConfig, SessionContext},
+};
+use datafusion_catalog::MemTable;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use futures::TryStreamExt;
+use itertools::Itertools;
+
+/// End to end test for spilling in RepartitionExec.
+/// The idea is to make a real world query with a relatively low memory limit 
and
+/// then drive one partition at a time, simulating dissimilar execution speed 
in partitions.
+/// Just as some examples of real world scenarios where this can happen 
consider
+/// lopsided groups in a group by especially if one partitions spills and 
others don't,
+/// or in distributed systems if one upstream node is slower than others.
+#[tokio::test]
+async fn test_repartition_memory_limit() {
+    let runtime = RuntimeEnvBuilder::new()
+        .with_memory_limit(1024 * 1024, 1.0)
+        .build()
+        .unwrap();
+    let config = SessionConfig::new()
+        .with_batch_size(32)
+        .with_target_partitions(2);
+    let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));
+    let batches = vec![RecordBatch::try_from_iter(vec![(
+        "c1",
+        Arc::new(Int32Array::from_iter_values(
+            (0..10).cycle().take(1_000_000),
+        )) as ArrayRef,
+    )])
+    .unwrap()];
+    let table = Arc::new(MemTable::try_new(batches[0].schema(), 
vec![batches]).unwrap());
+    ctx.register_table("t", table).unwrap();
+    let plan = ctx
+        .state()
+        .create_logical_plan("SELECT c1, count(*) as c FROM t GROUP BY c1;")
+        .await
+        .unwrap();
+    let plan = ctx.state().create_physical_plan(&plan).await.unwrap();

Review Comment:
   I recommend also verifying here that there are exactly two partitions in the 
final plan 
   
   ```suggestion
       let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
       assert_eq!(plan.output_partitioning().partition_count(), 2);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to