Hi Kurt,

Thanks for your comments.

It seems we come to a consensus that we should alleviate the performance
degraded by data skew with local aggregation. In this FLIP, our key
solution is to introduce local keyed partition to achieve this goal.

I also agree that we can benefit a lot from the usage of AggregateFunction.
In combination with localKeyBy, We can easily use it to achieve local
aggregation:

   - input.localKeyBy(0).aggregate()
   - input.localKeyBy(0).window().aggregate()


I think the only problem here is the choices between

   - (1) Introducing a new primitive called localKeyBy and implement local
   aggregation with existing operators, or
   - (2) Introducing an operator called localAggregation which is composed
   of a key selector, a window-like operator, and an aggregate function.


There may exist some optimization opportunities by providing a composited
interface for local aggregation. But at the same time, in my opinion, we
lose flexibility (Or we need certain efforts to achieve the same
flexibility).

As said in the previous mails, we have many use cases where the aggregation
is very complicated and cannot be performed with AggregateFunction. For
example, users may perform windowed aggregations according to time, data
values, or even external storage. Typically, they now use
KeyedProcessFunction or customized triggers to implement these
aggregations. It's not easy to address data skew in such cases with a
composited interface for local aggregation.

Given that Data Stream API is exactly targeted at these cases where the
application logic is very complicated and optimization does not matter, I
think it's a better choice to provide a relatively low-level and canonical
interface.

The composited interface, on the other side, may be a good choice in
declarative interfaces, including SQL and Table API, as it allows more
optimization opportunities.

Best,
Vino


Kurt Young <ykt...@gmail.com> 于2019年6月20日周四 上午10:15写道:

> Hi all,
>
> As vino said in previous emails, I think we should first discuss and decide
> what kind of use cases this FLIP want to
> resolve, and what the API should look like. From my side, I think this is
> probably the root cause of current divergence.
>
> My understand is (from the FLIP title and motivation section of the
> document), we want to have a proper support of
> local aggregation, or pre aggregation. This is not a very new idea, most
> SQL engine already did this improvement. And
> the core concept about this is, there should be an AggregateFunction, no
> matter it's a Flink runtime's AggregateFunction or
> SQL's UserDefinedAggregateFunction. Both aggregation have concept of
> intermediate data type, sometimes we call it ACC.
> I quickly went through the POC piotr did before [1], it also directly uses
> AggregateFunction.
>
> But the thing is, after reading the design of this FLIP, I can't help
> myself feeling that this FLIP is not targeting to have a proper
> local aggregation support. It actually want to introduce another concept:
> LocalKeyBy, and how to split and merge local key groups,
> and how to properly support state on local key. Local aggregation just
> happened to be one possible use case of LocalKeyBy.
> But it lacks supporting the essential concept of local aggregation, which
> is intermediate data type. Without this, I really don't thing
> it is a good fit of local aggregation.
>
> Here I want to make sure of the scope or the goal about this FLIP, do we
> want to have a proper local aggregation engine, or we
> just want to introduce a new concept called LocalKeyBy?
>
> [1]: https://github.com/apache/flink/pull/4626
>
> Best,
> Kurt
>
>
> On Wed, Jun 19, 2019 at 5:13 PM vino yang <yanghua1...@gmail.com> wrote:
>
> > Hi Hequn,
> >
> > Thanks for your comments!
> >
> > I agree that allowing local aggregation reusing window API and refining
> > window operator to make it match both requirements (come from our and
> Kurt)
> > is a good decision!
> >
> > Concerning your questions:
> >
> > 1. The result of input.localKeyBy(0).sum(1).keyBy(0).sum(1) may be
> > meaningless.
> >
> > Yes, it does not make sense in most cases. However, I also want to note
> > users should know the right semantics of localKeyBy and use it correctly.
> > Because this issue also exists for the global keyBy, consider this
> example:
> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is also meaningless.
> >
> > 2. About the semantics of
> > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)).
> >
> > Good catch! I agree with you that it's not good to enable all
> > functionalities for localKeyBy from KeyedStream.
> > Currently, We do not support some APIs such as
> > connect/join/intervalJoin/coGroup. This is due to that we force the
> > operators on LocalKeyedStreams chained with the inputs.
> >
> > Best,
> > Vino
> >
> >
> > Hequn Cheng <chenghe...@gmail.com> 于2019年6月19日周三 下午3:42写道:
> >
> > > Hi,
> > >
> > > Thanks a lot for your great discussion and great to see that some
> > agreement
> > > has been reached on the "local aggregate engine"!
> > >
> > > ===> Considering the abstract engine,
> > > I'm thinking is it valuable for us to extend the current window to meet
> > > both demands raised by Kurt and Vino? There are some benefits we can
> get:
> > >
> > > 1. The interfaces of the window are complete and clear. With windows,
> we
> > > can define a lot of ways to split the data and perform different
> > > computations.
> > > 2. We can also leverage the window to do miniBatch for the global
> > > aggregation, i.e, we can use the window to bundle data belong to the
> same
> > > key, for every bundle we only need to read and write once state. This
> can
> > > greatly reduce state IO and improve performance.
> > > 3. A lot of other use cases can also benefit from the window base on
> > memory
> > > or stateless.
> > >
> > > ===> As for the API,
> > > I think it is good to make our API more flexible. However, we may need
> to
> > > make our API meaningful.
> > >
> > > Take my previous reply as an example,
> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The result may be
> > meaningless.
> > > Another example I find is the intervalJoin, e.g.,
> > > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In this case,
> it
> > > will bring problems if input1 and input2 share different parallelism.
> We
> > > don't know which input should the join chained with? Even if they share
> > the
> > > same parallelism, it's hard to tell what the join is doing. There are
> > maybe
> > > some other problems.
> > >
> > > From this point of view, it's at least not good to enable all
> > > functionalities for localKeyBy from KeyedStream?
> > >
> > > Great to also have your opinions.
> > >
> > > Best, Hequn
> > >
> > >
> > >
> > >
> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang <yanghua1...@gmail.com>
> > wrote:
> > >
> > > > Hi Kurt and Piotrek,
> > > >
> > > > Thanks for your comments.
> > > >
> > > > I agree that we can provide a better abstraction to be compatible
> with
> > > two
> > > > different implementations.
> > > >
> > > > First of all, I think we should consider what kind of scenarios we
> need
> > > to
> > > > support in *API* level?
> > > >
> > > > We have some use cases which need to a customized aggregation through
> > > > KeyedProcessFunction, (in the usage of our localKeyBy.window they can
> > use
> > > > ProcessWindowFunction).
> > > >
> > > > Shall we support these flexible use scenarios?
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午8:37写道:
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > Thanks for joining the discussion. Make “local aggregation"
> abstract
> > > > enough
> > > > > sounds good to me, we could
> > > > > implement and verify alternative solutions for use cases of local
> > > > > aggregation. Maybe we will find both solutions
> > > > > are appropriate for different scenarios.
> > > > >
> > > > > Starting from a simple one sounds a practical way to go. What do
> you
> > > > think,
> > > > > vino?
> > > > >
> > > > > Best,
> > > > > Kurt
> > > > >
> > > > >
> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski <
> pi...@ververica.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Kurt and Vino,
> > > > > >
> > > > > > I think there is a trade of hat we need to consider for the local
> > > > > > aggregation.
> > > > > >
> > > > > > Generally speaking I would agree with Kurt about local
> > > aggregation/pre
> > > > > > aggregation not using Flink's state flush the operator on a
> > > checkpoint.
> > > > > > Network IO is usually cheaper compared to Disks IO. This has
> > however
> > > > > couple
> > > > > > of issues:
> > > > > > 1. It can explode number of in-flight records during checkpoint
> > > barrier
> > > > > > alignment, making checkpointing slower and decrease the actual
> > > > > throughput.
> > > > > > 2. This trades Disks IO on the local aggregation machine with CPU
> > > (and
> > > > > > Disks IO in case of RocksDB) on the final aggregation machine.
> This
> > > is
> > > > > > fine, as long there is no huge data skew. If there is only a
> > handful
> > > > (or
> > > > > > even one single) hot keys, it might be better to keep the
> > persistent
> > > > > state
> > > > > > in the LocalAggregationOperator to offload final aggregation as
> > much
> > > as
> > > > > > possible.
> > > > > > 3. With frequent checkpointing local aggregation effectiveness
> > would
> > > > > > degrade.
> > > > > >
> > > > > > I assume Kurt is correct, that in your use cases stateless
> operator
> > > was
> > > > > > behaving better, but I could easily see other use cases as well.
> > For
> > > > > > example someone is already using RocksDB, and his job is
> > bottlenecked
> > > > on
> > > > > a
> > > > > > single window operator instance because of the data skew. In that
> > > case
> > > > > > stateful local aggregation would be probably a better choice.
> > > > > >
> > > > > > Because of that, I think we should eventually provide both
> versions
> > > and
> > > > > in
> > > > > > the initial version we should at least make the “local
> aggregation
> > > > > engine”
> > > > > > abstract enough, that one could easily provide different
> > > implementation
> > > > > > strategy.
> > > > > >
> > > > > > Piotrek
> > > > > >
> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young <ykt...@gmail.com> wrote:
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > For the trigger, it depends on what operator we want to use
> under
> > > the
> > > > > > API.
> > > > > > > If we choose to use window operator,
> > > > > > > we should also use window's trigger. However, I also think
> reuse
> > > > window
> > > > > > > operator for this scenario may not be
> > > > > > > the best choice. The reasons are the following:
> > > > > > >
> > > > > > > 1. As a lot of people already pointed out, window relies
> heavily
> > on
> > > > > state
> > > > > > > and it will definitely effect performance. You can
> > > > > > > argue that one can use heap based statebackend, but this will
> > > > introduce
> > > > > > > extra coupling. Especially we have a chance to
> > > > > > > design a pure stateless operator.
> > > > > > > 2. The window operator is *the most* complicated operator Flink
> > > > > currently
> > > > > > > have. Maybe we only need to pick a subset of
> > > > > > > window operator to achieve the goal, but once the user wants to
> > > have
> > > > a
> > > > > > deep
> > > > > > > look at the localAggregation operator, it's still
> > > > > > > hard to find out what's going on under the window operator. For
> > > > > > simplicity,
> > > > > > > I would also recommend we introduce a dedicated
> > > > > > > lightweight operator, which also much easier for a user to
> learn
> > > and
> > > > > use.
> > > > > > >
> > > > > > > For your question about increasing the burden in
> > > > > > > `StreamOperator::prepareSnapshotPreBarrier()`, the only thing
> > this
> > > > > > function
> > > > > > > need
> > > > > > > to do is output all the partial results, it's purely cpu
> > workload,
> > > > not
> > > > > > > introducing any IO. I want to point out that even if we have
> this
> > > > > > > cost, we reduced another barrier align cost of the operator,
> > which
> > > is
> > > > > the
> > > > > > > sync flush stage of the state, if you introduced state. This
> > > > > > > flush actually will introduce disk IO, and I think it's worthy
> to
> > > > > > exchange
> > > > > > > this cost with purely CPU workload. And we do have some
> > > > > > > observations about these two behavior (as i said before, we
> > > actually
> > > > > > > implemented both solutions), the stateless one actually
> performs
> > > > > > > better both in performance and barrier align time.
> > > > > > >
> > > > > > > Best,
> > > > > > > Kurt
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang <
> yanghua1...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Hi Kurt,
> > > > > > >>
> > > > > > >> Thanks for your example. Now, it looks more clearly for me.
> > > > > > >>
> > > > > > >> From your example code snippet, I saw the localAggregate API
> has
> > > > three
> > > > > > >> parameters:
> > > > > > >>
> > > > > > >>   1. key field
> > > > > > >>   2. PartitionAvg
> > > > > > >>   3. CountTrigger: Does this trigger comes from window
> package?
> > > > > > >>
> > > > > > >> I will compare our and your design from API and operator
> level:
> > > > > > >>
> > > > > > >> *From the API level:*
> > > > > > >>
> > > > > > >> As I replied to @dianfu in the old email thread,[1] the Window
> > API
> > > > can
> > > > > > >> provide the second and the third parameter right now.
> > > > > > >>
> > > > > > >> If you reuse specified interface or class, such as *Trigger*
> or
> > > > > > >> *CounterTrigger* provided by window package, but do not use
> > window
> > > > > API,
> > > > > > >> it's not reasonable.
> > > > > > >> And if you do not reuse these interface or class, you would
> need
> > > to
> > > > > > >> introduce more things however they are looked similar to the
> > > things
> > > > > > >> provided by window package.
> > > > > > >>
> > > > > > >> The window package has provided several types of the window
> and
> > > many
> > > > > > >> triggers and let users customize it. What's more, the user is
> > more
> > > > > > familiar
> > > > > > >> with Window API.
> > > > > > >>
> > > > > > >> This is the reason why we just provide localKeyBy API and
> reuse
> > > the
> > > > > > window
> > > > > > >> API. It reduces unnecessary components such as triggers and
> the
> > > > > > mechanism
> > > > > > >> of buffer (based on count num or time).
> > > > > > >> And it has a clear and easy to understand semantics.
> > > > > > >>
> > > > > > >> *From the operator level:*
> > > > > > >>
> > > > > > >> We reused window operator, so we can get all the benefits from
> > > state
> > > > > and
> > > > > > >> checkpoint.
> > > > > > >>
> > > > > > >> From your design, you named the operator under localAggregate
> > API
> > > > is a
> > > > > > >> *stateless* operator. IMO, it is still a state, it is just not
> > > Flink
> > > > > > >> managed state.
> > > > > > >> About the memory buffer (I think it's still not very clear, if
> > you
> > > > > have
> > > > > > >> time, can you give more detail information or answer my
> > > questions),
> > > > I
> > > > > > have
> > > > > > >> some questions:
> > > > > > >>
> > > > > > >>   - if it just a raw JVM heap memory buffer, how to support
> > fault
> > > > > > >>   tolerance, if the job is configured EXACTLY-ONCE semantic
> > > > guarantee?
> > > > > > >>   - if you thought the memory buffer(non-Flink state), has
> > better
> > > > > > >>   performance. In our design, users can also config HEAP state
> > > > backend
> > > > > > to
> > > > > > >>   provide the performance close to your mechanism.
> > > > > > >>   - `StreamOperator::prepareSnapshotPreBarrier()` related to
> the
> > > > > timing
> > > > > > of
> > > > > > >>   snapshot. IMO, the flush action should be a synchronized
> > action?
> > > > (if
> > > > > > >> not,
> > > > > > >>   please point out my mistake) I still think we should not
> > depend
> > > on
> > > > > the
> > > > > > >>   timing of checkpoint. Checkpoint related operations are
> > inherent
> > > > > > >>   performance sensitive, we should not increase its burden
> > > anymore.
> > > > > Our
> > > > > > >>   implementation based on the mechanism of Flink's checkpoint,
> > > which
> > > > > can
> > > > > > >>   benefit from the asnyc snapshot and incremental checkpoint.
> > IMO,
> > > > the
> > > > > > >>   performance is not a problem, and we also do not find the
> > > > > performance
> > > > > > >> issue
> > > > > > >>   in our production.
> > > > > > >>
> > > > > > >> [1]:
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Vino
> > > > > > >>
> > > > > > >> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午2:27写道:
> > > > > > >>
> > > > > > >>> Yeah, sorry for not expressing myself clearly. I will try to
> > > > provide
> > > > > > more
> > > > > > >>> details to make sure we are on the same page.
> > > > > > >>>
> > > > > > >>> For DataStream API, it shouldn't be optimized automatically.
> > You
> > > > have
> > > > > > to
> > > > > > >>> explicitly call API to do local aggregation
> > > > > > >>> as well as the trigger policy of the local aggregation. Take
> > > > average
> > > > > > for
> > > > > > >>> example, the user program may look like this (just a draft):
> > > > > > >>>
> > > > > > >>> assuming the input type is DataStream<Tupl2<String, Int>>
> > > > > > >>>
> > > > > > >>> ds.localAggregate(
> > > > > > >>>        0,                                       // The local
> > key,
> > > > > which
> > > > > > >> is
> > > > > > >>> the String from Tuple2
> > > > > > >>>        PartitionAvg(1),                 // The partial
> > > aggregation
> > > > > > >>> function, produces Tuple2<Long, Int>, indicating sum and
> count
> > > > > > >>>        CountTrigger.of(1000L)    // Trigger policy, note this
> > > > should
> > > > > be
> > > > > > >>> best effort, and also be composited with time based or memory
> > > size
> > > > > > based
> > > > > > >>> trigger
> > > > > > >>>    )                                           // The return
> > type
> > > > is
> > > > > > >> local
> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>>
> > > > > > >>>    .keyBy(0)                             // Further keyby it
> > with
> > > > > > >> required
> > > > > > >>> key
> > > > > > >>>    .aggregate(1)                      // This will merge all
> > the
> > > > > > partial
> > > > > > >>> results and get the final average.
> > > > > > >>>
> > > > > > >>> (This is only a draft, only trying to explain what it looks
> > > like. )
> > > > > > >>>
> > > > > > >>> The local aggregate operator can be stateless, we can keep a
> > > memory
> > > > > > >> buffer
> > > > > > >>> or other efficient data structure to improve the aggregate
> > > > > performance.
> > > > > > >>>
> > > > > > >>> Let me know if you have any other questions.
> > > > > > >>>
> > > > > > >>> Best,
> > > > > > >>> Kurt
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang <
> > yanghua1...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hi Kurt,
> > > > > > >>>>
> > > > > > >>>> Thanks for your reply.
> > > > > > >>>>
> > > > > > >>>> Actually, I am not against you to raise your design.
> > > > > > >>>>
> > > > > > >>>> From your description before, I just can imagine your
> > high-level
> > > > > > >>>> implementation is about SQL and the optimization is inner of
> > the
> > > > > API.
> > > > > > >> Is
> > > > > > >>> it
> > > > > > >>>> automatically? how to give the configuration option about
> > > trigger
> > > > > > >>>> pre-aggregation?
> > > > > > >>>>
> > > > > > >>>> Maybe after I get more information, it sounds more
> reasonable.
> > > > > > >>>>
> > > > > > >>>> IMO, first of all, it would be better to make your user
> > > interface
> > > > > > >>> concrete,
> > > > > > >>>> it's the basis of the discussion.
> > > > > > >>>>
> > > > > > >>>> For example, can you give an example code snippet to
> introduce
> > > how
> > > > > to
> > > > > > >>> help
> > > > > > >>>> users to process data skew caused by the jobs which built
> with
> > > > > > >> DataStream
> > > > > > >>>> API?
> > > > > > >>>>
> > > > > > >>>> If you give more details we can discuss further more. I
> think
> > if
> > > > one
> > > > > > >>> design
> > > > > > >>>> introduces an exact interface and another does not.
> > > > > > >>>>
> > > > > > >>>> The implementation has an obvious difference. For example,
> we
> > > > > > introduce
> > > > > > >>> an
> > > > > > >>>> exact API in DataStream named localKeyBy, about the
> > > > pre-aggregation
> > > > > we
> > > > > > >>> need
> > > > > > >>>> to define the trigger mechanism of local aggregation, so we
> > find
> > > > > > reused
> > > > > > >>>> window API and operator is a good choice. This is a
> reasoning
> > > link
> > > > > > from
> > > > > > >>>> design to implementation.
> > > > > > >>>>
> > > > > > >>>> What do you think?
> > > > > > >>>>
> > > > > > >>>> Best,
> > > > > > >>>> Vino
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午11:58写道:
> > > > > > >>>>
> > > > > > >>>>> Hi Vino,
> > > > > > >>>>>
> > > > > > >>>>> Now I feel that we may have different understandings about
> > what
> > > > > kind
> > > > > > >> of
> > > > > > >>>>> problems or improvements you want to
> > > > > > >>>>> resolve. Currently, most of the feedback are focusing on
> *how
> > > to
> > > > > do a
> > > > > > >>>>> proper local aggregation to improve performance
> > > > > > >>>>> and maybe solving the data skew issue*. And my gut feeling
> is
> > > > this
> > > > > is
> > > > > > >>>>> exactly what users want at the first place,
> > > > > > >>>>> especially those +1s. (Sorry to try to summarize here,
> please
> > > > > correct
> > > > > > >>> me
> > > > > > >>>> if
> > > > > > >>>>> i'm wrong).
> > > > > > >>>>>
> > > > > > >>>>> But I still think the design is somehow diverged from the
> > goal.
> > > > If
> > > > > we
> > > > > > >>>> want
> > > > > > >>>>> to have an efficient and powerful way to
> > > > > > >>>>> have local aggregation, supporting intermedia result type
> is
> > > > > > >> essential
> > > > > > >>>> IMO.
> > > > > > >>>>> Both runtime's `AggregateFunction` and
> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction` have a proper support
> of
> > > > > > >>>> intermediate
> > > > > > >>>>> result type and can do `merge` operation
> > > > > > >>>>> on them.
> > > > > > >>>>>
> > > > > > >>>>> Now, we have a lightweight alternatives which performs
> well,
> > > and
> > > > > > >> have a
> > > > > > >>>>> nice fit with the local aggregate requirements.
> > > > > > >>>>> Mostly importantly,  it's much less complex because it's
> > > > stateless.
> > > > > > >> And
> > > > > > >>>> it
> > > > > > >>>>> can also achieve the similar multiple-aggregation
> > > > > > >>>>> scenario.
> > > > > > >>>>>
> > > > > > >>>>> I still not convinced why we shouldn't consider it as a
> first
> > > > step.
> > > > > > >>>>>
> > > > > > >>>>> Best,
> > > > > > >>>>> Kurt
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino yang <
> > > > yanghua1...@gmail.com>
> > > > > > >>>> wrote:
> > > > > > >>>>>
> > > > > > >>>>>> Hi Kurt,
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks for your comments.
> > > > > > >>>>>>
> > > > > > >>>>>> It seems we both implemented local aggregation feature to
> > > > optimize
> > > > > > >>> the
> > > > > > >>>>>> issue of data skew.
> > > > > > >>>>>> However, IMHO, the API level of optimizing revenue is
> > > different.
> > > > > > >>>>>>
> > > > > > >>>>>> *Your optimization benefits from Flink SQL and it's not
> > user's
> > > > > > >>>> faces.(If
> > > > > > >>>>> I
> > > > > > >>>>>> understand it incorrectly, please correct this.)*
> > > > > > >>>>>> *Our implementation employs it as an optimization tool API
> > for
> > > > > > >>>>> DataStream,
> > > > > > >>>>>> it just like a local version of the keyBy API.*
> > > > > > >>>>>>
> > > > > > >>>>>> Based on this, I want to say support it as a DataStream
> API
> > > can
> > > > > > >>> provide
> > > > > > >>>>>> these advantages:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>   - The localKeyBy API has a clear semantic and it's
> > flexible
> > > > not
> > > > > > >>> only
> > > > > > >>>>> for
> > > > > > >>>>>>   processing data skew but also for implementing some user
> > > > cases,
> > > > > > >>> for
> > > > > > >>>>>>   example, if we want to calculate the multiple-level
> > > > aggregation,
> > > > > > >>> we
> > > > > > >>>>> can
> > > > > > >>>>>> do
> > > > > > >>>>>>   multiple-level aggregation in the local aggregation:
> > > > > > >>>>>>   input.localKeyBy("a").sum(1).localKeyBy("b").window();
> //
> > > here
> > > > > > >> "a"
> > > > > > >>>> is
> > > > > > >>>>> a
> > > > > > >>>>>>   sub-category, while "b" is a category, here we do not
> need
> > > to
> > > > > > >>>> shuffle
> > > > > > >>>>>> data
> > > > > > >>>>>>   in the network.
> > > > > > >>>>>>   - The users of DataStream API will benefit from this.
> > > > Actually,
> > > > > > >> we
> > > > > > >>>>> have
> > > > > > >>>>>>   a lot of scenes need to use DataStream API. Currently,
> > > > > > >> DataStream
> > > > > > >>>> API
> > > > > > >>>>> is
> > > > > > >>>>>>   the cornerstone of the physical plan of Flink SQL. With
> a
> > > > > > >>> localKeyBy
> > > > > > >>>>>> API,
> > > > > > >>>>>>   the optimization of SQL at least may use this optimized
> > API,
> > > > > > >> this
> > > > > > >>>> is a
> > > > > > >>>>>>   further topic.
> > > > > > >>>>>>   - Based on the window operator, our state would benefit
> > from
> > > > > > >> Flink
> > > > > > >>>>> State
> > > > > > >>>>>>   and checkpoint, we do not need to worry about OOM and
> job
> > > > > > >> failed.
> > > > > > >>>>>>
> > > > > > >>>>>> Now, about your questions:
> > > > > > >>>>>>
> > > > > > >>>>>> 1. About our design cannot change the data type and about
> > the
> > > > > > >>>>>> implementation of average:
> > > > > > >>>>>>
> > > > > > >>>>>> Just like my reply to Hequn, the localKeyBy is an API
> > provides
> > > > to
> > > > > > >> the
> > > > > > >>>>> users
> > > > > > >>>>>> who use DataStream API to build their jobs.
> > > > > > >>>>>> Users should know its semantics and the difference with
> > keyBy
> > > > API,
> > > > > > >> so
> > > > > > >>>> if
> > > > > > >>>>>> they want to the average aggregation, they should carry
> > local
> > > > sum
> > > > > > >>>> result
> > > > > > >>>>>> and local count result.
> > > > > > >>>>>> I admit that it will be convenient to use keyBy directly.
> > But
> > > we
> > > > > > >> need
> > > > > > >>>> to
> > > > > > >>>>>> pay a little price when we get some benefits. I think this
> > > price
> > > > > is
> > > > > > >>>>>> reasonable. Considering that the DataStream API itself is
> a
> > > > > > >> low-level
> > > > > > >>>> API
> > > > > > >>>>>> (at least for now).
> > > > > > >>>>>>
> > > > > > >>>>>> 2. About stateless operator and
> > > > > > >>>>>> `StreamOperator::prepareSnapshotPreBarrier()`:
> > > > > > >>>>>>
> > > > > > >>>>>> Actually, I have discussed this opinion with @dianfu in
> the
> > > old
> > > > > > >> mail
> > > > > > >>>>>> thread. I will copy my opinion from there:
> > > > > > >>>>>>
> > > > > > >>>>>>   - for your design, you still need somewhere to give the
> > > users
> > > > > > >>>>> configure
> > > > > > >>>>>>   the trigger threshold (maybe memory availability?), this
> > > > design
> > > > > > >>>> cannot
> > > > > > >>>>>>   guarantee a deterministic semantics (it will bring
> trouble
> > > for
> > > > > > >>>> testing
> > > > > > >>>>>> and
> > > > > > >>>>>>   debugging).
> > > > > > >>>>>>   - if the implementation depends on the timing of
> > checkpoint,
> > > > it
> > > > > > >>>> would
> > > > > > >>>>>>   affect the checkpoint's progress, and the buffered data
> > may
> > > > > > >> cause
> > > > > > >>>> OOM
> > > > > > >>>>>>   issue. In addition, if the operator is stateless, it can
> > not
> > > > > > >>> provide
> > > > > > >>>>>> fault
> > > > > > >>>>>>   tolerance.
> > > > > > >>>>>>
> > > > > > >>>>>> Best,
> > > > > > >>>>>> Vino
> > > > > > >>>>>>
> > > > > > >>>>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午9:22写道:
> > > > > > >>>>>>
> > > > > > >>>>>>> Hi Vino,
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks for the proposal, I like the general idea and IMO
> > it's
> > > > > > >> very
> > > > > > >>>>> useful
> > > > > > >>>>>>> feature.
> > > > > > >>>>>>> But after reading through the document, I feel that we
> may
> > > over
> > > > > > >>>> design
> > > > > > >>>>>> the
> > > > > > >>>>>>> required
> > > > > > >>>>>>> operator for proper local aggregation. The main reason is
> > we
> > > > want
> > > > > > >>> to
> > > > > > >>>>>> have a
> > > > > > >>>>>>> clear definition and behavior about the "local keyed
> state"
> > > > which
> > > > > > >>> in
> > > > > > >>>> my
> > > > > > >>>>>>> opinion is not
> > > > > > >>>>>>> necessary for local aggregation, at least for start.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Another issue I noticed is the local key by operator
> cannot
> > > > > > >> change
> > > > > > >>>>>> element
> > > > > > >>>>>>> type, it will
> > > > > > >>>>>>> also restrict a lot of use cases which can be benefit
> from
> > > > local
> > > > > > >>>>>>> aggregation, like "average".
> > > > > > >>>>>>>
> > > > > > >>>>>>> We also did similar logic in SQL and the only thing need
> to
> > > be
> > > > > > >> done
> > > > > > >>>> is
> > > > > > >>>>>>> introduce
> > > > > > >>>>>>> a stateless lightweight operator which is *chained*
> before
> > > > > > >>> `keyby()`.
> > > > > > >>>>> The
> > > > > > >>>>>>> operator will flush all buffered
> > > > > > >>>>>>> elements during
> > `StreamOperator::prepareSnapshotPreBarrier()`
> > > > and
> > > > > > >>>> make
> > > > > > >>>>>>> himself stateless.
> > > > > > >>>>>>> By the way, in the earlier version we also did the
> similar
> > > > > > >> approach
> > > > > > >>>> by
> > > > > > >>>>>>> introducing a stateful
> > > > > > >>>>>>> local aggregation operator but it's not performed as well
> > as
> > > > the
> > > > > > >>>> later
> > > > > > >>>>>> one,
> > > > > > >>>>>>> and also effect the barrie
> > > > > > >>>>>>> alignment time. The later one is fairly simple and more
> > > > > > >> efficient.
> > > > > > >>>>>>>
> > > > > > >>>>>>> I would highly suggest you to consider to have a
> stateless
> > > > > > >> approach
> > > > > > >>>> at
> > > > > > >>>>>> the
> > > > > > >>>>>>> first step.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Best,
> > > > > > >>>>>>> Kurt
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <
> imj...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>>> Hi Vino,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Thanks for the proposal.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Regarding to the "input.keyBy(0).sum(1)" vs
> > > > > > >>>>>>>>
> > "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)",
> > > > > > >> have
> > > > > > >>>> you
> > > > > > >>>>>>> done
> > > > > > >>>>>>>> some benchmark?
> > > > > > >>>>>>>> Because I'm curious about how much performance
> improvement
> > > can
> > > > > > >> we
> > > > > > >>>> get
> > > > > > >>>>>> by
> > > > > > >>>>>>>> using count window as the local operator.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Best,
> > > > > > >>>>>>>> Jark
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino yang <
> > > > yanghua1...@gmail.com
> > > > > > >>>
> > > > > > >>>>> wrote:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Hi Hequn,
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Thanks for your reply.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> The purpose of localKeyBy API is to provide a tool
> which
> > > can
> > > > > > >>> let
> > > > > > >>>>>> users
> > > > > > >>>>>>> do
> > > > > > >>>>>>>>> pre-aggregation in the local. The behavior of the
> > > > > > >>> pre-aggregation
> > > > > > >>>>> is
> > > > > > >>>>>>>>> similar to keyBy API.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> So the three cases are different, I will describe them
> > one
> > > by
> > > > > > >>>> one:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1)
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> *In this case, the result is event-driven, each event
> can
> > > > > > >>> produce
> > > > > > >>>>> one
> > > > > > >>>>>>> sum
> > > > > > >>>>>>>>> aggregation result and it is the latest one from the
> > source
> > > > > > >>>> start.*
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> *In this case, the semantic may have a problem, it
> would
> > do
> > > > > > >> the
> > > > > > >>>>> local
> > > > > > >>>>>>> sum
> > > > > > >>>>>>>>> aggregation and will produce the latest partial result
> > from
> > > > > > >> the
> > > > > > >>>>>> source
> > > > > > >>>>>>>>> start for every event. *
> > > > > > >>>>>>>>> *These latest partial results from the same key are
> > hashed
> > > to
> > > > > > >>> one
> > > > > > >>>>>> node
> > > > > > >>>>>>> to
> > > > > > >>>>>>>>> do the global sum aggregation.*
> > > > > > >>>>>>>>> *In the global aggregation, when it received multiple
> > > partial
> > > > > > >>>>> results
> > > > > > >>>>>>>> (they
> > > > > > >>>>>>>>> are all calculated from the source start) and sum them
> > will
> > > > > > >> get
> > > > > > >>>> the
> > > > > > >>>>>>> wrong
> > > > > > >>>>>>>>> result.*
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> 3.
> > > input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> *In this case, it would just get a partial aggregation
> > > result
> > > > > > >>> for
> > > > > > >>>>>> the 5
> > > > > > >>>>>>>>> records in the count window. The partial aggregation
> > > results
> > > > > > >>> from
> > > > > > >>>>> the
> > > > > > >>>>>>>> same
> > > > > > >>>>>>>>> key will be aggregated globally.*
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> So the first case and the third case can get the *same*
> > > > > > >> result,
> > > > > > >>>> the
> > > > > > >>>>>>>>> difference is the output-style and the latency.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Generally speaking, the local key API is just an
> > > optimization
> > > > > > >>>> API.
> > > > > > >>>>> We
> > > > > > >>>>>>> do
> > > > > > >>>>>>>>> not limit the user's usage, but the user has to
> > understand
> > > > > > >> its
> > > > > > >>>>>>> semantics
> > > > > > >>>>>>>>> and use it correctly.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Best,
> > > > > > >>>>>>>>> Vino
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一
> > 下午4:18写道:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> Hi Vino,
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Thanks for the proposal, I think it is a very good
> > > feature!
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> One thing I want to make sure is the semantics for the
> > > > > > >>>>>> `localKeyBy`.
> > > > > > >>>>>>>> From
> > > > > > >>>>>>>>>> the document, the `localKeyBy` API returns an instance
> > of
> > > > > > >>>>>>> `KeyedStream`
> > > > > > >>>>>>>>>> which can also perform sum(), so in this case, what's
> > the
> > > > > > >>>>> semantics
> > > > > > >>>>>>> for
> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will the following code
> > share
> > > > > > >>> the
> > > > > > >>>>> same
> > > > > > >>>>>>>>> result?
> > > > > > >>>>>>>>>> and what're the differences between them?
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1)
> > > > > > >>>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > > > > >>>>>>>>>> 3.
> > > > > > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Would also be great if we can add this into the
> > document.
> > > > > > >>> Thank
> > > > > > >>>>> you
> > > > > > >>>>>>>> very
> > > > > > >>>>>>>>>> much.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Best, Hequn
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM vino yang <
> > > > > > >>>>> yanghua1...@gmail.com>
> > > > > > >>>>>>>>> wrote:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> Hi Aljoscha,
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> I have looked at the "*Process*" section of FLIP wiki
> > > > > > >>>> page.[1]
> > > > > > >>>>>> This
> > > > > > >>>>>>>>> mail
> > > > > > >>>>>>>>>>> thread indicates that it has proceeded to the third
> > step.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> When I looked at the fourth step(vote step), I didn't
> > > > > > >> find
> > > > > > >>>> the
> > > > > > >>>>>>>>>>> prerequisites for starting the voting process.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Considering that the discussion of this feature has
> > been
> > > > > > >>> done
> > > > > > >>>>> in
> > > > > > >>>>>>> the
> > > > > > >>>>>>>>> old
> > > > > > >>>>>>>>>>> thread. [2] So can you tell me when should I start
> > > > > > >> voting?
> > > > > > >>>> Can
> > > > > > >>>>> I
> > > > > > >>>>>>>> start
> > > > > > >>>>>>>>>> now?
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>> Vino
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> [1]:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > > > > > >>>>>>>>>>> [2]:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com> 于2019年6月13日周四 上午9:19写道:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for your efforts.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>> Leesf
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月12日周三
> > > > > > >>> 下午5:46写道:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Hi folks,
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> I would like to start the FLIP discussion thread
> > > > > > >> about
> > > > > > >>>>>>> supporting
> > > > > > >>>>>>>>>> local
> > > > > > >>>>>>>>>>>>> aggregation in Flink.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> In short, this feature can effectively alleviate
> data
> > > > > > >>>> skew.
> > > > > > >>>>>>> This
> > > > > > >>>>>>>> is
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>> FLIP:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP)
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Currently, keyed streams are widely used to perform
> > > > > > >>>>>> aggregating
> > > > > > >>>>>>>>>>>> operations
> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window) on the elements that
> > > > > > >>> have
> > > > > > >>>>> the
> > > > > > >>>>>>> same
> > > > > > >>>>>>>>>> key.
> > > > > > >>>>>>>>>>>> When
> > > > > > >>>>>>>>>>>>> executed at runtime, the elements with the same key
> > > > > > >>> will
> > > > > > >>>> be
> > > > > > >>>>>>> sent
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>> and
> > > > > > >>>>>>>>>>>>> aggregated by the same task.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> The performance of these aggregating operations is
> > > > > > >> very
> > > > > > >>>>>>> sensitive
> > > > > > >>>>>>>>> to
> > > > > > >>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>> distribution of keys. In the cases where the
> > > > > > >>> distribution
> > > > > > >>>>> of
> > > > > > >>>>>>> keys
> > > > > > >>>>>>>>>>>> follows a
> > > > > > >>>>>>>>>>>>> powerful law, the performance will be significantly
> > > > > > >>>>>> downgraded.
> > > > > > >>>>>>>>> More
> > > > > > >>>>>>>>>>>>> unluckily, increasing the degree of parallelism
> does
> > > > > > >>> not
> > > > > > >>>>> help
> > > > > > >>>>>>>> when
> > > > > > >>>>>>>>> a
> > > > > > >>>>>>>>>>> task
> > > > > > >>>>>>>>>>>>> is overloaded by a single key.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Local aggregation is a widely-adopted method to
> > > > > > >> reduce
> > > > > > >>>> the
> > > > > > >>>>>>>>>> performance
> > > > > > >>>>>>>>>>>>> degraded by data skew. We can decompose the
> > > > > > >> aggregating
> > > > > > >>>>>>>> operations
> > > > > > >>>>>>>>>> into
> > > > > > >>>>>>>>>>>> two
> > > > > > >>>>>>>>>>>>> phases. In the first phase, we aggregate the
> elements
> > > > > > >>> of
> > > > > > >>>>> the
> > > > > > >>>>>>> same
> > > > > > >>>>>>>>> key
> > > > > > >>>>>>>>>>> at
> > > > > > >>>>>>>>>>>>> the sender side to obtain partial results. Then at
> > > > > > >> the
> > > > > > >>>>> second
> > > > > > >>>>>>>>> phase,
> > > > > > >>>>>>>>>>>> these
> > > > > > >>>>>>>>>>>>> partial results are sent to receivers according to
> > > > > > >>> their
> > > > > > >>>>> keys
> > > > > > >>>>>>> and
> > > > > > >>>>>>>>> are
> > > > > > >>>>>>>>>>>>> combined to obtain the final result. Since the
> number
> > > > > > >>> of
> > > > > > >>>>>>> partial
> > > > > > >>>>>>>>>>> results
> > > > > > >>>>>>>>>>>>> received by each receiver is limited by the number
> of
> > > > > > >>>>>> senders,
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>> imbalance among receivers can be reduced. Besides,
> by
> > > > > > >>>>>> reducing
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>> amount
> > > > > > >>>>>>>>>>>>> of transferred data the performance can be further
> > > > > > >>>>> improved.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> *More details*:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Design documentation:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Old discussion thread:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 <
> > > > > > >>>>>>>>> https://issues.apache.org/jira/browse/FLINK-12786
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> We are looking forwards to your feedback!
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>> Vino
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to