ozankabak commented on code in PR #8281:
URL: https://github.com/apache/arrow-datafusion/pull/8281#discussion_r1402675203


##########
datafusion/sqllogictest/test_files/groupby.slt:
##########
@@ -3842,6 +3842,51 @@ ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT 
t1.x), MAX(alias1)@2 as MAX(
 --------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS 
Float64)t1.x, y@1 as y]
 ----------------------MemoryExec: partitions=1, partition_sizes=[1]
 
+# create an unbounded table that contains ordered timestamp.
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE csv_with_timestamps (
+  name VARCHAR,
+  ts TIMESTAMP
+)
+STORED AS CSV
+WITH ORDER (ts DESC)
+LOCATION '../core/tests/data/timestamps.csv'
+
+# below query should work in streaming mode.
+query TT
+EXPLAIN SELECT date_bin('15 minutes', ts) as time_chunks
+  FROM csv_with_timestamps
+  GROUP BY (date_bin('15 minutes', ts))
+  ORDER BY time_chunks DESC
+  LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: time_chunks DESC NULLS FIRST, fetch=5
+----Projection: date_bin(Utf8("15 minutes"),csv_with_timestamps.ts) AS 
time_chunks
+------Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("900000000000"), 
csv_with_timestamps.ts) AS date_bin(Utf8("15 
minutes"),csv_with_timestamps.ts)]], aggr=[[]]
+--------TableScan: csv_with_timestamps projection=[ts]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--SortPreservingMergeExec: [time_chunks@0 DESC], fetch=5
+----ProjectionExec: expr=[date_bin(Utf8("15 
minutes"),csv_with_timestamps.ts)@0 as time_chunks]
+------AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("15 
minutes"),csv_with_timestamps.ts)@0 as date_bin(Utf8("15 
minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted
+--------CoalesceBatchesExec: target_batch_size=2
+----------SortPreservingRepartitionExec: partitioning=Hash([date_bin(Utf8("15 
minutes"),csv_with_timestamps.ts)@0], 8), input_partitions=8, 
sort_exprs=date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 DESC
+------------AggregateExec: mode=Partial, gby=[date_bin(900000000000, ts@0) as 
date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[], 
ordering_mode=Sorted
+--------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
+----------------StreamingTableExec: partition_sizes=1, projection=[ts], 
infinite_source=true, output_ordering=[ts@0 DESC]
+
+query P
+SELECT date_bin('15 minutes', ts) as time_chunks
+  FROM csv_with_timestamps
+  GROUP BY (date_bin('15 minutes', ts))
+  ORDER BY time_chunks DESC
+  LIMIT 5
+----
+2018-12-13T12:00:00
+2018-11-13T17:00:00
+

Review Comment:
   Good idea. I added a test that shows the query works when the function is 
monotonic but not otherwise for unbounded sources. I also added a test that 
shows the query works regardless of the function's monotonicity for bounded 
sources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to