Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2981051106 > > I agree the existing benchmarks are not perfect > > Not trying to criticize the benchmarks, just saying I'm having trouble using it as a guide. @alamb Note that by default the current branch is using the local counter method @zhuqi-lucas implemented for `YieldStream`. Performance should be identical to `main` in other words. I totally get it -- we'll keep improving the benchmarks / hopefully making them easier to use / more stable over time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2981029248 π€: Benchmark completed Details ``` Comparing HEAD and task_budget Benchmark clickbench_extended.json ββββ³ββ³ββ³βββ β QueryβHEAD β task_budget β Change β β‘ββββββββββ© β QQuery 0 β 1929.02 ms β 1971.93 ms βno change β β QQuery 1 β 704.63 ms β 718.11 ms βno change β β QQuery 2 β 1412.62 ms β 1368.78 ms βno change β β QQuery 3 β 647.44 ms β 680.23 ms β 1.05x slower β β QQuery 4 β 1400.23 ms β 1358.38 ms βno change β β QQuery 5 β 14995.43 ms β 15226.83 ms βno change β β QQuery 6 β 1985.14 ms β 2087.35 ms β 1.05x slower β β QQuery 7 β 1989.35 ms β 1969.02 ms βno change β β QQuery 8 β 785.58 ms β 786.62 ms βno change β ββββ΄ββ΄ββ΄βββ ββ³β β Benchmark Summary ββ β‘ββ© β Total Time (HEAD) β 25849.44ms β β Total Time (task_budget) β 26167.24ms β β Average Time (HEAD)β 2872.16ms β β Average Time (task_budget) β 2907.47ms β β Queries Faster β 0 β β Queries Slower β 2 β β Queries with No Change β 7 β β Queries with Failure β 0 β ββ΄β Benchmark clickbench_partitioned.json ββββ³ββ³ββ³ββββ β QueryβHEAD β task_budget βChange β β‘βββββββββββ© β QQuery 0 β15.70 ms β15.32 ms β no change β β QQuery 1 β33.40 ms β32.29 ms β no change β β QQuery 2 β78.71 ms β81.60 ms β no change β β QQuery 3 β94.83 ms β96.30 ms β no change β β QQuery 4 β 590.16 ms β 586.19 ms β no change β β QQuery 5 β 839.78 ms β 884.55 ms β 1.05x slower β β QQuery 6 β23.62 ms β24.22 ms β no change β β QQuery 7 β35.97 ms β35.71 ms β no change β β QQuery 8 β 845.86 ms β 836.34 ms β no change β β QQuery 9 β 1155.51 ms β 1124.37 ms β no change β β QQuery 10β 255.94 ms β 258.01 ms β no change β β QQuery 11β 283.03 ms β 285.17 ms β no change β β QQuery 12β 861.95 ms β 884.44 ms β no change β β QQuery 13β 1256.74 ms β 1274.54 ms β no change β β QQuery 14β 801.84 ms β 819.99 ms β no change β β QQuery 15β 786.28 ms β 775.94 ms β no change β β QQuery 16β 1616.39 ms β 1622.78 ms β no change β β QQuery 17β 1591.05 ms β 1593.83 ms β no change β β QQuery 18β 2857.24 ms β 2902.54 ms β no change β β QQuery 19β89.43 ms β84.53 ms β +1.06x faster β β QQuery 20β 1125.85 ms β 1171.56 ms β no change β β QQuery 21β 1266.95 ms β 1307.77 ms β no change β β QQuery 22β 2089.10 ms β 2189.53 ms β no change β β QQuery 23β 7574.01 ms β 7914.18 ms β no change β β QQuery 24β 444.06 ms β 466.35 ms β 1.05x slower β β QQuery 25β 378.44 ms β 397.31 ms β no change β β QQuery 26β 521.01 ms β 525.65 ms β no change β β QQuery 27β 1503.49 ms β 1548.92 ms β no change β β QQuery 28β 11900.61 ms β 11913.35 ms β no change β β QQuery 29β 534.80 ms β 540.04 ms β no change β β QQuery 30β 764.78 ms β 790.34 ms β no change β β QQuery 31β 812.47 ms β 825.30 ms β no change β β QQuery 32β 2541.75 ms β 2545.00 ms β no change β β QQuery 33β 3179.10 ms β 3206.02 ms β no change β β QQuery 34β 3202.13 ms β 3242.56 ms β no change β β QQuery 35β 1219.46 ms β 1221.75 ms β no change β β QQuery 36β 121.25 ms β 126.01 ms β no change β β QQuery 37β54.86 ms β56.17 ms β no change β β QQuery 38β 119.17 ms β 125.52 ms β 1.05x slower β β QQuery 39β 192.11 ms β 198.37 ms β no change β β QQuery 40β47.73 ms β50.41 ms β 1.06x slower β β QQuery 41β43.85 ms β45.09 ms β no change β β QQuery 42β37.48 ms β38.16 ms β no change β ββββ΄ββ΄ββ΄ββββ ββ³β β Benchmark Summary ββ β‘ββ© β Total Time (HEAD) β 53787.93ms β β Total Time (task_budget) β 54664.0
Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980961262 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing task_budget (a749e8f60cbcdf5542b4b69a0a2f94c7790fab45) to 06631c25316ed2fc20ab8114d0dcc801f353fbad [diff](https://github.com/apache/datafusion/compare/06631c25316ed2fc20ab8114d0dcc801f353fbad..a749e8f60cbcdf5542b4b69a0a2f94c7790fab45) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980961066 π€: Benchmark completed Details ``` Comparing HEAD and task_budget Benchmark clickbench_1.json ββββ³ββ³ββ³ββββ β QueryβHEAD β task_budget βChange β β‘βββββββββββ© β QQuery 0 β51.47 ms β46.15 ms β +1.12x faster β β QQuery 1 β75.49 ms β72.84 ms β no change β β QQuery 2 β 109.24 ms β 108.00 ms β no change β β QQuery 3 β 127.42 ms β 119.32 ms β +1.07x faster β β QQuery 4 β 633.00 ms β 685.11 ms β 1.08x slower β β QQuery 5 β 853.72 ms β 863.05 ms β no change β β QQuery 6 β56.74 ms β54.54 ms β no change β β QQuery 7 β80.42 ms β82.07 ms β no change β β QQuery 8 β 881.04 ms β 884.51 ms β no change β β QQuery 9 β 1194.27 ms β 1202.55 ms β no change β β QQuery 10β 300.62 ms β 297.89 ms β no change β β QQuery 11β 318.31 ms β 323.44 ms β no change β β QQuery 12β 879.94 ms β 888.82 ms β no change β β QQuery 13β 1245.61 ms β 1252.15 ms β no change β β QQuery 14β 803.27 ms β 814.41 ms β no change β β QQuery 15β 803.96 ms β 822.93 ms β no change β β QQuery 16β 1630.13 ms β 1686.71 ms β no change β β QQuery 17β 1584.79 ms β 1608.29 ms β no change β β QQuery 18β 2894.48 ms β 2899.79 ms β no change β β QQuery 19β 125.96 ms β 119.63 ms β +1.05x faster β β QQuery 20β 1140.75 ms β 1206.46 ms β 1.06x slower β β QQuery 21β 1337.94 ms β 1390.68 ms β no change β β QQuery 22β 2323.74 ms β 2408.18 ms β no change β β QQuery 23β 7822.32 ms β 8103.67 ms β no change β β QQuery 24β 489.54 ms β 507.76 ms β no change β β QQuery 25β 406.25 ms β 432.43 ms β 1.06x slower β β QQuery 26β 545.86 ms β 568.63 ms β no change β β QQuery 27β 1638.18 ms β 1683.81 ms β no change β β QQuery 28β 12379.17 ms β 12511.87 ms β no change β β QQuery 29β 565.53 ms β 581.52 ms β no change β β QQuery 30β 803.29 ms β 816.97 ms β no change β β QQuery 31β 843.85 ms β 872.76 ms β no change β β QQuery 32β 2547.32 ms β 2563.59 ms β no change β β QQuery 33β 3256.21 ms β 3307.21 ms β no change β β QQuery 34β 3266.60 ms β 3323.18 ms β no change β β QQuery 35β 1258.28 ms β 1288.71 ms β no change β β QQuery 36β 173.64 ms β 181.16 ms β no change β β QQuery 37β 101.14 ms β 101.04 ms β no change β β QQuery 38β 171.00 ms β 179.89 ms β 1.05x slower β β QQuery 39β 252.45 ms β 266.95 ms β 1.06x slower β β QQuery 40β70.71 ms β88.09 ms β 1.25x slower β β QQuery 41β87.06 ms β86.11 ms β no change β β QQuery 42β78.55 ms β78.27 ms β no change β ββββ΄ββ΄ββ΄ββββ ββ³β β Benchmark Summary ββ β‘ββ© β Total Time (HEAD) β 56209.26ms β β Total Time (task_budget) β 57381.15ms β β Average Time (HEAD)β 1307.19ms β β Average Time (task_budget) β 1334.45ms β β Queries Faster β 3 β β Queries Slower β 6 β β Queries with No Change β 34 β β Queries with Failure β 0 β ββ΄β ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152534664
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -912,25 +948,7 @@ impl PlanProperties {
/// 2. CoalescePartitionsExec for collapsing all of the partitions into
one without ordering guarantee
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions
into one with ordering guarantee
pub fn need_data_exchange(plan: Arc) -> bool {
-if let Some(repartition) = plan.as_any().downcast_ref::()
{
-!matches!(
-repartition.properties().output_partitioning(),
-Partitioning::RoundRobinBatch(_)
-)
-} else if let Some(coalesce) =
plan.as_any().downcast_ref::()
-{
-coalesce.input().output_partitioning().partition_count() > 1
-} else if let Some(sort_preserving_merge) =
-plan.as_any().downcast_ref::()
-{
-sort_preserving_merge
-.input()
-.output_partitioning()
-.partition_count()
-> 1
-} else {
-false
-}
+plan.properties().evaluation_type == EvaluationType::Lazy
Review Comment:
that is certainly a lot nicer and easier to understand
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -743,6 +733,38 @@ pub enum EmissionType {
Both,
}
+/// Represents whether an operator's `Stream` has been implemented to actively
cooperate with the
+/// Tokio scheduler or not.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SchedulingType {
+/// The stream generated by [`execute`](ExecutionPlan::execute) does not
actively participate in
+/// cooperative scheduling. This means the implementation of the `Stream`
returned by
+/// [`ExecutionPlan::execute`] does not contain explicit cooperative yield
points.
+Blocking,
+/// The stream generated by [`execute`](ExecutionPlan::execute) actively
participates in
+/// cooperative scheduling by consuming task budget when it was able to
produce a
+/// [`RecordBatch`]. Please refer to the [`coop`](crate::coop) module for
more details.
+Cooperative,
+}
+
+/// Represents how an operator's `Stream` implementation generates
`RecordBatch`es.
+///
+/// Most operators in DataFusion generate `RecordBatch`es when asked to do so
by a call to
+/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
+///
+/// Some operators like `Repartition` need to drive `RecordBatch` generation
themselves though. This
+/// is known as data-driven or eager evaluation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum EvaluationType {
+/// The stream generated by [`execute`](ExecutionPlan::execute) only
generates `RecordBatch`
+/// instances when it is demanded by invoking `Stream::poll_next`.
+Lazy,
+/// The stream generated by [`execute`](ExecutionPlan::execute) eagerly
generates `RecordBatch`
+/// in one or more spawned Tokio tasks. Eager evaluation is only started
the first time
+/// `Stream::poll_next` is called.
Review Comment:
```suggestion
/// `Stream::poll_next` is called. Hash aggregation and HashJoin are
examples of such operators
```
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -743,6 +733,38 @@ pub enum EmissionType {
Both,
}
+/// Represents whether an operator's `Stream` has been implemented to actively
cooperate with the
+/// Tokio scheduler or not.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SchedulingType {
+/// The stream generated by [`execute`](ExecutionPlan::execute) does not
actively participate in
+/// cooperative scheduling. This means the implementation of the `Stream`
returned by
+/// [`ExecutionPlan::execute`] does not contain explicit cooperative yield
points.
Review Comment:
I think it would also be helpful here to give an explicitl example of what a
cooperative yield point is. For example
```suggestion
/// [`ExecutionPlan::execute`] does not contain explicit cooperative
yield points such as
/// `await` or [`tokio::task::yield_now`].
```
##
datafusion/core/tests/execution/coop.rs:
##
@@ -0,0 +1,747 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+
Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980925371 > π€: Benchmark completed Will see if it is reproducable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980898342 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing task_budget (a749e8f60cbcdf5542b4b69a0a2f94c7790fab45) to 06631c25316ed2fc20ab8114d0dcc801f353fbad [diff](https://github.com/apache/datafusion/compare/06631c25316ed2fc20ab8114d0dcc801f353fbad..a749e8f60cbcdf5542b4b69a0a2f94c7790fab45) Benchmarks: clickbench_1 Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980898244 π€: Benchmark completed Details ``` Comparing HEAD and task_budget Benchmark clickbench_extended.json ββββ³ββ³ββ³ββββ β QueryβHEAD β task_budget βChange β β‘βββββββββββ© β QQuery 0 β 1938.69 ms β 1955.98 ms β no change β β QQuery 1 β 728.67 ms β 732.71 ms β no change β β QQuery 2 β 1393.40 ms β 1380.16 ms β no change β β QQuery 3 β 654.25 ms β 667.02 ms β no change β β QQuery 4 β 1374.93 ms β 1368.90 ms β no change β β QQuery 5 β 14843.62 ms β 14967.16 ms β no change β β QQuery 6 β 2001.52 ms β 2082.06 ms β no change β β QQuery 7 β 1889.83 ms β 1950.81 ms β no change β β QQuery 8 β 805.12 ms β 809.25 ms β no change β ββββ΄ββ΄ββ΄ββββ ββ³β β Benchmark Summary ββ β‘ββ© β Total Time (HEAD) β 25630.03ms β β Total Time (task_budget) β 25914.06ms β β Average Time (HEAD)β 2847.78ms β β Average Time (task_budget) β 2879.34ms β β Queries Faster β 0 β β Queries Slower β 0 β β Queries with No Change β 9 β β Queries with Failure β 0 β ββ΄β Benchmark clickbench_partitioned.json ββββ³ββ³ββ³ββββ β QueryβHEAD β task_budget βChange β β‘βββββββββββ© β QQuery 0 β15.26 ms β15.20 ms β no change β β QQuery 1 β33.51 ms β32.70 ms β no change β β QQuery 2 β81.78 ms β80.44 ms β no change β β QQuery 3 β 100.21 ms β96.71 ms β no change β β QQuery 4 β 578.10 ms β 585.73 ms β no change β β QQuery 5 β 838.83 ms β 887.89 ms β 1.06x slower β β QQuery 6 β24.33 ms β22.32 ms β +1.09x faster β β QQuery 7 β38.56 ms β36.90 ms β no change β β QQuery 8 β 850.10 ms β 856.20 ms β no change β β QQuery 9 β 1151.01 ms β 1146.42 ms β no change β β QQuery 10β 252.88 ms β 263.74 ms β no change β β QQuery 11β 283.58 ms β 290.15 ms β no change β β QQuery 12β 869.70 ms β 866.73 ms β no change β β QQuery 13β 1255.69 ms β 1286.10 ms β no change β β QQuery 14β 809.83 ms β 812.02 ms β no change β β QQuery 15β 777.07 ms β 786.77 ms β no change β β QQuery 16β 1613.49 ms β 1629.44 ms β no change β β QQuery 17β 1567.97 ms β 1580.52 ms β no change β β QQuery 18β 2886.23 ms β 2891.01 ms β no change β β QQuery 19β87.29 ms β84.38 ms β no change β β QQuery 20β 1113.26 ms β 1152.62 ms β no change β β QQuery 21β 1252.92 ms β 1328.12 ms β 1.06x slower β β QQuery 22β 2069.04 ms β 2168.76 ms β no change β β QQuery 23β 7539.31 ms β 7957.48 ms β 1.06x slower β β QQuery 24β 443.12 ms β 464.35 ms β no change β β QQuery 25β 379.12 ms β 404.81 ms β 1.07x slower β β QQuery 26β 503.89 ms β 530.15 ms β 1.05x slower β β QQuery 27β 1500.52 ms β 1566.86 ms β no change β β QQuery 28β 12013.51 ms β 11817.68 ms β no change β β QQuery 29β 527.40 ms β 525.63 ms β no change β β QQuery 30β 758.72 ms β 791.22 ms β no change β β QQuery 31β 811.10 ms β 829.72 ms β no change β β QQuery 32β 2485.30 ms β 2553.71 ms β no change β β QQuery 33β 3181.91 ms β 3185.99 ms β no change β β QQuery 34β 3176.40 ms β 3209.67 ms β no change β β QQuery 35β 1228.64 ms β 1223.48 ms β no change β β QQuery 36β 120.17 ms β 127.76 ms β 1.06x slower β β QQuery 37β57.02 ms β56.75 ms β no change β β QQuery 38β 120.93 ms β 127.28 ms β 1.05x slower β β QQuery 39β 191.04 ms β 202.25 ms β 1.06x slower β β QQuery 40β47.95 ms β47.32 ms β no change β β QQuery 41β45.18 ms β45.53 ms β no change β β QQuery 42β36.63 ms β39.44 ms β 1.08x slower β ββββ΄ββ΄ββ΄ββββ ββ³β β Benchmark Summary ββ β‘ββ© β Total Time (HEAD) β 53718.51ms β β Total Time (task_budget) β 54607.96ms β β Average Time (HEAD)β
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980783229 > I agree the existing benchmarks are not perfect Not trying to criticize the benchmarks, just saying I'm having trouble using it as a guide. @alamb Note that by default the current branch is using the local counter method @zhuqi-lucas implemented for `YieldStream`. Performance should be identical to `main` in other words. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980750548 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing task_budget (5bfff9798b2815377ef8d40dfaffd26947c51385) to 06631c25316ed2fc20ab8114d0dcc801f353fbad [diff](https://github.com/apache/datafusion/compare/06631c25316ed2fc20ab8114d0dcc801f353fbad..5bfff9798b2815377ef8d40dfaffd26947c51385) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
alamb commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980748403 > I've been trying to measure the impact of these changes, but for the life of me I can't get useful results out of the benchmark comparison script. The sample below is actually comparing the exact same commit on a mac mini M4 and yet I still see large fluctuations. Not sure what else I can do π€·ββοΈ. I agree the existing benchmarks are not perfect -- figuring out how to make them better would be good, though as you probably know it is a challenge of benchmarking in general. I'll kick off some runs against this PR with my scripts to see what it shows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152509411
##
datafusion/datasource/src/source.rs:
##
@@ -261,11 +261,7 @@ impl ExecutionPlan for DataSourceExec {
) -> Result {
self.data_source
.open(partition, Arc::clone(&context))
-.map(|stream| wrap_yield_stream(stream, &context,
self.cooperative))
-}
-
-fn with_cooperative_yields(self: Arc) -> Option> {
-self.cooperative.then_some(self)
+.map(|stream| make_cooperative(stream))
Review Comment:
I've gone ahead and made that change and removed the dyn wrapper while I was
at it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152483273
##
datafusion/datasource/src/source.rs:
##
@@ -261,11 +261,7 @@ impl ExecutionPlan for DataSourceExec {
) -> Result {
self.data_source
.open(partition, Arc::clone(&context))
-.map(|stream| wrap_yield_stream(stream, &context,
self.cooperative))
-}
-
-fn with_cooperative_yields(self: Arc) -> Option> {
-self.cooperative.then_some(self)
+.map(|stream| make_cooperative(stream))
Review Comment:
That would make sense indeed. ExecutionPlan fulfills the role of a factory
of Streams. DataSourceExec delegates that to the DataSource itself. So
Datasource is actually the factory. It would make sense to delegate the 'will
you create something cooperative?' query as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
ozankabak commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152472886
##
datafusion/datasource/src/source.rs:
##
@@ -261,11 +261,7 @@ impl ExecutionPlan for DataSourceExec {
) -> Result {
self.data_source
.open(partition, Arc::clone(&context))
-.map(|stream| wrap_yield_stream(stream, &context,
self.cooperative))
-}
-
-fn with_cooperative_yields(self: Arc) -> Option> {
-self.cooperative.then_some(self)
+.map(|stream| make_cooperative(stream))
Review Comment:
I don't think it is critical performance-wise, but now I wonder if we should
make any additions to the `DataSource` trait to signal built-in cooperative
behavior to `DataSourceExec`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152379108
##
datafusion/datasource/src/source.rs:
##
@@ -261,11 +261,7 @@ impl ExecutionPlan for DataSourceExec {
) -> Result {
self.data_source
.open(partition, Arc::clone(&context))
-.map(|stream| wrap_yield_stream(stream, &context,
self.cooperative))
-}
-
-fn with_cooperative_yields(self: Arc) -> Option> {
-self.cooperative.then_some(self)
+.map(|stream| make_cooperative(stream))
Review Comment:
We could avoid a virtual call here by pushing the cooperative wrapper down
to `FileScanConfig::open`. That does lose the benefit of `DataSourceExec` being
cooperative for any `DataSource`. Is it worth shaving off one virtual call per
record batch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2980397594 I've been trying to measure the impact of these changes, but for the life of me I can't get useful results out of the benchmark comparison script. The sample below is actually comparing the exact same commit on a mac mini M4 and yet I still see large fluctuations. Not sure what else I can do π€·ββοΈ. ``` ββββ³β³β³ββββ β Queryβ baseline β branch βChange β β‘βββββββββ© β QQuery 0 β9.71 ms β9.87 ms β no change β β QQuery 1 β 15.07 ms β 15.36 ms β no change β β QQuery 2 β 47.76 ms β 47.81 ms β no change β β QQuery 3 β 46.78 ms β 47.13 ms β no change β β QQuery 4 β 426.60 ms β 426.87 ms β no change β β QQuery 5 β 557.13 ms β 559.79 ms β no change β β QQuery 6 β 11.37 ms β 11.30 ms β no change β β QQuery 7 β 16.17 ms β 15.99 ms β no change β β QQuery 8 β 518.13 ms β 516.98 ms β no change β β QQuery 9 β 671.89 ms β 669.00 ms β no change β β QQuery 10β 146.75 ms β 149.95 ms β no change β β QQuery 11β 162.79 ms β 164.02 ms β no change β β QQuery 12β 567.28 ms β 569.63 ms β no change β β QQuery 13β 799.84 ms β 796.18 ms β no change β β QQuery 14β 542.10 ms β 535.99 ms β no change β β QQuery 15β 502.33 ms β 500.89 ms β no change β β QQuery 16β 1151.00 ms β 1150.96 ms β no change β β QQuery 17β 1127.41 ms β 1116.08 ms β no change β β QQuery 18β 2195.19 ms β 2182.97 ms β no change β β QQuery 19β 39.96 ms β 40.64 ms β no change β β QQuery 20β 858.48 ms β 859.71 ms β no change β β QQuery 21β 988.68 ms β 1048.57 ms β 1.06x slower β β QQuery 22β 1594.48 ms β 2153.36 ms β 1.35x slower β β QQuery 23β 6399.31 ms β 6905.44 ms β 1.08x slower β β QQuery 24β 305.36 ms β 299.88 ms β no change β β QQuery 25β 274.50 ms β 276.00 ms β no change β β QQuery 26β 338.09 ms β 346.69 ms β no change β β QQuery 27β 1140.21 ms β 1213.95 ms β 1.06x slower β β QQuery 28β 8881.31 ms β 9061.62 ms β no change β β QQuery 29β 417.02 ms β 414.85 ms β no change β β QQuery 30β 487.07 ms β 486.73 ms β no change β β QQuery 31β 507.31 ms β 508.16 ms β no change β β QQuery 32β 2154.40 ms β 1917.22 ms β +1.12x faster β β QQuery 33β 2498.71 ms β 2467.38 ms β no change β β QQuery 34β 2678.02 ms β 2802.00 ms β no change β β QQuery 35β 751.17 ms β 772.78 ms β no change β β QQuery 36β 61.54 ms β 62.50 ms β no change β β QQuery 37β 25.67 ms β 25.56 ms β no change β β QQuery 38β 61.80 ms β 62.39 ms β no change β β QQuery 39β 101.51 ms β 102.13 ms β no change β β QQuery 40β 18.26 ms β 19.09 ms β no change β β QQuery 41β 17.70 ms β 17.77 ms β no change β β QQuery 42β 14.84 ms β 15.32 ms β no change β ββββ΄β΄β΄ββββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
ozankabak commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2979894941 I took a quick look at this yesterday and it was looking good -- I will review in more detail in a day or two. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2979874048 The current Tokio code is only an approximation of the final code. For the final version we're dependent on a Tokio PR getting merged and released. In order to unblock this PR and allow review to proceed, I've added a couple of feature flags that make the Tokio based code disabled by default. The intention is to allow comparing the two variants from a performance point-of-view and consider the revised cooperative API for merging already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2151897561
##
datafusion/core/tests/execution/coop.rs:
##
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_plan;
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::Boundedness;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_execution::TaskContext;
+use datafusion_expr_common::operator::Operator;
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_functions_aggregate::min_max;
+use datafusion_physical_expr::expressions::{
+binary, col, lit, BinaryExpr, Column, Literal,
+};
+use datafusion_physical_expr::Partitioning;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
+use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode,
SortMergeJoinExec};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+use futures::StreamExt;
+use parking_lot::RwLock;
+use rstest::rstest;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::ops::Range;
+use std::sync::Arc;
+use std::task::Poll;
+use tokio::runtime::{Handle, Runtime};
+use tokio::select;
+
+#[derive(Debug)]
+struct RangeBatchGenerator {
+schema: SchemaRef,
+value_range: Range,
+boundedness: Boundedness,
+batch_size: usize,
+poll_count: usize,
+}
+
+impl std::fmt::Display for RangeBatchGenerator {
+fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+// Display current counter
+write!(f, "InfiniteGenerator(counter={})", self.poll_count)
+}
+}
+
+impl LazyBatchGenerator for RangeBatchGenerator {
+fn boundedness(&self) -> Boundedness {
+self.boundedness
+}
+
+/// Generate the next RecordBatch.
+fn generate_next_batch(&mut self) ->
datafusion_common::Result> {
+self.poll_count += 1;
+
+let mut builder = Int64Array::builder(self.batch_size);
+for _ in 0..self.batch_size {
+match self.value_range.next() {
+None => break,
+Some(v) => builder.append_value(v),
+}
+}
+let array = builder.finish();
+
+if array.is_empty() {
+return Ok(None);
+}
+
+let batch =
+RecordBatch::try_new(Arc::clone(&self.schema),
vec![Arc::new(array)])?;
+Ok(Some(batch))
+}
+}
+
+fn make_lazy_exec(column_name: &str, pretend_infinite: bool) -> LazyMemoryExec
{
+make_lazy_exec_with_range(column_name, i64::MIN..i64::MAX,
pretend_infinite)
+}
+
+fn make_lazy_exec_with_range(
+column_name: &str,
+range: Range,
+pretend_infinite: bool,
+) -> LazyMemoryExec {
+let schema = Arc::new(Schema::new(vec![Field::new(
+column_name,
+DataType::Int64,
+false,
+)]));
+
+let boundedness = if pretend_infinite {
+Boundedness::Unbounded {
+requires_infinite_memory: false,
+}
+} else {
+Boundedness::Bounded
+};
+
+// Instantiate the generator with the batch and limit
+let gen = RangeBatchGenerator {
+schema: Arc::clone(&schema),
+boundedness,
+value_range: range,
+batch_
Re: [PR] Use Tokio's task budget consistently [datafusion]
zhuqi-lucas commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2151253373
##
datafusion/core/tests/execution/coop.rs:
##
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_plan;
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::Boundedness;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_execution::TaskContext;
+use datafusion_expr_common::operator::Operator;
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_functions_aggregate::min_max;
+use datafusion_physical_expr::expressions::{
+binary, col, lit, BinaryExpr, Column, Literal,
+};
+use datafusion_physical_expr::Partitioning;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
+use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode,
SortMergeJoinExec};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+use futures::StreamExt;
+use parking_lot::RwLock;
+use rstest::rstest;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::ops::Range;
+use std::sync::Arc;
+use std::task::Poll;
+use tokio::runtime::{Handle, Runtime};
+use tokio::select;
+
+#[derive(Debug)]
+struct RangeBatchGenerator {
+schema: SchemaRef,
+value_range: Range,
+boundedness: Boundedness,
+batch_size: usize,
+poll_count: usize,
+}
+
+impl std::fmt::Display for RangeBatchGenerator {
+fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+// Display current counter
+write!(f, "InfiniteGenerator(counter={})", self.poll_count)
+}
+}
+
+impl LazyBatchGenerator for RangeBatchGenerator {
+fn boundedness(&self) -> Boundedness {
+self.boundedness
+}
+
+/// Generate the next RecordBatch.
+fn generate_next_batch(&mut self) ->
datafusion_common::Result> {
+self.poll_count += 1;
+
+let mut builder = Int64Array::builder(self.batch_size);
+for _ in 0..self.batch_size {
+match self.value_range.next() {
+None => break,
+Some(v) => builder.append_value(v),
+}
+}
+let array = builder.finish();
+
+if array.is_empty() {
+return Ok(None);
+}
+
+let batch =
+RecordBatch::try_new(Arc::clone(&self.schema),
vec![Arc::new(array)])?;
+Ok(Some(batch))
+}
+}
+
+fn make_lazy_exec(column_name: &str, pretend_infinite: bool) -> LazyMemoryExec
{
+make_lazy_exec_with_range(column_name, i64::MIN..i64::MAX,
pretend_infinite)
+}
+
+fn make_lazy_exec_with_range(
+column_name: &str,
+range: Range,
+pretend_infinite: bool,
+) -> LazyMemoryExec {
+let schema = Arc::new(Schema::new(vec![Field::new(
+column_name,
+DataType::Int64,
+false,
+)]));
+
+let boundedness = if pretend_infinite {
+Boundedness::Unbounded {
+requires_infinite_memory: false,
+}
+} else {
+Boundedness::Bounded
+};
+
+// Instantiate the generator with the batch and limit
+let gen = RangeBatchGenerator {
+schema: Arc::clone(&schema),
+boundedness,
+value_range: range,
+bat
Re: [PR] Use Tokio's task budget consistently [datafusion]
zhuqi-lucas commented on code in PR #16398:
URL: https://github.com/apache/datafusion/pull/16398#discussion_r2151253373
##
datafusion/core/tests/execution/coop.rs:
##
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_plan;
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::Boundedness;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_execution::TaskContext;
+use datafusion_expr_common::operator::Operator;
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_functions_aggregate::min_max;
+use datafusion_physical_expr::expressions::{
+binary, col, lit, BinaryExpr, Column, Literal,
+};
+use datafusion_physical_expr::Partitioning;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
+use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode,
SortMergeJoinExec};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+use futures::StreamExt;
+use parking_lot::RwLock;
+use rstest::rstest;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::ops::Range;
+use std::sync::Arc;
+use std::task::Poll;
+use tokio::runtime::{Handle, Runtime};
+use tokio::select;
+
+#[derive(Debug)]
+struct RangeBatchGenerator {
+schema: SchemaRef,
+value_range: Range,
+boundedness: Boundedness,
+batch_size: usize,
+poll_count: usize,
+}
+
+impl std::fmt::Display for RangeBatchGenerator {
+fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+// Display current counter
+write!(f, "InfiniteGenerator(counter={})", self.poll_count)
+}
+}
+
+impl LazyBatchGenerator for RangeBatchGenerator {
+fn boundedness(&self) -> Boundedness {
+self.boundedness
+}
+
+/// Generate the next RecordBatch.
+fn generate_next_batch(&mut self) ->
datafusion_common::Result> {
+self.poll_count += 1;
+
+let mut builder = Int64Array::builder(self.batch_size);
+for _ in 0..self.batch_size {
+match self.value_range.next() {
+None => break,
+Some(v) => builder.append_value(v),
+}
+}
+let array = builder.finish();
+
+if array.is_empty() {
+return Ok(None);
+}
+
+let batch =
+RecordBatch::try_new(Arc::clone(&self.schema),
vec![Arc::new(array)])?;
+Ok(Some(batch))
+}
+}
+
+fn make_lazy_exec(column_name: &str, pretend_infinite: bool) -> LazyMemoryExec
{
+make_lazy_exec_with_range(column_name, i64::MIN..i64::MAX,
pretend_infinite)
+}
+
+fn make_lazy_exec_with_range(
+column_name: &str,
+range: Range,
+pretend_infinite: bool,
+) -> LazyMemoryExec {
+let schema = Arc::new(Schema::new(vec![Field::new(
+column_name,
+DataType::Int64,
+false,
+)]));
+
+let boundedness = if pretend_infinite {
+Boundedness::Unbounded {
+requires_infinite_memory: false,
+}
+} else {
+Boundedness::Bounded
+};
+
+// Instantiate the generator with the batch and limit
+let gen = RangeBatchGenerator {
+schema: Arc::clone(&schema),
+boundedness,
+value_range: range,
+bat
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2973548782 I've added a commit to this PR that: - Makes the tests more robust. Rather than hanging when there's an issue they will fail. - Removes duplication from the tests - Removes repartitions and coalesces in most tests to ensure we're actually testing the right thing. I'm to get the tests to set up the challenging situations rather than massaging things so that they work. - Merges in some of the extra test cases I still had lying around -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
ozankabak commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2973514966 > Thinking about it some more. The evaluation type is intended to describe how the operator computes record batches itself: lazy on demand, or by driving things itself. Iβm kind of trying to refer to the terminology from the volcano paper. That talks about demand-driven and data-driven operators. I had first called this 'drive type' with values 'demand' and 'data', but that felt a bit awkward. Since this is actually a property of how the operator prepares its output, one value per operator is probably fine after all. OK - let's try that way, we can change later if it turns out to be insufficient -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2973036248 Open to suggestions on better names for these properties. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2973035858 Thinking about it some more. The evaluation type is intended to describe how the operator computes record batches itself: lazy on demand, or by driving things itself. Iβm kind of trying to refer to the terminology from the volcano paper. That task about demand-driven and data-driven operators. Since this is actually a property of how the operator prepare its output, one value per operator is probably fine after all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
ozankabak commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2972621416 > While I was writing this I started wondering if evaluation type should be a per child thing. In my spawn experiment branch for instance hash join is eager for the build side, but lazy for the probe side. Perhaps it would be best to leave room for that. This is in alignment with what I was thinking, let's do it that way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
pepijnve commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2970729760 @ozankabak I've pushed the optimizer rule changes I had in mind. This introduces two new execution plan properties that capture the evaluation type (eager vs lazy) and the scheduling type (blocking vs cooperative). With those two combined the tree can be rewritten in a bottom up fashion. Every leaf that is not cooperative gets wrapped as before. Additionally, any eager evaluating nodes (i.e. exchanges) that are not cooperative are wrapped. This should ensure the entire plan participates in cooperative scheduling. The only caveat that remains is dynamic stream creation. Operators that do that need to take the necessary precautions themselves. I already update the spill manager for this in the previous commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Use Tokio's task budget consistently [datafusion]
ozankabak commented on PR #16398: URL: https://github.com/apache/datafusion/pull/16398#issuecomment-2970506512 Thanks for the draft -- this is inline with my understanding from your description. I think it will inch us closer to a good, lasting solution (especially after your upstream tokio also PR merges). Feel free to ping me for a more detailed review once you are done with it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
