crepererum opened a new issue, #16888:
URL: https://github.com/apache/datafusion/issues/16888

   ### Describe the bug
   
   It seems that under certain circumstances, window aggregation outputs are 
not sorted. In my test, this requires `set datafusion.execution.batch_size = 
1;`, but there might be other cases.
   
   ### To Reproduce
   
   Use this sqllogictest:
   
   **The output is NOT deterministic!**
   
   ```
   statement ok
   CREATE TABLE t (
       k VARCHAR,
       v Int,
       time TIMESTAMP WITH TIME ZONE
   );
   
   statement ok
   INSERT INTO t (k, v, time) VALUES
       ('a', 1, '1970-01-01T00:01:00.00Z'),
       ('a', 1, '1970-01-01T00:02:00.00Z'),
       ('a', 1, '1970-01-01T00:03:00.00Z'),
       ('a', 2, '1970-01-01T00:03:00.00Z'),
       ('a', 1, '1970-01-01T00:04:00.00Z'),
       ('b', 3, '1970-01-01T00:01:00.00Z'),
       ('b', 3, '1970-01-01T00:02:00.00Z'),
       ('b', 4, '1970-01-01T00:03:00.00Z'),
       ('b', 4, '1970-01-01T00:03:00.00Z');
   
   query TPI
   SELECT
       k,
       time,
       COUNT(v) OVER (
           PARTITION BY k
           ORDER BY time
           RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
       ) AS normal_count
   FROM t
   ODER BY k, time;
   ----
   a 1970-01-01T00:01:00Z 1
   a 1970-01-01T00:02:00Z 2
   a 1970-01-01T00:03:00Z 4
   a 1970-01-01T00:03:00Z 4
   a 1970-01-01T00:04:00Z 4
   b 1970-01-01T00:01:00Z 1
   b 1970-01-01T00:02:00Z 2
   b 1970-01-01T00:03:00Z 4
   b 1970-01-01T00:03:00Z 4
   
   query TT
   EXPLAIN SELECT
       k,
       time,
       COUNT(v) OVER (
           PARTITION BY k
           ORDER BY time
           RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
       ) AS normal_count
   FROM t
   ODER BY k, time;
   ----
   logical_plan
   01)Projection: oder.k, oder.time, count(oder.v) PARTITION BY [oder.k] ORDER 
BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW 
AS normal_count
   02)--WindowAggr: windowExpr=[[count(oder.v) PARTITION BY [oder.k] ORDER BY 
[oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, 
days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(oder.v) 
PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 
minutes PRECEDING AND CURRENT ROW]]
   03)----SubqueryAlias: oder
   04)------TableScan: t projection=[k, v, time]
   physical_plan
   01)ProjectionExec: expr=[k@0 as k, time@2 as time, count(oder.v) PARTITION 
BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes 
PRECEDING AND CURRENT ROW@3 as normal_count]
   02)--BoundedWindowAggExec: wdw=[count(oder.v) PARTITION BY [oder.k] ORDER BY 
[oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: 
Field { name: "count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC 
NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: 
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 
120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted]
   03)----SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], 
preserve_partitioning=[false]
   04)------DataSourceExec: partitions=1, partition_sizes=[1]
   
   
   statement ok
   set datafusion.execution.batch_size = 1;
   
   query TPI
   SELECT
       k,
       time,
       COUNT(v) OVER (
           PARTITION BY k
           ORDER BY time
           RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
       ) AS normal_count
   FROM t
   ODER BY k, time;
   ----
   b 1970-01-01T00:01:00Z 1
   b 1970-01-01T00:02:00Z 2
   a 1970-01-01T00:01:00Z 1
   a 1970-01-01T00:02:00Z 2
   a 1970-01-01T00:03:00Z 4
   a 1970-01-01T00:03:00Z 4
   b 1970-01-01T00:03:00Z 4
   b 1970-01-01T00:03:00Z 4
   a 1970-01-01T00:04:00Z 4
   
   query TT
   EXPLAIN SELECT
       k,
       time,
       COUNT(v) OVER (
           PARTITION BY k
           ORDER BY time
           RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
       ) AS normal_count
   FROM t
   ODER BY k, time;
   ----
   logical_plan
   01)Projection: oder.k, oder.time, count(oder.v) PARTITION BY [oder.k] ORDER 
BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW 
AS normal_count
   02)--WindowAggr: windowExpr=[[count(oder.v) PARTITION BY [oder.k] ORDER BY 
[oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, 
days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(oder.v) 
PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 
minutes PRECEDING AND CURRENT ROW]]
   03)----SubqueryAlias: oder
   04)------TableScan: t projection=[k, v, time]
   physical_plan
   01)ProjectionExec: expr=[k@0 as k, time@2 as time, count(oder.v) PARTITION 
BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes 
PRECEDING AND CURRENT ROW@3 as normal_count]
   02)--BoundedWindowAggExec: wdw=[count(oder.v) PARTITION BY [oder.k] ORDER BY 
[oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: 
Field { name: "count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC 
NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: 
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 
120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted]
   03)----SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], 
preserve_partitioning=[true]
   04)------CoalesceBatchesExec: target_batch_size=1
   05)--------RepartitionExec: partitioning=Hash([k@0], 4), input_partitions=4
   06)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
   07)------------DataSourceExec: partitions=1, partition_sizes=[1]
   ```
   
   ### Expected behavior
   
   Both queries in the test file above should always result in an ordered 
output.
   
   ### Additional context
   
   This could be related to #15833.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to