alamb commented on issue #7198: URL: https://github.com/apache/arrow-datafusion/issues/7198#issuecomment-1666547553
The more I think about this, the more I like where @avantgardnerio is going with https://github.com/apache/arrow-datafusion/pull/7192, and I think we could use the same operator in https://github.com/apache/arrow-datafusion/pull/7192 for this ticket as well as https://github.com/apache/arrow-datafusion/issues/6899, and https://github.com/apache/arrow-datafusion/issues/7196. I hope we can use the same operator for all these queries because: 1. It will allow us to pool resources (to make it very fast and efficient) 2. Can keep the boundaries clearly defined (and this keep the long term maintenance cost down) # "Observation" -- No Aggregates One key observation that @avantgardnerio made (perhaps implicitly) in https://github.com/apache/arrow-datafusion/pull/7192 is that even though the query in this ticket has aggregates (`max(time)`) there is no **actual** aggregation -- what is needed is to 'keep the top K items per group' where 'top' is defined by some particular sort order. # Proposal Thus, I think we could make the code in https://github.com/apache/arrow-datafusion/pull/7192 into a `TopKPerPartition` `ExecutionPlan`[^1] that has the following semantics: [^1]: I think this is what @ozankabak and @comphead were hinting at in https://github.com/apache/arrow-datafusion/issues/6899#issuecomment-1630479576 > Keeps the top K values, as defined by the order_exprs for each distinct value of partition_exprs ``` ┌───────────────────────────────┐ │ TopKPerPartition │ │ gby_exprs │ │ order_exprs │ │ K │ └───────────────────────────────┘ ``` # Use for min/max queries with limit (this ticket) So for the ```sql SELECT tag, max(time) FROM t GROUP BY tag ORDER BY max(time) DESC LIMIT 10 ``` We would use ``` TopKPerPartition gby_exprs: [tag] order_exprs: [time DESC] k: 10 ``` # General purpose ORDER BY limit query https://github.com/apache/arrow-datafusion/issues/7196 ```sql SELECT c1, c2 FROM t ORDER BY c3 LIMIT 10 ``` We could use the same operator (though maybe it has a more optimized implementation when there are no groups, like we [have for no group aggregate streams](https://github.com/apache/arrow-datafusion/blob/2d91917bb66542a44c31b7a306512bb4e09b5298/datafusion/core/src/physical_plan/aggregates/no_grouping.rs#L67)): ``` TopKPerPartition gby_exprs: [] order_exprs: [c3] k: 10 ``` # Queries that have a predicate on `row_number()` in https://github.com/apache/arrow-datafusion/issues/6899 ```sql SELECT ... ROW_NUMBER() OVER (PARTITION BY value1, ORDER BY value2) as rn WHERE rn < 10 ``` we could use ``` TopKPerPartition gby_exprs: [value1] order_exprs: [value2] k: 10 ``` P.s. I also tried, and failed, to think of a clever rewrites at the SQL level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
