gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2096083785
########## datafusion/physical-plan/src/repartition/mod.rs: ########## @@ -158,28 +225,17 @@ impl RepartitionExecState { )); spawned_tasks.push(wait_for_task); } - - Self { + *self = Self::ConsumingInputStreams(ConsumingInputStreamsState { channels, abort_helper: Arc::new(spawned_tasks), + }); + match self { + RepartitionExecState::ConsumingInputStreams(value) => Ok(value), + _ => unreachable!(), } } } -/// Lazily initialized state -/// -/// Note that the state is initialized ONCE for all partitions by a single task(thread). -/// This may take a short while. It is also like that multiple threads -/// call execute at the same time, because we have just started "target partitions" tasks -/// which is commonly set to the number of CPU cores and all call execute at the same time. -/// -/// Thus, use a **tokio** `OnceCell` for this initialization so as not to waste CPU cycles -/// in a mutex lock but instead allow other threads to do something useful. -/// -/// Uses a parking_lot `Mutex` to control other accesses as they are very short duration -/// (e.g. removing channels on completion) where the overhead of `await` is not warranted. -type LazyState = Arc<tokio::sync::OnceCell<Mutex<RepartitionExecState>>>; Review Comment: Actually one of the main points of the PR is removing this `LazyState`. The issue is that `LazyState::get_or_init()` is an async method, and therefore, it needs to be called within an async context. As `PhysicalPlan::execute` is not async, we are forced to initialized the `LazyState` inside the `future::stream::once(async move { ... })` block, which means that the `LazyState::get_or_init()` will not be called until the first message in the stream is polled, therefore delaying the `.execute()` call to the child input. I see that the purpose of introducing `LazyState` in https://github.com/apache/datafusion/pull/10009 was to reduce lock contention in `RepartitionExec::execute` calls, but my guess is that this can be more simply solved by just checking an `AtomicBool` in order to just lock the state once, letting any other threads continue the work without performing locks on `RepartitionExec::execute`, and therefore, allowing us to call `input.execute()` synchronously upon a `RepartitionExec::execute` call. Not sure if there's a middle term solution to this that allows us to keep the `LazyState`, I'll try to think of something, but otherwise I'm happy to wait for @crepererum's input next week -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org