> As to the distinct aggregations: At the least, these queries should be
rejected, not evaluated incorrectly.
Absolutely agree. If we don't support DISTINCT aggregations with one of
these approaches soon we should reject these queries rather than just
treating them as non-distinct queries.

> The term "stateful CombineFn" is not one I would use, as the nature of
state is linearity and the nature of CombineFn is parallelism. So I don't
totally understand this proposal. If I replace stateful CombineFn with
stateful DoFn with one combining state per column, then I think I
understand. FWIW on a runner with scalable SetState or MapState it will not
be any risk at all.

Yes the term stateful CombineFn is misleading. What I meant is a CombineFn
with an accumulator type that's much heavier weight than the ones currently
implemented, since it stores a set of distinct elements rather than just
the combined result itself. I think your re-wording is basically describing
what I'm proposing. But to be more explicit, all of the aggregation node's
CombineFns (including these hypothetical de-duplicating ones) are merged
into a single CombineFn and used in a Combine.groupedValues transform,
which operates on a PCollection<KV<K, Iterable<V>>. So the CombineFn would
be applied directly to the Iterable<V>, there's no combining state spec
that I'm aware of, but maybe that's what is happening internally, I'm not
sure.

> But if you go the two shuffle route, you don't have to separate the
aggregations and re-join them. You just have to incur the cost of the GBK +
DISTINCT for all columns, and just drop the secondary key for the second
shuffle, no?
I think this is correct. One gotcha could be if there are n>1 DISTINCT
aggregations on different expressions. Then I don't think two shuffles
would suffice, but I could be missing something.

Rui's point about FLOAT/DOUBLE columns is interesting as well. We couldn't
support distinct aggregations on floating point columns with the
two-shuffle approach, but we could with the CombineFn approach. I'm not
sure if that's a good thing or not, it seems like an anti-pattern to do a
distinct aggregation on floating point numbers but I suppose the spec
allows it.

Brian


On Fri, May 3, 2019 at 10:52 AM Rui Wang <ruw...@google.com> wrote:

> To clarify what I said "So two shuffle approach will lead to two different
> implementation for tables with and without FLOAT/DOUBLE column.":
>
> Basically I wanted to say that two shuffles approach will be an
> implementation for some cases, and it will co-exist with CombineFn
> approach. In the feature, when we start cost based optimization in
> BeamSQL,  CBO is supposed to compare different plans.
>
> -Rui
>
> On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote:
>
>>
>>> As to the distinct aggregations: At the least, these queries should be
>>> rejected, not evaluated incorrectly.
>>>
>>
>> Yes. The least is not to support it, and throws clear message to say no.
>> (current implementation ignores DISTINCT and executes all aggregations as
>> ALL).
>>
>>
>>> The term "stateful CombineFn" is not one I would use, as the nature of
>>> state is linearity and the nature of CombineFn is parallelism. So I don't
>>> totally understand this proposal. If I replace stateful CombineFn with
>>> stateful DoFn with one combining state per column, then I think I
>>> understand. FWIW on a runner with scalable SetState or MapState it will not
>>> be any risk at all.
>>>
>>> I see. "Stateful" is indeed misleading. In this thread, it was all about
>> using simple CombineFn to achieve DISTINCT aggregation with massive
>> parallelism.
>>
>> But if you go the two shuffle route, you don't have to separate the
>>> aggregations and re-join them. You just have to incur the cost of the GBK +
>>> DISTINCT for all columns, and just drop the secondary key for the second
>>> shuffle, no?
>>>
>>> Two shuffle approach cannot be the unified approach because it requires
>> to build a key of group_by_key + table_row to deduplicate, but table_row
>> might contain floating point numbers, which cannot be used as key in GBK.
>> So two shuffle approach will lead to two different implementation for
>> tables with and without FLOAT/DOUBLE column.
>>
>> CombineFn is the unified approach for distinct and non distinct
>> aggregation: each aggregation call will be a CombineFn.
>>
>>
>> -Rui
>>
>>
>>
>>> Kenn
>>>
>>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote:
>>>>
>>>>> Brian's first proposal is challenging also partially because in
>>>>> BeamSQL there is no good practice to deal with complex SQL plans. Ideally
>>>>> we need enough rules and SQL plan node in Beam to construct
>>>>> easy-to-transform plans for different cases. I had a similar situation
>>>>> before when I needed to separate logical plans of  "JOIN ON a OR b"and
>>>>> "JOIN ON a AND b", which was because their implementation are so
>>>>> different to fit into the same JoinRelNode. It seems in similar situation
>>>>> when both distinct aggregations and non-distinct aggregations are mixed,
>>>>> one single AggregationRelNode is hard to encapsulate complex logical.
>>>>>
>>>>> We will need a detailed plan to re-think about RelNodes and Rules in
>>>>> BeamSQL, which is out of scope for supporting DISTINCT.
>>>>>
>>>>> I would favor of second proposal because BeamSQL uses Beam schema and
>>>>> row. Schema itself uses Java primitives for most of its types (int, long,
>>>>> float, etc.). It limits the size of each element. Considering per key and
>>>>> per window combine, there is a good chance that stateful combine works for
>>>>> some (if not most) cases.
>>>>>
>>>>> Could we use @Experimental to tag stateful combine for supporting
>>>>> DISTINCT in aggregation so that we could have a chance to test it by 
>>>>> users?
>>>>>
>>>>>
>>>>> -Rui
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Ahmet -
>>>>>> I think it would only require observing each key's partition of the
>>>>>> input independently, and the size of the state would only be proportional
>>>>>> to the number of distinct elements, not the entire input. Note the 
>>>>>> pipeline
>>>>>> would be a GBK with a key based on the GROUP BY, followed by a
>>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn.
>>>>>>
>>>>>
>>>> Got it. Distinct elements could be proportional to the entire input,
>>>> however if this is a reasonable requirement from a product perspective that
>>>> is fine. Rui's suggestion of using experimental tag is also a good idea. I
>>>> supposed that will give us the ability to change the implementation if it
>>>> becomes necessary.
>>>>
>>>>
>>>>>
>>>>>> Luke -
>>>>>> Here's A little background on why I think (1) is harder (It may also
>>>>>> just be that it looks daunting to me as someone who's not that familiar
>>>>>> with the code).
>>>>>>
>>>>>> An aggregation node can have multiple aggregations. So, for example,
>>>>>> the query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT z) FROM ...`
>>>>>> would yield a logical plan that has a single aggregation node with three
>>>>>> different aggregations. We then take that node and build up a CombineFn
>>>>>> that is a composite of all of the aggregations we need to make a 
>>>>>> combining
>>>>>> PTransform [1]. To implement (1) we would need to distinguish between all
>>>>>> the DISTINCT and non-DISTINCT aggregations, and come up with a way to 
>>>>>> unify
>>>>>> the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT pipeline.
>>>>>>
>>>>>> That's certainly not unsolvable, but approach (2) is much simpler -
>>>>>> it just requires implementing some variations on the CombineFn's that
>>>>>> already exist [2] and re-using the existing logic for converting an
>>>>>> aggregation node to a combining PTransform.
>>>>>>
>>>>>>
>>>>>> Hopefully that makes sense, let me know if I need to clarify further
>>>>>> :)
>>>>>> Brian
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178
>>>>>> [2]
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>
>>>>>>
>>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Can you also go into more detail why you think 1) is more
>>>>>>> challenging to implement?
>>>>>>>
>>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> From my limited understanding, would not the stateful combinefn
>>>>>>>> option require observing the whole input before being able combine and 
>>>>>>>> the
>>>>>>>> risk of blowing memory is actually very high except for trivial inputs?
>>>>>>>>
>>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations. These
>>>>>>>>> are queries like:
>>>>>>>>>
>>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>>>>>>>
>>>>>>>>> These are represented in Calcite's logical plan with a distinct
>>>>>>>>> flag on aggregation calls, but we ignore the flag when converting to a
>>>>>>>>> pipeline. I see two different ways that we could support this:
>>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the
>>>>>>>>> <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr followed 
>>>>>>>>> by a
>>>>>>>>> second GBK on just <GROUP BY key> to perform the aggregation.
>>>>>>>>> 2. Stateful CombineFn - We could implement a version of the
>>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state that 
>>>>>>>>> tracks
>>>>>>>>> previously seen elements and uses it de-deupe new elements.
>>>>>>>>>
>>>>>>>>> Of course, something like (1) is much more scalable, but it is
>>>>>>>>> also much more challenging to implement. While (2) is trivial to 
>>>>>>>>> implement,
>>>>>>>>> but runs the risk of blowing up a worker's memory usage.
>>>>>>>>>
>>>>>>>>> Personally, I think it could be worthwhile to provide support for
>>>>>>>>> DISTINCT quickly with approach (2), and implement (1) as an 
>>>>>>>>> optimization
>>>>>>>>> later. This combiner's state would be partitioned by key and by 
>>>>>>>>> window, so
>>>>>>>>> I think we would be pretty safe from OOM'ing a worker except in some
>>>>>>>>> extreme cases (long windows, hot keys, very large batch pipelines, 
>>>>>>>>> ...).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> But I understand this could be controversial, so I wanted to open
>>>>>>>>> it up for discussion first: Would it be worthwhile to provide support 
>>>>>>>>> for
>>>>>>>>> DISTINCT aggregations quickly with approach #2, or is it better to 
>>>>>>>>> just
>>>>>>>>> reject these queries until we can implement them safely with approach 
>>>>>>>>> #1?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Brian
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>>>
>>>>>>>>

Reply via email to