This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ffc8444501 feat: eliminate redundant sorts on monotonic expressions
(#9813)
ffc8444501 is described below
commit ffc84445015eb69b2d3352304ec09fbc6267f282
Author: Matthew Cramerus <[email protected]>
AuthorDate: Fri Apr 12 16:39:06 2024 -0500
feat: eliminate redundant sorts on monotonic expressions (#9813)
* initial impl
* add comments & fix name
* even more comments
* add negative test
* updated sqllogictest
* make the test easier to read
* add collapse_monotonic_lex_req into collapse_lex_req
* more tests in sqllogictest
* another test in sqllogictest
* add yet another negative test case
---
datafusion/physical-expr/src/equivalence/mod.rs | 48 +++++++++-
.../physical-expr/src/equivalence/properties.rs | 82 +++++++++++++++++
.../test_files/filter_without_sort_exec.slt | 102 ++++++++++++++++++++-
3 files changed, 226 insertions(+), 6 deletions(-)
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs
b/datafusion/physical-expr/src/equivalence/mod.rs
index 46909f2361..fd8123c45b 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -18,6 +18,7 @@
use std::sync::Arc;
use crate::expressions::Column;
+use crate::sort_properties::SortProperties;
use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -35,6 +36,10 @@ pub use properties::{join_equivalence_properties,
EquivalenceProperties};
/// This function constructs a duplicate-free `LexOrderingReq` by filtering out
/// duplicate entries that have same physical expression inside. For example,
/// `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a Some(ASC)]`.
+///
+/// It will also filter out entries that are ordered if the next entry is;
+/// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to
+/// `vec![a Some(ASC)]`.
pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in input {
@@ -42,7 +47,48 @@ pub fn collapse_lex_req(input: LexRequirement) ->
LexRequirement {
output.push(item);
}
}
- output
+ collapse_monotonic_lex_req(output)
+}
+
+/// This function constructs a normalized [`LexRequirement`] by filtering out
entries
+/// that are ordered if the next entry is.
+/// Used in `collapse_lex_req`
+fn collapse_monotonic_lex_req(input: LexRequirement) -> LexRequirement {
+ input
+ .iter()
+ .enumerate()
+ .filter_map(|(i, item)| {
+ // If it's the last entry, there is no next entry
+ if i == input.len() - 1 {
+ return Some(item);
+ }
+ let next_expr = &input[i + 1];
+
+ // Only handle expressions with exactly one child
+ // TODO: it should be possible to handle expressions orderings
f(a, b, c), a, b, c
+ // if f is monotonic in all arguments
+ if !(item.expr.children().len() == 1
+ && item.expr.children()[0].eq(&next_expr.expr))
+ {
+ return Some(item);
+ }
+
+ let opts = match next_expr.options {
+ None => return Some(item),
+ Some(opts) => opts,
+ };
+
+ if item.options.map(SortProperties::Ordered)
+ ==
Some(item.expr.get_ordering(&[SortProperties::Ordered(opts)]))
+ {
+ // Remove the redundant sort
+ return None;
+ }
+
+ Some(item)
+ })
+ .cloned()
+ .collect::<Vec<_>>()
}
/// Adds the `offset` value to `Column` indices inside `expr`. This function is
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index c14c88d6c6..58ef5ec797 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -2212,6 +2212,88 @@ mod tests {
);
}
+ Ok(())
+ }
+ #[test]
+ fn test_eliminate_redundant_monotonic_sorts() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Date32, true),
+ Field::new("b", DataType::Utf8, true),
+ Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None),
true),
+ ]));
+ let base_properties =
EquivalenceProperties::new(schema.clone()).with_reorder(
+ ["a", "b", "c"]
+ .into_iter()
+ .map(|c| {
+ col(c, schema.as_ref()).map(|expr| PhysicalSortExpr {
+ expr,
+ options: SortOptions {
+ descending: false,
+ nulls_first: true,
+ },
+ })
+ })
+ .collect::<Result<Vec<_>>>()?,
+ );
+
+ struct TestCase {
+ name: &'static str,
+ constants: Vec<Arc<dyn PhysicalExpr>>,
+ equal_conditions: Vec<[Arc<dyn PhysicalExpr>; 2]>,
+ sort_columns: &'static [&'static str],
+ should_satisfy_ordering: bool,
+ }
+
+ let col_a = col("a", schema.as_ref())?;
+ let col_b = col("b", schema.as_ref())?;
+ let col_c = col("c", schema.as_ref())?;
+ let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None));
+
+ let cases = vec![
+ TestCase {
+ name: "(a, b, c) -> (c)",
+ // b is constant, so it should be removed from the sort order
+ constants: vec![col_b],
+ equal_conditions: vec![[cast_c.clone(), col_a.clone()]],
+ sort_columns: &["c"],
+ should_satisfy_ordering: true,
+ },
+ TestCase {
+ name: "not ordered because (b) is not constant",
+ // b is not constant anymore
+ constants: vec![],
+ // a and c are still compatible, but this is irrelevant since
the original ordering is (a, b, c)
+ equal_conditions: vec![[cast_c.clone(), col_a.clone()]],
+ sort_columns: &["c"],
+ should_satisfy_ordering: false,
+ },
+ ];
+
+ for case in cases {
+ let mut properties =
base_properties.clone().add_constants(case.constants);
+ for [left, right] in &case.equal_conditions {
+ properties.add_equal_conditions(left, right)
+ }
+
+ let sort = case
+ .sort_columns
+ .iter()
+ .map(|&name| {
+ col(name, &schema).map(|col| PhysicalSortExpr {
+ expr: col,
+ options: SortOptions::default(),
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ assert_eq!(
+ properties.ordering_satisfy(&sort),
+ case.should_satisfy_ordering,
+ "failed test '{}'",
+ case.name
+ );
+ }
+
Ok(())
}
}
diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
index 05e622db8a..b2cc64e3a7 100644
--- a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
+++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
@@ -18,9 +18,9 @@
# prepare table
statement ok
CREATE UNBOUNDED EXTERNAL TABLE data (
- "date" VARCHAR,
+ "date" DATE,
"ticker" VARCHAR,
- "time" VARCHAR,
+ "time" TIMESTAMP,
) STORED AS CSV
WITH ORDER ("date", "ticker", "time")
LOCATION './a.parquet';
@@ -43,19 +43,111 @@ SortPreservingMergeExec: [date@0 ASC NULLS LAST,time@2 ASC
NULLS LAST]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------StreamingTableExec: partition_sizes=1, projection=[date, ticker,
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1
ASC NULLS LAST, time@2 ASC NULLS LAST]
+# constant ticker, CAST(time AS DATE) = time, order by time
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "time"
+----
+logical_plan
+Sort: data.time ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [time@2 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker,
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# same thing but order by date
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "date"
+----
+logical_plan
+Sort: data.date ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [date@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker,
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# same thing but order by ticker
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "ticker"
+----
+logical_plan
+Sort: data.ticker ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+CoalescePartitionsExec
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker,
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# same thing but order by time, date
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "time", "date";
+----
+logical_plan
+Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [time@2 ASC NULLS LAST,date@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker,
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# CAST(time AS DATE) <> date (should require a sort)
+# no physical plan due to sort breaking pipeline
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) <> date
+ORDER BY "time"
+----
+logical_plan
+Sort: data.time ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) != data.date
+----TableScan: data projection=[date, ticker, time]
+
+# no relation between time & date
+# should also be pipeline breaking
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A'
+ORDER BY "time"
+----
+logical_plan
+Sort: data.time ASC NULLS LAST
+--Filter: data.ticker = Utf8("A")
+----TableScan: data projection=[date, ticker, time]
+
# query
query TT
explain SELECT * FROM data
-WHERE date = 'A'
+WHERE date = '2006-01-02'
ORDER BY "ticker", "time";
----
logical_plan
Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST
---Filter: data.date = Utf8("A")
+--Filter: data.date = Date32("13150")
----TableScan: data projection=[date, ticker, time]
physical_plan
SortPreservingMergeExec: [ticker@1 ASC NULLS LAST,time@2 ASC NULLS LAST]
--CoalesceBatchesExec: target_batch_size=8192
-----FilterExec: date@0 = A
+----FilterExec: date@0 = 13150
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------StreamingTableExec: partition_sizes=1, projection=[date, ticker,
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1
ASC NULLS LAST, time@2 ASC NULLS LAST]