Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-12 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2966838912

   I don't think it's necessary TBH. I applied this patch (which I think is 
what @berkaysynnada meant) and the test then fails in the way it's intended to.
   
   ```
   Index: datafusion/physical-plan/src/sorts/merge.rs
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===
   diff --git a/datafusion/physical-plan/src/sorts/merge.rs 
b/datafusion/physical-plan/src/sorts/merge.rs
   --- a/datafusion/physical-plan/src/sorts/merge.rs(revision 
31c570e3ee7fa830753b2bbab3ec1a635ef16a30)
   +++ b/datafusion/physical-plan/src/sorts/merge.rs(date 1749736195516)
   @@ -227,7 +227,8 @@
// The polled stream is pending which means we're 
already set up to
// be woken when necessary
// Try the next stream
   -idx += 1;
   +// idx += 1;
   +return Poll::Pending;
}
_ => {
// The polled stream is ready
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-12 Thread via GitHub


alamb commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2966502775

   Is there an additional test we should write perhaps, to add the coverage 
@berkaysynnada suggests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-12 Thread via GitHub


alamb merged PR #16322:
URL: https://github.com/apache/datafusion/pull/16322


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-12 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2965650342

   > if the pending rotation somehow breaks, since SortPreservingMergeStream 
never yields
   
   I'm not sure I understand what you mean @berkaysynnada. Looking at just the 
initial phase it should still yield after each round of polling if there are 
any pending streams.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-12 Thread via GitHub


berkaysynnada commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2965238217

   Sorry for the late reply. @pepijnve Your diagnosis is spot on, and the 
proposed fix totally makes sense. I honestly can’t recall why I added the wake 
there but not in CongestedStream, most likely I just overlooked it. It's a 
really impactful gain, thank you. 
   One small note regarding this PR: if the pending rotation somehow breaks, 
since SortPreservingMergeStream never yields, tests relying on tokio::timeout 
(like test_spm_congestion) may fail to trigger the timeout signal. That said, I 
don’t think it’s a big deal to address right now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-12 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2965788732

   If think I've got you covered. Try commenting out 
https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs#L1313
 and running the test. This doesn't really test the rotation per se, but it 
keeps all the streams congested. The timeout triggers and the test reports 
`SortPreservingMerge caused a deadlock` for me. Is that sufficient?
   
   I believe I had the test hang you described while I was working on this and 
adapted the test case a bit to make sure the safety net of the test stays in 
place.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-12 Thread via GitHub


berkaysynnada commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2965717965

   > > if the pending rotation somehow breaks, since SortPreservingMergeStream 
never yields
   > 
   > I'm not sure I understand what you mean @berkaysynnada. Looking at just 
the initial phase it should still yield after each round of polling if there 
are any pending streams.
   
   I mean when we remove this incremental (that's how I test that 
test_spm_congestion covers this rotation): 
   ```
// The polled stream is pending which means we're 
already set up to
// be woken when necessary
// Try the next stream
idx += 1;
   ```
   the test never finishes. But as I said, not a big deal as the test is 
written in that way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


alamb commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2965063028

   Thanks again @pepijnve @Dandandan @ozankabak and @zhuqi-lucas 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


alamb commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2965062758

   > I just checked, and SortExec is also setting up RecordBatchReceiverStream. 
The worst case scenario in terms of elapsed time in the poll_next call is that 
all 10k streams are ready in one cycle. This will trigger 10k cursor 
initialization which does some non-trivial work converting the record batch. 
But the current code is doing exactly the same thing today already so it's no 
worse than the status quo as far as I can tell.
   
   For what it is worth, trying to merge 10k streams will be bad for a lot of 
reasons (merge is linear in the number of input streams)
   
   From my perspective this PR has concrete measurements that show it is faster 
and mostly theoretical conclusions that it is no worse as well.
   
   Therefore I think it is good to go and will merge it. We can adjust / change 
/ further optimize as we move on an get any additional information


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


zhuqi-lucas commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2962001843

   > > they are ultimately based on Tokio's channel mechanism, which relies on 
Linux's epoll, so it's still very efficient
   > 
   > I'm pretty sure that's not the case. There are no file descriptors 
involved in this as far as I can tell, it's just a bunch of user-space mpsc 
queues and semaphores (see implementation of `RecordBatchReceiverStream`, 
`tokio::sync::mpsc::Receiver`, and `tokio::sync::mpsc::Chan`). Kernel-based 
polling isn't involved here.
   
   Ok i see, i am not checking the sort merge spilling files case,  if it's not 
the case, it should be small for cpu to run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2961836884

   > they are ultimately based on Tokio's channel mechanism, which relies on 
Linux's epoll, so it's still very efficient
   
   I'm pretty sure that's not the case. There are no file descriptors involved 
in this as far as I can tell, it's just a bunch of user-space mpsc queues and 
semaphores (see implementation of `RecordBatchReceiverStream`, 
`tokio::sync::mpsc::Receiver`, and `tokio::sync::mpsc::Chan`). Kernel-based 
polling isn't involved here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


zhuqi-lucas commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2961694324

   > I just checked, and SortExec is also setting up RecordBatchReceiverStream. 
The worst case scenario in terms of elapsed time in the poll_next call is that 
all 10k streams are ready in one cycle. This will trigger 10k cursor 
initialization which does some non-trivial work converting the record batch. 
But the current code is doing exactly the same thing today already so it's no 
worse than the status quo as far as I can tell.
   
   @pepijnve  This is a good explain, so even 10k streams are behind 
RecordBatchReceiverStream, it's linux epoll based, it's very efficient.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


zhuqi-lucas commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2961608771

   > > Especially in high-parallelism scenarios where polling all streams on 
every wake-up is unnecessarily expensive.
   > 
   > @zhuqi-lucas my assumption is that in high-parallelism situations all 
those streams are distinct tasks with a channel on the end. I that incorrect? 
And is checking a channel for readiness such an expensive operation?
   > 
   > What we're doing here is, as far as I understand it, a bespoke version of 
`tokio::select!` on 10k channels in the end. That should work, shouldn't it?
   
   @pepijnve 
   Just saw the update for max 10k channels, i think it's harmless for 
performance, i agree we can also run sort-tpch to see the result.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


Dandandan commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2961591627

   Perhaps we can run the sorting benchmarks (tpch_sort) as well just to be 
sure there are no regressions? AFAIU there shouldn't be but let's see


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2961574986

   > Especially in high-parallelism scenarios where polling all streams on 
every wake-up is unnecessarily expensive.
   
   @zhuqi-lucas my assumption is that in high-parallelism situations all those 
streams are distinct tasks with a channel on the end. I that incorrect? And is 
checking a channel for readiness such an expensive operation?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


zhuqi-lucas commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2961564872

   > > Previously, as soon as one stream returned Pending, the merge would 
short-circuit and return Pending, minimizing work per cycle. With the new 
approach, we now poll all N streams once before giving up, which can add extra 
CPU cost, especially when you have high parallelism
   > 
   > @zhuqi-lucas this is a really interesting topic and tricky tradeoff. The 
previous code would indeed do one poll per cycle and then do a self-wake yield. 
The new code polls each stream once and then yields without self-wake. In terms 
of the amount of work done there is no difference, not any extra CPU cost. On 
the contrary, the self-wake yield is removal of pure waste. Unless I'm missing 
something, total elapsed time should be less.
   > 
   > But the tradeoff is that we're going to extend the elapsed time per cycle. 
As a consequence it may take a bit longer to yield to the caller, and then the 
cancellation problem rears its ugly head again.
   > 
   > If this actually matters or not is entirely dependent on the streams being 
polled. SortPreservingMergeExec for instance lets things up to use 
`RecordBatchReceiverStream` instances as children. Polling those repeatedly 
until the first record batch arrives is quite wasteful because you're really 
just spinning in place. Checking each receiver in a loop is going to be super 
quick, and yielding in between every partition is not useful at all. If the 
child streams are blocking though, then it's a different matter. You probably 
don't want to actually drive the progress of each stream synchronously in a 
loop.
   > 
   > So... it's a complicated balancing act. Would love to hear how others look 
at this problem. PR #16319 ties into this. It's an experiment I'm doing to see 
if we can avoid exposing the potentially blocking portion of streams to the 
caller so that the problem described above kind of disappears. It's not yet 
clear if this can be achieved without a performance penalty.
   
   Thank you @pepijnve , some trade-off solution may be:
   
   ```rust
   Trade-off Strategy : Batch Polling
   
   Approach that sits between the two extremes is batch polling.
   Instead of polling all streams every time, you divide the N child streams 
into several groups (e.g., 3 groups).
   
   On each poll cycle, you only poll one group of streams.
   
   On the next cycle, you move to the next group, and so on.
   
   This helps reduce the per-cycle polling cost while still making steady 
progress across all streams over time.
   
   Especially in high-parallelism scenarios where polling all streams on every 
wake-up is unnecessarily expensive.
   ```
   
   And our TPC-H is not enough i believe, we may need to mock those cases... 
But it's really hard.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2961538845

   > Previously, as soon as one stream returned Pending, the merge would 
short-circuit and return Pending, minimizing work per cycle. With the new 
approach, we now poll all N streams once before giving up, which can add extra 
CPU cost, especially when you have high parallelism
   
   @zhuqi-lucas this is a really interesting topic and tricky tradeoff. The 
previous code would indeed do one poll per cycle and then do a self-wake yield. 
The new code polls each stream once and then yields without self-wake.
   In terms of the amount of work done there is no difference, not any extra 
CPU cost. On the contrary, the self-wake yield is removal of pure waste. Unless 
I'm missing something, total elapsed time should be less.
   
   But the tradeoff is that we're going to extend the elapsed time per cycle. 
As a consequence it may take a bit longer to yield to the caller, and then the 
cancellation problem rears its ugly head again.
   
   If this actually matters or not is entirely dependent on the streams being 
polled.
   SortPreservingMergeExec for instance lets things up to use 
`RecordBatchReceiverStream` instances as children. Polling those repeatedly 
until the first record batch arrives is quite wasteful because you're really 
just spinning in place. Checking each receiver in a loop is going to be super 
quick, and yielding in between every partition is not useful at all.
   If the child streams are blocking though, then it's a different matter. You 
probably don't want to actually drive the progress of each stream synchronously 
in a loop.
   
   So... it's a complicated balancing act. Would love to hear how others look 
at this problem.
   PR #16319 ties into this. It's an experiment I'm doing to see if we can 
avoid exposing the potentially blocking portion of streams to the caller so 
that the problem described above kind of disappears. It's not yet clear if this 
can be achieved without a performance penalty.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137449500


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   > I'm new to this, so maybe my mental model is completely wrong, but I don't 
think that's correct. This is not an async function. It will iterate over all 
the partitions once per call to poll_next_inner and return at line 257 unless 
uninitiated_partitions is empty.
   
   AFAIK, `uninitiated_partitions` is only constructed once for each input 
partition.
   So, if I understand correctly, during polling SPM, all input partitions are 
polled once (after this change) when they are needed first and removed from the 
list.
   
   So, I think we could slightly simplify it by just tracking a partition index 
rather than a `Vec`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137478983


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   I think you came to the same conclusion in the meantime, but I had typed 
this already. Just FYI for anyone following along.
   
   `SortPreservingMergeStream` has one `CursorStream` with `n` partitions. 
`SortPreservingMergeStream` has a single `uninitiated_partitions: Vec`; not one 
per partition. This vec contains the indices of the partitions, so `[0, 1, 2, 
3, ..., n]`.
   
   `SortPreservingMergeStream::poll_next` has two states: waiting for all 
partitions ready (wait for short), and merge. The state transition condition 
from wait to merge is that all partitions have been polled and returned ready 
exactly once. Additionally you want to ensure you poll the partitions 
round-robin.
   
   So what the wait state does is iterate over every remaining partition index 
in `uninitiated_partitions` once. That gives you the round-robin property. It 
calls poll_next for each partition and if a partition returns ready it removes  
the partition index from `uninitiated_partitions`. If it returns pending the 
index is retained. Then we move on to the next partition. At the end of the 
loop you check if `uninitiated_partitions` is empty. If so, go to merge state, 
if not stay in wait and return pending.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137446995


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   Suppose the `uninitiated_partitions` is `[1, 2, 3]` and in the first round 1 
and 3 are pending, but 2 is ready. Next round you only want to poll 1 and 3 
again. How would you keep track of that with just a counter?



##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   Suppose `uninitiated_partitions` is `[1, 2, 3]` and in the first round 1 and 
3 are pending, but 2 is ready. Next round you only want to poll 1 and 3 again. 
How would you keep track of that with just a counter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137523815


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137438301


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   I'm new to this, so maybe my mental model is completely wrong, but I don't 
think that's correct. This is not an async function. It will iterate over all 
the partitions once per call to `poll_next_inner` and return at line 257.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137423047


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   I mean we basically iterate once through all the partitions when 
"initializing" this function.
   So instead of having a `uninitiated_partitions: Vec` we could e.g. 
have a `is_initialized: bool` and traverse the partitions from left `idx` to 
right for all input partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137478983


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   I think you came to the same conclusion in the meantime, but I had typed 
this already. Just FYI for anyone following along.
   
   `SortPreservingMergeStream` has one `CursorStream` with `n` partitions. 
`SortPreservingMergeStream` has a single `uninitiated_partitions: Vec`; not one 
per partition. This vec contains the indices of the partitions, so `[0, 1, 2, 
3, ..., n]`.
   
   `SortPreservingMergeStream::poll_next` has two states: waiting for all 
partitions ready (wait for short), and merge. The state transition condition 
from wait to merge is that all partitions have been polled and returned ready 
exactly once. Additionally you want to ensure you poll the partitions 
round-robin.
   
   So what the wait state does is iterate over every remaining partition index 
in `uninitiated_partitions` once. That gives you the round-robin property. It 
calls poll_next for the partition in question and if that returns ready it 
removes  the partition index from `uninitiated_partitions`. If it returns 
pending the index is retained. Then we move on to the next partition. At the 
end of the loop you check if `uninitiated_partitions` is empty. If so, go to 
merge state, if not stay in wait and return pending.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137474689


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   > Suppose uninitiated_partitions is [1, 2, 3] and in the first round 1 and 3 
are pending, but 2 is ready. Next round you only want to poll 1 and 3 again. 
How would you keep track of that with just a counter?
   
   Yeah, you're right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137438301


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   I'm new to this so maybe my mental model is completely wrong, but I don't 
think that's correct. This is not an async function. It will iterate over all 
the partitions once per call to `poll_next_inner` and return at line 257 unless 
`uninitiated_partitions` is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137439468


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   hm I see that we also return partitions.
   
   Maybe then it could be simplified to `uninitiated_partition: usize` - which 
just increases the partition index instead of keeping a `Vec` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137438301


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   I'm new to this, so maybe my mental model is completely wrong, but I don't 
think that's correct. This is not an async function. It will iterate over all 
the partitions once per call to `poll_next_inner` and return at line 257 unless 
`uninitiated_partitions` is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137384940


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   I wonder - this is now only done once in the initial loop? - can't we 
simplify it to just poll once in the start for every input partition (so we 
don't have to keep track of `uninitiated_partitions`)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137392057


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   I'm not sure I understand what you mean. When some streams are not ready, 
this function will still return pending and will be called again when one of 
the streams wakes the task. So the loop can still be hit multiple times and the 
current code intentionally does not poll streams that have already returned 
ready.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2137392057


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {

Review Comment:
   I'm not sure I understand what you mean. When some streams are not ready, 
this function will still return pending and will be called again when one of 
the streams wakes the task. So the loop can still be hit multiple times and the 
current code intentionally does not poll streams that have return ready again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2958304164

   Waiting for benchmarks results here so I have some time to write up my 
assessment of what was happening and what has changed. This is just to assist 
any reviewers, not to replace review or justify the changes.
   
    Initial test failure
   
   The congestion test set up three streams to merge:
   1. returns Ready(None) and panics if polled again. This tries to check that 
None is respected as terminal result
   2. return Pending until stream 3 has been polled, then always returns 
Ready(None)
   3. clears congestion for 2 and always returns Ready(None)
   
   After the initial change the congestion test started failing. The poll 
sequence I observed was
   1. stream1.poll_next -> None
   2. stream2.poll_next -> Pending
   3. stream3.poll_next -> Ready(None)
   
   and then the test hung. This turned out to be caused by CongestedStream 
returning pending, but not holding on to the waker to wake it when the 
congestion clears. The initial phase of SPM was now assuming it would be woken 
up by any stream that returned pending and that wasn't happening.
   
    CongestedStream fix
   
   I don't believe it's valid for a Stream to return pending but not set up 
waking, so I fixed that and then got this poll sequence.
   
   1. stream1.poll_next -> None
   2. stream2.poll_next -> Pending
   3. stream3.poll_next -> Ready(None)
   4. stream2.poll_next -> Ready(None)
   
   and the test passed.
   
    Using swap_remove
   
   Based on initial review feedback I restored the usage of swap_remove which 
results in the following sequence.
   
   1. stream1.poll_next -> None
   2. stream3.poll_next -> Ready(None)
   3. stream2.poll_next -> Ready(None)
   
   Due to swap_remove changing the poll order, stream 3 was getting polled 
before stream 2. As a consequence we never hit the congestion situation and the 
test case passes, but no longer tests what it says it does.
   
    Testing congestion again
   
   I adapted CongestedStream to keep track of the set of streams that have been 
polled. Both stream 2 and 3 are now treated as congested unless all streams 
have been polled. This results in the following sequence.
   
   1. stream1.poll_next -> None
   2. stream3.poll_next -> Pending
   3. stream2.poll_next -> Ready(None)
   4. stream3.poll_next -> Ready(None)
   
   The test case now hits the congestion point again. As a bonus the test is no 
longer dependent on the exact order in which the streams are polled which 
decouples it from the implementation details a bit more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-09 Thread via GitHub


ozankabak commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2955839724

   Since this changes the congestion behavior test, which I'm not deeply 
familiar with, let's hear from @berkaysynnada on this to make sure we are not 
losing anything. If he is OK with the change, 10% performance improvement would 
be great.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-09 Thread via GitHub


alamb commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2955730921

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and issue_16321
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃HEAD ┃ issue_16321 ┃Change ┃
   ┑━━╇━╇━╇━━━┩
   β”‚ QQuery 0 β”‚  1900.64 ms β”‚  1941.03 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚   698.77 ms β”‚   685.55 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚  1423.32 ms β”‚  1336.60 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 3 β”‚   708.54 ms β”‚   677.63 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚  1450.18 ms β”‚  1371.41 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 5 β”‚ 15851.43 ms β”‚ 14804.05 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 6 β”‚  2016.34 ms β”‚  2046.14 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚  2124.47 ms β”‚  1948.45 ms β”‚ +1.09x faster β”‚
   β”‚ QQuery 8 β”‚   845.96 ms β”‚   828.21 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”΄β”€β”€β”€β”˜
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┑╇┩
   β”‚ Total Time (HEAD)  β”‚ 27019.65ms β”‚
   β”‚ Total Time (issue_16321)   β”‚ 25639.07ms β”‚
   β”‚ Average Time (HEAD)β”‚  3002.18ms β”‚
   β”‚ Average Time (issue_16321) β”‚  2848.79ms β”‚
   β”‚ Queries Faster β”‚  4 β”‚
   β”‚ Queries Slower β”‚  0 β”‚
   β”‚ Queries with No Change β”‚  5 β”‚
   β”‚ Queries with Failure   β”‚  0 β”‚
   β””β”΄β”˜
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃HEAD ┃ issue_16321 ┃Change ┃
   ┑━━╇━╇━╇━━━┩
   β”‚ QQuery 0 β”‚14.98 ms β”‚15.25 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚31.43 ms β”‚32.60 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚81.15 ms β”‚80.01 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚97.82 ms β”‚99.49 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚   576.70 ms β”‚   584.19 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚   833.02 ms β”‚   841.56 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚23.21 ms β”‚23.25 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚38.30 ms β”‚34.79 ms β”‚ +1.10x faster β”‚
   β”‚ QQuery 8 β”‚   913.51 ms β”‚   898.29 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  1197.30 ms β”‚  1126.00 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 10β”‚   262.45 ms β”‚   248.43 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 11β”‚   297.77 ms β”‚   282.01 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 12β”‚   891.35 ms β”‚   868.87 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  1321.37 ms β”‚  1247.47 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 14β”‚   826.48 ms β”‚   809.29 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚   808.37 ms β”‚   754.75 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 16β”‚  1722.67 ms β”‚  1620.69 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 17β”‚  1594.99 ms β”‚  1593.08 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚  3101.36 ms β”‚  2892.25 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 19β”‚83.25 ms β”‚84.66 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  1121.24 ms β”‚  1146.38 ms β”‚ no change β”‚
   β”‚ QQuery 21β”‚  1313.87 ms β”‚  1298.49 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  2157.02 ms β”‚  2129.27 ms β”‚ no change β”‚
   β”‚ QQuery 23β”‚  7965.30 ms β”‚  7780.56 ms β”‚ no change β”‚
   β”‚ QQuery 24β”‚   465.54 ms β”‚   454.83 ms β”‚ no change β”‚
   β”‚ QQuery 25β”‚   394.55 ms β”‚   386.33 ms β”‚ no change β”‚
   β”‚ QQuery 26β”‚   537.44 ms β”‚   523.72 ms β”‚ no change β”‚
   β”‚ QQuery 27β”‚  1593.11 ms β”‚  1547.66 ms β”‚ no change β”‚
   β”‚ QQuery 28β”‚ 13504.42 ms β”‚ 11770.24 ms β”‚ +1.15x faster β”‚
   β”‚ QQuery 29β”‚   535.75 ms β”‚   535.55 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚   791.38 ms β”‚   784.48 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚   837.24 ms β”‚   809.86 ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚  2631.49 ms β”‚  2501.55 ms β”‚ no change β”‚
   β”‚ QQuery 33β”‚  3317.30 ms β”‚  3148.53 ms β”‚ +1.05x faster β”‚
   β”‚ QQuery 34β”‚  3329.56 ms β”‚  3233.21 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚  1265.38 ms β”‚  1238.90 ms β”‚ no change β”‚
   β”‚ QQuery 36β”‚   120.28 ms β”‚   120.88 ms β”‚ no change β”‚
   β”‚ QQuery 37β”‚57.17 ms β”‚56.21 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   122.92 ms β”‚   122.28 ms β”‚ no change β”‚
   β”‚ QQuery 39β”‚   198.52 ms β”‚   198.42 ms β”‚ no change β”‚
   β”‚ QQuery 40β”‚44.65 ms β”‚46.41 ms β”‚ no change β”‚
   β”‚ QQuery 41β”‚45.72 ms β”‚45.15 ms β”‚ no change β”‚
   β”‚ QQuery 42β”‚37.71 ms β”‚37.05 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”΄β”€β”€β”€β”˜
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┑╇┩
   β”‚ Total Time (HEAD)  β”‚ 57105.04ms β”‚
   β”‚ Total Time (issue_16321

Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-09 Thread via GitHub


alamb commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2135636293


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,50 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {
+let partition_idx = self.uninitiated_partitions[idx];
 match self.maybe_poll_stream(cx, partition_idx) {
 Poll::Ready(Err(e)) => {
 self.aborted = true;
 return Poll::Ready(Some(Err(e)));
 }
 Poll::Pending => {
-// If a partition returns Poll::Pending, to avoid 
continuously polling it
-// and potentially increasing upstream buffer sizes, 
we move it to the
-// back of the polling queue.
-self.uninitiated_partitions.rotate_left(1);
-
-// This function could remain in a pending state, so 
we manually wake it here.
-// However, this approach can be investigated further 
to find a more natural way
-// to avoid disrupting the runtime scheduler.
-cx.waker().wake_by_ref();
-return Poll::Pending;
+// The polled stream is pending which means we're 
already set up to
+// be woken when necessary
+// Try the next stream
+idx += 1;
 }
 _ => {
-// If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
-// we remove this partition from the queue so it is 
not polled again.
-self.uninitiated_partitions.pop_front();
+// The polled stream is ready

Review Comment:
   thank you for these comments πŸ‘ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-09 Thread via GitHub


alamb commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2955655026

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr  2 16:34:16 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing issue_16321 (c3d5ae92660ffce77f67f32fdf19efcad00efec4) to 
1daa5ed5cc51546904d45e23cc148601d973942a 
[diff](https://github.com/apache/datafusion/compare/1daa5ed5cc51546904d45e23cc148601d973942a..c3d5ae92660ffce77f67f32fdf19efcad00efec4)
   Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-09 Thread via GitHub


Dandandan commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2955401426

   I profiled some queries to verify it's no longer busy on the sorting thread:
   
   Main (TPC-H query 1):
   https://github.com/user-attachments/assets/a7236a88-c024-435b-8413-d98dcd8c8152";
 />
   
   
   PR (TPC-H query 1)
   https://github.com/user-attachments/assets/810749c1-fe25-462e-998f-714e48accb42";
 />
   
   The thread above is the thread executing SPM, you can see it does close to 
nothing after this PR (after reading metadata).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-09 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2135346103


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,49 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {
+// unwrap is safe since we just checked the index
+let &partition_idx = 
self.uninitiated_partitions.get(idx).unwrap();
 match self.maybe_poll_stream(cx, partition_idx) {
 Poll::Ready(Err(e)) => {
 self.aborted = true;
 return Poll::Ready(Some(Err(e)));
 }
 Poll::Pending => {
-// If a partition returns Poll::Pending, to avoid 
continuously polling it
-// and potentially increasing upstream buffer sizes, 
we move it to the
-// back of the polling queue.
-self.uninitiated_partitions.rotate_left(1);
-
-// This function could remain in a pending state, so 
we manually wake it here.
-// However, this approach can be investigated further 
to find a more natural way
-// to avoid disrupting the runtime scheduler.
-cx.waker().wake_by_ref();
-return Poll::Pending;
+// The polled stream is pending which means we're 
already set up to
+// be woken when necessary
+// Try the next stream
+idx += 1;
 }
 _ => {
-// If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
-// we remove this partition from the queue so it is 
not polled again.
-self.uninitiated_partitions.pop_front();
+// The polled stream is ready
+// Remove it from uninitiated_partitions
+// Don't bump idx here, since a new element will have 
taken its
+// place which we'll try in the next loop iteration
+self.uninitiated_partitions.remove(idx);

Review Comment:
   I'll rerun the failed job



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2134795452


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,49 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {
+// unwrap is safe since we just checked the index
+let &partition_idx = 
self.uninitiated_partitions.get(idx).unwrap();
 match self.maybe_poll_stream(cx, partition_idx) {
 Poll::Ready(Err(e)) => {
 self.aborted = true;
 return Poll::Ready(Some(Err(e)));
 }
 Poll::Pending => {
-// If a partition returns Poll::Pending, to avoid 
continuously polling it
-// and potentially increasing upstream buffer sizes, 
we move it to the
-// back of the polling queue.
-self.uninitiated_partitions.rotate_left(1);
-
-// This function could remain in a pending state, so 
we manually wake it here.
-// However, this approach can be investigated further 
to find a more natural way
-// to avoid disrupting the runtime scheduler.
-cx.waker().wake_by_ref();
-return Poll::Pending;
+// The polled stream is pending which means we're 
already set up to
+// be woken when necessary
+// Try the next stream
+idx += 1;
 }
 _ => {
-// If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
-// we remove this partition from the queue so it is 
not polled again.
-self.uninitiated_partitions.pop_front();
+// The polled stream is ready
+// Remove it from uninitiated_partitions
+// Don't bump idx here, since a new element will have 
taken its
+// place which we'll try in the next loop iteration
+self.uninitiated_partitions.remove(idx);

Review Comment:
   Tests passed. Examples failed due to crates.io having issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


Dandandan commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2954193366

   > @Dandandan project newbie question, my daily practice at work is to handle 
code review comments using amend/force-push. Did so out of habit before 
thinking to as ask. Is that ok in this project or does the community prefer 
fixup commits and squash before merging?
   
   IMO it doesn't matter too much (personally I like to create individual 
commits).
   
   We'll squash-merge the commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2954189088

   @Dandandan project newbie question, my daily practice at work is to handle 
code review comments using amend/force-push. Did so out of habit before 
thinking to as ask. Is that ok in this project or does the community prefer 
fixup commits and squash before merging?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2134778315


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,49 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {
+// unwrap is safe since we just checked the index
+let &partition_idx = 
self.uninitiated_partitions.get(idx).unwrap();
 match self.maybe_poll_stream(cx, partition_idx) {
 Poll::Ready(Err(e)) => {
 self.aborted = true;
 return Poll::Ready(Some(Err(e)));
 }
 Poll::Pending => {
-// If a partition returns Poll::Pending, to avoid 
continuously polling it
-// and potentially increasing upstream buffer sizes, 
we move it to the
-// back of the polling queue.
-self.uninitiated_partitions.rotate_left(1);
-
-// This function could remain in a pending state, so 
we manually wake it here.
-// However, this approach can be investigated further 
to find a more natural way
-// to avoid disrupting the runtime scheduler.
-cx.waker().wake_by_ref();
-return Poll::Pending;
+// The polled stream is pending which means we're 
already set up to
+// be woken when necessary
+// Try the next stream
+idx += 1;
 }
 _ => {
-// If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
-// we remove this partition from the queue so it is 
not polled again.
-self.uninitiated_partitions.pop_front();
+// The polled stream is ready
+// Remove it from uninitiated_partitions
+// Don't bump idx here, since a new element will have 
taken its
+// place which we'll try in the next loop iteration
+self.uninitiated_partitions.remove(idx);

Review Comment:
   Will do. I'll need to adapt `test_spm_congestion` as well then since it 
isn't really testing what it claims to be testing anymore then. I'll push 
swap_remove already to see if the other test breaks already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2134777847


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,49 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {
+// unwrap is safe since we just checked the index
+let &partition_idx = 
self.uninitiated_partitions.get(idx).unwrap();
 match self.maybe_poll_stream(cx, partition_idx) {
 Poll::Ready(Err(e)) => {
 self.aborted = true;
 return Poll::Ready(Some(Err(e)));
 }
 Poll::Pending => {
-// If a partition returns Poll::Pending, to avoid 
continuously polling it
-// and potentially increasing upstream buffer sizes, 
we move it to the
-// back of the polling queue.
-self.uninitiated_partitions.rotate_left(1);
-
-// This function could remain in a pending state, so 
we manually wake it here.
-// However, this approach can be investigated further 
to find a more natural way
-// to avoid disrupting the runtime scheduler.
-cx.waker().wake_by_ref();
-return Poll::Pending;
+// The polled stream is pending which means we're 
already set up to
+// be woken when necessary
+// Try the next stream
+idx += 1;
 }
 _ => {
-// If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
-// we remove this partition from the queue so it is 
not polled again.
-self.uninitiated_partitions.pop_front();
+// The polled stream is ready
+// Remove it from uninitiated_partitions
+// Don't bump idx here, since a new element will have 
taken its
+// place which we'll try in the next loop iteration
+self.uninitiated_partitions.remove(idx);

Review Comment:
   It seems to me it might be an issue in the test (so I would prefer to use 
`swap_remove`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2134773918


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,49 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {
+// unwrap is safe since we just checked the index
+let &partition_idx = 
self.uninitiated_partitions.get(idx).unwrap();

Review Comment:
   Rust newbie, sorry. I'll fix this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


pepijnve commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2134773638


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,49 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {
+// unwrap is safe since we just checked the index
+let &partition_idx = 
self.uninitiated_partitions.get(idx).unwrap();
 match self.maybe_poll_stream(cx, partition_idx) {
 Poll::Ready(Err(e)) => {
 self.aborted = true;
 return Poll::Ready(Some(Err(e)));
 }
 Poll::Pending => {
-// If a partition returns Poll::Pending, to avoid 
continuously polling it
-// and potentially increasing upstream buffer sizes, 
we move it to the
-// back of the polling queue.
-self.uninitiated_partitions.rotate_left(1);
-
-// This function could remain in a pending state, so 
we manually wake it here.
-// However, this approach can be investigated further 
to find a more natural way
-// to avoid disrupting the runtime scheduler.
-cx.waker().wake_by_ref();
-return Poll::Pending;
+// The polled stream is pending which means we're 
already set up to
+// be woken when necessary
+// Try the next stream
+idx += 1;
 }
 _ => {
-// If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
-// we remove this partition from the queue so it is 
not polled again.
-self.uninitiated_partitions.pop_front();
+// The polled stream is ready
+// Remove it from uninitiated_partitions
+// Don't bump idx here, since a new element will have 
taken its
+// place which we'll try in the next loop iteration
+self.uninitiated_partitions.remove(idx);

Review Comment:
   Funny you mention it. I had done that originally thinking the exact poll 
order here is irrelevant, but I got a flipping test failure in one of the union 
test cases due to a difference in row order. I switched to remove to be 
conservative.
   Funny enough, switching to remove uncovered the missing waker bits in 
CongestedStream; swap_remove was hiding that because it caused the third stream 
in the test to get polled before the first one.
   Shall I switch this back to swap_remove and continue investigating the 
failing test case if it resurfaces?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


Dandandan commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2954177103

   This seems really nice πŸš€ 
   On my machine I get roughly 10% improvement on queries with SPM - which I 
think makes sense on a 10 core machine (with less cores it might busy-poll).
   
   ```
   ┏━━┳┳━┳━━━┓
   ┃ Query┃   main ┃ issue_16321 ┃Change ┃
   ┑━━╇╇━╇━━━┩
   β”‚ QQuery 1 β”‚  809.46 ms β”‚   733.99 ms β”‚ +1.10x faster β”‚
   β”‚ QQuery 2 β”‚  165.80 ms β”‚   148.89 ms β”‚ +1.11x faster β”‚
   β”‚ QQuery 3 β”‚  432.56 ms β”‚   386.04 ms β”‚ +1.12x faster β”‚
   β”‚ QQuery 4 β”‚  459.88 ms β”‚   467.66 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚  671.13 ms β”‚   621.34 ms β”‚ +1.08x faster β”‚
   β”‚ QQuery 6 β”‚  181.36 ms β”‚   184.77 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚  959.96 ms β”‚   871.05 ms β”‚ +1.10x faster β”‚
   β”‚ QQuery 8 β”‚  672.39 ms β”‚   627.96 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 9 β”‚ 1101.98 ms β”‚  1023.84 ms β”‚ +1.08x faster β”‚
   β”‚ QQuery 10β”‚  638.41 ms β”‚   552.55 ms β”‚ +1.16x faster β”‚
   β”‚ QQuery 11β”‚  126.22 ms β”‚   123.91 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚  358.25 ms β”‚   329.10 ms β”‚ +1.09x faster β”‚
   β”‚ QQuery 13β”‚  720.99 ms β”‚   656.95 ms β”‚ +1.10x faster β”‚
   β”‚ QQuery 14β”‚  247.23 ms β”‚   251.52 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚  395.58 ms β”‚   369.79 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 16β”‚  110.37 ms β”‚   103.96 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 17β”‚ 1193.78 ms β”‚  1206.00 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚ 1846.58 ms β”‚  1668.23 ms β”‚ +1.11x faster β”‚
   β”‚ QQuery 19β”‚  412.76 ms β”‚   400.33 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  421.08 ms β”‚   392.58 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 21β”‚ 1363.39 ms β”‚  1275.33 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 22β”‚  149.67 ms β”‚   132.31 ms β”‚ +1.13x faster β”‚
   β””β”€β”€β”΄β”΄β”€β”΄β”€β”€β”€β”˜
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┑╇┩
   β”‚ Total Time (main)  β”‚ 13438.84ms β”‚
   β”‚ Total Time (issue_16321)   β”‚ 12528.08ms β”‚
   β”‚ Average Time (main)β”‚   610.86ms β”‚
   β”‚ Average Time (issue_16321) β”‚   569.46ms β”‚
   β”‚ Queries Faster β”‚ 16 β”‚
   β”‚ Queries Slower β”‚  0 β”‚
   β”‚ Queries with No Change β”‚  6 β”‚
   β”‚ Queries with Failure   β”‚  0 β”‚
   β””β”΄β”˜
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2134772014


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,49 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {
+// unwrap is safe since we just checked the index
+let &partition_idx = 
self.uninitiated_partitions.get(idx).unwrap();

Review Comment:
   ```suggestion
   let &partition_idx = self.uninitiated_partitions[idx];
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


Dandandan commented on code in PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#discussion_r2134771715


##
datafusion/physical-plan/src/sorts/merge.rs:
##
@@ -216,36 +212,49 @@ impl SortPreservingMergeStream {
 // Once all partitions have set their corresponding cursors for the 
loser tree,
 // we skip the following block. Until then, this function may be 
called multiple
 // times and can return Poll::Pending if any partition returns 
Poll::Pending.
+
 if self.loser_tree.is_empty() {
-while let Some(&partition_idx) = 
self.uninitiated_partitions.front() {
+// Manual indexing since we're iterating over the vector and 
shrinking it in the loop
+let mut idx = 0;
+while idx < self.uninitiated_partitions.len() {
+// unwrap is safe since we just checked the index
+let &partition_idx = 
self.uninitiated_partitions.get(idx).unwrap();
 match self.maybe_poll_stream(cx, partition_idx) {
 Poll::Ready(Err(e)) => {
 self.aborted = true;
 return Poll::Ready(Some(Err(e)));
 }
 Poll::Pending => {
-// If a partition returns Poll::Pending, to avoid 
continuously polling it
-// and potentially increasing upstream buffer sizes, 
we move it to the
-// back of the polling queue.
-self.uninitiated_partitions.rotate_left(1);
-
-// This function could remain in a pending state, so 
we manually wake it here.
-// However, this approach can be investigated further 
to find a more natural way
-// to avoid disrupting the runtime scheduler.
-cx.waker().wake_by_ref();
-return Poll::Pending;
+// The polled stream is pending which means we're 
already set up to
+// be woken when necessary
+// Try the next stream
+idx += 1;
 }
 _ => {
-// If the polling result is Poll::Ready(Some(batch)) 
or Poll::Ready(None),
-// we remove this partition from the queue so it is 
not polled again.
-self.uninitiated_partitions.pop_front();
+// The polled stream is ready
+// Remove it from uninitiated_partitions
+// Don't bump idx here, since a new element will have 
taken its
+// place which we'll try in the next loop iteration
+self.uninitiated_partitions.remove(idx);

Review Comment:
   Can we use `swap_remove` here? Probably effect of `O(n)` is not measurable, 
but still...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-08 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2953693505

   @berkaysynnada I hope it's ok that I ping you directly; reaching out because 
I believe you are the other of the test case in question. I believe this PR 
surfaced a mistake in the `CongestedStream` implementation of the test case. 
That `Stream` was returning `Pending` without ensuring the waker was set up to 
be notified when the congestion condition cleared.  I'm fairly confident that's 
not correct since the [trait 
contract](https://docs.rs/futures/latest/futures/prelude/trait.Stream.html#tymethod.poll_next)
 explicitly states
   > `Poll::Pending` means that this stream’s next value is not ready yet. 
Implementations will ensure that the current task will be notified when the 
next value may be ready.
   
   I've fixed the test implementation which makes the test pass again, but 
would like to get your opinion on this. Maybe I'm missing something.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] bug: remove busy-wait while sort is ongoing [datafusion]

2025-06-07 Thread via GitHub


pepijnve commented on PR #16322:
URL: https://github.com/apache/datafusion/pull/16322#issuecomment-2953125280

   A sort preserving merge specific test case started failing. I’ll dig deeper 
to better understand what’s going on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]