This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 03bcdb00dd implement tree rendering for StreamingTableExec (#15085)
03bcdb00dd is described below

commit 03bcdb00ddad4a6ba72e0a8a40ebadfa7d2523a5
Author: Alan Tang <[email protected]>
AuthorDate: Mon Mar 10 18:46:03 2025 +0800

    implement tree rendering for StreamingTableExec (#15085)
    
    * feat: support tree rendering for StreamingTableExec
    
    Signed-off-by: Alan Tang <[email protected]>
    
    * feat: simpler expr for streamingExec
    
    Signed-off-by: Alan Tang <[email protected]>
    
    chore: Describe more precisely
    
    Signed-off-by: Alan Tang <[email protected]>
    
    ---------
    
    Signed-off-by: Alan Tang <[email protected]>
---
 datafusion/physical-plan/src/streaming.rs          |  12 +-
 .../sqllogictest/test_files/explain_tree.slt       | 229 ++++++++++++++++++++-
 2 files changed, 237 insertions(+), 4 deletions(-)

diff --git a/datafusion/physical-plan/src/streaming.rs 
b/datafusion/physical-plan/src/streaming.rs
index 8104e8acf1..18c472a7e1 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -209,8 +209,16 @@ impl DisplayAs for StreamingTableExec {
                 Ok(())
             }
             DisplayFormatType::TreeRender => {
-                // TODO: collect info
-                write!(f, "")
+                if self.infinite {
+                    writeln!(f, "infinite={}", self.infinite)?;
+                }
+                if let Some(limit) = self.limit {
+                    write!(f, "limit={limit}")?;
+                } else {
+                    write!(f, "limit=None")?;
+                }
+
+                Ok(())
             }
         }
     }
diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt 
b/datafusion/sqllogictest/test_files/explain_tree.slt
index 689816ff1f..f54cbb600a 100644
--- a/datafusion/sqllogictest/test_files/explain_tree.slt
+++ b/datafusion/sqllogictest/test_files/explain_tree.slt
@@ -1011,7 +1011,10 @@ physical_plan
 08)└─────────────┬─────────────┘
 09)┌─────────────┴─────────────┐
 10)│     StreamingTableExec    │
-11)└───────────────────────────┘
+11)│    --------------------   │
+12)│       infinite: true      │
+13)│        limit: None        │
+14)└───────────────────────────┘
 
 query TT
 EXPLAIN SELECT *
@@ -1035,7 +1038,10 @@ physical_plan
 10)└─────────────┬─────────────┘
 11)┌─────────────┴─────────────┐
 12)│     StreamingTableExec    │
-13)└───────────────────────────┘
+13)│    --------------------   │
+14)│       infinite: true      │
+15)│        limit: None        │
+16)└───────────────────────────┘
 
 # Query with hash join.
 query TT
@@ -1271,3 +1277,222 @@ drop table table4;
 
 statement ok
 drop table table5;
+
+# Test on StreamingTableExec
+# prepare table
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE data (
+    "date"   DATE, 
+    "ticker" VARCHAR, 
+    "time"   TIMESTAMP,
+) STORED AS CSV
+WITH ORDER ("date", "ticker", "time")
+LOCATION './a.parquet';
+
+
+# query
+query TT
+explain SELECT * FROM data 
+WHERE ticker = 'A' 
+ORDER BY "date", "time";
+----
+logical_plan
+01)Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A")
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│  SortPreservingMergeExec  │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│    CoalesceBatchesExec    │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│         FilterExec        │
+09)│    --------------------   │
+10)│         predicate:        │
+11)│        ticker@1 = A       │
+12)└─────────────┬─────────────┘
+13)┌─────────────┴─────────────┐
+14)│      RepartitionExec      │
+15)└─────────────┬─────────────┘
+16)┌─────────────┴─────────────┐
+17)│     StreamingTableExec    │
+18)│    --------------------   │
+19)│       infinite: true      │
+20)│        limit: None        │
+21)└───────────────────────────┘
+
+
+# constant ticker, CAST(time AS DATE) = time, order by time
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "time"
+----
+logical_plan
+01)Sort: data.time ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│  SortPreservingMergeExec  │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│    CoalesceBatchesExec    │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│         FilterExec        │
+09)│    --------------------   │
+10)│         predicate:        │
+11)│ ticker@1 = A AND CAST(time│
+12)│   @2 AS Date32) = date@0  │
+13)└─────────────┬─────────────┘
+14)┌─────────────┴─────────────┐
+15)│      RepartitionExec      │
+16)└─────────────┬─────────────┘
+17)┌─────────────┴─────────────┐
+18)│     StreamingTableExec    │
+19)│    --------------------   │
+20)│       infinite: true      │
+21)│        limit: None        │
+22)└───────────────────────────┘
+
+# same thing but order by date
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "date"
+----
+logical_plan
+01)Sort: data.date ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│  SortPreservingMergeExec  │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│    CoalesceBatchesExec    │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│         FilterExec        │
+09)│    --------------------   │
+10)│         predicate:        │
+11)│ ticker@1 = A AND CAST(time│
+12)│   @2 AS Date32) = date@0  │
+13)└─────────────┬─────────────┘
+14)┌─────────────┴─────────────┐
+15)│      RepartitionExec      │
+16)└─────────────┬─────────────┘
+17)┌─────────────┴─────────────┐
+18)│     StreamingTableExec    │
+19)│    --------------------   │
+20)│       infinite: true      │
+21)│        limit: None        │
+22)└───────────────────────────┘
+
+# same thing but order by ticker
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "ticker"
+----
+logical_plan
+01)Sort: data.ticker ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│   CoalescePartitionsExec  │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│    CoalesceBatchesExec    │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│         FilterExec        │
+09)│    --------------------   │
+10)│         predicate:        │
+11)│ ticker@1 = A AND CAST(time│
+12)│   @2 AS Date32) = date@0  │
+13)└─────────────┬─────────────┘
+14)┌─────────────┴─────────────┐
+15)│      RepartitionExec      │
+16)└─────────────┬─────────────┘
+17)┌─────────────┴─────────────┐
+18)│     StreamingTableExec    │
+19)│    --------------------   │
+20)│       infinite: true      │
+21)│        limit: None        │
+22)└───────────────────────────┘
+
+
+# same thing but order by time, date
+query TT
+explain SELECT * FROM data 
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "time", "date";
+----
+logical_plan
+01)Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST
+02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│  SortPreservingMergeExec  │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│    CoalesceBatchesExec    │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│         FilterExec        │
+09)│    --------------------   │
+10)│         predicate:        │
+11)│ ticker@1 = A AND CAST(time│
+12)│   @2 AS Date32) = date@0  │
+13)└─────────────┬─────────────┘
+14)┌─────────────┴─────────────┐
+15)│      RepartitionExec      │
+16)└─────────────┬─────────────┘
+17)┌─────────────┴─────────────┐
+18)│     StreamingTableExec    │
+19)│    --------------------   │
+20)│       infinite: true      │
+21)│        limit: None        │
+22)└───────────────────────────┘
+
+
+
+
+# query
+query TT
+explain SELECT * FROM data 
+WHERE date = '2006-01-02' 
+ORDER BY "ticker", "time";
+----
+logical_plan
+01)Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST
+02)--Filter: data.date = Date32("2006-01-02")
+03)----TableScan: data projection=[date, ticker, time]
+physical_plan
+01)┌───────────────────────────┐
+02)│  SortPreservingMergeExec  │
+03)└─────────────┬─────────────┘
+04)┌─────────────┴─────────────┐
+05)│    CoalesceBatchesExec    │
+06)└─────────────┬─────────────┘
+07)┌─────────────┴─────────────┐
+08)│         FilterExec        │
+09)│    --------------------   │
+10)│         predicate:        │
+11)│    date@0 = 2006-01-02    │
+12)└─────────────┬─────────────┘
+13)┌─────────────┴─────────────┐
+14)│      RepartitionExec      │
+15)└─────────────┬─────────────┘
+16)┌─────────────┴─────────────┐
+17)│     StreamingTableExec    │
+18)│    --------------------   │
+19)│       infinite: true      │
+20)│        limit: None        │
+21)└───────────────────────────┘


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to