This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 03bcdb00dd implement tree rendering for StreamingTableExec (#15085)
03bcdb00dd is described below
commit 03bcdb00ddad4a6ba72e0a8a40ebadfa7d2523a5
Author: Alan Tang <[email protected]>
AuthorDate: Mon Mar 10 18:46:03 2025 +0800
implement tree rendering for StreamingTableExec (#15085)
* feat: support tree rendering for StreamingTableExec
Signed-off-by: Alan Tang <[email protected]>
* feat: simpler expr for streamingExec
Signed-off-by: Alan Tang <[email protected]>
chore: Describe more precisely
Signed-off-by: Alan Tang <[email protected]>
---------
Signed-off-by: Alan Tang <[email protected]>
---
datafusion/physical-plan/src/streaming.rs | 12 +-
.../sqllogictest/test_files/explain_tree.slt | 229 ++++++++++++++++++++-
2 files changed, 237 insertions(+), 4 deletions(-)
diff --git a/datafusion/physical-plan/src/streaming.rs
b/datafusion/physical-plan/src/streaming.rs
index 8104e8acf1..18c472a7e1 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -209,8 +209,16 @@ impl DisplayAs for StreamingTableExec {
Ok(())
}
DisplayFormatType::TreeRender => {
- // TODO: collect info
- write!(f, "")
+ if self.infinite {
+ writeln!(f, "infinite={}", self.infinite)?;
+ }
+ if let Some(limit) = self.limit {
+ write!(f, "limit={limit}")?;
+ } else {
+ write!(f, "limit=None")?;
+ }
+
+ Ok(())
}
}
}
diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt
b/datafusion/sqllogictest/test_files/explain_tree.slt
index 689816ff1f..f54cbb600a 100644
--- a/datafusion/sqllogictest/test_files/explain_tree.slt
+++ b/datafusion/sqllogictest/test_files/explain_tree.slt
@@ -1011,7 +1011,10 @@ physical_plan
08)└─────────────┬─────────────┘
09)┌─────────────┴─────────────┐
10)│ StreamingTableExec │
-11)└───────────────────────────┘
+11)│ -------------------- │
+12)│ infinite: true │
+13)│ limit: None │
+14)└───────────────────────────┘
query TT
EXPLAIN SELECT *
@@ -1035,7 +1038,10 @@ physical_plan
10)└─────────────┬─────────────┘
11)┌─────────────┴─────────────┐
12)│ StreamingTableExec │
-13)└───────────────────────────┘
+13)│ -------------------- │
+14)│ infinite: true │
+15)│ limit: None │
+16)└───────────────────────────┘
# Query with hash join.
query TT
@@ -1271,3 +1277,222 @@ drop table table4;
statement ok
drop table table5;
+
+# Test on StreamingTableExec
+# prepare table
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE data (
+ "date" DATE,
+ "ticker" VARCHAR,
+ "time" TIMESTAMP,
+) STORED AS CSV
+WITH ORDER ("date", "ticker", "time")
+LOCATION './a.parquet';
+
+
+# query
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A'
+ORDER BY "date", "time";
+----
+logical_plan
+01)Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A")
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│ SortPreservingMergeExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ CoalesceBatchesExec │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│ FilterExec │
+09)│ -------------------- │
+10)│ predicate: │
+11)│ ticker@1 = A │
+12)└─────────────┬─────────────┘
+13)┌─────────────┴─────────────┐
+14)│ RepartitionExec │
+15)└─────────────┬─────────────┘
+16)┌─────────────┴─────────────┐
+17)│ StreamingTableExec │
+18)│ -------------------- │
+19)│ infinite: true │
+20)│ limit: None │
+21)└───────────────────────────┘
+
+
+# constant ticker, CAST(time AS DATE) = time, order by time
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "time"
+----
+logical_plan
+01)Sort: data.time ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│ SortPreservingMergeExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ CoalesceBatchesExec │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│ FilterExec │
+09)│ -------------------- │
+10)│ predicate: │
+11)│ ticker@1 = A AND CAST(time│
+12)│ @2 AS Date32) = date@0 │
+13)└─────────────┬─────────────┘
+14)┌─────────────┴─────────────┐
+15)│ RepartitionExec │
+16)└─────────────┬─────────────┘
+17)┌─────────────┴─────────────┐
+18)│ StreamingTableExec │
+19)│ -------------------- │
+20)│ infinite: true │
+21)│ limit: None │
+22)└───────────────────────────┘
+
+# same thing but order by date
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "date"
+----
+logical_plan
+01)Sort: data.date ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│ SortPreservingMergeExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ CoalesceBatchesExec │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│ FilterExec │
+09)│ -------------------- │
+10)│ predicate: │
+11)│ ticker@1 = A AND CAST(time│
+12)│ @2 AS Date32) = date@0 │
+13)└─────────────┬─────────────┘
+14)┌─────────────┴─────────────┐
+15)│ RepartitionExec │
+16)└─────────────┬─────────────┘
+17)┌─────────────┴─────────────┐
+18)│ StreamingTableExec │
+19)│ -------------------- │
+20)│ infinite: true │
+21)│ limit: None │
+22)└───────────────────────────┘
+
+# same thing but order by ticker
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "ticker"
+----
+logical_plan
+01)Sort: data.ticker ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│ CoalescePartitionsExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ CoalesceBatchesExec │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│ FilterExec │
+09)│ -------------------- │
+10)│ predicate: │
+11)│ ticker@1 = A AND CAST(time│
+12)│ @2 AS Date32) = date@0 │
+13)└─────────────┬─────────────┘
+14)┌─────────────┴─────────────┐
+15)│ RepartitionExec │
+16)└─────────────┬─────────────┘
+17)┌─────────────┴─────────────┐
+18)│ StreamingTableExec │
+19)│ -------------------- │
+20)│ infinite: true │
+21)│ limit: None │
+22)└───────────────────────────┘
+
+
+# same thing but order by time, date
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "time", "date";
+----
+logical_plan
+01)Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│ SortPreservingMergeExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ CoalesceBatchesExec │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│ FilterExec │
+09)│ -------------------- │
+10)│ predicate: │
+11)│ ticker@1 = A AND CAST(time│
+12)│ @2 AS Date32) = date@0 │
+13)└─────────────┬─────────────┘
+14)┌─────────────┴─────────────┐
+15)│ RepartitionExec │
+16)└─────────────┬─────────────┘
+17)┌─────────────┴─────────────┐
+18)│ StreamingTableExec │
+19)│ -------------------- │
+20)│ infinite: true │
+21)│ limit: None │
+22)└───────────────────────────┘
+
+
+
+
+# query
+query TT
+explain SELECT * FROM data
+WHERE date = '2006-01-02'
+ORDER BY "ticker", "time";
+----
+logical_plan
+01)Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST
+02)--Filter: data.date = Date32("2006-01-02")
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│ SortPreservingMergeExec │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│ CoalesceBatchesExec │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│ FilterExec │
+09)│ -------------------- │
+10)│ predicate: │
+11)│ date@0 = 2006-01-02 │
+12)└─────────────┬─────────────┘
+13)┌─────────────┴─────────────┐
+14)│ RepartitionExec │
+15)└─────────────┬─────────────┘
+16)┌─────────────┴─────────────┐
+17)│ StreamingTableExec │
+18)│ -------------------- │
+19)│ infinite: true │
+20)│ limit: None │
+21)└───────────────────────────┘
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]