> On Mar 16, 2016, at 10:17 AM, Anoop Johnson <[email protected]> wrote:
>
> Is there a standardized mechanism for doing the scatter/gather in Calcite?
Calcite does not have a parallel/distributed execution engine. So I wouldn’t
say it has a scatter/gather mechanism. What Calcite does provide is algebraic
rewrites to enable you to combine partial results. Those rules tend to be
similar for a variety of execution engines.
There are various scenarios for combining partial results. If you know the keys
don’t overlap you can combine using UNION ALL. If the keys overlap but are
sorted you can combine using merge, and the resulting collection will also be
sorted. If the keys overlap then you can combine partial aggregates. The
SqlSplittableAggFunction interface is our best attempt to describe whether/how
partial aggregates can be combined.
Suppose you have the query
SELECT deptno, COUNT(*) AS c FROM emp GROUP BY deptno
and emp has two partitions, and you know these partitions have overlapping
deptno values, i.e. emp is equivalent to the view
CREATE VIEW emp AS SELECT * FROM emp1 UNION ALL SELECT * FROM emp2
Then Calcite will rewrite the query to something equivalent to
SELECT deptno, SUM(partialCount) AS c
FROM (
SELECT deptno, COUNT(*) AS partialCount FROM emp1 GROUP BY deptno
UNION ALL
SELECT deptno, COUNT(*) AS partialCount FROM emp2 GROUP BY deptno)
GROUP BY deptno
The logic is all there, so you don’t need to write any custom code.
If you know deptno values are sorted, you could write a SortedMergeUnion
operator (probably a sub-class of Union), and the Calcite rule would generate
the ‘input0.partialCount + input1.partialCount’ logic.
Julian
> Even in the absence of it, the scatter part is fairly straightforward. We
> could do something like:
>
> 1. Query co-ordinator receives the query that needs to run on a bunch of
> table partitions.
> 2. The query co-ordinator inspects the query and rewrites it. It mainly
> splits the non-partitionable aggregate operations (like AVG) into
> partitionable aggregate functions.
> 3. It then fans out the query to all the remote partition servers.
>
> I have not figured out what's the best way to aggregate the partial results
> coming back from the remote partitions. It mainly needs to do two things:
>
> 1. Combine the partial aggregates for the same group by keys coming from
> multiple partitions.
> 2. Compute the final result - if the original query had split aggregates,
> that needs to be combined.
>
> I could write custom code to do this, but if there is a way (or example) to
> do this within Calcite, I would appreciate any pointers.
>
> Thanks,
> Anoop
>
> On Mon, Mar 7, 2016 at 1:32 PM, Julian Hyde <[email protected]> wrote:
>
>> Some projects that use Calcite (e.g. Drill) model partitioned tables &
>> data using the Distribution trait. A trait represents a physical property
>> of the data; for example, Collation is another trait, and represents how
>> the data is sorted. The Exchange operator can change the distribution of
>> data, analogous to how the Sort operator changes the Collation of the data.
>>
>> As you correctly say, it is not valid to compute aggregate functions on
>> partitioned data. You need to combine into a single partition first. (Or
>> you can re-partition by the GROUP BY keys, and then safely combine using
>> Union-all.)
>>
>> You can compute partial aggregates on the partitions, then roll them up.
>> For example if you want to compute AVG(x) you can expand to SUM(x) /
>> COUNT(x), and compute partition SUM(x) and COUNT(X) on each partition then
>> sum them using SUM. AggregateUnionTransposeRule and
>> AggregateJoinTransposeRule deal in that kind of logic, and
>> SqlSplittableAggFunction is an SPI that could describe how an arbitrary
>> aggregate function could be split.
>>
>> Julian
>>
>>
>>
>>> On Mar 7, 2016, at 10:58 AM, Anoop Johnson <[email protected]>
>> wrote:
>>>
>>> Hello Everyone -
>>>
>>> We have an in-house distributed database with our own custom query
>> engine.
>>> We're considering using Calcite to replace our current query engine.
>>>
>>> I looked through the examples and one of the adapters. One thing I
>> haven't
>>> quite figured out is using Calcite when the data is partitioned and you
>>> need to fan out the query into all the partitions and combine the partial
>>> results.
>>>
>>> There are several challenges here - for instance, some operations like
>> AVG
>>> are not partitionable, so they have to be rewritten as COUNT and SUM and
>>> only at the final step.
>>>
>>> We could always write custom code to rewrite the query and reassemble the
>>> results. For instance, I saw a planner rule[1] in the Calcite codebase
>> that
>>> does something similar.
>>>
>>> Since this is a common problem, I was wondering if there was a standard
>> way
>>> of handling this use case or if there were any example plugin I could
>> look
>>> at.
>>>
>>> Thanks,
>>> Anoop
>>>
>>>
>>> [1]
>>>
>> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
>>
>>