All good points. My version of the two shuffle approach does not work at
all.

On Fri, May 3, 2019 at 11:38 AM Brian Hulette <bhule...@google.com> wrote:

> Rui's point about FLOAT/DOUBLE columns is interesting as well. We couldn't
> support distinct aggregations on floating point columns with the
> two-shuffle approach, but we could with the CombineFn approach. I'm not
> sure if that's a good thing or not, it seems like an anti-pattern to do a
> distinct aggregation on floating point numbers but I suppose the spec
> allows it.
>

I can't find the Jira, but grouping on doubles has been discussed at some
length before. Many DBMSs do not provide this, so it is not generally
expected by SQL users. That is good, because mathematically it is
questionable - floating point is usually used as a stand-in for real
numbers, where computing equality is not generally possible. So any code
that actually depends on equality of floating points is likely susceptible
to rounding errors, other quirks of floating point, and also is probably
misguided because the underlying thing that floats are approximating
already cannot be checked for equality.

Kenn


>
> Brian
>
>
> On Fri, May 3, 2019 at 10:52 AM Rui Wang <ruw...@google.com> wrote:
>
>> To clarify what I said "So two shuffle approach will lead to two
>> different implementation for tables with and without FLOAT/DOUBLE column.":
>>
>> Basically I wanted to say that two shuffles approach will be an
>> implementation for some cases, and it will co-exist with CombineFn
>> approach. In the feature, when we start cost based optimization in
>> BeamSQL,  CBO is supposed to compare different plans.
>>
>> -Rui
>>
>> On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote:
>>
>>>
>>>> As to the distinct aggregations: At the least, these queries should be
>>>> rejected, not evaluated incorrectly.
>>>>
>>>
>>> Yes. The least is not to support it, and throws clear message to say no.
>>> (current implementation ignores DISTINCT and executes all aggregations as
>>> ALL).
>>>
>>>
>>>> The term "stateful CombineFn" is not one I would use, as the nature of
>>>> state is linearity and the nature of CombineFn is parallelism. So I don't
>>>> totally understand this proposal. If I replace stateful CombineFn with
>>>> stateful DoFn with one combining state per column, then I think I
>>>> understand. FWIW on a runner with scalable SetState or MapState it will not
>>>> be any risk at all.
>>>>
>>>> I see. "Stateful" is indeed misleading. In this thread, it was all
>>> about using simple CombineFn to achieve DISTINCT aggregation with massive
>>> parallelism.
>>>
>>> But if you go the two shuffle route, you don't have to separate the
>>>> aggregations and re-join them. You just have to incur the cost of the GBK +
>>>> DISTINCT for all columns, and just drop the secondary key for the second
>>>> shuffle, no?
>>>>
>>>> Two shuffle approach cannot be the unified approach because it requires
>>> to build a key of group_by_key + table_row to deduplicate, but table_row
>>> might contain floating point numbers, which cannot be used as key in GBK.
>>> So two shuffle approach will lead to two different implementation for
>>> tables with and without FLOAT/DOUBLE column.
>>>
>>> CombineFn is the unified approach for distinct and non distinct
>>> aggregation: each aggregation call will be a CombineFn.
>>>
>>>
>>> -Rui
>>>
>>>
>>>
>>>> Kenn
>>>>
>>>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote:
>>>>>
>>>>>> Brian's first proposal is challenging also partially because in
>>>>>> BeamSQL there is no good practice to deal with complex SQL plans. Ideally
>>>>>> we need enough rules and SQL plan node in Beam to construct
>>>>>> easy-to-transform plans for different cases. I had a similar situation
>>>>>> before when I needed to separate logical plans of  "JOIN ON a OR b"and
>>>>>> "JOIN ON a AND b", which was because their implementation are so
>>>>>> different to fit into the same JoinRelNode. It seems in similar situation
>>>>>> when both distinct aggregations and non-distinct aggregations are mixed,
>>>>>> one single AggregationRelNode is hard to encapsulate complex logical.
>>>>>>
>>>>>> We will need a detailed plan to re-think about RelNodes and Rules in
>>>>>> BeamSQL, which is out of scope for supporting DISTINCT.
>>>>>>
>>>>>> I would favor of second proposal because BeamSQL uses Beam schema and
>>>>>> row. Schema itself uses Java primitives for most of its types (int, long,
>>>>>> float, etc.). It limits the size of each element. Considering per key and
>>>>>> per window combine, there is a good chance that stateful combine works 
>>>>>> for
>>>>>> some (if not most) cases.
>>>>>>
>>>>>> Could we use @Experimental to tag stateful combine for supporting
>>>>>> DISTINCT in aggregation so that we could have a chance to test it by 
>>>>>> users?
>>>>>>
>>>>>>
>>>>>> -Rui
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ahmet -
>>>>>>> I think it would only require observing each key's partition of the
>>>>>>> input independently, and the size of the state would only be 
>>>>>>> proportional
>>>>>>> to the number of distinct elements, not the entire input. Note the 
>>>>>>> pipeline
>>>>>>> would be a GBK with a key based on the GROUP BY, followed by a
>>>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn.
>>>>>>>
>>>>>>
>>>>> Got it. Distinct elements could be proportional to the entire input,
>>>>> however if this is a reasonable requirement from a product perspective 
>>>>> that
>>>>> is fine. Rui's suggestion of using experimental tag is also a good idea. I
>>>>> supposed that will give us the ability to change the implementation if it
>>>>> becomes necessary.
>>>>>
>>>>>
>>>>>>
>>>>>>> Luke -
>>>>>>> Here's A little background on why I think (1) is harder (It may also
>>>>>>> just be that it looks daunting to me as someone who's not that familiar
>>>>>>> with the code).
>>>>>>>
>>>>>>> An aggregation node can have multiple aggregations. So, for example,
>>>>>>> the query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT z) FROM 
>>>>>>> ...`
>>>>>>> would yield a logical plan that has a single aggregation node with three
>>>>>>> different aggregations. We then take that node and build up a CombineFn
>>>>>>> that is a composite of all of the aggregations we need to make a 
>>>>>>> combining
>>>>>>> PTransform [1]. To implement (1) we would need to distinguish between 
>>>>>>> all
>>>>>>> the DISTINCT and non-DISTINCT aggregations, and come up with a way to 
>>>>>>> unify
>>>>>>> the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT pipeline.
>>>>>>>
>>>>>>> That's certainly not unsolvable, but approach (2) is much simpler -
>>>>>>> it just requires implementing some variations on the CombineFn's that
>>>>>>> already exist [2] and re-using the existing logic for converting an
>>>>>>> aggregation node to a combining PTransform.
>>>>>>>
>>>>>>>
>>>>>>> Hopefully that makes sense, let me know if I need to clarify further
>>>>>>> :)
>>>>>>> Brian
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178
>>>>>>> [2]
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Can you also go into more detail why you think 1) is more
>>>>>>>> challenging to implement?
>>>>>>>>
>>>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> From my limited understanding, would not the stateful combinefn
>>>>>>>>> option require observing the whole input before being able combine 
>>>>>>>>> and the
>>>>>>>>> risk of blowing memory is actually very high except for trivial 
>>>>>>>>> inputs?
>>>>>>>>>
>>>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations. These
>>>>>>>>>> are queries like:
>>>>>>>>>>
>>>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>>>>>>>>
>>>>>>>>>> These are represented in Calcite's logical plan with a distinct
>>>>>>>>>> flag on aggregation calls, but we ignore the flag when converting to 
>>>>>>>>>> a
>>>>>>>>>> pipeline. I see two different ways that we could support this:
>>>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the
>>>>>>>>>> <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr followed 
>>>>>>>>>> by a
>>>>>>>>>> second GBK on just <GROUP BY key> to perform the aggregation.
>>>>>>>>>> 2. Stateful CombineFn - We could implement a version of the
>>>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state that 
>>>>>>>>>> tracks
>>>>>>>>>> previously seen elements and uses it de-deupe new elements.
>>>>>>>>>>
>>>>>>>>>> Of course, something like (1) is much more scalable, but it is
>>>>>>>>>> also much more challenging to implement. While (2) is trivial to 
>>>>>>>>>> implement,
>>>>>>>>>> but runs the risk of blowing up a worker's memory usage.
>>>>>>>>>>
>>>>>>>>>> Personally, I think it could be worthwhile to provide support for
>>>>>>>>>> DISTINCT quickly with approach (2), and implement (1) as an 
>>>>>>>>>> optimization
>>>>>>>>>> later. This combiner's state would be partitioned by key and by 
>>>>>>>>>> window, so
>>>>>>>>>> I think we would be pretty safe from OOM'ing a worker except in some
>>>>>>>>>> extreme cases (long windows, hot keys, very large batch pipelines, 
>>>>>>>>>> ...).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> But I understand this could be controversial, so I wanted to open
>>>>>>>>>> it up for discussion first: Would it be worthwhile to provide 
>>>>>>>>>> support for
>>>>>>>>>> DISTINCT aggregations quickly with approach #2, or is it better to 
>>>>>>>>>> just
>>>>>>>>>> reject these queries until we can implement them safely with 
>>>>>>>>>> approach #1?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Brian
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>>>>
>>>>>>>>>

Reply via email to