gabotechs commented on code in PR #16093:
URL: https://github.com/apache/datafusion/pull/16093#discussion_r2096083785


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -158,28 +225,17 @@ impl RepartitionExecState {
             ));
             spawned_tasks.push(wait_for_task);
         }
-
-        Self {
+        *self = Self::ConsumingInputStreams(ConsumingInputStreamsState {
             channels,
             abort_helper: Arc::new(spawned_tasks),
+        });
+        match self {
+            RepartitionExecState::ConsumingInputStreams(value) => Ok(value),
+            _ => unreachable!(),
         }
     }
 }
 
-/// Lazily initialized state
-///
-/// Note that the state is initialized ONCE for all partitions by a single 
task(thread).
-/// This may take a short while.  It is also like that multiple threads
-/// call execute at the same time, because we have just started "target 
partitions" tasks
-/// which is commonly set to the number of CPU cores and all call execute at 
the same time.
-///
-/// Thus, use a **tokio** `OnceCell` for this initialization so as not to 
waste CPU cycles
-/// in a mutex lock but instead allow other threads to do something useful.
-///
-/// Uses a parking_lot `Mutex` to control other accesses as they are very 
short duration
-///  (e.g. removing channels on completion) where the overhead of `await` is 
not warranted.
-type LazyState = Arc<tokio::sync::OnceCell<Mutex<RepartitionExecState>>>;

Review Comment:
   Actually one of the main points of the PR is removing this `LazyState`. The 
issue is that `LazyState::get_or_init()` is an async method, and therefore, it 
needs to be called within an async context. 
   
   As PhysicalPlan::execute is not async, we are forced to initialized the 
`LazyState` inside the `future::stream::once(async move { ... })` block, which 
means that the `LazyState::get_or_init()` will not be called until the first 
message in the stream is polled, therefore delaying the `.execute()` call to 
the child input.
   
   I see that the purpose of introducing `LazyState` in 
https://github.com/apache/datafusion/pull/10009 was to reduce lock contention 
in `RepartitionExec::execute` calls, but my guess is that this can be more 
simply solved by just checking an `AtomicBool` in order to just lock the state 
once, letting any other threads continue the work without performing locks on 
RepartitionExec::execute, and therefore, allowing us to call `input.execute()` 
synchronously upon a `RepartitionExec::execute` call.
   
   Not sure if there's a middle term solution to this that allows us to keep 
the `LazyState`, I'll try to think of something, but otherwise I'm happy to 
wait for @crepererum's input next week



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to