alamb commented on code in PR #16093:
URL: https://github.com/apache/datafusion/pull/16093#discussion_r2095992822


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -406,8 +462,12 @@ impl BatchPartitioner {
 pub struct RepartitionExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
-    /// Inner state that is initialized when the first output stream is 
created.
-    state: LazyState,
+    /// Inner state that is initialized when the parent calls .execute() on 
this node
+    /// and consumed as soon as the parent starts consuming this node.
+    state: Arc<Mutex<RepartitionExecState>>,
+    /// Stores whether the state has been initialized. Checking this 
AtomicBool is faster than

Review Comment:
   Could you be clear about what "initialized" means? Does it mean that `state` 
is in either `RepartitionExecState::InputStreamsInitialized`  or 
`RepartitionExecState::ConsumingInputStreams`?



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1298,15 +1347,9 @@ mod tests {
         let partitioning = Partitioning::RoundRobinBatch(1);
         let exec = RepartitionExec::try_new(Arc::new(input), 
partitioning).unwrap();
 
-        // Note: this should pass (the stream can be created) but the
-        // error when the input is executed should get passed back
-        let output_stream = exec.execute(0, task_ctx).unwrap();
-
         // Expect that an error is returned
-        let result_string = crate::common::collect(output_stream)
-            .await
-            .unwrap_err()
-            .to_string();
+        let result_string = exec.execute(0, 
task_ctx).err().unwrap().to_string();

Review Comment:
   I double checked and the `ErrorExec` throws an error on calls to `exec` (not 
when the stream is polled) so this change seems very reasonable to me
   



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -158,28 +225,17 @@ impl RepartitionExecState {
             ));
             spawned_tasks.push(wait_for_task);
         }
-
-        Self {
+        *self = Self::ConsumingInputStreams(ConsumingInputStreamsState {
             channels,
             abort_helper: Arc::new(spawned_tasks),
+        });
+        match self {
+            RepartitionExecState::ConsumingInputStreams(value) => Ok(value),
+            _ => unreachable!(),
         }
     }
 }
 
-/// Lazily initialized state
-///
-/// Note that the state is initialized ONCE for all partitions by a single 
task(thread).
-/// This may take a short while.  It is also like that multiple threads
-/// call execute at the same time, because we have just started "target 
partitions" tasks
-/// which is commonly set to the number of CPU cores and all call execute at 
the same time.
-///
-/// Thus, use a **tokio** `OnceCell` for this initialization so as not to 
waste CPU cycles
-/// in a mutex lock but instead allow other threads to do something useful.
-///
-/// Uses a parking_lot `Mutex` to control other accesses as they are very 
short duration
-///  (e.g. removing channels on completion) where the overhead of `await` is 
not warranted.
-type LazyState = Arc<tokio::sync::OnceCell<Mutex<RepartitionExecState>>>;

Review Comment:
   Is there a reason to remove the `LazyState` approach? The scenario described 
in the comments still seems applicable (even during `execute()` I think)
   
   FWIW I belive @crepererum is out this week so we would have to wait for next 
week for his input



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1496,7 +1539,14 @@ mod tests {
         });
         let batches_with_drop = 
crate::common::collect(output_stream1).await.unwrap();
 
-        assert_eq!(batches_without_drop, batches_with_drop);
+        fn sort(batch: Vec<RecordBatch>) -> Vec<RecordBatch> {
+            batch
+                .into_iter()
+                .sorted_by_key(|b| format!("{b:?}"))
+                .collect()
+        }
+
+        assert_eq!(sort(batches_without_drop), sort(batches_with_drop));

Review Comment:
   Maybe now the streams are actually subject to tokio's schedule where before 
it was the first that was executed 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to