matthewgapp commented on code in PR #8840:
URL: https://github.com/apache/arrow-datafusion/pull/8840#discussion_r1451077021


##########
datafusion/sqllogictest/test_files/cte.slt:
##########
@@ -19,3 +19,221 @@ query II
 select * from (WITH source AS (select 1 as e) SELECT * FROM source) t1,   
(WITH source AS (select 1 as e) SELECT * FROM source) t2
 ----
 1 1
+
+# trivial recursive CTE works
+query I rowsort
+WITH RECURSIVE nodes AS ( 
+    SELECT 1 as id
+    UNION ALL 
+    SELECT id + 1 as id 
+    FROM nodes
+    WHERE id < 10
+)
+SELECT * FROM nodes
+----
+1
+10
+2
+3
+4
+5
+6
+7
+8
+9
+
+# setup
+statement ok
+CREATE EXTERNAL TABLE beg_account_balance STORED as CSV WITH HEADER ROW 
LOCATION '../../testing/data/csv/recursive_query_account_beg_2.csv'
+
+# setup
+statement ok
+CREATE EXTERNAL TABLE account_balance_growth STORED as CSV WITH HEADER ROW 
LOCATION '../../testing/data/csv/recursive_query_account_growth_3.csv'
+
+# recursive CTE with static term derived from table works
+query ITI rowsort
+WITH RECURSIVE balances AS (
+    SELECT * from beg_account_balance
+    UNION ALL 
+    SELECT time + 1 as time, name, account_balance + 10 as account_balance
+    FROM balances
+    WHERE time < 10
+)
+SELECT * FROM balances
+----
+1 John 100
+1 Tim 200
+10 John 190
+10 Tim 290
+2 John 110
+2 Tim 210
+3 John 120
+3 Tim 220
+4 John 130
+4 Tim 230
+5 John 140
+5 Tim 240
+6 John 150
+6 Tim 250
+7 John 160
+7 Tim 260
+8 John 170
+8 Tim 270
+9 John 180
+9 Tim 280
+
+
+# recursive CTE with recursive join works
+query ITI
+WITH RECURSIVE balances AS (
+    SELECT time as time, name as name, account_balance as account_balance
+    FROM beg_account_balance
+    UNION ALL 
+    SELECT time + 1 as time, balances.name, account_balance + 
account_balance_growth.account_growth as account_balance
+    FROM balances
+    JOIN account_balance_growth
+    ON balances.name = account_balance_growth.name
+    WHERE time < 10
+)
+SELECT * FROM balances
+ORDER BY time, name
+----
+1 John 100
+1 Tim 200
+2 John 103
+2 Tim 220
+3 John 106
+3 Tim 240
+4 John 109
+4 Tim 260
+5 John 112
+5 Tim 280
+6 John 115
+6 Tim 300
+7 John 118
+7 Tim 320
+8 John 121
+8 Tim 340
+9 John 124
+9 Tim 360
+10 John 127
+10 Tim 380
+
+# recursive CTE with aggregations works
+query I rowsort
+WITH RECURSIVE nodes AS ( 
+    SELECT 1 as id
+    UNION ALL 
+    SELECT id + 1 as id 
+    FROM nodes
+    WHERE id < 10
+)
+SELECT sum(id) FROM nodes
+----
+55
+
+# setup
+statement ok
+CREATE TABLE t(a BIGINT) AS VALUES(1),(2),(3);
+
+# referencing CTE multiple times does not error
+query II rowsort
+WITH RECURSIVE my_cte AS (
+    SELECT a from t 
+    UNION ALL 
+    SELECT a+2 as a
+    FROM my_cte 
+    WHERE a<5
+)
+SELECT * FROM my_cte t1, my_cte
+----
+1 1
+1 2
+1 3
+1 3
+1 4
+1 5
+1 5
+1 6
+2 1
+2 2
+2 3
+2 3
+2 4
+2 5
+2 5
+2 6
+3 1
+3 1
+3 2
+3 2
+3 3
+3 3
+3 3
+3 3
+3 4
+3 4
+3 5
+3 5
+3 5
+3 5
+3 6
+3 6
+4 1
+4 2
+4 3
+4 3
+4 4
+4 5
+4 5
+4 6
+5 1
+5 1
+5 2
+5 2
+5 3
+5 3
+5 3
+5 3
+5 4
+5 4
+5 5
+5 5
+5 5
+5 5
+5 6
+5 6
+6 1
+6 2
+6 3
+6 3
+6 4
+6 5
+6 5
+6 6
+
+# CTE within recursive CTE works and does not result in 'index out of bounds: 
the len is 0 but the index is 0'
+query
+WITH RECURSIVE "recursive_cte" AS (
+    SELECT 1 as "val"
+  UNION ALL (
+    WITH "sub_cte" AS (
+      SELECT
+        time,
+        1 as "val"
+      FROM
+        (SELECT DISTINCT "time" FROM "beg_account_balance")

Review Comment:
   This issue seems to arise from (or at lease be correlated to) the 
MemoryExec. I think this because the difference in MemExec partition sizes 
(more on that below) and because when I remove `try_swapping_with_memory` 
https://github.com/matthewgapp/arrow-datafusion/blob/ea4cb6e9c6b8a0bc892ef76578fd7853ddc692ab/datafusion/core/src/physical_optimizer/projection_pushdown.rs#L110,
 our project no longer blows up. 
   
   The difference between the plans that fail (in our project) and succeed 
(outside of our project) is that the offending Memory exec has a partition size 
of 2 ( `MemoryExec: partitions=1, partition_sizes=[2]` ) and the successful 
plan has a partition size of 1 ( `MemoryExec: partitions=1, 
partition_sizes=[1]` ). 
   
   I'm not sure how to force datafusion to take on a larger partition size so 
that I can reproduce outside of our project 🤔 
   
   **Offending physical plan from logs:**
   ```text
   Input physical plan:
   AggregateExec: mode=FinalPartitioned, gby=[emd@0 as emd, beg@1 as beg, 
prices_row_num@2 as prices_row_num, prices_row_num_advancement@3 as 
prices_row_num_advancement], aggr=[]
     AggregateExec: mode=Partial, gby=[emd@0 as emd, beg@1 as beg, 
prices_row_num@2 as prices_row_num, prices_row_num_advancement@3 as 
prices_row_num_advancement], aggr=[]
       ProjectionExec: expr=[emd@0 as emd, beg@1 as beg, prices_row_num@2 as 
prices_row_num, prices_row_num_advancement@3 as prices_row_num_advancement]
         RecursiveQueryExec: is_distinct=false
           ProjectionExec: expr=[40 as emd, 0 as beg, prices_row_num@5 as 
prices_row_num, prices_row_num_advancement@6 as prices_row_num_advancement]
             NestedLoopJoinExec: join_type=Left
               NestedLoopJoinExec: join_type=Left
                 HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(prices_row_num@3, prices_row_num@0)]
                   MemoryExec: partitions=1, partition_sizes=[2]
                   AggregateExec: mode=FinalPartitioned, gby=[prices_row_num@0 
as prices_row_num], aggr=[]
                     AggregateExec: mode=Partial, gby=[prices_row_num@0 as 
prices_row_num], aggr=[]
                       ProjectionExec: expr=[prices_row_num@0 as prices_row_num]
                         ProjectionExec: 
expr=[MIN(prices_with_row_num_2.prices_row_num)@0 as prices_row_num]
                           AggregateExec: mode=Final, gby=[], 
aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                             AggregateExec: mode=Partial, gby=[], 
aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                               MemoryExec: partitions=1, partition_sizes=[2]
                 ProjectionExec: expr=[prices_row_num@0 as prices_row_num]
                   ProjectionExec: 
expr=[MIN(prices_with_row_num_2.prices_row_num)@0 as prices_row_num]
                     AggregateExec: mode=Final, gby=[], 
aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                       AggregateExec: mode=Partial, gby=[], 
aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                         MemoryExec: partitions=1, partition_sizes=[2]
               ProjectionExec: expr=[prices_row_num_advancement@0 as 
prices_row_num_advancement]
                 ProjectionExec: 
expr=[MIN(prices_with_row_num_2.prices_row_num)@0 as prices_row_num_advancement]
                   AggregateExec: mode=Final, gby=[], 
aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                     AggregateExec: mode=Partial, gby=[], 
aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                       NestedLoopJoinExec: join_type=Inner, 
filter=prices_row_num@0 > prices_row_num@1
                         MemoryExec: partitions=1, partition_sizes=[2]
                         ProjectionExec: expr=[prices_row_num@0 as 
prices_row_num]
                           ProjectionExec: 
expr=[MIN(prices_with_row_num_2.prices_row_num)@0 as prices_row_num]
                             AggregateExec: mode=Final, gby=[], 
aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                               AggregateExec: mode=Partial, gby=[], 
aggr=[MIN(prices_with_row_num_2.prices_row_num)]
                                 MemoryExec: partitions=1, partition_sizes=[2]
           ProjectionExec: expr=[emd@0 + 40 as emd, emd@0 as beg, 
prices_row_num@8 as prices_row_num, prices_row_num_advancement@9 as 
prices_row_num_advancement]
             ProjectionExec: expr=[emd@0 as emd, beg@1 as beg, prices_row_num@2 
as prices_row_num, prices_row_num_advancement@3 as prices_row_num_advancement, 
Index@4 as Index, product@5 as product, price@6 as price, prices_row_num@7 as 
prices_row_num, prices_row_num@9 as prices_row_num, 
prices_row_num_advancement@10 as prices_row_num_advancement]
               HashJoinExec: mode=Partitioned, join_type=Left, 
on=[(coalesce(prices_with_row_num_2.prices_row_num,recursive_cte.prices_row_num_advancement)@8,
 prices_row_num@0)]
                 ProjectionExec: expr=[emd@0 as emd, beg@1 as beg, 
prices_row_num@2 as prices_row_num, prices_row_num_advancement@3 as 
prices_row_num_advancement, Index@4 as Index, product@5 as product, price@6 as 
price, prices_row_num@7 as prices_row_num, coalesce(prices_row_num@7, 
prices_row_num_advancement@3) as 
coalesce(prices_with_row_num_2.prices_row_num,recursive_cte.prices_row_num_advancement)]
                   HashJoinExec: mode=Partitioned, join_type=Left, 
on=[(prices_row_num_advancement@3, prices_row_num@3)]
                     FilterExec: prices_row_num_advancement@3 IS NOT NULL
                       WorkTableExec: name=recursive_cte
                     MemoryExec: partitions=1, partition_sizes=[2]
                 ProjectionExec: expr=[prices_row_num@0 as prices_row_num, 
LEAD(prices_with_row_num_2.prices_row_num,Int64(1)) ORDER BY 
[prices_with_row_num_2.prices_row_num ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@1 as prices_row_num_advancement]
                   BoundedWindowAggExec: 
wdw=[LEAD(prices_with_row_num_2.prices_row_num,Int64(1)) ORDER BY 
[prices_with_row_num_2.prices_row_num ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW: Ok(Field { name: 
"LEAD(prices_with_row_num_2.prices_row_num,Int64(1)) ORDER BY 
[prices_with_row_num_2.prices_row_num ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
                     AggregateExec: mode=FinalPartitioned, 
gby=[prices_row_num@0 as prices_row_num], aggr=[]
                       AggregateExec: mode=Partial, gby=[prices_row_num@0 as 
prices_row_num], aggr=[]
                         ProjectionExec: expr=[prices_row_num@3 as 
prices_row_num]
                           MemoryExec: partitions=1, partition_sizes=[2]
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to