matthewgapp commented on code in PR #8840:
URL: https://github.com/apache/arrow-datafusion/pull/8840#discussion_r1451075785


##########
datafusion/sqllogictest/test_files/cte.slt:
##########
@@ -19,3 +19,221 @@ query II
 select * from (WITH source AS (select 1 as e) SELECT * FROM source) t1,   
(WITH source AS (select 1 as e) SELECT * FROM source) t2
 ----
 1 1
+
+# trivial recursive CTE works
+query I rowsort
+WITH RECURSIVE nodes AS ( 
+    SELECT 1 as id
+    UNION ALL 
+    SELECT id + 1 as id 
+    FROM nodes
+    WHERE id < 10
+)
+SELECT * FROM nodes
+----
+1
+10
+2
+3
+4
+5
+6
+7
+8
+9
+
+# setup
+statement ok
+CREATE EXTERNAL TABLE beg_account_balance STORED as CSV WITH HEADER ROW 
LOCATION '../../testing/data/csv/recursive_query_account_beg_2.csv'
+
+# setup
+statement ok
+CREATE EXTERNAL TABLE account_balance_growth STORED as CSV WITH HEADER ROW 
LOCATION '../../testing/data/csv/recursive_query_account_growth_3.csv'
+
+# recursive CTE with static term derived from table works
+query ITI rowsort
+WITH RECURSIVE balances AS (
+    SELECT * from beg_account_balance
+    UNION ALL 
+    SELECT time + 1 as time, name, account_balance + 10 as account_balance
+    FROM balances
+    WHERE time < 10
+)
+SELECT * FROM balances
+----
+1 John 100
+1 Tim 200
+10 John 190
+10 Tim 290
+2 John 110
+2 Tim 210
+3 John 120
+3 Tim 220
+4 John 130
+4 Tim 230
+5 John 140
+5 Tim 240
+6 John 150
+6 Tim 250
+7 John 160
+7 Tim 260
+8 John 170
+8 Tim 270
+9 John 180
+9 Tim 280
+
+
+# recursive CTE with recursive join works
+query ITI
+WITH RECURSIVE balances AS (
+    SELECT time as time, name as name, account_balance as account_balance
+    FROM beg_account_balance
+    UNION ALL 
+    SELECT time + 1 as time, balances.name, account_balance + 
account_balance_growth.account_growth as account_balance
+    FROM balances
+    JOIN account_balance_growth
+    ON balances.name = account_balance_growth.name
+    WHERE time < 10
+)
+SELECT * FROM balances
+ORDER BY time, name
+----
+1 John 100
+1 Tim 200
+2 John 103
+2 Tim 220
+3 John 106
+3 Tim 240
+4 John 109
+4 Tim 260
+5 John 112
+5 Tim 280
+6 John 115
+6 Tim 300
+7 John 118
+7 Tim 320
+8 John 121
+8 Tim 340
+9 John 124
+9 Tim 360
+10 John 127
+10 Tim 380
+
+# recursive CTE with aggregations works
+query I rowsort
+WITH RECURSIVE nodes AS ( 
+    SELECT 1 as id
+    UNION ALL 
+    SELECT id + 1 as id 
+    FROM nodes
+    WHERE id < 10
+)
+SELECT sum(id) FROM nodes
+----
+55
+
+# setup
+statement ok
+CREATE TABLE t(a BIGINT) AS VALUES(1),(2),(3);
+
+# referencing CTE multiple times does not error
+query II rowsort
+WITH RECURSIVE my_cte AS (
+    SELECT a from t 
+    UNION ALL 
+    SELECT a+2 as a
+    FROM my_cte 
+    WHERE a<5
+)
+SELECT * FROM my_cte t1, my_cte
+----
+1 1
+1 2
+1 3
+1 3
+1 4
+1 5
+1 5
+1 6
+2 1
+2 2
+2 3
+2 3
+2 4
+2 5
+2 5
+2 6
+3 1
+3 1
+3 2
+3 2
+3 3
+3 3
+3 3
+3 3
+3 4
+3 4
+3 5
+3 5
+3 5
+3 5
+3 6
+3 6
+4 1
+4 2
+4 3
+4 3
+4 4
+4 5
+4 5
+4 6
+5 1
+5 1
+5 2
+5 2
+5 3
+5 3
+5 3
+5 3
+5 4
+5 4
+5 5
+5 5
+5 5
+5 5
+5 6
+5 6
+6 1
+6 2
+6 3
+6 3
+6 4
+6 5
+6 5
+6 6
+
+# CTE within recursive CTE works and does not result in 'index out of bounds: 
the len is 0 but the index is 0'
+query
+WITH RECURSIVE "recursive_cte" AS (
+    SELECT 1 as "val"
+  UNION ALL (
+    WITH "sub_cte" AS (
+      SELECT
+        time,
+        1 as "val"
+      FROM
+        (SELECT DISTINCT "time" FROM "beg_account_balance")

Review Comment:
   I can't seem to reproduce outside of our project just yet. Getting the 
following panic 
   
   
   ```rust
   The application panicked (crashed).
   Message:  primitive array
   Location: 
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-49.0.0/src/cast.rs:751
   
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BACKTRACE 
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                                   ⋮ 15 frames hidden ⋮                         
     
     16: core::panicking::panic_display::hbefe501d317ed1d7
         at 
/rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/panicking.rs:196
     17: core::panicking::panic_str::h9863c42b2d46e0f3
         at 
/rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/panicking.rs:171
     18: core::option::expect_failed::h4e9eb510dd8145dd
         at 
/rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/option.rs:1980
     19: core::option::Option<T>::expect::hec9b720ec03fc4f9
         at 
/rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/option.rs:894
     20: arrow_array::cast::AsArray::as_primitive::h493f40e8665dba69
         at 
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-49.0.0/src/cast.rs:751
          749 │     /// Downcast this to a [`PrimitiveArray`] panicking if not 
possible
          750 │     fn as_primitive<T: ArrowPrimitiveType>(&self) -> 
&PrimitiveArray<T> {
          751 >         self.as_primitive_opt().expect("primitive array")
          752 │     }
          753 │ 
     21: 
<datafusion_physical_plan::aggregates::group_values::primitive::GroupValuesPrimitive<T>
 as 
datafusion_physical_plan::aggregates::group_values::GroupValues>::intern::h6edcb0cb053f37df
         at 
/Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/aggregates/group_values/primitive.rs:116
          114 │         groups.clear();
          115 │ 
          116 >         for v in cols[0].as_primitive::<T>() {
          117 │             let group_id = match v {
          118 │                 None => *self.null_group.get_or_insert_with(|| {
     22: 
datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::group_aggregate_batch::h941a0f4d623d2cc4
         at 
/Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs:548
          546 │             // calculate the group indices for each input row
          547 │             let starting_num_groups = self.group_values.len();
          548 >             self.group_values
          549 │                 .intern(group_values, &mut 
self.current_group_indices)?;
          550 │             let group_indices = &self.current_group_indices;
     23: 
<datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as 
futures_core::stream::Stream>::poll_next::h778e63c29afa7d81
         at 
/Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs:438
          436 │ 
          437 │                             // Do the grouping
          438 >                             
extract_ok!(self.group_aggregate_batch(batch));
          439 │ 
          440 │                             // If we can begin emitting rows, 
do so,
     24: <core::pin::Pin<P> as 
futures_core::stream::Stream>::poll_next::h13c49d63ee92c868
         at 
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.29/src/stream.rs:120
          118 │ 
          119 │     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
          120 >         self.get_mut().as_mut().poll_next(cx)
          121 │     }
          122 │ 
     25: 
futures_util::stream::stream::StreamExt::poll_next_unpin::h5f167c57e87b8a15
         at 
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/mod.rs:1638
         1636 │         Self: Unpin,
         1637 │     {
         1638 >         Pin::new(self).poll_next(cx)
         1639 │     }
         1640 │ 
     26: <futures_util::stream::stream::next::Next<St> as 
core::future::future::Future>::poll::h558ec691ab4e7eeb
         at 
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/next.rs:32
           30 │ 
           31 │     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
           32 >         self.stream.poll_next_unpin(cx)
           33 │     }
           34 │ }
     27: 
datafusion_physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}}::h0d5b828859fa1f18
         at 
/Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/repartition/mod.rs:702
          700 │             // fetch the next batch
          701 │             let timer = metrics.fetch_time.timer();
          702 >             let result = stream.next().await;
          703 │             timer.done();
          704 │ 
     28: 
tokio::runtime::task::core::Core<T,S>::poll::{{closure}}::h24aa3949553e8510
         at 
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/runtime/task/core.rs:328
          326 │ 
          327 │                 let _guard = TaskIdGuard::enter(self.task_id);
          328 >                 future.poll(&mut cx)
          329 │             })
          330 │         };
     29: 
tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut::hed0e8a2b03eb44a5
         at 
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/loom/std/unsafe_cell.rs:16
           14 │     #[inline(always)]
           15 │     pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> 
R) -> R {
           16 >         f(self.0.get())
           17 │     }
           18 │ }
     30: tokio::runtime::task::core::Core<T,S>::poll::h6e6c537ecabf42b1
         at 
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/runtime/task/core.rs:317
          315 │     pub(super) fn poll(&self, mut cx: Context<'_>) -> 
Poll<T::Output> {
          316 │         let res = {
          317 >             self.stage.stage.with_mut(|ptr| {
          318 │                 // Safety: The caller ensures mutual exclusion 
to the field.
          319 │                 let future = match unsafe { &mut *ptr } {
     31: 
tokio::runtime::task::harness::poll_future::{{closure}}::hdaa68138fe576c41
         at 
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/runtime/task/harness.rs:485
          483 │         }
          484 │         let guard = Guard { core };
          485 >         let res = guard.core.poll(cx);
          486 │         mem::forget(guard);
          487 │         res
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to