alamb commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2095992822
########## datafusion/physical-plan/src/repartition/mod.rs: ########## @@ -406,8 +462,12 @@ impl BatchPartitioner { pub struct RepartitionExec { /// Input execution plan input: Arc<dyn ExecutionPlan>, - /// Inner state that is initialized when the first output stream is created. - state: LazyState, + /// Inner state that is initialized when the parent calls .execute() on this node + /// and consumed as soon as the parent starts consuming this node. + state: Arc<Mutex<RepartitionExecState>>, + /// Stores whether the state has been initialized. Checking this AtomicBool is faster than Review Comment: Could you be clear about what "initialized" means? Does it mean that `state` is in either `RepartitionExecState::InputStreamsInitialized` or `RepartitionExecState::ConsumingInputStreams`? ########## datafusion/physical-plan/src/repartition/mod.rs: ########## @@ -1298,15 +1347,9 @@ mod tests { let partitioning = Partitioning::RoundRobinBatch(1); let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); - // Note: this should pass (the stream can be created) but the - // error when the input is executed should get passed back - let output_stream = exec.execute(0, task_ctx).unwrap(); - // Expect that an error is returned - let result_string = crate::common::collect(output_stream) - .await - .unwrap_err() - .to_string(); + let result_string = exec.execute(0, task_ctx).err().unwrap().to_string(); Review Comment: I double checked and the `ErrorExec` throws an error on calls to `exec` (not when the stream is polled) so this change seems very reasonable to me ########## datafusion/physical-plan/src/repartition/mod.rs: ########## @@ -158,28 +225,17 @@ impl RepartitionExecState { )); spawned_tasks.push(wait_for_task); } - - Self { + *self = Self::ConsumingInputStreams(ConsumingInputStreamsState { channels, abort_helper: Arc::new(spawned_tasks), + }); + match self { + RepartitionExecState::ConsumingInputStreams(value) => Ok(value), + _ => unreachable!(), } } } -/// Lazily initialized state -/// -/// Note that the state is initialized ONCE for all partitions by a single task(thread). -/// This may take a short while. It is also like that multiple threads -/// call execute at the same time, because we have just started "target partitions" tasks -/// which is commonly set to the number of CPU cores and all call execute at the same time. -/// -/// Thus, use a **tokio** `OnceCell` for this initialization so as not to waste CPU cycles -/// in a mutex lock but instead allow other threads to do something useful. -/// -/// Uses a parking_lot `Mutex` to control other accesses as they are very short duration -/// (e.g. removing channels on completion) where the overhead of `await` is not warranted. -type LazyState = Arc<tokio::sync::OnceCell<Mutex<RepartitionExecState>>>; Review Comment: Is there a reason to remove the `LazyState` approach? The scenario described in the comments still seems applicable (even during `execute()` I think) FWIW I belive @crepererum is out this week so we would have to wait for next week for his input ########## datafusion/physical-plan/src/repartition/mod.rs: ########## @@ -1496,7 +1539,14 @@ mod tests { }); let batches_with_drop = crate::common::collect(output_stream1).await.unwrap(); - assert_eq!(batches_without_drop, batches_with_drop); + fn sort(batch: Vec<RecordBatch>) -> Vec<RecordBatch> { + batch + .into_iter() + .sorted_by_key(|b| format!("{b:?}")) + .collect() + } + + assert_eq!(sort(batches_without_drop), sort(batches_with_drop)); Review Comment: Maybe now the streams are actually subject to tokio's schedule where before it was the first that was executed 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org