Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2981051106

   > > I agree the existing benchmarks are not perfect
   > 
   > Not trying to criticize the benchmarks, just saying I'm having trouble 
using it as a guide. @alamb Note that by default the current branch is using 
the local counter method @zhuqi-lucas implemented for `YieldStream`. 
Performance should be identical to `main` in other words.
   
   I totally get it -- we'll keep improving the benchmarks / hopefully making 
them easier to use / more stable over time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2981029248

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and task_budget
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━┳━━┓
   ┃ Query┃HEAD ┃ task_budget ┃   Change ┃
   ┑━━╇━╇━╇━━┩
   β”‚ QQuery 0 β”‚  1929.02 ms β”‚  1971.93 ms β”‚no change β”‚
   β”‚ QQuery 1 β”‚   704.63 ms β”‚   718.11 ms β”‚no change β”‚
   β”‚ QQuery 2 β”‚  1412.62 ms β”‚  1368.78 ms β”‚no change β”‚
   β”‚ QQuery 3 β”‚   647.44 ms β”‚   680.23 ms β”‚ 1.05x slower β”‚
   β”‚ QQuery 4 β”‚  1400.23 ms β”‚  1358.38 ms β”‚no change β”‚
   β”‚ QQuery 5 β”‚ 14995.43 ms β”‚ 15226.83 ms β”‚no change β”‚
   β”‚ QQuery 6 β”‚  1985.14 ms β”‚  2087.35 ms β”‚ 1.05x slower β”‚
   β”‚ QQuery 7 β”‚  1989.35 ms β”‚  1969.02 ms β”‚no change β”‚
   β”‚ QQuery 8 β”‚   785.58 ms β”‚   786.62 ms β”‚no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”΄β”€β”€β”˜
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┑╇┩
   β”‚ Total Time (HEAD)  β”‚ 25849.44ms β”‚
   β”‚ Total Time (task_budget)   β”‚ 26167.24ms β”‚
   β”‚ Average Time (HEAD)β”‚  2872.16ms β”‚
   β”‚ Average Time (task_budget) β”‚  2907.47ms β”‚
   β”‚ Queries Faster β”‚  0 β”‚
   β”‚ Queries Slower β”‚  2 β”‚
   β”‚ Queries with No Change β”‚  7 β”‚
   β”‚ Queries with Failure   β”‚  0 β”‚
   β””β”΄β”˜
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃HEAD ┃ task_budget ┃Change ┃
   ┑━━╇━╇━╇━━━┩
   β”‚ QQuery 0 β”‚15.70 ms β”‚15.32 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚33.40 ms β”‚32.29 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚78.71 ms β”‚81.60 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚94.83 ms β”‚96.30 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚   590.16 ms β”‚   586.19 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚   839.78 ms β”‚   884.55 ms β”‚  1.05x slower β”‚
   β”‚ QQuery 6 β”‚23.62 ms β”‚24.22 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚35.97 ms β”‚35.71 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚   845.86 ms β”‚   836.34 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  1155.51 ms β”‚  1124.37 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚   255.94 ms β”‚   258.01 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚   283.03 ms β”‚   285.17 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚   861.95 ms β”‚   884.44 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  1256.74 ms β”‚  1274.54 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚   801.84 ms β”‚   819.99 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚   786.28 ms β”‚   775.94 ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚  1616.39 ms β”‚  1622.78 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚  1591.05 ms β”‚  1593.83 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚  2857.24 ms β”‚  2902.54 ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚89.43 ms β”‚84.53 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 20β”‚  1125.85 ms β”‚  1171.56 ms β”‚ no change β”‚
   β”‚ QQuery 21β”‚  1266.95 ms β”‚  1307.77 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  2089.10 ms β”‚  2189.53 ms β”‚ no change β”‚
   β”‚ QQuery 23β”‚  7574.01 ms β”‚  7914.18 ms β”‚ no change β”‚
   β”‚ QQuery 24β”‚   444.06 ms β”‚   466.35 ms β”‚  1.05x slower β”‚
   β”‚ QQuery 25β”‚   378.44 ms β”‚   397.31 ms β”‚ no change β”‚
   β”‚ QQuery 26β”‚   521.01 ms β”‚   525.65 ms β”‚ no change β”‚
   β”‚ QQuery 27β”‚  1503.49 ms β”‚  1548.92 ms β”‚ no change β”‚
   β”‚ QQuery 28β”‚ 11900.61 ms β”‚ 11913.35 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚   534.80 ms β”‚   540.04 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚   764.78 ms β”‚   790.34 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚   812.47 ms β”‚   825.30 ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚  2541.75 ms β”‚  2545.00 ms β”‚ no change β”‚
   β”‚ QQuery 33β”‚  3179.10 ms β”‚  3206.02 ms β”‚ no change β”‚
   β”‚ QQuery 34β”‚  3202.13 ms β”‚  3242.56 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚  1219.46 ms β”‚  1221.75 ms β”‚ no change β”‚
   β”‚ QQuery 36β”‚   121.25 ms β”‚   126.01 ms β”‚ no change β”‚
   β”‚ QQuery 37β”‚54.86 ms β”‚56.17 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   119.17 ms β”‚   125.52 ms β”‚  1.05x slower β”‚
   β”‚ QQuery 39β”‚   192.11 ms β”‚   198.37 ms β”‚ no change β”‚
   β”‚ QQuery 40β”‚47.73 ms β”‚50.41 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 41β”‚43.85 ms β”‚45.09 ms β”‚ no change β”‚
   β”‚ QQuery 42β”‚37.48 ms β”‚38.16 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”΄β”€β”€β”€β”˜
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┑╇┩
   β”‚ Total Time (HEAD)  β”‚ 53787.93ms β”‚
   β”‚ Total Time (task_budget)   β”‚ 54664.0

Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980961262

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing task_budget (a749e8f60cbcdf5542b4b69a0a2f94c7790fab45) to 
06631c25316ed2fc20ab8114d0dcc801f353fbad 
[diff](https://github.com/apache/datafusion/compare/06631c25316ed2fc20ab8114d0dcc801f353fbad..a749e8f60cbcdf5542b4b69a0a2f94c7790fab45)
   Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980961066

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and task_budget
   
   Benchmark clickbench_1.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃HEAD ┃ task_budget ┃Change ┃
   ┑━━╇━╇━╇━━━┩
   β”‚ QQuery 0 β”‚51.47 ms β”‚46.15 ms β”‚ +1.12x faster β”‚
   β”‚ QQuery 1 β”‚75.49 ms β”‚72.84 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚   109.24 ms β”‚   108.00 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚   127.42 ms β”‚   119.32 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 4 β”‚   633.00 ms β”‚   685.11 ms β”‚  1.08x slower β”‚
   β”‚ QQuery 5 β”‚   853.72 ms β”‚   863.05 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚56.74 ms β”‚54.54 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚80.42 ms β”‚82.07 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚   881.04 ms β”‚   884.51 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  1194.27 ms β”‚  1202.55 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚   300.62 ms β”‚   297.89 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚   318.31 ms β”‚   323.44 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚   879.94 ms β”‚   888.82 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  1245.61 ms β”‚  1252.15 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚   803.27 ms β”‚   814.41 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚   803.96 ms β”‚   822.93 ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚  1630.13 ms β”‚  1686.71 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚  1584.79 ms β”‚  1608.29 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚  2894.48 ms β”‚  2899.79 ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚   125.96 ms β”‚   119.63 ms β”‚ +1.05x faster β”‚
   β”‚ QQuery 20β”‚  1140.75 ms β”‚  1206.46 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 21β”‚  1337.94 ms β”‚  1390.68 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  2323.74 ms β”‚  2408.18 ms β”‚ no change β”‚
   β”‚ QQuery 23β”‚  7822.32 ms β”‚  8103.67 ms β”‚ no change β”‚
   β”‚ QQuery 24β”‚   489.54 ms β”‚   507.76 ms β”‚ no change β”‚
   β”‚ QQuery 25β”‚   406.25 ms β”‚   432.43 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 26β”‚   545.86 ms β”‚   568.63 ms β”‚ no change β”‚
   β”‚ QQuery 27β”‚  1638.18 ms β”‚  1683.81 ms β”‚ no change β”‚
   β”‚ QQuery 28β”‚ 12379.17 ms β”‚ 12511.87 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚   565.53 ms β”‚   581.52 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚   803.29 ms β”‚   816.97 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚   843.85 ms β”‚   872.76 ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚  2547.32 ms β”‚  2563.59 ms β”‚ no change β”‚
   β”‚ QQuery 33β”‚  3256.21 ms β”‚  3307.21 ms β”‚ no change β”‚
   β”‚ QQuery 34β”‚  3266.60 ms β”‚  3323.18 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚  1258.28 ms β”‚  1288.71 ms β”‚ no change β”‚
   β”‚ QQuery 36β”‚   173.64 ms β”‚   181.16 ms β”‚ no change β”‚
   β”‚ QQuery 37β”‚   101.14 ms β”‚   101.04 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   171.00 ms β”‚   179.89 ms β”‚  1.05x slower β”‚
   β”‚ QQuery 39β”‚   252.45 ms β”‚   266.95 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 40β”‚70.71 ms β”‚88.09 ms β”‚  1.25x slower β”‚
   β”‚ QQuery 41β”‚87.06 ms β”‚86.11 ms β”‚ no change β”‚
   β”‚ QQuery 42β”‚78.55 ms β”‚78.27 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”΄β”€β”€β”€β”˜
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┑╇┩
   β”‚ Total Time (HEAD)  β”‚ 56209.26ms β”‚
   β”‚ Total Time (task_budget)   β”‚ 57381.15ms β”‚
   β”‚ Average Time (HEAD)β”‚  1307.19ms β”‚
   β”‚ Average Time (task_budget) β”‚  1334.45ms β”‚
   β”‚ Queries Faster β”‚  3 β”‚
   β”‚ Queries Slower β”‚  6 β”‚
   β”‚ Queries with No Change β”‚ 34 β”‚
   β”‚ Queries with Failure   β”‚  0 β”‚
   β””β”΄β”˜
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152534664


##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -912,25 +948,7 @@ impl PlanProperties {
 /// 2. CoalescePartitionsExec for collapsing all of the partitions into 
one without ordering guarantee
 /// 3. SortPreservingMergeExec for collapsing all of the sorted partitions 
into one with ordering guarantee
 pub fn need_data_exchange(plan: Arc) -> bool {
-if let Some(repartition) = plan.as_any().downcast_ref::() 
{
-!matches!(
-repartition.properties().output_partitioning(),
-Partitioning::RoundRobinBatch(_)
-)
-} else if let Some(coalesce) = 
plan.as_any().downcast_ref::()
-{
-coalesce.input().output_partitioning().partition_count() > 1
-} else if let Some(sort_preserving_merge) =
-plan.as_any().downcast_ref::()
-{
-sort_preserving_merge
-.input()
-.output_partitioning()
-.partition_count()
-> 1
-} else {
-false
-}
+plan.properties().evaluation_type == EvaluationType::Lazy

Review Comment:
   that is certainly a lot nicer and easier to understand



##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -743,6 +733,38 @@ pub enum EmissionType {
 Both,
 }
 
+/// Represents whether an operator's `Stream` has been implemented to actively 
cooperate with the
+/// Tokio scheduler or not.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SchedulingType {
+/// The stream generated by [`execute`](ExecutionPlan::execute) does not 
actively participate in
+/// cooperative scheduling. This means the implementation of the `Stream` 
returned by
+/// [`ExecutionPlan::execute`] does not contain explicit cooperative yield 
points.
+Blocking,
+/// The stream generated by [`execute`](ExecutionPlan::execute) actively 
participates in
+/// cooperative scheduling by consuming task budget when it was able to 
produce a
+/// [`RecordBatch`]. Please refer to the [`coop`](crate::coop) module for 
more details.
+Cooperative,
+}
+
+/// Represents how an operator's `Stream` implementation generates 
`RecordBatch`es.
+///
+/// Most operators in DataFusion generate `RecordBatch`es when asked to do so 
by a call to
+/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
+///
+/// Some operators like `Repartition` need to drive `RecordBatch` generation 
themselves though. This
+/// is known as data-driven or eager evaluation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum EvaluationType {
+/// The stream generated by [`execute`](ExecutionPlan::execute) only 
generates `RecordBatch`
+/// instances when it is demanded by invoking `Stream::poll_next`.
+Lazy,
+/// The stream generated by [`execute`](ExecutionPlan::execute) eagerly 
generates `RecordBatch`
+/// in one or more spawned Tokio tasks. Eager evaluation is only started 
the first time
+/// `Stream::poll_next` is called.

Review Comment:
   ```suggestion
   /// `Stream::poll_next` is called. Hash aggregation and HashJoin are 
examples of such operators
   ```



##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -743,6 +733,38 @@ pub enum EmissionType {
 Both,
 }
 
+/// Represents whether an operator's `Stream` has been implemented to actively 
cooperate with the
+/// Tokio scheduler or not.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SchedulingType {
+/// The stream generated by [`execute`](ExecutionPlan::execute) does not 
actively participate in
+/// cooperative scheduling. This means the implementation of the `Stream` 
returned by
+/// [`ExecutionPlan::execute`] does not contain explicit cooperative yield 
points.

Review Comment:
   I think it would also be helpful here to give an explicitl example of what a 
cooperative yield point is. For example
   ```suggestion
   /// [`ExecutionPlan::execute`] does not contain explicit cooperative 
yield points such as 
   /// `await` or [`tokio::task::yield_now`].
   ```



##
datafusion/core/tests/execution/coop.rs:
##
@@ -0,0 +1,747 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+

Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980925371

   > πŸ€–: Benchmark completed
   
   
   Will see if it is reproducable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980898342

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing task_budget (a749e8f60cbcdf5542b4b69a0a2f94c7790fab45) to 
06631c25316ed2fc20ab8114d0dcc801f353fbad 
[diff](https://github.com/apache/datafusion/compare/06631c25316ed2fc20ab8114d0dcc801f353fbad..a749e8f60cbcdf5542b4b69a0a2f94c7790fab45)
   Benchmarks: clickbench_1
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980898244

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and task_budget
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃HEAD ┃ task_budget ┃Change ┃
   ┑━━╇━╇━╇━━━┩
   β”‚ QQuery 0 β”‚  1938.69 ms β”‚  1955.98 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚   728.67 ms β”‚   732.71 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚  1393.40 ms β”‚  1380.16 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚   654.25 ms β”‚   667.02 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚  1374.93 ms β”‚  1368.90 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚ 14843.62 ms β”‚ 14967.16 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚  2001.52 ms β”‚  2082.06 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚  1889.83 ms β”‚  1950.81 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚   805.12 ms β”‚   809.25 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”΄β”€β”€β”€β”˜
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┑╇┩
   β”‚ Total Time (HEAD)  β”‚ 25630.03ms β”‚
   β”‚ Total Time (task_budget)   β”‚ 25914.06ms β”‚
   β”‚ Average Time (HEAD)β”‚  2847.78ms β”‚
   β”‚ Average Time (task_budget) β”‚  2879.34ms β”‚
   β”‚ Queries Faster β”‚  0 β”‚
   β”‚ Queries Slower β”‚  0 β”‚
   β”‚ Queries with No Change β”‚  9 β”‚
   β”‚ Queries with Failure   β”‚  0 β”‚
   β””β”΄β”˜
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃HEAD ┃ task_budget ┃Change ┃
   ┑━━╇━╇━╇━━━┩
   β”‚ QQuery 0 β”‚15.26 ms β”‚15.20 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚33.51 ms β”‚32.70 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚81.78 ms β”‚80.44 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚   100.21 ms β”‚96.71 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚   578.10 ms β”‚   585.73 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚   838.83 ms β”‚   887.89 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 6 β”‚24.33 ms β”‚22.32 ms β”‚ +1.09x faster β”‚
   β”‚ QQuery 7 β”‚38.56 ms β”‚36.90 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚   850.10 ms β”‚   856.20 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  1151.01 ms β”‚  1146.42 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚   252.88 ms β”‚   263.74 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚   283.58 ms β”‚   290.15 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚   869.70 ms β”‚   866.73 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  1255.69 ms β”‚  1286.10 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚   809.83 ms β”‚   812.02 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚   777.07 ms β”‚   786.77 ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚  1613.49 ms β”‚  1629.44 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚  1567.97 ms β”‚  1580.52 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚  2886.23 ms β”‚  2891.01 ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚87.29 ms β”‚84.38 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  1113.26 ms β”‚  1152.62 ms β”‚ no change β”‚
   β”‚ QQuery 21β”‚  1252.92 ms β”‚  1328.12 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 22β”‚  2069.04 ms β”‚  2168.76 ms β”‚ no change β”‚
   β”‚ QQuery 23β”‚  7539.31 ms β”‚  7957.48 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 24β”‚   443.12 ms β”‚   464.35 ms β”‚ no change β”‚
   β”‚ QQuery 25β”‚   379.12 ms β”‚   404.81 ms β”‚  1.07x slower β”‚
   β”‚ QQuery 26β”‚   503.89 ms β”‚   530.15 ms β”‚  1.05x slower β”‚
   β”‚ QQuery 27β”‚  1500.52 ms β”‚  1566.86 ms β”‚ no change β”‚
   β”‚ QQuery 28β”‚ 12013.51 ms β”‚ 11817.68 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚   527.40 ms β”‚   525.63 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚   758.72 ms β”‚   791.22 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚   811.10 ms β”‚   829.72 ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚  2485.30 ms β”‚  2553.71 ms β”‚ no change β”‚
   β”‚ QQuery 33β”‚  3181.91 ms β”‚  3185.99 ms β”‚ no change β”‚
   β”‚ QQuery 34β”‚  3176.40 ms β”‚  3209.67 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚  1228.64 ms β”‚  1223.48 ms β”‚ no change β”‚
   β”‚ QQuery 36β”‚   120.17 ms β”‚   127.76 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 37β”‚57.02 ms β”‚56.75 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   120.93 ms β”‚   127.28 ms β”‚  1.05x slower β”‚
   β”‚ QQuery 39β”‚   191.04 ms β”‚   202.25 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 40β”‚47.95 ms β”‚47.32 ms β”‚ no change β”‚
   β”‚ QQuery 41β”‚45.18 ms β”‚45.53 ms β”‚ no change β”‚
   β”‚ QQuery 42β”‚36.63 ms β”‚39.44 ms β”‚  1.08x slower β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”΄β”€β”€β”€β”˜
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┑╇┩
   β”‚ Total Time (HEAD)  β”‚ 53718.51ms β”‚
   β”‚ Total Time (task_budget)   β”‚ 54607.96ms β”‚
   β”‚ Average Time (HEAD)β”‚

Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


pepijnve commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980783229

   > I agree the existing benchmarks are not perfect
   
   Not trying to criticize the benchmarks, just saying I'm having trouble using 
it as a guide.
   @alamb Note that by default the current branch is using the local counter 
method @zhuqi-lucas implemented for `YieldStream`. Performance should be 
identical to `main` in other words.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980750548

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing task_budget (5bfff9798b2815377ef8d40dfaffd26947c51385) to 
06631c25316ed2fc20ab8114d0dcc801f353fbad 
[diff](https://github.com/apache/datafusion/compare/06631c25316ed2fc20ab8114d0dcc801f353fbad..5bfff9798b2815377ef8d40dfaffd26947c51385)
   Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


alamb commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980748403

   > I've been trying to measure the impact of these changes, but for the life 
of me I can't get useful results out of the benchmark comparison script. The 
sample below is actually comparing the exact same commit on a mac mini M4 and 
yet I still see large fluctuations. Not sure what else I can do πŸ€·β€β™‚οΈ.
   
   I agree the existing benchmarks are not perfect -- figuring out how to make 
them better would be good, though as you probably know it is a challenge of 
benchmarking in general. I'll kick off some runs against this PR with my 
scripts to see what it shows.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


pepijnve commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152509411


##
datafusion/datasource/src/source.rs:
##
@@ -261,11 +261,7 @@ impl ExecutionPlan for DataSourceExec {
 ) -> Result {
 self.data_source
 .open(partition, Arc::clone(&context))
-.map(|stream| wrap_yield_stream(stream, &context, 
self.cooperative))
-}
-
-fn with_cooperative_yields(self: Arc) -> Option> {
-self.cooperative.then_some(self)
+.map(|stream| make_cooperative(stream))

Review Comment:
   I've gone ahead and made that change and removed the dyn wrapper while I was 
at it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


pepijnve commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152483273


##
datafusion/datasource/src/source.rs:
##
@@ -261,11 +261,7 @@ impl ExecutionPlan for DataSourceExec {
 ) -> Result {
 self.data_source
 .open(partition, Arc::clone(&context))
-.map(|stream| wrap_yield_stream(stream, &context, 
self.cooperative))
-}
-
-fn with_cooperative_yields(self: Arc) -> Option> {
-self.cooperative.then_some(self)
+.map(|stream| make_cooperative(stream))

Review Comment:
   That would make sense indeed. ExecutionPlan fulfills the role of a factory 
of Streams. DataSourceExec delegates that to the DataSource itself. So 
Datasource is actually the factory. It would make sense to delegate the 'will 
you create something cooperative?' query as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


ozankabak commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152472886


##
datafusion/datasource/src/source.rs:
##
@@ -261,11 +261,7 @@ impl ExecutionPlan for DataSourceExec {
 ) -> Result {
 self.data_source
 .open(partition, Arc::clone(&context))
-.map(|stream| wrap_yield_stream(stream, &context, 
self.cooperative))
-}
-
-fn with_cooperative_yields(self: Arc) -> Option> {
-self.cooperative.then_some(self)
+.map(|stream| make_cooperative(stream))

Review Comment:
   I don't think it is critical performance-wise, but now I wonder if we should 
make any additions to the `DataSource` trait to signal built-in cooperative 
behavior to `DataSourceExec`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


pepijnve commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152379108


##
datafusion/datasource/src/source.rs:
##
@@ -261,11 +261,7 @@ impl ExecutionPlan for DataSourceExec {
 ) -> Result {
 self.data_source
 .open(partition, Arc::clone(&context))
-.map(|stream| wrap_yield_stream(stream, &context, 
self.cooperative))
-}
-
-fn with_cooperative_yields(self: Arc) -> Option> {
-self.cooperative.then_some(self)
+.map(|stream| make_cooperative(stream))

Review Comment:
   We could avoid a virtual call here by pushing the cooperative wrapper down 
to `FileScanConfig::open`. That does lose the benefit of `DataSourceExec` being 
cooperative for any `DataSource`. Is it worth shaving off one virtual call per 
record batch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


pepijnve commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980397594

   I've been trying to measure the impact of these changes, but for the life of 
me I can't get useful results out of the benchmark comparison script. The 
sample below is actually comparing the exact same commit on a mac mini M4 and 
yet I still see large fluctuations. Not sure what else I can do πŸ€·β€β™‚οΈ.
   
   
   
   ```
   ┏━━┳┳┳━━━┓
   ┃ Query┃   baseline ┃ branch ┃Change ┃
   ┑━━╇╇╇━━━┩
   β”‚ QQuery 0 β”‚9.71 ms β”‚9.87 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚   15.07 ms β”‚   15.36 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚   47.76 ms β”‚   47.81 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚   46.78 ms β”‚   47.13 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚  426.60 ms β”‚  426.87 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚  557.13 ms β”‚  559.79 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚   11.37 ms β”‚   11.30 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚   16.17 ms β”‚   15.99 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚  518.13 ms β”‚  516.98 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  671.89 ms β”‚  669.00 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚  146.75 ms β”‚  149.95 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚  162.79 ms β”‚  164.02 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚  567.28 ms β”‚  569.63 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  799.84 ms β”‚  796.18 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚  542.10 ms β”‚  535.99 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚  502.33 ms β”‚  500.89 ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚ 1151.00 ms β”‚ 1150.96 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚ 1127.41 ms β”‚ 1116.08 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚ 2195.19 ms β”‚ 2182.97 ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚   39.96 ms β”‚   40.64 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  858.48 ms β”‚  859.71 ms β”‚ no change β”‚
   β”‚ QQuery 21β”‚  988.68 ms β”‚ 1048.57 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 22β”‚ 1594.48 ms β”‚ 2153.36 ms β”‚  1.35x slower β”‚
   β”‚ QQuery 23β”‚ 6399.31 ms β”‚ 6905.44 ms β”‚  1.08x slower β”‚
   β”‚ QQuery 24β”‚  305.36 ms β”‚  299.88 ms β”‚ no change β”‚
   β”‚ QQuery 25β”‚  274.50 ms β”‚  276.00 ms β”‚ no change β”‚
   β”‚ QQuery 26β”‚  338.09 ms β”‚  346.69 ms β”‚ no change β”‚
   β”‚ QQuery 27β”‚ 1140.21 ms β”‚ 1213.95 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 28β”‚ 8881.31 ms β”‚ 9061.62 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚  417.02 ms β”‚  414.85 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚  487.07 ms β”‚  486.73 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚  507.31 ms β”‚  508.16 ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚ 2154.40 ms β”‚ 1917.22 ms β”‚ +1.12x faster β”‚
   β”‚ QQuery 33β”‚ 2498.71 ms β”‚ 2467.38 ms β”‚ no change β”‚
   β”‚ QQuery 34β”‚ 2678.02 ms β”‚ 2802.00 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚  751.17 ms β”‚  772.78 ms β”‚ no change β”‚
   β”‚ QQuery 36β”‚   61.54 ms β”‚   62.50 ms β”‚ no change β”‚
   β”‚ QQuery 37β”‚   25.67 ms β”‚   25.56 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   61.80 ms β”‚   62.39 ms β”‚ no change β”‚
   β”‚ QQuery 39β”‚  101.51 ms β”‚  102.13 ms β”‚ no change β”‚
   β”‚ QQuery 40β”‚   18.26 ms β”‚   19.09 ms β”‚ no change β”‚
   β”‚ QQuery 41β”‚   17.70 ms β”‚   17.77 ms β”‚ no change β”‚
   β”‚ QQuery 42β”‚   14.84 ms β”‚   15.32 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”΄β”΄β”€β”€β”€β”˜
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


ozankabak commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2979894941

   I took a quick look at this yesterday and it was looking good -- I will 
review in more detail in a day or two. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


pepijnve commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2979874048

   The current Tokio code is only an approximation of the final code. For the 
final version we're dependent on a Tokio PR getting merged and released.
   In order to unblock this PR and allow review to proceed, I've added a couple 
of feature flags that make the Tokio based code disabled by default.
   The intention is to allow comparing the two variants from a performance 
point-of-view and consider the revised cooperative API for merging already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-17 Thread via GitHub


pepijnve commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2151897561


##
datafusion/core/tests/execution/coop.rs:
##
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_plan;
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::Boundedness;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_execution::TaskContext;
+use datafusion_expr_common::operator::Operator;
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_functions_aggregate::min_max;
+use datafusion_physical_expr::expressions::{
+binary, col, lit, BinaryExpr, Column, Literal,
+};
+use datafusion_physical_expr::Partitioning;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
+use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, 
SortMergeJoinExec};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+use futures::StreamExt;
+use parking_lot::RwLock;
+use rstest::rstest;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::ops::Range;
+use std::sync::Arc;
+use std::task::Poll;
+use tokio::runtime::{Handle, Runtime};
+use tokio::select;
+
+#[derive(Debug)]
+struct RangeBatchGenerator {
+schema: SchemaRef,
+value_range: Range,
+boundedness: Boundedness,
+batch_size: usize,
+poll_count: usize,
+}
+
+impl std::fmt::Display for RangeBatchGenerator {
+fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+// Display current counter
+write!(f, "InfiniteGenerator(counter={})", self.poll_count)
+}
+}
+
+impl LazyBatchGenerator for RangeBatchGenerator {
+fn boundedness(&self) -> Boundedness {
+self.boundedness
+}
+
+/// Generate the next RecordBatch.
+fn generate_next_batch(&mut self) -> 
datafusion_common::Result> {
+self.poll_count += 1;
+
+let mut builder = Int64Array::builder(self.batch_size);
+for _ in 0..self.batch_size {
+match self.value_range.next() {
+None => break,
+Some(v) => builder.append_value(v),
+}
+}
+let array = builder.finish();
+
+if array.is_empty() {
+return Ok(None);
+}
+
+let batch =
+RecordBatch::try_new(Arc::clone(&self.schema), 
vec![Arc::new(array)])?;
+Ok(Some(batch))
+}
+}
+
+fn make_lazy_exec(column_name: &str, pretend_infinite: bool) -> LazyMemoryExec 
{
+make_lazy_exec_with_range(column_name, i64::MIN..i64::MAX, 
pretend_infinite)
+}
+
+fn make_lazy_exec_with_range(
+column_name: &str,
+range: Range,
+pretend_infinite: bool,
+) -> LazyMemoryExec {
+let schema = Arc::new(Schema::new(vec![Field::new(
+column_name,
+DataType::Int64,
+false,
+)]));
+
+let boundedness = if pretend_infinite {
+Boundedness::Unbounded {
+requires_infinite_memory: false,
+}
+} else {
+Boundedness::Bounded
+};
+
+// Instantiate the generator with the batch and limit
+let gen = RangeBatchGenerator {
+schema: Arc::clone(&schema),
+boundedness,
+value_range: range,
+batch_

Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-16 Thread via GitHub


zhuqi-lucas commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2151253373


##
datafusion/core/tests/execution/coop.rs:
##
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_plan;
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::Boundedness;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_execution::TaskContext;
+use datafusion_expr_common::operator::Operator;
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_functions_aggregate::min_max;
+use datafusion_physical_expr::expressions::{
+binary, col, lit, BinaryExpr, Column, Literal,
+};
+use datafusion_physical_expr::Partitioning;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
+use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, 
SortMergeJoinExec};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+use futures::StreamExt;
+use parking_lot::RwLock;
+use rstest::rstest;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::ops::Range;
+use std::sync::Arc;
+use std::task::Poll;
+use tokio::runtime::{Handle, Runtime};
+use tokio::select;
+
+#[derive(Debug)]
+struct RangeBatchGenerator {
+schema: SchemaRef,
+value_range: Range,
+boundedness: Boundedness,
+batch_size: usize,
+poll_count: usize,
+}
+
+impl std::fmt::Display for RangeBatchGenerator {
+fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+// Display current counter
+write!(f, "InfiniteGenerator(counter={})", self.poll_count)
+}
+}
+
+impl LazyBatchGenerator for RangeBatchGenerator {
+fn boundedness(&self) -> Boundedness {
+self.boundedness
+}
+
+/// Generate the next RecordBatch.
+fn generate_next_batch(&mut self) -> 
datafusion_common::Result> {
+self.poll_count += 1;
+
+let mut builder = Int64Array::builder(self.batch_size);
+for _ in 0..self.batch_size {
+match self.value_range.next() {
+None => break,
+Some(v) => builder.append_value(v),
+}
+}
+let array = builder.finish();
+
+if array.is_empty() {
+return Ok(None);
+}
+
+let batch =
+RecordBatch::try_new(Arc::clone(&self.schema), 
vec![Arc::new(array)])?;
+Ok(Some(batch))
+}
+}
+
+fn make_lazy_exec(column_name: &str, pretend_infinite: bool) -> LazyMemoryExec 
{
+make_lazy_exec_with_range(column_name, i64::MIN..i64::MAX, 
pretend_infinite)
+}
+
+fn make_lazy_exec_with_range(
+column_name: &str,
+range: Range,
+pretend_infinite: bool,
+) -> LazyMemoryExec {
+let schema = Arc::new(Schema::new(vec![Field::new(
+column_name,
+DataType::Int64,
+false,
+)]));
+
+let boundedness = if pretend_infinite {
+Boundedness::Unbounded {
+requires_infinite_memory: false,
+}
+} else {
+Boundedness::Bounded
+};
+
+// Instantiate the generator with the batch and limit
+let gen = RangeBatchGenerator {
+schema: Arc::clone(&schema),
+boundedness,
+value_range: range,
+bat

Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-16 Thread via GitHub


zhuqi-lucas commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2151253373


##
datafusion/core/tests/execution/coop.rs:
##
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_plan;
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::Boundedness;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_execution::TaskContext;
+use datafusion_expr_common::operator::Operator;
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_functions_aggregate::min_max;
+use datafusion_physical_expr::expressions::{
+binary, col, lit, BinaryExpr, Column, Literal,
+};
+use datafusion_physical_expr::Partitioning;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
+use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, 
SortMergeJoinExec};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+use futures::StreamExt;
+use parking_lot::RwLock;
+use rstest::rstest;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::ops::Range;
+use std::sync::Arc;
+use std::task::Poll;
+use tokio::runtime::{Handle, Runtime};
+use tokio::select;
+
+#[derive(Debug)]
+struct RangeBatchGenerator {
+schema: SchemaRef,
+value_range: Range,
+boundedness: Boundedness,
+batch_size: usize,
+poll_count: usize,
+}
+
+impl std::fmt::Display for RangeBatchGenerator {
+fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+// Display current counter
+write!(f, "InfiniteGenerator(counter={})", self.poll_count)
+}
+}
+
+impl LazyBatchGenerator for RangeBatchGenerator {
+fn boundedness(&self) -> Boundedness {
+self.boundedness
+}
+
+/// Generate the next RecordBatch.
+fn generate_next_batch(&mut self) -> 
datafusion_common::Result> {
+self.poll_count += 1;
+
+let mut builder = Int64Array::builder(self.batch_size);
+for _ in 0..self.batch_size {
+match self.value_range.next() {
+None => break,
+Some(v) => builder.append_value(v),
+}
+}
+let array = builder.finish();
+
+if array.is_empty() {
+return Ok(None);
+}
+
+let batch =
+RecordBatch::try_new(Arc::clone(&self.schema), 
vec![Arc::new(array)])?;
+Ok(Some(batch))
+}
+}
+
+fn make_lazy_exec(column_name: &str, pretend_infinite: bool) -> LazyMemoryExec 
{
+make_lazy_exec_with_range(column_name, i64::MIN..i64::MAX, 
pretend_infinite)
+}
+
+fn make_lazy_exec_with_range(
+column_name: &str,
+range: Range,
+pretend_infinite: bool,
+) -> LazyMemoryExec {
+let schema = Arc::new(Schema::new(vec![Field::new(
+column_name,
+DataType::Int64,
+false,
+)]));
+
+let boundedness = if pretend_infinite {
+Boundedness::Unbounded {
+requires_infinite_memory: false,
+}
+} else {
+Boundedness::Bounded
+};
+
+// Instantiate the generator with the batch and limit
+let gen = RangeBatchGenerator {
+schema: Arc::clone(&schema),
+boundedness,
+value_range: range,
+bat

Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-15 Thread via GitHub


pepijnve commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2973548782

   I've added a commit to this PR that:
   - Makes the tests more robust. Rather than hanging when there's an issue 
they will fail.
   - Removes duplication from the tests
   - Removes repartitions and coalesces in most tests to ensure we're actually 
testing the right thing. I'm to get the tests to set up the challenging 
situations rather than massaging things so that they work.
   - Merges in some of the extra test cases I still had lying around


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-14 Thread via GitHub


ozankabak commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2973514966

   > Thinking about it some more. The evaluation type is intended to describe 
how the operator computes record batches itself: lazy on demand, or by driving 
things itself. I’m kind of trying to refer to the terminology from the volcano 
paper. That talks about demand-driven and data-driven operators. I had first 
called this 'drive type' with values 'demand' and 'data', but that felt a bit 
awkward. Since this is actually a property of how the operator prepares its 
output, one value per operator is probably fine after all.
   
   OK - let's try that way, we can change later if it turns out to be 
insufficient


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-14 Thread via GitHub


pepijnve commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2973036248

   Open to suggestions on better names for these properties.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-14 Thread via GitHub


pepijnve commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2973035858

   Thinking about it some more. The evaluation type is intended to describe how 
the operator computes record batches itself: lazy on demand, or by driving 
things itself. I’m kind of trying to refer to the terminology from the volcano 
paper. That task about demand-driven and data-driven operators.
   Since this is actually a property of how the operator prepare its output, 
one value per operator is probably fine after all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-14 Thread via GitHub


ozankabak commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2972621416

   > While I was writing this I started wondering if evaluation type should be 
a per child thing. In my spawn experiment branch for instance hash join is 
eager for the build side, but lazy for the probe side. Perhaps it would be best 
to leave room for that.
   
   This is in alignment with what I was thinking, let's do it that way


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-13 Thread via GitHub


pepijnve commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2970729760

   @ozankabak I've pushed the optimizer rule changes I had in mind. This 
introduces two new execution plan properties that capture the evaluation type 
(eager vs lazy) and the scheduling type (blocking vs cooperative). With those 
two combined the tree can be rewritten in a bottom up fashion. Every leaf that 
is not cooperative gets wrapped as before. Additionally, any eager evaluating 
nodes (i.e. exchanges) that are not cooperative are wrapped. This should ensure 
the entire plan participates in cooperative scheduling.
   
   The only caveat that remains is dynamic stream creation. Operators that do 
that need to take the necessary precautions themselves. I already update the 
spill manager for this in the previous commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Use Tokio's task budget consistently [datafusion]

2025-06-13 Thread via GitHub


ozankabak commented on PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2970506512

   Thanks for the draft -- this is inline with my understanding from your 
description. I think it will inch us closer to a good, lasting solution 
(especially after your upstream tokio also PR merges). Feel free to ping me for 
a more detailed review once you are done with it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]