mustafasrepo commented on code in PR #7129:
URL: https://github.com/apache/arrow-datafusion/pull/7129#discussion_r1279277814


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2689,13 +2703,12 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 --SortExec: expr=[country@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as 
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST]@2 as fv2]
 ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
---------SortExec: expr=[ts@1 ASC NULLS LAST]
-----------CoalesceBatchesExec: target_batch_size=8192
-------------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
---------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], 
ordering_mode=None
-----------------SortExec: expr=[ts@1 ASC NULLS LAST]
-------------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
---------------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
+------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]

Review Comment:
   Consider table below 
   
   | amount    | ts |
   | -------- | ------- |
   | 12 | 1|
   | 11| 2|
   | 13| 3|
   | 12|4|
   | 11| 5|
   | 13| 6|
   | 12 |7|
   | 11 | 8|
   | 13| 9|
   
   Also assume we have 3 partitions, receiving following data 
   
   | amount    | ts |
   | -------- | ------- |
   | 11| 2|
   | 11| 5|
   | 11 | 8|
   
   | amount    | ts |
   | -------- | ------- |
   | 12 | 1|
   | 12 |4|
   | 12 |7|
   
   | amount    | ts |
   | -------- | ------- |
   | 13| 3|
   | 13| 6|
   | 13| 9|
   
   AggregatePartial would produce following values
   (11, 2); (12, 1); (13;3) for each partition. First value represents 
first_value for this partitition. Second value represents its corresponding ts 
value. In this case
   AgregateFinal would receive following batch
   
   | amount    | ts of amount partial result |
   | -------- | ------- |
   | 11| 2|
   | 12| 1|
   | 13| 3|. 
   
   During `merge_batch` method of `first_value` first value is calculated by 
considering `ts` values corresponding to `amount` for each partition. In our 
case, since requirement is `ts ASC`, first value should be from the row that 
have smallest `ts` (in our case 1). Hence result will be 12. Please note that 
`ts` at the final input and `ts` at the partial input doesn't correspond to 
same column. `ts` at the final aggregation input, comes from the `state` of 
aggregation partial result.
   In short, we delegated responsibility to sort to `merge_batch` algorithm. 
Because, the column where sorting will be done is no longer valid for the 
aggregation final. 
   
   > Maybe the right solution would be to do a single phase grouping when any 
of the aggregates have an ORDER BY clause
   
   this would certainly work. However, I wanted to use existing parallelization 
as much as possible. Hence, I wanted to make aggregators to work in Partial and 
Final modes.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to