Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
alamb commented on PR #16093: URL: https://github.com/apache/datafusion/pull/16093#issuecomment-2917050839 Looks all good to me, so let's go! 🚀 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
alamb merged PR #16093: URL: https://github.com/apache/datafusion/pull/16093 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107592672 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); +let state = Arc::clone(&self.state); +if !self.state_initialized.swap(true, Ordering::Relaxed) { +state.lock().ensure_input_streams_initialized( Review Comment: Pushed another small change for further simplifying the setup https://github.com/apache/datafusion/pull/16093/commits/21db4d5b0cb56c026d311b47d3fa4f7b3341f041 Removes the `state_initialized: Arc` in favor of optimistically locking the state: If the state is locked, it just moves on without blocking, otherwise, the lock is acquired -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107594083 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); +let state = Arc::clone(&self.state); +if !self.state_initialized.swap(true, Ordering::Relaxed) { +state.lock().ensure_input_streams_initialized( Review Comment: WDYT @crepererum? I'm also very open to simpler approaches -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107588557 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); +let state = Arc::clone(&self.state); +if !self.state_initialized.swap(true, Ordering::Relaxed) { +state.lock().ensure_input_streams_initialized( Review Comment: Pushed a small change in https://github.com/apache/datafusion/pull/16093/commits/1b276cc15683d5b199ead4f5970c4e9c9cb0542b. Now: 1. Thread 1 and 2 call `.execute()` at the same time 2. Thread 1 wins the atomic bool, but still has not acquired the lock (it's about to, it's just on the next line) 3. while Thread 1 is between lines 655 and 656 Thread 2: - skips the if statement - builds the stream - polls the stream - acquires the lock 4. Thread 1 finally moves from line 655 to 656 and tries to acquire the lock, but it's locked by Thread 2, so it waits until it gets released 5. Thread 2 does all the initialization work inside the stream polling, releasing the lock 6. Thread 1 acquires the lock, but there's nothing to do, as all the initialization already happened, so it moves on -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
crepererum commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107279416 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); +let state = Arc::clone(&self.state); +if !self.state_initialized.swap(true, Ordering::Relaxed) { +state.lock().ensure_input_streams_initialized( Review Comment: > Seems highly unlikely (we've run this under high load for a while and it was never happened) I don't fancy to hunt Heisenbugs in production. Esp. some cloud deployments w/ low/partial CPU allocations might be subject to this issue. You can easily stall a thread of a few milliseconds. > I'm not sure if we can replicate this reliably in a unit test Fair, but the fact that the code is strongly typed and allows that to happen is reason to change it. I'm OK w/o a test then. The expected behavior would then be that you might need to block during stream polling (because thread 1 may still have the lock and runs the preparation step or the said poller needs to do that). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107240985 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); +let state = Arc::clone(&self.state); +if !self.state_initialized.swap(true, Ordering::Relaxed) { +state.lock().ensure_input_streams_initialized( Review Comment: 🤔 The only way that I imagine this to be racy is: 1. Thread 1 and 2 call `.execute()` at the same time 2. Thread 1 wins the atomic bool, but still has not acquired the lock (it's about to, it's just on the next line) 3. while Thread 1 is between lines 655 and 656 Thread 2: - skips the if statement - builds the stream - polls the stream - acquires the lock - fails 4. Thread 1 finally moves from line 655 to 656 and tries to acquire the lock, but it's too late now Seems highly unlikely (we've run this under high load for a while and it was never happened), and I'm not sure if we can replicate this reliably in a unit test, but is theoretically possible. A couple of options come to mind: - Remove the AtomicBool, but then we are back again on locking every thread - Use a `tokio::sync::oneshot` channel that allows awaiting for something to be there during polling > which TBH might have been a somewhat unfortunate choice, I would rather call it once and return a vector of streams. 👍 This would greatly simplify the approach -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
crepererum commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2106955247 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); +let state = Arc::clone(&self.state); +if !self.state_initialized.swap(true, Ordering::Relaxed) { +state.lock().ensure_input_streams_initialized( Review Comment: The observation done by @gabotechs here https://github.com/apache/datafusion/pull/16093#discussion_r2096083785 is correct: 1. The original intention was to make `execute` not content. 2. The delay is indeed a breakage of the API contract. However I want to broaden the discussion a bit: This is racy: the first thread could check the boolean but not get the lock, while other threads skip the IF body and start to poll, at which point they get the lock (around line 670 WITH PR applied, around the comment "lock mutexes") and now you're blocking the async IO runtime with initialization work, or with your implementation you just get an error `RepartitionExecState::init_input_streams must be called before consuming input streams` (this needs to be fixed before merging because this might happen under high load!). And even if you would find a non-racy API that combines the boolean w/ the lock, you still have the same semantic race. I think the question is: are we allowed to poll streams even if not all `execute` calls finished? I would say: yes. Like in theory you could even interleave: 1. execute partition 0 2. poll partition 0 3. execute partition 1 4. poll partition 1 5. ... I think in general, this problem cannot be fully fixed though: you either block during `execute` or you potentially block during `poll`, at least as long the `execute` method needs to be called PER PARTITION -- which TBH might have been a somewhat unfortunate choice, I would rather call it once and return a vector of streams. So the question is: is this PR better then the status quo? I would say yes, but I would like to see at least one additional test to simulate the race described above so it doesn't error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2096093535 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -1496,7 +1539,14 @@ mod tests { }); let batches_with_drop = crate::common::collect(output_stream1).await.unwrap(); -assert_eq!(batches_without_drop, batches_with_drop); +fn sort(batch: Vec) -> Vec { +batch +.into_iter() +.sorted_by_key(|b| format!("{b:?}")) +.collect() +} + +assert_eq!(sort(batches_without_drop), sort(batches_with_drop)); Review Comment: 🤔 they are yeah, not 100% sure why they weren't before though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2096083785 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -158,28 +225,17 @@ impl RepartitionExecState { )); spawned_tasks.push(wait_for_task); } - -Self { +*self = Self::ConsumingInputStreams(ConsumingInputStreamsState { channels, abort_helper: Arc::new(spawned_tasks), +}); +match self { +RepartitionExecState::ConsumingInputStreams(value) => Ok(value), +_ => unreachable!(), } } } -/// Lazily initialized state -/// -/// Note that the state is initialized ONCE for all partitions by a single task(thread). -/// This may take a short while. It is also like that multiple threads -/// call execute at the same time, because we have just started "target partitions" tasks -/// which is commonly set to the number of CPU cores and all call execute at the same time. -/// -/// Thus, use a **tokio** `OnceCell` for this initialization so as not to waste CPU cycles -/// in a mutex lock but instead allow other threads to do something useful. -/// -/// Uses a parking_lot `Mutex` to control other accesses as they are very short duration -/// (e.g. removing channels on completion) where the overhead of `await` is not warranted. -type LazyState = Arc>>; Review Comment: Actually one of the main points of the PR is removing this `LazyState`. The issue is that `LazyState::get_or_init()` is an async method, and therefore, it needs to be called within an async context. As `PhysicalPlan::execute` is not async, we are forced to initialized the `LazyState` inside the `future::stream::once(async move { ... })` block, which means that the `LazyState::get_or_init()` will not be called until the first message in the stream is polled, therefore delaying the `.execute()` call to the child input. I see that the purpose of introducing `LazyState` in https://github.com/apache/datafusion/pull/10009 was to reduce lock contention in `RepartitionExec::execute` calls, but my guess is that this can be more simply solved by just checking an `AtomicBool` in order to just lock the state once, letting any other threads continue the work without performing locks on `RepartitionExec::execute`, and therefore, allowing us to call `input.execute()` synchronously upon a `RepartitionExec::execute` call. Not sure if there's a middle term solution to this that allows us to keep the `LazyState`, I'll try to think of something, but otherwise I'm happy to wait for @crepererum's input next week -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2096083785 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -158,28 +225,17 @@ impl RepartitionExecState { )); spawned_tasks.push(wait_for_task); } - -Self { +*self = Self::ConsumingInputStreams(ConsumingInputStreamsState { channels, abort_helper: Arc::new(spawned_tasks), +}); +match self { +RepartitionExecState::ConsumingInputStreams(value) => Ok(value), +_ => unreachable!(), } } } -/// Lazily initialized state -/// -/// Note that the state is initialized ONCE for all partitions by a single task(thread). -/// This may take a short while. It is also like that multiple threads -/// call execute at the same time, because we have just started "target partitions" tasks -/// which is commonly set to the number of CPU cores and all call execute at the same time. -/// -/// Thus, use a **tokio** `OnceCell` for this initialization so as not to waste CPU cycles -/// in a mutex lock but instead allow other threads to do something useful. -/// -/// Uses a parking_lot `Mutex` to control other accesses as they are very short duration -/// (e.g. removing channels on completion) where the overhead of `await` is not warranted. -type LazyState = Arc>>; Review Comment: Actually one of the main points of the PR is removing this `LazyState`. The issue is that `LazyState::get_or_init()` is an async method, and therefore, it needs to be called within an async context. As `PhysicalPlan::execute` is not async, we are forced to initialized the `LazyState` inside the `future::stream::once(async move { ... })` block, which means that the `LazyState::get_or_init()` will not be called until the first message in the stream is polled, therefore delaying the `.execute()` call to the child input. I see that the purpose of introducing `LazyState` in https://github.com/apache/datafusion/pull/10009 was to reduce lock contention in `RepartitionExec::execute` calls, but my guess is that this can be more simply solved by just checking an `AtomicBool` in order to just lock the state once, letting any other threads continue the work without performing locks on RepartitionExec::execute, and therefore, allowing us to call `input.execute()` synchronously upon a `RepartitionExec::execute` call. Not sure if there's a middle term solution to this that allows us to keep the `LazyState`, I'll try to think of something, but otherwise I'm happy to wait for @crepererum's input next week -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2096086285 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -406,8 +462,12 @@ impl BatchPartitioner { pub struct RepartitionExec { /// Input execution plan input: Arc, -/// Inner state that is initialized when the first output stream is created. -state: LazyState, +/// Inner state that is initialized when the parent calls .execute() on this node +/// and consumed as soon as the parent starts consuming this node. +state: Arc>, +/// Stores whether the state has been initialized. Checking this AtomicBool is faster than Review Comment: yes, exactly that, I'll clarify in the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2096083785 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -158,28 +225,17 @@ impl RepartitionExecState { )); spawned_tasks.push(wait_for_task); } - -Self { +*self = Self::ConsumingInputStreams(ConsumingInputStreamsState { channels, abort_helper: Arc::new(spawned_tasks), +}); +match self { +RepartitionExecState::ConsumingInputStreams(value) => Ok(value), +_ => unreachable!(), } } } -/// Lazily initialized state -/// -/// Note that the state is initialized ONCE for all partitions by a single task(thread). -/// This may take a short while. It is also like that multiple threads -/// call execute at the same time, because we have just started "target partitions" tasks -/// which is commonly set to the number of CPU cores and all call execute at the same time. -/// -/// Thus, use a **tokio** `OnceCell` for this initialization so as not to waste CPU cycles -/// in a mutex lock but instead allow other threads to do something useful. -/// -/// Uses a parking_lot `Mutex` to control other accesses as they are very short duration -/// (e.g. removing channels on completion) where the overhead of `await` is not warranted. -type LazyState = Arc>>; Review Comment: Actually one of the main points of the PR is removing this `LazyState`. The issue is that `LazyState::get_or_init()` is an async method, and therefore, it needs to be called within an async context. As PhysicalPlan::execute is not async, we are forced to initialized the `LazyState` inside the `future::stream::once(async move { ... })` block, which means that the `LazyState::get_or_init()` will not be called until the first message in the stream is polled, therefore delaying the `.execute()` call to the child input. I see that the purpose of introducing `LazyState` in https://github.com/apache/datafusion/pull/10009 was to reduce lock contention in `RepartitionExec::execute` calls, but my guess is that this can be more simply solved by just checking an `AtomicBool` in order to just lock the state once, letting any other threads continue the work without performing locks on RepartitionExec::execute, and therefore, allowing us to call `input.execute()` synchronously upon a `RepartitionExec::execute` call. Not sure if there's a middle term solution to this that allows us to keep the `LazyState`, I'll try to think of something, but otherwise I'm happy to wait for @crepererum's input next week -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
alamb commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2095992822 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -406,8 +462,12 @@ impl BatchPartitioner { pub struct RepartitionExec { /// Input execution plan input: Arc, -/// Inner state that is initialized when the first output stream is created. -state: LazyState, +/// Inner state that is initialized when the parent calls .execute() on this node +/// and consumed as soon as the parent starts consuming this node. +state: Arc>, +/// Stores whether the state has been initialized. Checking this AtomicBool is faster than Review Comment: Could you be clear about what "initialized" means? Does it mean that `state` is in either `RepartitionExecState::InputStreamsInitialized` or `RepartitionExecState::ConsumingInputStreams`? ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -1298,15 +1347,9 @@ mod tests { let partitioning = Partitioning::RoundRobinBatch(1); let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); -// Note: this should pass (the stream can be created) but the -// error when the input is executed should get passed back -let output_stream = exec.execute(0, task_ctx).unwrap(); - // Expect that an error is returned -let result_string = crate::common::collect(output_stream) -.await -.unwrap_err() -.to_string(); +let result_string = exec.execute(0, task_ctx).err().unwrap().to_string(); Review Comment: I double checked and the `ErrorExec` throws an error on calls to `exec` (not when the stream is polled) so this change seems very reasonable to me ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -158,28 +225,17 @@ impl RepartitionExecState { )); spawned_tasks.push(wait_for_task); } - -Self { +*self = Self::ConsumingInputStreams(ConsumingInputStreamsState { channels, abort_helper: Arc::new(spawned_tasks), +}); +match self { +RepartitionExecState::ConsumingInputStreams(value) => Ok(value), +_ => unreachable!(), } } } -/// Lazily initialized state -/// -/// Note that the state is initialized ONCE for all partitions by a single task(thread). -/// This may take a short while. It is also like that multiple threads -/// call execute at the same time, because we have just started "target partitions" tasks -/// which is commonly set to the number of CPU cores and all call execute at the same time. -/// -/// Thus, use a **tokio** `OnceCell` for this initialization so as not to waste CPU cycles -/// in a mutex lock but instead allow other threads to do something useful. -/// -/// Uses a parking_lot `Mutex` to control other accesses as they are very short duration -/// (e.g. removing channels on completion) where the overhead of `await` is not warranted. -type LazyState = Arc>>; Review Comment: Is there a reason to remove the `LazyState` approach? The scenario described in the comments still seems applicable (even during `execute()` I think) FWIW I belive @crepererum is out this week so we would have to wait for next week for his input ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -1496,7 +1539,14 @@ mod tests { }); let batches_with_drop = crate::common::collect(output_stream1).await.unwrap(); -assert_eq!(batches_without_drop, batches_with_drop); +fn sort(batch: Vec) -> Vec { +batch +.into_iter() +.sorted_by_key(|b| format!("{b:?}")) +.collect() +} + +assert_eq!(sort(batches_without_drop), sort(batches_with_drop)); Review Comment: Maybe now the streams are actually subject to tokio's schedule where before it was the first that was executed 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2095419949 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -158,28 +225,17 @@ impl RepartitionExecState { )); spawned_tasks.push(wait_for_task); } - -Self { +*self = Self::ConsumingInputStreams(ConsumingInputStreamsState { channels, abort_helper: Arc::new(spawned_tasks), +}); +match self { +RepartitionExecState::ConsumingInputStreams(value) => Ok(value), +_ => unreachable!(), } } } -/// Lazily initialized state -/// -/// Note that the state is initialized ONCE for all partitions by a single task(thread). -/// This may take a short while. It is also like that multiple threads -/// call execute at the same time, because we have just started "target partitions" tasks -/// which is commonly set to the number of CPU cores and all call execute at the same time. -/// -/// Thus, use a **tokio** `OnceCell` for this initialization so as not to waste CPU cycles -/// in a mutex lock but instead allow other threads to do something useful. -/// -/// Uses a parking_lot `Mutex` to control other accesses as they are very short duration -/// (e.g. removing channels on completion) where the overhead of `await` is not warranted. -type LazyState = Arc>>; Review Comment: cc @crepererum as you are the original author of this approach -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Propagate .execute() calls immediately in `RepartitionExec` [datafusion]
gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2095320415 ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -1298,15 +1347,9 @@ mod tests { let partitioning = Partitioning::RoundRobinBatch(1); let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); -// Note: this should pass (the stream can be created) but the -// error when the input is executed should get passed back -let output_stream = exec.execute(0, task_ctx).unwrap(); - // Expect that an error is returned -let result_string = crate::common::collect(output_stream) -.await -.unwrap_err() -.to_string(); +let result_string = exec.execute(0, task_ctx).err().unwrap().to_string(); Review Comment: The error is now propagated immediately, rather than lazily upon the first message poll ## datafusion/physical-plan/src/repartition/mod.rs: ## @@ -1496,7 +1539,14 @@ mod tests { }); let batches_with_drop = crate::common::collect(output_stream1).await.unwrap(); -assert_eq!(batches_without_drop, batches_with_drop); +fn sort(batch: Vec) -> Vec { +batch +.into_iter() +.sorted_by_key(|b| format!("{b:?}")) +.collect() +} + +assert_eq!(sort(batches_without_drop), sort(batches_with_drop)); Review Comment: Found this test to be flaky after my changes. Just ordering the record batches removes the flakiness. Not sure why this started being flaky now though... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org