matthewgapp commented on code in PR #8840:
URL: https://github.com/apache/arrow-datafusion/pull/8840#discussion_r1451075785
##########
datafusion/sqllogictest/test_files/cte.slt:
##########
@@ -19,3 +19,221 @@ query II
select * from (WITH source AS (select 1 as e) SELECT * FROM source) t1,
(WITH source AS (select 1 as e) SELECT * FROM source) t2
----
1 1
+
+# trivial recursive CTE works
+query I rowsort
+WITH RECURSIVE nodes AS (
+ SELECT 1 as id
+ UNION ALL
+ SELECT id + 1 as id
+ FROM nodes
+ WHERE id < 10
+)
+SELECT * FROM nodes
+----
+1
+10
+2
+3
+4
+5
+6
+7
+8
+9
+
+# setup
+statement ok
+CREATE EXTERNAL TABLE beg_account_balance STORED as CSV WITH HEADER ROW
LOCATION '../../testing/data/csv/recursive_query_account_beg_2.csv'
+
+# setup
+statement ok
+CREATE EXTERNAL TABLE account_balance_growth STORED as CSV WITH HEADER ROW
LOCATION '../../testing/data/csv/recursive_query_account_growth_3.csv'
+
+# recursive CTE with static term derived from table works
+query ITI rowsort
+WITH RECURSIVE balances AS (
+ SELECT * from beg_account_balance
+ UNION ALL
+ SELECT time + 1 as time, name, account_balance + 10 as account_balance
+ FROM balances
+ WHERE time < 10
+)
+SELECT * FROM balances
+----
+1 John 100
+1 Tim 200
+10 John 190
+10 Tim 290
+2 John 110
+2 Tim 210
+3 John 120
+3 Tim 220
+4 John 130
+4 Tim 230
+5 John 140
+5 Tim 240
+6 John 150
+6 Tim 250
+7 John 160
+7 Tim 260
+8 John 170
+8 Tim 270
+9 John 180
+9 Tim 280
+
+
+# recursive CTE with recursive join works
+query ITI
+WITH RECURSIVE balances AS (
+ SELECT time as time, name as name, account_balance as account_balance
+ FROM beg_account_balance
+ UNION ALL
+ SELECT time + 1 as time, balances.name, account_balance +
account_balance_growth.account_growth as account_balance
+ FROM balances
+ JOIN account_balance_growth
+ ON balances.name = account_balance_growth.name
+ WHERE time < 10
+)
+SELECT * FROM balances
+ORDER BY time, name
+----
+1 John 100
+1 Tim 200
+2 John 103
+2 Tim 220
+3 John 106
+3 Tim 240
+4 John 109
+4 Tim 260
+5 John 112
+5 Tim 280
+6 John 115
+6 Tim 300
+7 John 118
+7 Tim 320
+8 John 121
+8 Tim 340
+9 John 124
+9 Tim 360
+10 John 127
+10 Tim 380
+
+# recursive CTE with aggregations works
+query I rowsort
+WITH RECURSIVE nodes AS (
+ SELECT 1 as id
+ UNION ALL
+ SELECT id + 1 as id
+ FROM nodes
+ WHERE id < 10
+)
+SELECT sum(id) FROM nodes
+----
+55
+
+# setup
+statement ok
+CREATE TABLE t(a BIGINT) AS VALUES(1),(2),(3);
+
+# referencing CTE multiple times does not error
+query II rowsort
+WITH RECURSIVE my_cte AS (
+ SELECT a from t
+ UNION ALL
+ SELECT a+2 as a
+ FROM my_cte
+ WHERE a<5
+)
+SELECT * FROM my_cte t1, my_cte
+----
+1 1
+1 2
+1 3
+1 3
+1 4
+1 5
+1 5
+1 6
+2 1
+2 2
+2 3
+2 3
+2 4
+2 5
+2 5
+2 6
+3 1
+3 1
+3 2
+3 2
+3 3
+3 3
+3 3
+3 3
+3 4
+3 4
+3 5
+3 5
+3 5
+3 5
+3 6
+3 6
+4 1
+4 2
+4 3
+4 3
+4 4
+4 5
+4 5
+4 6
+5 1
+5 1
+5 2
+5 2
+5 3
+5 3
+5 3
+5 3
+5 4
+5 4
+5 5
+5 5
+5 5
+5 5
+5 6
+5 6
+6 1
+6 2
+6 3
+6 3
+6 4
+6 5
+6 5
+6 6
+
+# CTE within recursive CTE works and does not result in 'index out of bounds:
the len is 0 but the index is 0'
+query
+WITH RECURSIVE "recursive_cte" AS (
+ SELECT 1 as "val"
+ UNION ALL (
+ WITH "sub_cte" AS (
+ SELECT
+ time,
+ 1 as "val"
+ FROM
+ (SELECT DISTINCT "time" FROM "beg_account_balance")
Review Comment:
I can't seem to reproduce outside of our project just yet. Getting the
following panic
```rust
The application panicked (crashed).
Message: primitive array
Location:
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-49.0.0/src/cast.rs:751
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BACKTRACE
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
⋮ 15 frames hidden ⋮
16: core::panicking::panic_display::hbefe501d317ed1d7
at
/rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/panicking.rs:196
17: core::panicking::panic_str::h9863c42b2d46e0f3
at
/rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/panicking.rs:171
18: core::option::expect_failed::h4e9eb510dd8145dd
at
/rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/option.rs:1980
19: core::option::Option<T>::expect::hec9b720ec03fc4f9
at
/rustc/e51e98dde6a60637b6a71b8105245b629ac3fe77/library/core/src/option.rs:894
20: arrow_array::cast::AsArray::as_primitive::h493f40e8665dba69
at
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-49.0.0/src/cast.rs:751
749 │ /// Downcast this to a [`PrimitiveArray`] panicking if not
possible
750 │ fn as_primitive<T: ArrowPrimitiveType>(&self) ->
&PrimitiveArray<T> {
751 > self.as_primitive_opt().expect("primitive array")
752 │ }
753 │
21:
<datafusion_physical_plan::aggregates::group_values::primitive::GroupValuesPrimitive<T>
as
datafusion_physical_plan::aggregates::group_values::GroupValues>::intern::h6edcb0cb053f37df
at
/Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/aggregates/group_values/primitive.rs:116
114 │ groups.clear();
115 │
116 > for v in cols[0].as_primitive::<T>() {
117 │ let group_id = match v {
118 │ None => *self.null_group.get_or_insert_with(|| {
22:
datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::group_aggregate_batch::h941a0f4d623d2cc4
at
/Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs:548
546 │ // calculate the group indices for each input row
547 │ let starting_num_groups = self.group_values.len();
548 > self.group_values
549 │ .intern(group_values, &mut
self.current_group_indices)?;
550 │ let group_indices = &self.current_group_indices;
23:
<datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as
futures_core::stream::Stream>::poll_next::h778e63c29afa7d81
at
/Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs:438
436 │
437 │ // Do the grouping
438 >
extract_ok!(self.group_aggregate_batch(batch));
439 │
440 │ // If we can begin emitting rows,
do so,
24: <core::pin::Pin<P> as
futures_core::stream::Stream>::poll_next::h13c49d63ee92c868
at
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.29/src/stream.rs:120
118 │
119 │ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
120 > self.get_mut().as_mut().poll_next(cx)
121 │ }
122 │
25:
futures_util::stream::stream::StreamExt::poll_next_unpin::h5f167c57e87b8a15
at
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/mod.rs:1638
1636 │ Self: Unpin,
1637 │ {
1638 > Pin::new(self).poll_next(cx)
1639 │ }
1640 │
26: <futures_util::stream::stream::next::Next<St> as
core::future::future::Future>::poll::h558ec691ab4e7eeb
at
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/next.rs:32
30 │
31 │ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
32 > self.stream.poll_next_unpin(cx)
33 │ }
34 │ }
27:
datafusion_physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}}::h0d5b828859fa1f18
at
/Users/matthewgapp/code/forked/arrow-datafusion/datafusion/physical-plan/src/repartition/mod.rs:702
700 │ // fetch the next batch
701 │ let timer = metrics.fetch_time.timer();
702 > let result = stream.next().await;
703 │ timer.done();
704 │
28:
tokio::runtime::task::core::Core<T,S>::poll::{{closure}}::h24aa3949553e8510
at
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/runtime/task/core.rs:328
326 │
327 │ let _guard = TaskIdGuard::enter(self.task_id);
328 > future.poll(&mut cx)
329 │ })
330 │ };
29:
tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut::hed0e8a2b03eb44a5
at
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/loom/std/unsafe_cell.rs:16
14 │ #[inline(always)]
15 │ pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) ->
R) -> R {
16 > f(self.0.get())
17 │ }
18 │ }
30: tokio::runtime::task::core::Core<T,S>::poll::h6e6c537ecabf42b1
at
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/runtime/task/core.rs:317
315 │ pub(super) fn poll(&self, mut cx: Context<'_>) ->
Poll<T::Output> {
316 │ let res = {
317 > self.stage.stage.with_mut(|ptr| {
318 │ // Safety: The caller ensures mutual exclusion
to the field.
319 │ let future = match unsafe { &mut *ptr } {
31:
tokio::runtime::task::harness::poll_future::{{closure}}::hdaa68138fe576c41
at
/Users/matthewgapp/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.0/src/runtime/task/harness.rs:485
483 │ }
484 │ let guard = Guard { core };
485 > let res = guard.core.poll(cx);
486 │ mem::forget(guard);
487 │ res
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]