Thank you Julian for the response. The SqlSplittableAggFunction SPI looks useful. I could use this to rewrite queries such that they are partitionable.
I have not looked closely at how Drill does the distributed query execution. Our table partitions are distributed over several nodes in a cluster. Essentially the query co-ordinator will have to do scatter the query among all the partitions and gather the partial results. For now we're not planning to repartition the data based on the GROUP BY keys because it is expensive to do so and most of the fields are low cardinality and can be safely combined by a simple scatter/gather. Is there a standardized mechanism for doing the scatter/gather in Calcite? Even in the absence of it, the scatter part is fairly straightforward. We could do something like: 1. Query co-ordinator receives the query that needs to run on a bunch of table partitions. 2. The query co-ordinator inspects the query and rewrites it. It mainly splits the non-partitionable aggregate operations (like AVG) into partitionable aggregate functions. 3. It then fans out the query to all the remote partition servers. I have not figured out what's the best way to aggregate the partial results coming back from the remote partitions. It mainly needs to do two things: 1. Combine the partial aggregates for the same group by keys coming from multiple partitions. 2. Compute the final result - if the original query had split aggregates, that needs to be combined. I could write custom code to do this, but if there is a way (or example) to do this within Calcite, I would appreciate any pointers. Thanks, Anoop On Mon, Mar 7, 2016 at 1:32 PM, Julian Hyde <[email protected]> wrote: > Some projects that use Calcite (e.g. Drill) model partitioned tables & > data using the Distribution trait. A trait represents a physical property > of the data; for example, Collation is another trait, and represents how > the data is sorted. The Exchange operator can change the distribution of > data, analogous to how the Sort operator changes the Collation of the data. > > As you correctly say, it is not valid to compute aggregate functions on > partitioned data. You need to combine into a single partition first. (Or > you can re-partition by the GROUP BY keys, and then safely combine using > Union-all.) > > You can compute partial aggregates on the partitions, then roll them up. > For example if you want to compute AVG(x) you can expand to SUM(x) / > COUNT(x), and compute partition SUM(x) and COUNT(X) on each partition then > sum them using SUM. AggregateUnionTransposeRule and > AggregateJoinTransposeRule deal in that kind of logic, and > SqlSplittableAggFunction is an SPI that could describe how an arbitrary > aggregate function could be split. > > Julian > > > > > On Mar 7, 2016, at 10:58 AM, Anoop Johnson <[email protected]> > wrote: > > > > Hello Everyone - > > > > We have an in-house distributed database with our own custom query > engine. > > We're considering using Calcite to replace our current query engine. > > > > I looked through the examples and one of the adapters. One thing I > haven't > > quite figured out is using Calcite when the data is partitioned and you > > need to fan out the query into all the partitions and combine the partial > > results. > > > > There are several challenges here - for instance, some operations like > AVG > > are not partitionable, so they have to be rewritten as COUNT and SUM and > > only at the final step. > > > > We could always write custom code to rewrite the query and reassemble the > > results. For instance, I saw a planner rule[1] in the Calcite codebase > that > > does something similar. > > > > Since this is a common problem, I was wondering if there was a standard > way > > of handling this use case or if there were any example plugin I could > look > > at. > > > > Thanks, > > Anoop > > > > > > [1] > > > https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java > >
