alamb commented on code in PR #11875:
URL: https://github.com/apache/datafusion/pull/11875#discussion_r1711494810
##########
datafusion/common/src/config.rs:
##########
@@ -333,6 +333,11 @@ config_namespace! {
/// Number of input rows partial aggregation partition should process,
before
/// aggregation ratio check and trying to switch to skipping
aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default =
100_000
+
+ /// Should DataFusion use row number estimate at the input to decide
whether increasing
+ /// parallelism is beneficial or not. By default, only exact row
number (not estimates)
+ /// are used for decision. Setting this flag to `true` will more
likely produce better plans.
Review Comment:
I think it might help people understand what this does a bit more and when
it would be good to turn on
```suggestion
/// are used for this decision. Setting this flag to `true` will
more likely produce better plans
/// if the source of statistics is accurate.
```
##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1031,6 +1031,75 @@ fn replace_order_preserving_variants(
context.update_plan_from_children()
}
+/// A struct to keep track of RepartitionRequirement status for each child
node.
+struct RepartitionRequirementStatus {
+ roundrobin_beneficial: bool,
+ hash_necessary: bool,
+}
+
+/// Calculates the `RepartitionRequirementStatus` for each children to
generate consistent requirements.
+/// As an example, for hash exec left children might produce
`RepartitionRequirementStatus{roundrobin_beneficial: true, hash_necessary:
true}`
+/// and right children might produce:
`RepartitionRequirementStatus{roundrobin_beneficial: false, hash_necessary:
false}`.
+/// When target partitions=4, left child might produce `Hash(vec![expr], 4)`
and right child might produce `Hash(vec![expr], 4)`. However,
+/// for correct operation we need consistent hashes accross children. This
util turns right child status:
+/// from `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: false}` into
+/// `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: true}` to generate consistent plan.
Review Comment:
```suggestion
/// Calculates the `RepartitionRequirementStatus` for each children to
generate consistent requirements.
/// As an example, for a `HashJoinExec`, the left children might produce
`RepartitionRequirementStatus{roundrobin_beneficial: true, hash_necessary:
true}`
/// and the right children might produce:
`RepartitionRequirementStatus{roundrobin_beneficial: false, hash_necessary:
false}`.
/// When target partitions=4, left child might produce `Hash(vec![expr], 4)`
and right child might produce `Hash(vec![expr], 4)`.
///
/// However,
/// for correct operation we need consistent hashes accross children. This
util turns right child status:
/// from `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: false}` into
/// `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: true}`
/// to generate consistent plan.
```
##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1031,6 +1031,75 @@ fn replace_order_preserving_variants(
context.update_plan_from_children()
}
+/// A struct to keep track of RepartitionRequirement status for each child
node.
+struct RepartitionRequirementStatus {
+ roundrobin_beneficial: bool,
+ hash_necessary: bool,
+}
+
+/// Calculates the `RepartitionRequirementStatus` for each children to
generate consistent requirements.
+/// As an example, for hash exec left children might produce
`RepartitionRequirementStatus{roundrobin_beneficial: true, hash_necessary:
true}`
+/// and right children might produce:
`RepartitionRequirementStatus{roundrobin_beneficial: false, hash_necessary:
false}`.
+/// When target partitions=4, left child might produce `Hash(vec![expr], 4)`
and right child might produce `Hash(vec![expr], 4)`. However,
+/// for correct operation we need consistent hashes accross children. This
util turns right child status:
+/// from `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: false}` into
+/// `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: true}` to generate consistent plan.
+fn get_repartition_status_flags(
+ requirements: &[Distribution],
+ children: &[&Arc<dyn ExecutionPlan>],
+ batch_size: usize,
+ should_use_estimates: bool,
+) -> Result<Vec<RepartitionRequirementStatus>> {
+ debug_assert_eq!(requirements.len(), children.len());
+ let mut repartition_status_flags = vec![];
+ for (child, requirement) in children.iter().zip(requirements) {
+ // Don't need to apply when the returned row count is not greater than
batch size
+ let num_rows = child.statistics()?.num_rows;
+ let roundrobin_beneficial = if let Some(n_rows) = num_rows.get_value()
{
+ // Row count estimate is larger than the batch size.
+ // Adding repartition is desirable for this case
+ // According to `should_use_estimates` flag, we can either use
exact and inexact row numbers or only exact row numbers for this decision.
+ if should_use_estimates || num_rows.is_exact().unwrap() {
Review Comment:
Why is this `unwrap`? Is the rationale that if `num_rows.get_value() `
returns `Some` than so will `is_exact`?
I think it might be more defensive (avoid panics if future refactors change
the invariant above) to explicitly check for some true. Perhaps something like
```suggestion
if should_use_estimates || num_rows.is_exact().unwrap_or(false) {
```
##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1031,6 +1031,75 @@ fn replace_order_preserving_variants(
context.update_plan_from_children()
}
+/// A struct to keep track of RepartitionRequirement status for each child
node.
+struct RepartitionRequirementStatus {
+ roundrobin_beneficial: bool,
+ hash_necessary: bool,
+}
+
+/// Calculates the `RepartitionRequirementStatus` for each children to
generate consistent requirements.
+/// As an example, for hash exec left children might produce
`RepartitionRequirementStatus{roundrobin_beneficial: true, hash_necessary:
true}`
+/// and right children might produce:
`RepartitionRequirementStatus{roundrobin_beneficial: false, hash_necessary:
false}`.
+/// When target partitions=4, left child might produce `Hash(vec![expr], 4)`
and right child might produce `Hash(vec![expr], 4)`. However,
+/// for correct operation we need consistent hashes accross children. This
util turns right child status:
+/// from `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: false}` into
+/// `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: true}` to generate consistent plan.
+fn get_repartition_status_flags(
Review Comment:
A minor stylistic thing is that this function returns something called
`RepartitionRequirementStatus` but it is called `status flags`
Maybe it would be more consistent to name it
```suggestion
fn get_repartition_requirement_status(
```
##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1031,6 +1031,75 @@ fn replace_order_preserving_variants(
context.update_plan_from_children()
}
+/// A struct to keep track of RepartitionRequirement status for each child
node.
+struct RepartitionRequirementStatus {
+ roundrobin_beneficial: bool,
+ hash_necessary: bool,
+}
+
+/// Calculates the `RepartitionRequirementStatus` for each children to
generate consistent requirements.
+/// As an example, for hash exec left children might produce
`RepartitionRequirementStatus{roundrobin_beneficial: true, hash_necessary:
true}`
+/// and right children might produce:
`RepartitionRequirementStatus{roundrobin_beneficial: false, hash_necessary:
false}`.
+/// When target partitions=4, left child might produce `Hash(vec![expr], 4)`
and right child might produce `Hash(vec![expr], 4)`. However,
+/// for correct operation we need consistent hashes accross children. This
util turns right child status:
+/// from `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: false}` into
+/// `RepartitionRequirementStatus{roundrobin_beneficial: false,
hash_necessary: true}` to generate consistent plan.
+fn get_repartition_status_flags(
+ requirements: &[Distribution],
+ children: &[&Arc<dyn ExecutionPlan>],
+ batch_size: usize,
+ should_use_estimates: bool,
+) -> Result<Vec<RepartitionRequirementStatus>> {
+ debug_assert_eq!(requirements.len(), children.len());
+ let mut repartition_status_flags = vec![];
+ for (child, requirement) in children.iter().zip(requirements) {
+ // Don't need to apply when the returned row count is not greater than
batch size
+ let num_rows = child.statistics()?.num_rows;
+ let roundrobin_beneficial = if let Some(n_rows) = num_rows.get_value()
{
+ // Row count estimate is larger than the batch size.
+ // Adding repartition is desirable for this case
+ // According to `should_use_estimates` flag, we can either use
exact and inexact row numbers or only exact row numbers for this decision.
+ if should_use_estimates || num_rows.is_exact().unwrap() {
+ *n_rows > batch_size
+ } else {
+ true
+ }
+ } else {
+ true
+ };
+ let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
+ let mut hash_necessary = false;
+ if is_hash && child.output_partitioning().partition_count() > 1 {
+ // when input partitioning is larger than 1 for hash requirement.
+ // re-partitioning is desired
+ hash_necessary = true;
Review Comment:
this comment implies the hash is desired for performance but when I see
`necessary` I think it is needed for correctness. It might help to clarify when
the hash repartitioning is needed for correctness vs optimization
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -921,7 +922,13 @@ impl ExecutionPlan for SortExec {
}
fn statistics(&self) -> Result<Statistics> {
- self.input.statistics()
+ statistics_with_fetch(
Review Comment:
The fact that another execution plan uses `statistics_with_fetch` is more
reason in my mind to ut `with_fetch` on `Statistics` itself (could totally be
done as a follow on PR)
##########
datafusion/sqllogictest/test_files/count_star_rule.slt:
##########
@@ -86,10 +86,8 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a, count() PARTITION BY [t1.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a]
02)--WindowAggExec: wdw=[count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count() PARTITION BY
[t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }]
-03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-04)------CoalesceBatchesExec: target_batch_size=8192
-05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
-06)----------MemoryExec: partitions=1, partition_sizes=[1]
+03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
Review Comment:
Is the reason this repartition removed because the plan is now using the
actual row count from `MemoryExec` (where it is known accurately)?
##########
datafusion/core/src/physical_optimizer/enforce_sorting.rs:
##########
@@ -1049,6 +1065,98 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_remove_unnecessary_sort6() -> Result<()> {
+ let schema = create_test_schema()?;
+ let source = memory_exec(&schema);
+ let input = Arc::new(
+ SortExec::new(vec![sort_expr("non_nullable_col", &schema)], source)
+ .with_fetch(Some(2)),
+ );
+ let physical_plan = sort_exec(
+ vec![
+ sort_expr("non_nullable_col", &schema),
+ sort_expr("nullable_col", &schema),
+ ],
+ input,
+ );
+
+ let expected_input = [
+ "SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC],
preserve_partitioning=[false]",
+ " SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC],
preserve_partitioning=[false]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
+ ];
Review Comment:
I suggest also testing what happens with the reverse plan (where the topK is
above)
```rust
let expected_input = [
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC],
preserve_partitioning=[false]",
" SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC],
preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
```
I would expect the same optimized plan
```rust
let expected_optimized = [
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1
ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
```
##########
datafusion/sqllogictest/test_files/order.slt:
##########
@@ -1148,3 +1148,21 @@ SELECT (SELECT c from ordered_table ORDER BY c LIMIT 1)
UNION ALL (SELECT 23 as
----
0
23
+
+# Do not increase partition number after fetch 1. As this will be unnecessary.
Review Comment:
This comment implies to me I wouldn't see any RepartitionExec that increases
the number of partitions, but the plan has one after the `SortExec`
Maybe we could clarify the comment to better explain what is expected (maybe
that the sort input should not be reparitioned)?
##########
datafusion/core/src/physical_optimizer/sort_pushdown.rs:
##########
@@ -41,38 +40,62 @@ use datafusion_physical_expr::{
/// This is a "data class" we use within the [`EnforceSorting`] rule to push
/// down [`SortExec`] in the plan. In some cases, we can reduce the total
/// computational cost by pushing down `SortExec`s through some executors. The
-/// object carries the parent required ordering as its data.
+/// object carries the parent required ordering, fetch value of the parent
node as its data.
Review Comment:
```suggestion
/// object carries the parent required ordering and `fetch` value of the
parent node as its data.
```
##########
datafusion/physical-plan/src/limit.rs:
##########
@@ -380,56 +321,110 @@ impl ExecutionPlan for LocalLimitExec {
}
fn statistics(&self) -> Result<Statistics> {
- let input_stats = self.input.statistics()?;
- let col_stats = Statistics::unknown_column(&self.schema());
- let stats = match input_stats {
- // if the input does not reach the limit globally, return input
stats
- Statistics {
- num_rows: Precision::Exact(nr),
- ..
- }
- | Statistics {
- num_rows: Precision::Inexact(nr),
- ..
- } if nr <= self.fetch => input_stats,
- // if the input is greater than the limit, the num_row will be
greater
- // than the limit because the partitions will be limited separately
- // the statistic
- Statistics {
- num_rows: Precision::Exact(nr),
- ..
- } if nr > self.fetch => Statistics {
- num_rows: Precision::Exact(self.fetch),
- // this is not actually exact, but will be when GlobalLimit is
applied
- // TODO stats: find a more explicit way to vehiculate this
information
- column_statistics: col_stats,
- total_byte_size: Precision::Absent,
- },
- Statistics {
- num_rows: Precision::Inexact(nr),
- ..
- } if nr > self.fetch => Statistics {
- num_rows: Precision::Inexact(self.fetch),
- // this is not actually exact, but will be when GlobalLimit is
applied
- // TODO stats: find a more explicit way to vehiculate this
information
- column_statistics: col_stats,
- total_byte_size: Precision::Absent,
- },
- _ => Statistics {
- // the result output row number will always be no greater than
the limit number
- num_rows: Precision::Inexact(
- self.fetch
- *
self.properties().output_partitioning().partition_count(),
- ),
-
- column_statistics: col_stats,
- total_byte_size: Precision::Absent,
- },
- };
- Ok(stats)
+ statistics_with_fetch(
+ self.input.statistics()?,
+ self.schema(),
+ Some(self.fetch),
+ 0,
+ self.properties().partitioning.partition_count(),
+ )
+ }
+
+ fn fetch(&self) -> Option<usize> {
+ Some(self.fetch)
+ }
+
+ fn supports_limit_pushdown(&self) -> bool {
+ true
}
}
+/// Calculates the statistics for the operator when fetch and skip is used in
the operator
+/// (Output row count can be estimated in the presence of fetch and skip
information).
+/// using the input statistics information.
+pub fn statistics_with_fetch(
Review Comment:
I recommend putting this as a method into the `Statistics` struct itself to
make it easier to find -- as it would likely be useful for others implementing
operations
like
```rust
impl Statistics
pub fn with_fetch(self,
input_stats: Statistics,
schema: SchemaRef,
fetch: Option<usize>,
skip: usize,
n_partitions: usize,
) -> Result<Self> {
...
}
```
##########
datafusion/core/src/physical_optimizer/enforce_sorting.rs:
##########
@@ -1049,6 +1065,98 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_remove_unnecessary_sort6() -> Result<()> {
+ let schema = create_test_schema()?;
+ let source = memory_exec(&schema);
+ let input = Arc::new(
+ SortExec::new(vec![sort_expr("non_nullable_col", &schema)], source)
+ .with_fetch(Some(2)),
+ );
+ let physical_plan = sort_exec(
+ vec![
+ sort_expr("non_nullable_col", &schema),
+ sort_expr("nullable_col", &schema),
+ ],
+ input,
+ );
+
+ let expected_input = [
+ "SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC],
preserve_partitioning=[false]",
+ " SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC],
preserve_partitioning=[false]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
+ ];
+ let expected_optimized = [
+ "SortExec: TopK(fetch=2), expr=[non_nullable_col@1
ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_remove_unnecessary_sort7() -> Result<()> {
+ let schema = create_test_schema()?;
+ let source = memory_exec(&schema);
+ let input = Arc::new(SortExec::new(
+ vec![sort_expr("non_nullable_col", &schema)],
+ source,
+ ));
+ let limit = Arc::new(LocalLimitExec::new(input, 2));
+ let physical_plan = sort_exec(
+ vec![
+ sort_expr("non_nullable_col", &schema),
+ sort_expr("nullable_col", &schema),
+ ],
+ limit,
+ );
+
+ let expected_input = [
+ "SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC],
preserve_partitioning=[false]",
+ " LocalLimitExec: fetch=2",
+ " SortExec: expr=[non_nullable_col@1 ASC],
preserve_partitioning=[false]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
+ ];
+ let expected_optimized = [
+ "LocalLimitExec: fetch=2",
+ " SortExec: TopK(fetch=2), expr=[non_nullable_col@1
ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_do_not_pushdown_through_limit() -> Result<()> {
+ let schema = create_test_schema()?;
+ let source = memory_exec(&schema);
+ // let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)],
source);
+ let input = Arc::new(SortExec::new(
+ vec![sort_expr("non_nullable_col", &schema)],
+ source,
+ ));
+ let limit = Arc::new(GlobalLimitExec::new(input, 0, Some(5))) as _;
+ let physical_plan = sort_exec(vec![sort_expr("nullable_col",
&schema)], limit);
+
+ let expected_input = [
+ "SortExec: expr=[nullable_col@0 ASC],
preserve_partitioning=[false]",
+ " GlobalLimitExec: skip=0, fetch=5",
+ " SortExec: expr=[non_nullable_col@1 ASC],
preserve_partitioning=[false]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
+ ];
+ let expected_optimized = [
Review Comment:
I don't understand why this plan doesn't push the limit into the sort as
well like it does in the plan above
Like why is it not
```
"LocalLimitExec: fetch=2",
" SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC],
preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
```
(is it the diifference between GlobalLimitExec and LocalLimitExec 🤔 )
##########
datafusion/core/src/physical_optimizer/sort_pushdown.rs:
##########
@@ -41,38 +40,62 @@ use datafusion_physical_expr::{
/// This is a "data class" we use within the [`EnforceSorting`] rule to push
/// down [`SortExec`] in the plan. In some cases, we can reduce the total
/// computational cost by pushing down `SortExec`s through some executors. The
-/// object carries the parent required ordering as its data.
+/// object carries the parent required ordering, fetch value of the parent
node as its data.
///
/// [`EnforceSorting`]:
crate::physical_optimizer::enforce_sorting::EnforceSorting
-pub type SortPushDown = PlanContext<Option<Vec<PhysicalSortRequirement>>>;
+#[derive(Default, Clone)]
+pub struct ParentRequirements {
Review Comment:
💯 for a struct rather than typedef
##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -3019,11 +3019,11 @@ mod tests {
assert_batches_sorted_eq!(
[
- "+-----+-----+----+-------+",
Review Comment:
I agree it also makes sense that the previous test did a `sort` right after
a select + filter which will not produce a deterministic result. Doing the
limit after the sort makes sense
##########
datafusion/physical-plan/src/limit.rs:
##########
@@ -380,56 +321,110 @@ impl ExecutionPlan for LocalLimitExec {
}
fn statistics(&self) -> Result<Statistics> {
- let input_stats = self.input.statistics()?;
- let col_stats = Statistics::unknown_column(&self.schema());
- let stats = match input_stats {
- // if the input does not reach the limit globally, return input
stats
- Statistics {
- num_rows: Precision::Exact(nr),
- ..
- }
- | Statistics {
- num_rows: Precision::Inexact(nr),
- ..
- } if nr <= self.fetch => input_stats,
- // if the input is greater than the limit, the num_row will be
greater
- // than the limit because the partitions will be limited separately
- // the statistic
- Statistics {
- num_rows: Precision::Exact(nr),
- ..
- } if nr > self.fetch => Statistics {
- num_rows: Precision::Exact(self.fetch),
- // this is not actually exact, but will be when GlobalLimit is
applied
- // TODO stats: find a more explicit way to vehiculate this
information
- column_statistics: col_stats,
- total_byte_size: Precision::Absent,
- },
- Statistics {
- num_rows: Precision::Inexact(nr),
- ..
- } if nr > self.fetch => Statistics {
- num_rows: Precision::Inexact(self.fetch),
- // this is not actually exact, but will be when GlobalLimit is
applied
- // TODO stats: find a more explicit way to vehiculate this
information
- column_statistics: col_stats,
- total_byte_size: Precision::Absent,
- },
- _ => Statistics {
- // the result output row number will always be no greater than
the limit number
- num_rows: Precision::Inexact(
- self.fetch
- *
self.properties().output_partitioning().partition_count(),
- ),
-
- column_statistics: col_stats,
- total_byte_size: Precision::Absent,
- },
- };
- Ok(stats)
+ statistics_with_fetch(
Review Comment:
😍 for the refactor to reduce deuplication
##########
datafusion/core/src/physical_optimizer/sort_pushdown.rs:
##########
@@ -132,6 +172,43 @@ fn pushdown_requirement_to_children(
RequirementsCompatibility::Compatible(adjusted) =>
Ok(Some(vec![adjusted])),
RequirementsCompatibility::NonCompatible => Ok(None),
}
+ } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
Review Comment:
I wonder if there is some general way to formulate these conditions about
requirements pushdown that doesn't have to special case the operators.
I worry that this logic basically can't be used for user defined execution
plan nodes.
Maybe we could pull the `pushdown_requirement_to_children` function or
something into the `ExecutionPlan`
Not needed for this PR, I am just mentioning it
##########
datafusion/physical-plan/src/limit.rs:
##########
@@ -821,7 +816,7 @@ mod tests {
#[tokio::test]
async fn test_row_number_statistics_for_local_limit() -> Result<()> {
let row_count = row_number_statistics_for_local_limit(4, 10).await?;
- assert_eq!(row_count, Precision::Exact(10));
+ assert_eq!(row_count, Precision::Exact(40));
Review Comment:
This is interesting -- is the reason it changes because statistics represent
data for all of a plans partitions and in this case there are 4 partitions,
each with a limit of 10, and therefore the total output row count is 40?
Maybe as a follow on PR we could clarify that `Statistics` represent data
for all partitions 🤔
https://docs.rs/datafusion/latest/datafusion/common/struct.Statistics.html
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]