To clarify what I said "So two shuffle approach will lead to two different implementation for tables with and without FLOAT/DOUBLE column.":
Basically I wanted to say that two shuffles approach will be an implementation for some cases, and it will co-exist with CombineFn approach. In the feature, when we start cost based optimization in BeamSQL, CBO is supposed to compare different plans. -Rui On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote: > >> As to the distinct aggregations: At the least, these queries should be >> rejected, not evaluated incorrectly. >> > > Yes. The least is not to support it, and throws clear message to say no. > (current implementation ignores DISTINCT and executes all aggregations as > ALL). > > >> The term "stateful CombineFn" is not one I would use, as the nature of >> state is linearity and the nature of CombineFn is parallelism. So I don't >> totally understand this proposal. If I replace stateful CombineFn with >> stateful DoFn with one combining state per column, then I think I >> understand. FWIW on a runner with scalable SetState or MapState it will not >> be any risk at all. >> >> I see. "Stateful" is indeed misleading. In this thread, it was all about > using simple CombineFn to achieve DISTINCT aggregation with massive > parallelism. > > But if you go the two shuffle route, you don't have to separate the >> aggregations and re-join them. You just have to incur the cost of the GBK + >> DISTINCT for all columns, and just drop the secondary key for the second >> shuffle, no? >> >> Two shuffle approach cannot be the unified approach because it requires > to build a key of group_by_key + table_row to deduplicate, but table_row > might contain floating point numbers, which cannot be used as key in GBK. > So two shuffle approach will lead to two different implementation for > tables with and without FLOAT/DOUBLE column. > > CombineFn is the unified approach for distinct and non distinct > aggregation: each aggregation call will be a CombineFn. > > > -Rui > > > >> Kenn >> >> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> wrote: >> >>> >>> >>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote: >>> >>>> Brian's first proposal is challenging also partially because in BeamSQL >>>> there is no good practice to deal with complex SQL plans. Ideally we need >>>> enough rules and SQL plan node in Beam to construct easy-to-transform plans >>>> for different cases. I had a similar situation before when I needed to >>>> separate logical plans of "JOIN ON a OR b"and "JOIN ON a AND b", which was >>>> because their implementation are so different to fit into the same >>>> JoinRelNode. It seems in similar situation when both distinct aggregations >>>> and non-distinct aggregations are mixed, one single AggregationRelNode is >>>> hard to encapsulate complex logical. >>>> >>>> We will need a detailed plan to re-think about RelNodes and Rules in >>>> BeamSQL, which is out of scope for supporting DISTINCT. >>>> >>>> I would favor of second proposal because BeamSQL uses Beam schema and >>>> row. Schema itself uses Java primitives for most of its types (int, long, >>>> float, etc.). It limits the size of each element. Considering per key and >>>> per window combine, there is a good chance that stateful combine works for >>>> some (if not most) cases. >>>> >>>> Could we use @Experimental to tag stateful combine for supporting >>>> DISTINCT in aggregation so that we could have a chance to test it by users? >>>> >>>> >>>> -Rui >>>> >>>> >>>> >>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> Ahmet - >>>>> I think it would only require observing each key's partition of the >>>>> input independently, and the size of the state would only be proportional >>>>> to the number of distinct elements, not the entire input. Note the >>>>> pipeline >>>>> would be a GBK with a key based on the GROUP BY, followed by a >>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn. >>>>> >>>> >>> Got it. Distinct elements could be proportional to the entire input, >>> however if this is a reasonable requirement from a product perspective that >>> is fine. Rui's suggestion of using experimental tag is also a good idea. I >>> supposed that will give us the ability to change the implementation if it >>> becomes necessary. >>> >>> >>>> >>>>> Luke - >>>>> Here's A little background on why I think (1) is harder (It may also >>>>> just be that it looks daunting to me as someone who's not that familiar >>>>> with the code). >>>>> >>>>> An aggregation node can have multiple aggregations. So, for example, >>>>> the query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT z) FROM ...` >>>>> would yield a logical plan that has a single aggregation node with three >>>>> different aggregations. We then take that node and build up a CombineFn >>>>> that is a composite of all of the aggregations we need to make a combining >>>>> PTransform [1]. To implement (1) we would need to distinguish between all >>>>> the DISTINCT and non-DISTINCT aggregations, and come up with a way to >>>>> unify >>>>> the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT pipeline. >>>>> >>>>> That's certainly not unsolvable, but approach (2) is much simpler - it >>>>> just requires implementing some variations on the CombineFn's that already >>>>> exist [2] and re-using the existing logic for converting an aggregation >>>>> node to a combining PTransform. >>>>> >>>>> >>>>> Hopefully that makes sense, let me know if I need to clarify further :) >>>>> Brian >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178 >>>>> [2] >>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >>>>> >>>>> >>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> Can you also go into more detail why you think 1) is more challenging >>>>>> to implement? >>>>>> >>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> wrote: >>>>>> >>>>>>> From my limited understanding, would not the stateful combinefn >>>>>>> option require observing the whole input before being able combine and >>>>>>> the >>>>>>> risk of blowing memory is actually very high except for trivial inputs? >>>>>>> >>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi everyone, >>>>>>>> Currently BeamSQL does not support DISTINCT aggregations. These are >>>>>>>> queries like: >>>>>>>> >>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k >>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2 >>>>>>>> >>>>>>>> These are represented in Calcite's logical plan with a distinct >>>>>>>> flag on aggregation calls, but we ignore the flag when converting to a >>>>>>>> pipeline. I see two different ways that we could support this: >>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the >>>>>>>> <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr followed by >>>>>>>> a >>>>>>>> second GBK on just <GROUP BY key> to perform the aggregation. >>>>>>>> 2. Stateful CombineFn - We could implement a version of the >>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state that >>>>>>>> tracks >>>>>>>> previously seen elements and uses it de-deupe new elements. >>>>>>>> >>>>>>>> Of course, something like (1) is much more scalable, but it is also >>>>>>>> much more challenging to implement. While (2) is trivial to implement, >>>>>>>> but >>>>>>>> runs the risk of blowing up a worker's memory usage. >>>>>>>> >>>>>>>> Personally, I think it could be worthwhile to provide support for >>>>>>>> DISTINCT quickly with approach (2), and implement (1) as an >>>>>>>> optimization >>>>>>>> later. This combiner's state would be partitioned by key and by >>>>>>>> window, so >>>>>>>> I think we would be pretty safe from OOM'ing a worker except in some >>>>>>>> extreme cases (long windows, hot keys, very large batch pipelines, >>>>>>>> ...). >>>>>>>> >>>>>>>> >>>>>>>> But I understand this could be controversial, so I wanted to open >>>>>>>> it up for discussion first: Would it be worthwhile to provide support >>>>>>>> for >>>>>>>> DISTINCT aggregations quickly with approach #2, or is it better to just >>>>>>>> reject these queries until we can implement them safely with approach >>>>>>>> #1? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Brian >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >>>>>>>> >>>>>>>