alamb commented on issue #7198:
URL: 
https://github.com/apache/arrow-datafusion/issues/7198#issuecomment-1666547553

   The more I think about this, the more I like where @avantgardnerio  is going 
with https://github.com/apache/arrow-datafusion/pull/7192, and I think we could 
use the same operator in https://github.com/apache/arrow-datafusion/pull/7192  
for this ticket as well   as 
https://github.com/apache/arrow-datafusion/issues/6899, and 
https://github.com/apache/arrow-datafusion/issues/7196.
   
   I hope we can use the same operator for all these queries because:
   1. It will allow us to pool resources (to make it very fast and efficient)
   2. Can keep the boundaries clearly defined  (and this keep the long term 
maintenance cost down)
   
   # "Observation" -- No Aggregates
   
   One key observation that @avantgardnerio made (perhaps implicitly) in 
https://github.com/apache/arrow-datafusion/pull/7192  is that even though the 
query in this ticket has aggregates (`max(time)`) there is no **actual** 
aggregation -- what is needed is to 'keep the top K items per group' where 
'top' is defined by some particular sort order. 
   
   # Proposal
   
   Thus, I think we could make the code in 
https://github.com/apache/arrow-datafusion/pull/7192 into a `TopKPerPartition` 
`ExecutionPlan`[^1] that has the following semantics:
   
   [^1]: I think this is what @ozankabak  and @comphead were hinting at in 
https://github.com/apache/arrow-datafusion/issues/6899#issuecomment-1630479576
   
   > Keeps the top K values, as defined by the order_exprs for each distinct 
value of partition_exprs
   
   ```
   ┌───────────────────────────────┐
   │       TopKPerPartition        │
   │           gby_exprs           │
   │          order_exprs          │
   │               K               │
   └───────────────────────────────┘
   ```
   
   # Use for min/max queries with limit (this ticket)
   So for the 
   ```sql
   SELECT tag, max(time)
   FROM t
   GROUP BY tag
   ORDER BY max(time) DESC
   LIMIT 10
   ```
   
   We would use
   ```
   TopKPerPartition
     gby_exprs: [tag]
     order_exprs: [time DESC]
     k: 10
   ```
   
   # General purpose ORDER BY limit query 
https://github.com/apache/arrow-datafusion/issues/7196
   
   ```sql
   SELECT c1, c2 
   FROM t
   ORDER BY c3
   LIMIT 10
   ```
   
   We could use the same operator (though maybe it has a more optimized 
implementation when there are no groups, like we [have for no group aggregate 
streams](https://github.com/apache/arrow-datafusion/blob/2d91917bb66542a44c31b7a306512bb4e09b5298/datafusion/core/src/physical_plan/aggregates/no_grouping.rs#L67)):
   ```
   TopKPerPartition
     gby_exprs: []
     order_exprs: [c3]
     k: 10
   ```
   
   # Queries that have a predicate on `row_number()` in 
https://github.com/apache/arrow-datafusion/issues/6899
   
   ```sql
   SELECT ...
     ROW_NUMBER() OVER (PARTITION BY value1, ORDER BY value2) as rn
   WHERE
     rn < 10
   ```
   
   we could use
   
   ```
   TopKPerPartition
     gby_exprs: [value1]
     order_exprs: [value2]
     k: 10
   ```
   
   
   P.s. I also tried, and failed, to think of a clever rewrites at the SQL 
level. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to