alamb commented on code in PR #19304:
URL: https://github.com/apache/datafusion/pull/19304#discussion_r2628676775
##########
datafusion/common/src/config.rs:
##########
@@ -1000,6 +1000,34 @@ config_namespace! {
/// ```
pub repartition_sorts: bool, default = true
+ /// Partition count threshold for subset satisfaction optimization.
+ ///
+ /// When the current partition count is >= this threshold, DataFusion
will
+ /// skip repartitioning if the required partitioning expression is a
subset
+ /// of the current partition expression such as Hash(a) satisfies
Hash(a, b).
+ ///
+ /// When the current partition count is < this threshold, DataFusion
will
+ /// repartition to increase parallelism even when subset satisfaction
applies.
+ ///
+ /// Set to 0 to always repartition (disable subset satisfaction
optimization).
+ /// Set to a high value to always use subset satisfaction.
+ ///
+ /// Example (subset_satisfaction_partition_threshold = 4):
+ /// ```text
+ /// Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is
subset of Hash([a])
+ ///
+ /// If current partitions (3) < threshold (4), repartition:
+ /// AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)]
+ /// RepartitionExec: partitioning=Hash([a, b], 8),
input_partitions=3
+ /// AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)]
+ /// DataSourceExec: file_groups={...},
output_partitioning=Hash([a], 3)
+ ///
+ /// If current partitions (8) >= threshold (4), use subset
satisfaction:
+ /// AggregateExec: mode=SinglePartitioned, gby=[a, b],
aggr=[SUM(x)]
+ /// DataSourceExec: file_groups={...},
output_partitioning=Hash([a], 8)
+ /// ```
+ pub subset_satisfaction_partition_threshold: usize, default = 4
Review Comment:
that is quite a name. Subset satisfaction. I like it
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -339,4 +414,403 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_partitioning_satisfy_by_subset() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Int64, false),
+ Field::new("c", DataType::Int64, false),
+ ]));
+
+ let col_a: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("a", &schema)?);
+ let col_b: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("b", &schema)?);
+ let col_c: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("c", &schema)?);
+ let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+
+ let test_cases = vec![
+ (
+ "Hash([a]) vs Hash([a, b])",
+ Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ ]),
+ PartitioningSatisfaction::Subset,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ (
+ "Hash([a]) vs Hash([a, b, c])",
+ Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ Arc::clone(&col_c),
+ ]),
+ PartitioningSatisfaction::Subset,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ (
+ "Hash([a, b]) vs Hash([a, b, c])",
Review Comment:
Thank you for these comments ❤️
Can you also please add cases for
* `Hash([b]) vs Hash([a, b, c])`
* `Hash([b, a]) vs Hash([a, b, c])`
##########
datafusion/common/src/config.rs:
##########
@@ -1000,6 +1000,34 @@ config_namespace! {
/// ```
pub repartition_sorts: bool, default = true
+ /// Partition count threshold for subset satisfaction optimization.
Review Comment:
Something I don't fully understand after reading this PR is what is the
rationale for defaulting this to 4?
It seems like this optimization will potentially slow things down only when
there is large skew across partitions so that hashing on more columns would
even out the work
Is this something to do with trading off repartitioning to get more
partitions and more cores involved?
A little rationale here might help readers understand that
##########
datafusion/common/src/config.rs:
##########
@@ -1000,6 +1000,34 @@ config_namespace! {
/// ```
pub repartition_sorts: bool, default = true
+ /// Partition count threshold for subset satisfaction optimization.
+ ///
+ /// When the current partition count is >= this threshold, DataFusion
will
+ /// skip repartitioning if the required partitioning expression is a
subset
+ /// of the current partition expression such as Hash(a) satisfies
Hash(a, b).
+ ///
+ /// When the current partition count is < this threshold, DataFusion
will
+ /// repartition to increase parallelism even when subset satisfaction
applies.
+ ///
+ /// Set to 0 to always repartition (disable subset satisfaction
optimization).
Review Comment:
this is great to document how to turn the feature off
##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -6193,14 +6193,12 @@ logical_plan
04)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
01)CoalescePartitionsExec: fetch=5
-02)--AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
-03)----RepartitionExec: partitioning=Hash([c3@0,
min(aggregate_test_100.c1)@1], 4), input_partitions=4
-04)------AggregateExec: mode=Partial, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3],
aggr=[min(aggregate_test_100.c1)]
-06)----------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[c3@1 as c3],
aggr=[min(aggregate_test_100.c1)]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-09)----------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c1, c3], file_type=csv, has_header=true
+02)--AggregateExec: mode=SinglePartitioned, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
Review Comment:
that is certainly nicer -- it cuts out an entire repartition
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1212,6 +1225,14 @@ pub fn ensure_distribution(
plan = updated_window;
};
+ // For joins in partitioned mode, we need exact hash matching between
Review Comment:
Can you please add some comments that explain **why** joins need exact hash
matching? This comment explains what the code does (which you can get from
reading the code, though this is a nice summary) but you can't get the intent
from the comments
I don't think it is for correctness as my understanding is that a subset of
a hash will still ensure that all matching join keys are in the corresponding
partitions. So is it performance related?
Also, what about the other join types (like CrossJoin, NestedLoopsJoin)?
##########
datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt:
##########
@@ -0,0 +1,526 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for Subset Partitioning Optimization
+#
+# Subset partitioning allows Hash([a]) to satisfy Hash([a, b]) requirements
+# when the required partitioning expressions are a strict subset of the
+# current partitioning expressions.
+##########
+
+##########
+# SETUP: Configuration and Data Generation
+##########
+
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = false;
+
+statement ok
+set datafusion.execution.target_partitions = 3;
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+# Create fact table partitioned by f_dkey (3 partitions)
+# Each partition has data sorted by timestamp
+# Partition: f_dkey=A
+statement ok
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 95.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 102.3),
+ (TIMESTAMP '2023-01-01T09:00:20', 98.7),
+ (TIMESTAMP '2023-01-01T09:12:20', 105.1),
+ (TIMESTAMP '2023-01-01T09:12:30', 100.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 150.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 120.8)
+))
+TO
'test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+
+# Partition: f_dkey=B
+statement ok
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 75.2),
+ (TIMESTAMP '2023-01-01T09:00:10', 82.4),
+ (TIMESTAMP '2023-01-01T09:00:20', 78.9),
+ (TIMESTAMP '2023-01-01T09:00:30', 85.6),
+ (TIMESTAMP '2023-01-01T09:12:30', 80.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 120.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 92.3)
+))
+TO
'test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+
+# Partition: f_dkey=C
+statement ok
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 285.7),
+ (TIMESTAMP '2023-01-01T09:00:20', 310.2),
+ (TIMESTAMP '2023-01-01T09:00:30', 295.8),
+ (TIMESTAMP '2023-01-01T09:00:40', 300.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 250.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 275.4)
+))
+TO
'test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+
+# Create dimension table partitioned by d_dkey (4 partitions)
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+ ('dev', 'log', 'ma')
+))
+TO
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+ ('prod', 'log', 'ma')
+))
+TO
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+ ('prod', 'log', 'vim')
+))
+TO
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+ ('prod', 'trace', 'vim')
+))
+TO
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+##########
+# TABLE DECLARATIONS
+##########
+
+# Fact table with ordering
+statement ok
+CREATE EXTERNAL TABLE fact_table_ordered (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+WITH ORDER (f_dkey ASC, timestamp ASC)
+LOCATION 'test_files/scratch/repartition_subset_satisfaction/fact/';
+
+# Dimension table (for join tests)
+statement ok
+CREATE EXTERNAL TABLE dimension_table (env STRING, service STRING, host STRING)
+STORED AS PARQUET
+PARTITIONED BY (d_dkey STRING)
+LOCATION 'test_files/scratch/repartition_subset_satisfaction/dimension/';
+
+##########
+# TEST 1: Basic Aggregate with Subset Partitioning
+# Demonstrates that GROUP BY [f_dkey, time_bin] can use
+# file partitioning on just [f_dkey]
+##########
+
+# With subset repartitioning forced (disables subset optimization)
+statement ok
+set datafusion.optimizer.subset_satisfaction_partition_threshold = 4;
+
+query TT
+EXPLAIN SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+ COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST, time_bin ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin,
count(Int64(1)) AS count(*), avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS
LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin,
count(Int64(1))@2 as count(*), avg(fact_table_ordered.value)@3 as
avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 ASC NULLS LAST],
preserve_partitioning=[true]
+05)--------RepartitionExec: partitioning=Hash([f_dkey@0,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1], 3),
input_partitions=3
+06)----------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey,
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 },
timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months:
0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+07)------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
+
+# Verify results without subset satisfaction
+query TPIR rowsort
+SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+ COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+A 2023-01-01T09:00:00 3 98.833333333333
+A 2023-01-01T09:12:00 1 105.1
+A 2023-01-01T09:12:30 3 123.6
+B 2023-01-01T09:00:00 3 78.833333333333
+B 2023-01-01T09:00:30 1 85.6
+B 2023-01-01T09:12:30 3 97.433333333333
+C 2023-01-01T09:00:00 3 298.8
+C 2023-01-01T09:00:30 2 297.9
+C 2023-01-01T09:12:30 2 262.7
+
+# With subset logic enabled (default - enables subset optimization)
+statement ok
+set datafusion.optimizer.subset_satisfaction_partition_threshold = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+ COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST, time_bin ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin,
count(Int64(1)) AS count(*), avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS
LAST]
Review Comment:
a nice single grouping 👌
##########
datafusion/sqllogictest/test_files/tpch/plans/q18.slt.part:
##########
@@ -69,21 +69,19 @@ logical_plan
physical_plan
01)SortPreservingMergeExec: [o_totalprice@4 DESC, o_orderdate@3 ASC NULLS LAST]
02)--SortExec: expr=[o_totalprice@4 DESC, o_orderdate@3 ASC NULLS LAST],
preserve_partitioning=[true]
-03)----AggregateExec: mode=FinalPartitioned, gby=[c_name@0 as c_name,
c_custkey@1 as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@3 as
o_orderdate, o_totalprice@4 as o_totalprice], aggr=[sum(lineitem.l_quantity)]
-04)------RepartitionExec: partitioning=Hash([c_name@0, c_custkey@1,
o_orderkey@2, o_orderdate@3, o_totalprice@4], 4), input_partitions=4
-05)--------AggregateExec: mode=Partial, gby=[c_name@1 as c_name, c_custkey@0
as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@4 as o_orderdate,
o_totalprice@3 as o_totalprice], aggr=[sum(lineitem.l_quantity)]
-06)----------HashJoinExec: mode=Partitioned, join_type=LeftSemi,
on=[(o_orderkey@2, l_orderkey@0)]
-07)------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(o_orderkey@2, l_orderkey@0)], projection=[c_custkey@0, c_name@1,
o_orderkey@2, o_totalprice@3, o_orderdate@4, l_quantity@6]
-08)--------------RepartitionExec: partitioning=Hash([o_orderkey@2], 4),
input_partitions=4
-09)----------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(c_custkey@0, o_custkey@1)], projection=[c_custkey@0, c_name@1,
o_orderkey@2, o_totalprice@4, o_orderdate@5]
-10)------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4),
input_partitions=1
-11)--------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]},
projection=[c_custkey, c_name], file_type=csv, has_header=false
-12)------------------RepartitionExec: partitioning=Hash([o_custkey@1], 4),
input_partitions=4
-13)--------------------DataSourceExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]},
projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate], file_type=csv,
has_header=false
-14)--------------RepartitionExec: partitioning=Hash([l_orderkey@0], 4),
input_partitions=4
-15)----------------DataSourceExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_orderkey, l_quantity], file_type=csv, has_header=false
-16)------------FilterExec: sum(lineitem.l_quantity)@1 > Some(30000),25,2,
projection=[l_orderkey@0]
-17)--------------AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as
l_orderkey], aggr=[sum(lineitem.l_quantity)]
-18)----------------RepartitionExec: partitioning=Hash([l_orderkey@0], 4),
input_partitions=4
-19)------------------AggregateExec: mode=Partial, gby=[l_orderkey@0 as
l_orderkey], aggr=[sum(lineitem.l_quantity)]
-20)--------------------DataSourceExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_orderkey, l_quantity], file_type=csv, has_header=false
+03)----AggregateExec: mode=SinglePartitioned, gby=[c_name@1 as c_name,
c_custkey@0 as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@4 as
o_orderdate, o_totalprice@3 as o_totalprice], aggr=[sum(lineitem.l_quantity)]
Review Comment:
this is the same improvement as in q16 -- there is no repartitioning for the
final aggregateexec
##########
datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part:
##########
@@ -71,19 +71,17 @@ physical_plan
04)------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand,
p_type@1 as p_type, p_size@2 as p_size], aggr=[count(alias1)]
05)--------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2],
4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1
as p_type, p_size@2 as p_size], aggr=[count(alias1)]
-07)------------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as
p_brand, p_type@1 as p_type, p_size@2 as p_size, alias1@3 as alias1], aggr=[]
-08)--------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1,
p_size@2, alias1@3], 4), input_partitions=4
-09)----------------AggregateExec: mode=Partial, gby=[p_brand@1 as p_brand,
p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1], aggr=[]
-10)------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti,
on=[(ps_suppkey@0, s_suppkey@0)]
-11)--------------------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4),
input_partitions=4
-12)----------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(ps_partkey@0, p_partkey@0)], projection=[ps_suppkey@1, p_brand@3,
p_type@4, p_size@5]
-13)------------------------RepartitionExec: partitioning=Hash([ps_partkey@0],
4), input_partitions=4
-14)--------------------------DataSourceExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]},
projection=[ps_partkey, ps_suppkey], file_type=csv, has_header=false
-15)------------------------RepartitionExec: partitioning=Hash([p_partkey@0],
4), input_partitions=4
-16)--------------------------FilterExec: p_brand@1 != Brand#45 AND p_type@2
NOT LIKE MEDIUM POLISHED% AND p_size@3 IN (SET) ([49, 14, 23, 45, 19, 3, 36, 9])
-17)----------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
-18)------------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]},
projection=[p_partkey, p_brand, p_type, p_size], file_type=csv, has_header=false
-19)--------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4),
input_partitions=4
-20)----------------------FilterExec: s_comment@1 LIKE %Customer%Complaints%,
projection=[s_suppkey@0]
-21)------------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-22)--------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]},
projection=[s_suppkey, s_comment], file_type=csv, has_header=false
+07)------------AggregateExec: mode=SinglePartitioned, gby=[p_brand@1 as
p_brand, p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1],
aggr=[]
Review Comment:
this plan looks significantly better as it removes one layer of hash
repartitioning / partial grouping.
I also double checked that I think it is correct (my reasoning is below)
<details>
<summary>details</summary>
The first group is grouping on
```
GROUP BY p_branch, p_type, p_size, ps_suppkey
```
However, the join partitioned on `ps_partkey` and `p_partkey`
So that means any particular key will only ever be in one partition
</details>
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -148,51 +170,94 @@ impl Partitioning {
}
}
- /// Returns true when the guarantees made by this [`Partitioning`] are
sufficient to
- /// satisfy the partitioning scheme mandated by the `required`
[`Distribution`].
+ /// Returns true if `subset_exprs` is a subset of `exprs`.
+ /// For example: Hash(a, b) is subset of Hash(a) since a partition with
all occurrences of
+ /// a distinct (a) must also contain all occurrences of a distinct (a, b)
with the same (a).
+ fn is_subset_partitioning(
+ subset_exprs: &[Arc<dyn PhysicalExpr>],
+ superset_exprs: &[Arc<dyn PhysicalExpr>],
+ ) -> bool {
+ // Require strict subset: fewer expressions, not equal
+ if subset_exprs.is_empty() || subset_exprs.len() >=
superset_exprs.len() {
+ return false;
+ }
+
+ subset_exprs.iter().all(|subset_expr| {
+ superset_exprs
+ .iter()
+ .any(|superset_expr| subset_expr.eq(superset_expr))
+ })
+ }
+
+ /// Returns how this [`Partitioning`] satisfies the partitioning scheme
mandated
+ /// by the `required` [`Distribution`].
pub fn satisfy(
&self,
required: &Distribution,
eq_properties: &EquivalenceProperties,
- ) -> bool {
+ allow_subset: bool,
Review Comment:
this is technically an API change (which is fine)
However a nicer change for downstream consumers would be to leave the API
and deprecate it and add a new function as described in
https://datafusion.apache.org/contributor-guide/api-health.html#deprecation-guidelines
Something like
```rust
#[deprecated(since = "52.0.0", note = "Use satisfaction instead")]
pub fn satisfy(
&self,
required: &Distribution,
eq_properties: &EquivalenceProperties) -> bool {
self.satisfaction(required, eq_props, false) ==
PartitioningSatisfaction::Exact
}
/// Returns how this [`Partitioning`] satisfies the partitioning scheme
mandated
/// by the `required` [`Distribution`].
pub fn satisfaction(
&self,
required: &Distribution,
eq_properties: &EquivalenceProperties,
allow_subset: bool,
) -> {
....
```
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -339,4 +414,403 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_partitioning_satisfy_by_subset() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Int64, false),
+ Field::new("c", DataType::Int64, false),
+ ]));
+
+ let col_a: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("a", &schema)?);
+ let col_b: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("b", &schema)?);
+ let col_c: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("c", &schema)?);
+ let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+
+ let test_cases = vec![
+ (
+ "Hash([a]) vs Hash([a, b])",
+ Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ ]),
+ PartitioningSatisfaction::Subset,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ (
+ "Hash([a]) vs Hash([a, b, c])",
+ Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ Arc::clone(&col_c),
+ ]),
+ PartitioningSatisfaction::Subset,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ (
+ "Hash([a, b]) vs Hash([a, b, c])",
+ Partitioning::Hash(vec![Arc::clone(&col_a),
Arc::clone(&col_b)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ Arc::clone(&col_c),
+ ]),
+ PartitioningSatisfaction::Subset,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ ];
+
+ for (desc, partition, required, expected_with_subset,
expected_without_subset) in
+ test_cases
+ {
+ let result = partition.satisfy(&required, &eq_properties, true);
+ assert_eq!(
+ result, expected_with_subset,
+ "Failed for {desc} with subset enabled"
+ );
+
+ let result = partition.satisfy(&required, &eq_properties, false);
+ assert_eq!(
+ result, expected_without_subset,
+ "Failed for {desc} with subset disabled"
+ );
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_partitioning_current_superset() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Int64, false),
+ Field::new("c", DataType::Int64, false),
+ ]));
+
+ let col_a: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("a", &schema)?);
+ let col_b: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("b", &schema)?);
+ let col_c: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("c", &schema)?);
+ let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+
+ let test_cases = vec![
+ (
+ "Hash([a, b]) vs Hash([a])",
+ Partitioning::Hash(vec![Arc::clone(&col_a),
Arc::clone(&col_b)], 4),
+ Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
+ PartitioningSatisfaction::NotSatisfied,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ (
+ "Hash([a, b, c]) vs Hash([a])",
+ Partitioning::Hash(
+ vec![Arc::clone(&col_a), Arc::clone(&col_b),
Arc::clone(&col_c)],
+ 4,
+ ),
+ Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
+ PartitioningSatisfaction::NotSatisfied,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ (
+ "Hash([a, b, c]) vs Hash([a, b])",
+ Partitioning::Hash(
+ vec![Arc::clone(&col_a), Arc::clone(&col_b),
Arc::clone(&col_c)],
+ 4,
+ ),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ ]),
+ PartitioningSatisfaction::NotSatisfied,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ ];
+
+ for (desc, partition, required, expected_with_subset,
expected_without_subset) in
Review Comment:
There seems to be a lot of duplication in these tests (the input schemas are
the same and the loop over cases is the same). Is there a reason not to just
make one large list of cases in a single function? It would probably be
significantly less verbose
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]