crepererum opened a new issue, #16888:
URL: https://github.com/apache/datafusion/issues/16888
### Describe the bug
It seems that under certain circumstances, window aggregation outputs are
not sorted. In my test, this requires `set datafusion.execution.batch_size =
1;`, but there might be other cases.
### To Reproduce
Use this sqllogictest:
**The output is NOT deterministic!**
```
statement ok
CREATE TABLE t (
k VARCHAR,
v Int,
time TIMESTAMP WITH TIME ZONE
);
statement ok
INSERT INTO t (k, v, time) VALUES
('a', 1, '1970-01-01T00:01:00.00Z'),
('a', 1, '1970-01-01T00:02:00.00Z'),
('a', 1, '1970-01-01T00:03:00.00Z'),
('a', 2, '1970-01-01T00:03:00.00Z'),
('a', 1, '1970-01-01T00:04:00.00Z'),
('b', 3, '1970-01-01T00:01:00.00Z'),
('b', 3, '1970-01-01T00:02:00.00Z'),
('b', 4, '1970-01-01T00:03:00.00Z'),
('b', 4, '1970-01-01T00:03:00.00Z');
query TPI
SELECT
k,
time,
COUNT(v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS normal_count
FROM t
ODER BY k, time;
----
a 1970-01-01T00:01:00Z 1
a 1970-01-01T00:02:00Z 2
a 1970-01-01T00:03:00Z 4
a 1970-01-01T00:03:00Z 4
a 1970-01-01T00:04:00Z 4
b 1970-01-01T00:01:00Z 1
b 1970-01-01T00:02:00Z 2
b 1970-01-01T00:03:00Z 4
b 1970-01-01T00:03:00Z 4
query TT
EXPLAIN SELECT
k,
time,
COUNT(v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS normal_count
FROM t
ODER BY k, time;
----
logical_plan
01)Projection: oder.k, oder.time, count(oder.v) PARTITION BY [oder.k] ORDER
BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW
AS normal_count
02)--WindowAggr: windowExpr=[[count(oder.v) PARTITION BY [oder.k] ORDER BY
[oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0,
days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(oder.v)
PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2
minutes PRECEDING AND CURRENT ROW]]
03)----SubqueryAlias: oder
04)------TableScan: t projection=[k, v, time]
physical_plan
01)ProjectionExec: expr=[k@0 as k, time@2 as time, count(oder.v) PARTITION
BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes
PRECEDING AND CURRENT ROW@3 as normal_count]
02)--BoundedWindowAggExec: wdw=[count(oder.v) PARTITION BY [oder.k] ORDER BY
[oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC
NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type:
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds:
120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted]
03)----SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST],
preserve_partitioning=[false]
04)------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
set datafusion.execution.batch_size = 1;
query TPI
SELECT
k,
time,
COUNT(v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS normal_count
FROM t
ODER BY k, time;
----
b 1970-01-01T00:01:00Z 1
b 1970-01-01T00:02:00Z 2
a 1970-01-01T00:01:00Z 1
a 1970-01-01T00:02:00Z 2
a 1970-01-01T00:03:00Z 4
a 1970-01-01T00:03:00Z 4
b 1970-01-01T00:03:00Z 4
b 1970-01-01T00:03:00Z 4
a 1970-01-01T00:04:00Z 4
query TT
EXPLAIN SELECT
k,
time,
COUNT(v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS normal_count
FROM t
ODER BY k, time;
----
logical_plan
01)Projection: oder.k, oder.time, count(oder.v) PARTITION BY [oder.k] ORDER
BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW
AS normal_count
02)--WindowAggr: windowExpr=[[count(oder.v) PARTITION BY [oder.k] ORDER BY
[oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0,
days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(oder.v)
PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2
minutes PRECEDING AND CURRENT ROW]]
03)----SubqueryAlias: oder
04)------TableScan: t projection=[k, v, time]
physical_plan
01)ProjectionExec: expr=[k@0 as k, time@2 as time, count(oder.v) PARTITION
BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes
PRECEDING AND CURRENT ROW@3 as normal_count]
02)--BoundedWindowAggExec: wdw=[count(oder.v) PARTITION BY [oder.k] ORDER BY
[oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC
NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type:
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds:
120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted]
03)----SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST],
preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=1
05)--------RepartitionExec: partitioning=Hash([k@0], 4), input_partitions=4
06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
```
### Expected behavior
Both queries in the test file above should always result in an ordered
output.
### Additional context
This could be related to #15833.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]