Hi Kurt,

Sorry for the late statement. Yes, your statement basically reflects our
communication conclusions.

Yesterday, I am busy with another proposal and some other things.

Some part of the FLIP documentation has not been fully updated.

Let's listen to the further opinion after many developers been freed from
the Flink 1.9.

Best,
Vino


Kurt Young <ykt...@gmail.com> 于2019年7月4日周四 上午9:52写道:

> Hi all,
>
> I have talked to vino offline try to have a better understanding about the
> motivation and functionality
> this FLIP he wanted to have. Seems there indeed exists some
> misunderstandings for the previous
> discussions. I will try to summarize the original scenario and requirements
> from vino and share my
> thoughts here.
>
> From my understanding, the user wants 2 things:
> 1. A proper support of *local calculation*, I would not call it *local
> aggregation* since it's just a special case
> of local calculation. A local calculation may includes aggregation,
> windowing, or even general process functions.
> Why it should be local? Because the user don't want to be enforced to call
> "keyBy()" before such kind of
> calculations, handing data skew will be a big motivation behind this.
>
> 2. Have a proper keyed state support for local calculation. Actually Flink
> already support such kind of local
> calculation already. You can call something like `DataStream.process' to do
> the trick. But the user argued that
> the *KeyedState* can not be used in this case, where Flink can only offer
> OperatorState. They want to use the
> KeyedState during local calculation, or maybe directly uses all the
> operators which can only be used after normal
> *keyby()*, like keyed window, and keyed aggregation.
>
> (@Vino, please correct me if I misunderstood anything. )
>
> These two requirements finally leads to exactly what vino have proposed.
> The core part of the FLIP is the *localKeyBy()*
> and "LocalKeyedState". LocalKeyby can prevent any expensive real shuffle so
> the user can do the local calculation. And
> the LocalKeyedState is one special kind of KeyedState, maybe even with the
> same user interface, but only have the
> scope to LOCAL.
>
> Form my point of view, these two things touched very core assumption of
> current Flink's design, and we should first focus to
> discuss around this. Previously, we actually had a wrong target, which
> focused on how to do proper *local aggregation*.
>
> If the community is OK with this changes, we can further discuss how do we
> do the changes step by step. If the community thinks
> it's too much, we can also try to find another way to fulfill user's
> requirements.
>
> Best,
> Kurt
>
>
> On Mon, Jul 1, 2019 at 11:45 AM vino yang <yanghua1...@gmail.com> wrote:
>
> > Hi Jark,
> >
> > > I will call them "local keyed state" because they have different
> > semantics with keyed state, even if "ListState", "MapState" are keyed
> state
> > primitives currently. From the point of my view, this exposes local keyed
> > state to users.
> >
> > Actually, it depends on how to understand. From a certain point of view,
> > you are right. However, when users using stateful APIs based on
> localKeyBy,
> > the semantics of the keyed state is still correct. The difference is the
> > state is not for the global key. So, I think users may not care or worry
> > about this. Although, from those people's point of view who know the
> inner
> > implementation, it exposes local keyed state to users.
> >
> > > What I want to help here is keeping the discussion/FLIP focus on the
> > minimal problem (data-skew aggregation) to make the discussion forward.
> > Currently, the FLIP introduces many things (local keyed state,
> > localKeyBy#process) may not needed when we solving the "common" data-skew
> > aggregation problem (we can introduce a stateless operator or use
> operator
> > state to support local aggregation as Kurt and others said before). It
> will
> > be easier to push a thin FLIP forward.
> >
> > You have already seen our implementation documentation, and I think you
> can
> > see that the changes about the "state" are natural (Of cause, we will
> > support stateless implementation). In fact, the meaning of creating a
> FLIP
> > is because the changes it brings. The reason I don't agree to cut down
> some
> > "features" is that if the feature set we support is getting smaller and
> > smaller, then this will make the creation of FLIP meaningless. And if we
> > continue to support those functions in the future, it will lead to
> problems
> > that cannot be correctly converged, they are scattered everywhere and
> even
> > lose the theme. For example, if the local keyed state is not included in
> > the topic of local aggregation, then more people will feel that
> introducing
> > it is trivial.
> >
> > What's more, from the standpoint of DataStream, flexibility is
> acceptable.
> > From our standpoint, we have a lot of scenarios where users use the
> > DataStream API or the hybrid API of DataStream + Table. People who use
> the
> > Table/SQL API purely may not be as much as you think.
> >
> > So my suggestion is that the relevant feature set should be included in
> the
> > same correct theme, but we can split them into subtasks and prioritize
> > them, such as near-term goals and long-term goals.
> >
> > > At last, I think "localKeyBy()" returning "LocalKeyedStream" may be a
> > good way to start. However, we still need to figure out how to apply
> local
> > aggregate and global aggregate on that.
> >
> > Now I agree to introduce a "LocalKeyedStream", although it would bring
> much
> > code duplicated with "KeyedStream". But its advantage is that we can
> split
> > and gradually support the API. For example, we might be able to lower the
> > API for the process, and then we can touch state-related changes
> > later. Regarding returning LocalKeyedStream, I will update the FLIP
> > document later.
> >
> > Best,
> > Vino
> >
> > Jark Wu <imj...@gmail.com> 于2019年6月30日周日 下午9:11写道:
> >
> > > Hi Vino,
> > >
> > > > The local keyed state we introduced is not exposed to the outside!
> > > I have read your design and know the way how you implement local
> > > aggregation via local keyed state, and how the local keyed state works.
> > > Currently, Flink exposes two basic kinds of state: operator state and
> > > keyed state. Keyed state can only be used after "keyBy()", each
> > > keyed-state “belongs” to exactly one parallel instance. Operator state
> > > doesn't relative to keys and can be used in non-keyed operators. IMO,
> > local
> > > keyed state is like a combination of operator state and keyed state
> which
> > > allows "RuntimeContext#getState()", "#getListState", "#getMapState"
> > > accessible in non-keyed operator. I will call them "local keyed state"
> > > because they have different semantics with keyed state, even if
> > > "ListState", "MapState" are keyed state primitives currently. From the
> > > point of my view, this exposes local keyed state to users.
> > >
> > > I agree with you it's great if we can have local keyed state
> > ("ListState",
> > > "MapState", "Timers") in non-keyed operator. However, I'm not familiar
> > with
> > > state part and will leave this part to others to judge.
> > >
> > > What I want to help here is keeping the discussion/FLIP focus on the
> > > minimal problem (data-skew aggregation) to make the discussion forward.
> > > Currently, the FLIP introduces many things (local keyed state,
> > > localKeyBy#process) may not needed when we solving the "common"
> data-skew
> > > aggregation problem (we can introduce a stateless operator or use
> > operator
> > > state to support local aggregation as Kurt and others said before). It
> > will
> > > be easier to push a thin FLIP forward.
> > >
> > > At last, I think "localKeyBy()" returning "LocalKeyedStream" may be a
> > good
> > > way to start. However, we still need to figure out how to apply local
> > > aggregate and global aggregate on that.
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Fri, 28 Jun 2019 at 10:57, vino yang <yanghua1...@gmail.com> wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> *About local keyed state:*
> > >>
> > >> I object to moving it out of this FLIP. It's one of the ways we
> support
> > >> Local aggregation on the implementation of operator level, though not
> > the
> > >> only one.
> > >>
> > >> I guess you have misunderstood my last reply. I just tell you the
> > >> difference between `DataStream#process` and `KeyedStream#process`.
> Users
> > >> who use `localKeyBy#process` API are completely unaware of the
> > differences
> > >> when using the Stateful API. The local keyed state we introduced is
> not
> > >> exposed to the outside! It exists only internally. When calling
> > localKeyBy
> > >> to returns an instance of `KeyedStream`, we introduce `KeyScope` in
> > >> `KeyedStream` to distinguish them. I suggest you take a look at our
> > design
> > >> documentation.
> > >>
> > >> *About your concerns:*
> > >>
> > >> 1) I agree that not all exposed APIs are meaningful if localKeyBy
> > returns
> > >> `KeyedStream`. I did not find a signature of timeWindow(long, long)
> API.
> > >> IMO, all the window related APIs are useful and meaningful, It is one
> of
> > >> the main means of our local aggregation, and we should not limit its
> > >> flexibility. I am not against localKeyBy returns `LocalKeyedStream` if
> > you
> > >> agree `localKeyBy` is reasonable.
> > >>
> > >> 2) I have replied more than one times that we are trying to support a
> > >> more general local aggregation. The meaning of aggregation here is not
> > >> limited to the implementation of AggregateFunction. And that's exactly
> > what
> > >> we got from `KeyedStream#process`. Why do you need a "local process"
> > >> concept? I don't think it is necessary at all. I don't want to say
> that
> > the
> > >> aggregation you think is narrow, but we want to use this API to
> provide
> > >> enough flexibility. This is the primary focus of DataStream, as @Piotr
> > >> Nowojski <pi...@ververica.com>  also agrees. I also agree "local
> > >> process" is more than "local aggregate", that's users' choice if they
> > want
> > >> to use. Again, it should not be removed from this FLIP because it is
> the
> > >> added value of localKeyBy.
> > >>
> > >> Best,
> > >> Vino
> > >>
> > >>
> > >> Jark Wu <imj...@gmail.com> 于2019年6月27日周四 下午8:47写道:
> > >>
> > >>> Hi Vino,
> > >>>
> > >>> So the difference between `DataStream.localKeyBy().process()` with
> > >>> `DataStream.process()` is that the former can access keyed state and
> > the
> > >>> latter can only access operator state.
> > >>> I think it's out of the scope of designing a local aggregation API.
> It
> > >>> might be an extension of state API, i.e. local keyed state.
> > >>> The difference between local keyed state with operator state (if I
> > >>> understand correctly) is local keyed state can be backed on RocksDB?
> or
> > >>> making "keyed state" locally?
> > >>> IMO, it's a larger topic than local aggregation and should be
> discussed
> > >>> separately. I cc-ed people who works on states @Tzu-Li (Gordon) Tai
> > >>> <tzuli...@apache.org>  @Seth @Yu Li to give some feedback from the
> > >>> perspective of state.
> > >>>
> > >>> Regarding to the API designing updated in your FLIP, I have some
> > >>> concerns:
> > >>>
> > >>> 1) The "localKeyBy()" method returns a "KeyedStream" which exposes
> all
> > >>> method of it.
> > >>> However, not every method makes sense or have a clear definition on
> > >>> local stream.
> > >>> For example, "countWindow(long, long)", "timeWindow(long, long)",
> > >>> "window(WindowAssigner)", and "intervalJoin" Hequn mentioned before.
> > >>> I would suggest we can expose the only APIs we needed for local
> > >>> aggregation and leave the others later.
> > >>> We can return a "LocalKeyedStream" and may expose only some dedicated
> > >>> methods: for example, "aggregate()", "trigger()".
> > >>> These APIs do not need to expose local keyed state to support local
> > >>> aggregation.
> > >>>
> > >>> 2) I think `localKeyBy().process()` is something called "local
> > process",
> > >>> not just "local aggregate".
> > >>> It needs more discussion about local keyed state, and I would like to
> > >>> put it out of this FLIP.
> > >>>
> > >>>
> > >>> Regards,
> > >>> Jark
> > >>>
> > >>>
> > >>> On Thu, 27 Jun 2019 at 13:03, vino yang <yanghua1...@gmail.com>
> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> I also think it's a good idea that we need to agree on the API level
> > >>>> first.
> > >>>>
> > >>>> I am sorry, we did not give some usage examples of the API in the
> FLIP
> > >>>> documentation before. This may have caused some misunderstandings
> > about the
> > >>>> discussion of this mail thread.
> > >>>>
> > >>>> So, now I have added some usage examples in the "Public Interfaces"
> > >>>> section of the FLIP-44 documentation.
> > >>>>
> > >>>> Let us first know the API through its use examples.
> > >>>>
> > >>>> Any feedback and questions please let me know.
> > >>>>
> > >>>> Best,
> > >>>> Vino
> > >>>>
> > >>>> vino yang <yanghua1...@gmail.com> 于2019年6月27日周四 下午12:51写道:
> > >>>>
> > >>>>> Hi Jark,
> > >>>>>
> > >>>>> `DataStream.localKeyBy().process()` has some key difference with
> > >>>>> `DataStream.process()`. The former API receive
> `KeyedProcessFunction`
> > >>>>> (sorry my previous reply may let you misunderstood), the latter
> > receive API
> > >>>>> receive `ProcessFunction`. When you read the java doc of
> > ProcessFunction,
> > >>>>> you can find a "*Note*" statement:
> > >>>>>
> > >>>>> Access to keyed state and timers (which are also scoped to a key)
> is
> > >>>>>> only available if the ProcessFunction is applied on a KeyedStream.
> > >>>>>
> > >>>>>
> > >>>>> In addition, you can also compare the two
> > >>>>> implementations(`ProcessOperator` and `KeyedProcessOperator`) of
> > them to
> > >>>>> view the difference.
> > >>>>>
> > >>>>> IMO, the "Note" statement means a lot for many use scenarios.
> > >>>>> For example, if we cannot access keyed state, we can only use heap
> > memory
> > >>>>> to buffer data while it does not guarantee the semantics of
> > correctness!
> > >>>>> And the timer is also very important in some scenarios.
> > >>>>>
> > >>>>> That's why we say our API is flexible, it can get most benefits
> (even
> > >>>>> subsequent potential benefits in the future) from KeyedStream.
> > >>>>>
> > >>>>> I have added some instructions on the use of localKeyBy in the
> > FLIP-44
> > >>>>> documentation.
> > >>>>>
> > >>>>> Best,
> > >>>>> Vino
> > >>>>>
> > >>>>>
> > >>>>> Jark Wu <imj...@gmail.com> 于2019年6月27日周四 上午10:44写道:
> > >>>>>
> > >>>>>> Hi Piotr,
> > >>>>>>
> > >>>>>> I think the state migration you raised is a good point. Having
> > >>>>>> "stream.enableLocalAggregation(Trigger)” might add some implicit
> > operators
> > >>>>>> which users can't set uid and cause the state
> > compatibility/evolution
> > >>>>>> problems.
> > >>>>>> So let's put this in rejected alternatives.
> > >>>>>>
> > >>>>>> Hi Vino,
> > >>>>>>
> > >>>>>> You mentioned several times that
> "DataStream.localKeyBy().process()"
> > >>>>>> can solve the data skew problem of "DataStream.keyBy().process()".
> > >>>>>> I'm curious about what's the differences between
> > >>>>>> "DataStream.process()" and "DataStream.localKeyBy().process()"?
> > >>>>>> Can't "DataStream.process()" solve the data skew problem?
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> Jark
> > >>>>>>
> > >>>>>>
> > >>>>>> On Wed, 26 Jun 2019 at 18:20, Piotr Nowojski <pi...@ververica.com
> >
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Jark and Vino,
> > >>>>>>>
> > >>>>>>> I agree fully with Jark, that in order to have the discussion
> > >>>>>>> focused and to limit the number of parallel topics, we should
> > first focus
> > >>>>>>> on one topic. We can first decide on the API and later we can
> > discuss the
> > >>>>>>> runtime details. At least as long as we keep the potential
> > requirements of
> > >>>>>>> the runtime part in mind while designing the API.
> > >>>>>>>
> > >>>>>>> Regarding the automatic optimisation and proposed by Jark:
> > >>>>>>>
> > >>>>>>> "stream.enableLocalAggregation(Trigger)”
> > >>>>>>>
> > >>>>>>> I would be against that in the DataStream API for the reasons
> that
> > >>>>>>> Vino presented. There was a discussion thread about future
> > directions of
> > >>>>>>> Table API vs DataStream API and the consensus was that the
> > automatic
> > >>>>>>> optimisations are one of the dividing lines between those two,
> for
> > at least
> > >>>>>>> a couple of reasons. Flexibility and full control over the
> program
> > was one
> > >>>>>>> of them. Another is state migration. Having
> > >>>>>>> "stream.enableLocalAggregation(Trigger)” that might add some
> > implicit
> > >>>>>>> operators in the job graph can cause problems with
> > savepoint/checkpoint
> > >>>>>>> compatibility.
> > >>>>>>>
> > >>>>>>> However I haven’t thought about/looked into the details of the
> > >>>>>>> Vino’s API proposal, so I can not fully judge it.
> > >>>>>>>
> > >>>>>>> Piotrek
> > >>>>>>>
> > >>>>>>> > On 26 Jun 2019, at 09:17, vino yang <yanghua1...@gmail.com>
> > wrote:
> > >>>>>>> >
> > >>>>>>> > Hi Jark,
> > >>>>>>> >
> > >>>>>>> > Similar questions and responses have been repeated many times.
> > >>>>>>> >
> > >>>>>>> > Why didn't we spend more sections discussing the API?
> > >>>>>>> >
> > >>>>>>> > Because we try to reuse the ability of KeyedStream. The
> > localKeyBy
> > >>>>>>> API just returns the KeyedStream, that's our design, we can get
> > all the
> > >>>>>>> benefit from the KeyedStream and get further benefit from
> > WindowedStream.
> > >>>>>>> The APIs come from KeyedStream and WindowedStream is long-tested
> > and
> > >>>>>>> flexible. Yes, we spend much space discussing the local keyed
> > state, that's
> > >>>>>>> not the goal and motivation, that's the way to implement local
> > aggregation.
> > >>>>>>> It is much more complicated than the API we introduced, so we
> > spent more
> > >>>>>>> section. Of course, this is the implementation level of the
> > Operator. We
> > >>>>>>> also agreed to support the implementation of buffer+flush and
> > added related
> > >>>>>>> instructions to the documentation. This needs to wait for the
> > community to
> > >>>>>>> recognize, and if the community agrees, we will give more
> > instructions.
> > >>>>>>> What's more, I have indicated before that we welcome
> state-related
> > >>>>>>> commenters to participate in the discussion, but it is not wise
> to
> > modify
> > >>>>>>> the FLIP title.
> > >>>>>>> >
> > >>>>>>> > About the API of local aggregation:
> > >>>>>>> >
> > >>>>>>> > I don't object to ease of use is very important. But IMHO
> > >>>>>>> flexibility is the most important at the DataStream API level.
> > Otherwise,
> > >>>>>>> what does DataStream mean? The significance of the DataStream API
> > is that
> > >>>>>>> it is more flexible than Table/SQL, if it cannot provide this
> > point then
> > >>>>>>> everyone would just use Table/SQL.
> > >>>>>>> >
> > >>>>>>> > The DataStream API should focus more on flexibility than on
> > >>>>>>> automatic optimization, which allows users to have more
> > possibilities to
> > >>>>>>> implement complex programs and meet specific scenarios. There are
> > a lot of
> > >>>>>>> programs written using the DataStream API that are far more
> > complex than we
> > >>>>>>> think. It is very difficult to optimize at the API level and the
> > benefit is
> > >>>>>>> very low.
> > >>>>>>> >
> > >>>>>>> > I want to say that we support a more generalized local
> > >>>>>>> aggregation. I mentioned in the previous reply that not only the
> > UDF that
> > >>>>>>> implements AggregateFunction is called aggregation. In some
> complex
> > >>>>>>> scenarios, we have to support local aggregation through
> > ProcessFunction and
> > >>>>>>> ProcessWindowFunction to solve the data skew problem. How do you
> > support
> > >>>>>>> them in the API implementation and optimization you mentioned?
> > >>>>>>> >
> > >>>>>>> > Flexible APIs are arbitrarily combined to result in erroneous
> > >>>>>>> semantics, which does not prove that flexibility is meaningless
> > because the
> > >>>>>>> user is the decision maker. I have been exemplified many times,
> > for many
> > >>>>>>> APIs in DataStream, if we arbitrarily combined them, they also do
> > not have
> > >>>>>>> much practical significance. So, users who use flexible APIs need
> > to
> > >>>>>>> understand what they are doing and what is the right choice.
> > >>>>>>> >
> > >>>>>>> > I think that if we discuss this, there will be no result.
> > >>>>>>> >
> > >>>>>>> > @Stephan Ewen <mailto:se...@apache.org> , @Aljoscha Krettek
> > >>>>>>> <mailto:aljos...@apache.org> and @Piotr Nowojski <mailto:
> > >>>>>>> pi...@ververica.com> Do you have further comments?
> > >>>>>>> >
> > >>>>>>> >
> > >>>>>>> > Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>>
> > >>>>>>> 于2019年6月26日周三 上午11:46写道:
> > >>>>>>> > Thanks for the long discussion Vino, Kurt, Hequn, Piotr and
> > others,
> > >>>>>>> >
> > >>>>>>> > It seems that we still have some different ideas about the API
> > >>>>>>> > (localKeyBy()?) and implementation details (reuse window
> > operator?
> > >>>>>>> local
> > >>>>>>> > keyed state?).
> > >>>>>>> > And the discussion is stalled and mixed with motivation and API
> > and
> > >>>>>>> > implementation discussion.
> > >>>>>>> >
> > >>>>>>> > In order to make some progress in this topic, I want to
> summarize
> > >>>>>>> the
> > >>>>>>> > points (pls correct me if I'm wrong or missing sth) and would
> > >>>>>>> suggest to
> > >>>>>>> > split
> > >>>>>>> >  the topic into following aspects and discuss them one by one.
> > >>>>>>> >
> > >>>>>>> > 1) What's the main purpose of this FLIP?
> > >>>>>>> >  - From the title of this FLIP, it is to support local
> aggregate.
> > >>>>>>> However
> > >>>>>>> > from the content of the FLIP, 80% are introducing a new state
> > >>>>>>> called local
> > >>>>>>> > keyed state.
> > >>>>>>> >  - If we mainly want to introduce local keyed state, then we
> > should
> > >>>>>>> > re-title the FLIP and involve in more people who works on
> state.
> > >>>>>>> >  - If we mainly want to support local aggregate, then we can
> jump
> > >>>>>>> to step 2
> > >>>>>>> > to discuss the API design.
> > >>>>>>> >
> > >>>>>>> > 2) What does the API look like?
> > >>>>>>> >  - Vino proposed to use "localKeyBy()" to do local process, the
> > >>>>>>> output of
> > >>>>>>> > local process is the result type of aggregate function.
> > >>>>>>> >   a) For non-windowed aggregate:
> > >>>>>>> > input.localKeyBy(..).aggregate(agg1).keyBy(..).aggregate(agg2)
> > >>>>>>> **NOT
> > >>>>>>> > SUPPORT**
> > >>>>>>> >   b) For windowed aggregate:
> > >>>>>>> >
> > >>>>>>>
> >
> input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)
> > >>>>>>> >
> > >>>>>>> > 3) What's the implementation detail?
> > >>>>>>> >  - may reuse window operator or not.
> > >>>>>>> >  - may introduce a new state concepts or not.
> > >>>>>>> >  - may not have state in local operator by flushing buffers in
> > >>>>>>> > prepareSnapshotPreBarrier
> > >>>>>>> >  - and so on...
> > >>>>>>> >  - we can discuss these later when we reach a consensus on API
> > >>>>>>> >
> > >>>>>>> > --------------------
> > >>>>>>> >
> > >>>>>>> > Here are my thoughts:
> > >>>>>>> >
> > >>>>>>> > 1) Purpose of this FLIP
> > >>>>>>> >  - From the motivation section in the FLIP, I think the purpose
> > is
> > >>>>>>> to
> > >>>>>>> > support local aggregation to solve the data skew issue.
> > >>>>>>> >    Then I think we should focus on how to provide a easy to use
> > >>>>>>> and clear
> > >>>>>>> > API to support **local aggregation**.
> > >>>>>>> >  - Vino's point is centered around the local keyed state API
> (or
> > >>>>>>> > localKeyBy()), and how to leverage the local keyed state API to
> > >>>>>>> support
> > >>>>>>> > local aggregation.
> > >>>>>>> >    But I'm afraid it's not a good way to design API for local
> > >>>>>>> aggregation.
> > >>>>>>> >
> > >>>>>>> > 2) local aggregation API
> > >>>>>>> >  - IMO, the method call chain
> > >>>>>>> >
> > >>>>>>>
> >
> "input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)"
> > >>>>>>> > is not such easy to use.
> > >>>>>>> >    Because we have to provide two implementation for an
> > >>>>>>> aggregation (one
> > >>>>>>> > for partial agg, another for final agg). And we have to take
> care
> > >>>>>>> of
> > >>>>>>> >    the first window call, an inappropriate window call will
> break
> > >>>>>>> the
> > >>>>>>> > sematics.
> > >>>>>>> >  - From my point of view, local aggregation is a mature concept
> > >>>>>>> which
> > >>>>>>> > should output the intermediate accumulator (ACC) in the past
> > >>>>>>> period of time
> > >>>>>>> > (a trigger).
> > >>>>>>> >    And the downstream final aggregation will merge ACCs
> received
> > >>>>>>> from local
> > >>>>>>> > side, and output the current final result.
> > >>>>>>> >  - The current "AggregateFunction" API in DataStream already
> has
> > >>>>>>> the
> > >>>>>>> > accumulator type and "merge" method. So the only thing user
> need
> > >>>>>>> to do is
> > >>>>>>> > how to enable
> > >>>>>>> >    local aggregation opimization and set a trigger.
> > >>>>>>> >  - One idea comes to my head is that, assume we have a windowed
> > >>>>>>> aggregation
> > >>>>>>> > stream: "val stream = input.keyBy().window(w).aggregate(agg)".
> We
> > >>>>>>> can
> > >>>>>>> > provide an API on the stream.
> > >>>>>>> >    For exmaple, "stream.enableLocalAggregation(Trigger)", the
> > >>>>>>> trigger can
> > >>>>>>> > be "ContinuousEventTimeTrigger.of(Time.of(Time.minutes(1)))".
> > Then
> > >>>>>>> it will
> > >>>>>>> > be optmized into
> > >>>>>>> >    local operator + final operator, and local operator will
> > >>>>>>> combine records
> > >>>>>>> > every minute on event time.
> > >>>>>>> >  - In this way, there is only one line added, and the output is
> > >>>>>>> the same
> > >>>>>>> > with before, because it is just an opimization.
> > >>>>>>> >
> > >>>>>>> >
> > >>>>>>> > Regards,
> > >>>>>>> > Jark
> > >>>>>>> >
> > >>>>>>> >
> > >>>>>>> >
> > >>>>>>> > On Tue, 25 Jun 2019 at 14:34, vino yang <yanghua1...@gmail.com
> > >>>>>>> <mailto:yanghua1...@gmail.com>> wrote:
> > >>>>>>> >
> > >>>>>>> > > Hi Kurt,
> > >>>>>>> > >
> > >>>>>>> > > Answer your questions:
> > >>>>>>> > >
> > >>>>>>> > > a) Sorry, I just updated the Google doc, still have no time
> > >>>>>>> update the
> > >>>>>>> > > FLIP, will update FLIP as soon as possible.
> > >>>>>>> > > About your description at this point, I have a question, what
> > >>>>>>> does it mean:
> > >>>>>>> > > how do we combine with
> > >>>>>>> > > `AggregateFunction`?
> > >>>>>>> > >
> > >>>>>>> > > I have shown you the examples which Flink has supported:
> > >>>>>>> > >
> > >>>>>>> > >    - input.localKeyBy(0).aggregate()
> > >>>>>>> > >    - input.localKeyBy(0).window().aggregate()
> > >>>>>>> > >
> > >>>>>>> > > You can show me a example about how do we combine with
> > >>>>>>> `AggregateFuncion`
> > >>>>>>> > > through your localAggregate API.
> > >>>>>>> > >
> > >>>>>>> > > About the example, how to do the local aggregation for AVG,
> > >>>>>>> consider this
> > >>>>>>> > > code:
> > >>>>>>> > >
> > >>>>>>> > >
> > >>>>>>> > >
> > >>>>>>> > >
> > >>>>>>> > >
> > >>>>>>> > >
> > >>>>>>> > >
> > >>>>>>> > >
> > >>>>>>> > >
> > >>>>>>> > > *DataStream<Tuple2<String, Long>> source = null; source
> > >>>>>>> .localKeyBy(0)
> > >>>>>>> > > .timeWindow(Time.seconds(60)) .aggregate(agg1, new
> > >>>>>>> > > WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long,
> Long>,
> > >>>>>>> String,
> > >>>>>>> > > TimeWindow>() {}) .keyBy(0) .timeWindow(Time.seconds(60))
> > >>>>>>> .aggregate(agg2,
> > >>>>>>> > > new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>,
> > >>>>>>> String,
> > >>>>>>> > > TimeWindow>());*
> > >>>>>>> > >
> > >>>>>>> > > *agg1:*
> > >>>>>>> > > *signature : new AggregateFunction<Tuple2<String, Long>,
> > >>>>>>> Tuple2<Long,
> > >>>>>>> > > Long>, Tuple2<Long, Long>>() {}*
> > >>>>>>> > > *input param type: Tuple2<String, Long> f0: key, f1: value*
> > >>>>>>> > > *intermediate result type: Tuple2<Long, Long>, f0: local
> > >>>>>>> aggregated sum;
> > >>>>>>> > > f1: local aggregated count*
> > >>>>>>> > > *output param type:  Tuple2<Long, Long>, f0: local aggregated
> > >>>>>>> sum; f1:
> > >>>>>>> > > local aggregated count*
> > >>>>>>> > >
> > >>>>>>> > > *agg2:*
> > >>>>>>> > > *signature: new AggregateFunction<Tuple3<String, Long, Long>,
> > >>>>>>> Long,
> > >>>>>>> > > Tuple2<String, Long>>() {},*
> > >>>>>>> > > *input param type: Tuple3<String, Long, Long>, f0: key, f1:
> > >>>>>>> local
> > >>>>>>> > > aggregated sum; f2: local aggregated count*
> > >>>>>>> > >
> > >>>>>>> > > *intermediate result type: Long  avg result*
> > >>>>>>> > > *output param type:  Tuple2<String, Long> f0: key, f1 avg
> > result*
> > >>>>>>> > >
> > >>>>>>> > > For sliding window, we just need to change the window type if
> > >>>>>>> users want to
> > >>>>>>> > > do.
> > >>>>>>> > > Again, we try to give the design and implementation in the
> > >>>>>>> DataStream
> > >>>>>>> > > level. So I believe we can match all the requirements(It's
> just
> > >>>>>>> that the
> > >>>>>>> > > implementation may be different) comes from the SQL level.
> > >>>>>>> > >
> > >>>>>>> > > b) Yes, Theoretically, your thought is right. But in reality,
> > it
> > >>>>>>> cannot
> > >>>>>>> > > bring many benefits.
> > >>>>>>> > > If we want to get the benefits from the window API, while we
> do
> > >>>>>>> not reuse
> > >>>>>>> > > the window operator? And just copy some many duplicated code
> to
> > >>>>>>> another
> > >>>>>>> > > operator?
> > >>>>>>> > >
> > >>>>>>> > > c) OK, I agree to let the state backend committers join this
> > >>>>>>> discussion.
> > >>>>>>> > >
> > >>>>>>> > > Best,
> > >>>>>>> > > Vino
> > >>>>>>> > >
> > >>>>>>> > >
> > >>>>>>> > > Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>>
> > >>>>>>> 于2019年6月24日周一 下午6:53写道:
> > >>>>>>> > >
> > >>>>>>> > > > Hi vino,
> > >>>>>>> > > >
> > >>>>>>> > > > One thing to add,  for a), I think use one or two examples
> > >>>>>>> like how to do
> > >>>>>>> > > > local aggregation on a sliding window,
> > >>>>>>> > > > and how do we do local aggregation on an unbounded
> aggregate,
> > >>>>>>> will do a
> > >>>>>>> > > lot
> > >>>>>>> > > > help.
> > >>>>>>> > > >
> > >>>>>>> > > > Best,
> > >>>>>>> > > > Kurt
> > >>>>>>> > > >
> > >>>>>>> > > >
> > >>>>>>> > > > On Mon, Jun 24, 2019 at 6:06 PM Kurt Young <
> ykt...@gmail.com
> > >>>>>>> <mailto:ykt...@gmail.com>> wrote:
> > >>>>>>> > > >
> > >>>>>>> > > > > Hi vino,
> > >>>>>>> > > > >
> > >>>>>>> > > > > I think there are several things still need discussion.
> > >>>>>>> > > > >
> > >>>>>>> > > > > a) We all agree that we should first go with a unified
> > >>>>>>> abstraction, but
> > >>>>>>> > > > > the abstraction is not reflected by the FLIP.
> > >>>>>>> > > > > If your answer is "locakKeyBy" API, then I would ask how
> do
> > >>>>>>> we combine
> > >>>>>>> > > > > with `AggregateFunction`, and how do
> > >>>>>>> > > > > we do proper local aggregation for those have different
> > >>>>>>> intermediate
> > >>>>>>> > > > > result type, like AVG. Could you add these
> > >>>>>>> > > > > to the document?
> > >>>>>>> > > > >
> > >>>>>>> > > > > b) From implementation side, reusing window operator is
> one
> > >>>>>>> of the
> > >>>>>>> > > > > possible solutions, but not we base on window
> > >>>>>>> > > > > operator to have two different implementations. What I
> > >>>>>>> understanding
> > >>>>>>> > > is,
> > >>>>>>> > > > > one of the possible implementations should
> > >>>>>>> > > > > not touch window operator.
> > >>>>>>> > > > >
> > >>>>>>> > > > > c) 80% of your FLIP content is actually describing how do
> > we
> > >>>>>>> support
> > >>>>>>> > > > local
> > >>>>>>> > > > > keyed state. I don't know if this is necessary
> > >>>>>>> > > > > to introduce at the first step and we should also involve
> > >>>>>>> committers
> > >>>>>>> > > work
> > >>>>>>> > > > > on state backend to share their thoughts.
> > >>>>>>> > > > >
> > >>>>>>> > > > > Best,
> > >>>>>>> > > > > Kurt
> > >>>>>>> > > > >
> > >>>>>>> > > > >
> > >>>>>>> > > > > On Mon, Jun 24, 2019 at 5:17 PM vino yang <
> > >>>>>>> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>>
> > >>>>>>> > > wrote:
> > >>>>>>> > > > >
> > >>>>>>> > > > >> Hi Kurt,
> > >>>>>>> > > > >>
> > >>>>>>> > > > >> You did not give more further different opinions, so I
> > >>>>>>> thought you
> > >>>>>>> > > have
> > >>>>>>> > > > >> agreed with the design after we promised to support two
> > >>>>>>> kinds of
> > >>>>>>> > > > >> implementation.
> > >>>>>>> > > > >>
> > >>>>>>> > > > >> In API level, we have answered your question about pass
> an
> > >>>>>>> > > > >> AggregateFunction to do the aggregation. No matter
> > introduce
> > >>>>>>> > > localKeyBy
> > >>>>>>> > > > >> API
> > >>>>>>> > > > >> or not, we can support AggregateFunction.
> > >>>>>>> > > > >>
> > >>>>>>> > > > >> So what's your different opinion now? Can you share it
> > with
> > >>>>>>> us?
> > >>>>>>> > > > >>
> > >>>>>>> > > > >> Best,
> > >>>>>>> > > > >> Vino
> > >>>>>>> > > > >>
> > >>>>>>> > > > >> Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>>
> > >>>>>>> 于2019年6月24日周一 下午4:24写道:
> > >>>>>>> > > > >>
> > >>>>>>> > > > >> > Hi vino,
> > >>>>>>> > > > >> >
> > >>>>>>> > > > >> > Sorry I don't see the consensus about reusing window
> > >>>>>>> operator and
> > >>>>>>> > > keep
> > >>>>>>> > > > >> the
> > >>>>>>> > > > >> > API design of localKeyBy. But I think we should
> > >>>>>>> definitely more
> > >>>>>>> > > > thoughts
> > >>>>>>> > > > >> > about this topic.
> > >>>>>>> > > > >> >
> > >>>>>>> > > > >> > I also try to loop in Stephan for this discussion.
> > >>>>>>> > > > >> >
> > >>>>>>> > > > >> > Best,
> > >>>>>>> > > > >> > Kurt
> > >>>>>>> > > > >> >
> > >>>>>>> > > > >> >
> > >>>>>>> > > > >> > On Mon, Jun 24, 2019 at 3:26 PM vino yang <
> > >>>>>>> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>>
> > >>>>>>> > > > >> wrote:
> > >>>>>>> > > > >> >
> > >>>>>>> > > > >> > > Hi all,
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> > > I am happy we have a wonderful discussion and
> received
> > >>>>>>> many
> > >>>>>>> > > valuable
> > >>>>>>> > > > >> > > opinions in the last few days.
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> > > Now, let me try to summarize what we have reached
> > >>>>>>> consensus about
> > >>>>>>> > > > the
> > >>>>>>> > > > >> > > changes in the design.
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> > >    - provide a unified abstraction to support two
> > kinds
> > >>>>>>> of
> > >>>>>>> > > > >> > implementation;
> > >>>>>>> > > > >> > >    - reuse WindowOperator and try to enhance it so
> > that
> > >>>>>>> we can
> > >>>>>>> > > make
> > >>>>>>> > > > >> the
> > >>>>>>> > > > >> > >    intermediate result of the local aggregation can
> be
> > >>>>>>> buffered
> > >>>>>>> > > and
> > >>>>>>> > > > >> > > flushed to
> > >>>>>>> > > > >> > >    support two kinds of implementation;
> > >>>>>>> > > > >> > >    - keep the API design of localKeyBy, but declare
> > the
> > >>>>>>> disabled
> > >>>>>>> > > > some
> > >>>>>>> > > > >> > APIs
> > >>>>>>> > > > >> > >    we cannot support currently, and provide a
> > >>>>>>> configurable API for
> > >>>>>>> > > > >> users
> > >>>>>>> > > > >> > to
> > >>>>>>> > > > >> > >    choose how to handle intermediate result;
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> > > The above three points have been updated in the
> design
> > >>>>>>> doc. Any
> > >>>>>>> > > > >> > > questions, please let me know.
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> > > @Aljoscha Krettek <aljos...@apache.org <mailto:
> > >>>>>>> aljos...@apache.org>> What do you think? Any
> > >>>>>>> > > > >> further
> > >>>>>>> > > > >> > > comments?
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> > > Best,
> > >>>>>>> > > > >> > > Vino
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> > > vino yang <yanghua1...@gmail.com <mailto:
> > >>>>>>> yanghua1...@gmail.com>> 于2019年6月20日周四 下午2:02写道:
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> > > > Hi Kurt,
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > Thanks for your comments.
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > It seems we come to a consensus that we should
> > >>>>>>> alleviate the
> > >>>>>>> > > > >> > performance
> > >>>>>>> > > > >> > > > degraded by data skew with local aggregation. In
> > this
> > >>>>>>> FLIP, our
> > >>>>>>> > > > key
> > >>>>>>> > > > >> > > > solution is to introduce local keyed partition to
> > >>>>>>> achieve this
> > >>>>>>> > > > goal.
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > I also agree that we can benefit a lot from the
> > usage
> > >>>>>>> of
> > >>>>>>> > > > >> > > > AggregateFunction. In combination with localKeyBy,
> > We
> > >>>>>>> can easily
> > >>>>>>> > > > >> use it
> > >>>>>>> > > > >> > > to
> > >>>>>>> > > > >> > > > achieve local aggregation:
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > >    - input.localKeyBy(0).aggregate()
> > >>>>>>> > > > >> > > >    - input.localKeyBy(0).window().aggregate()
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > I think the only problem here is the choices
> between
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > >    - (1) Introducing a new primitive called
> > >>>>>>> localKeyBy and
> > >>>>>>> > > > implement
> > >>>>>>> > > > >> > > >    local aggregation with existing operators, or
> > >>>>>>> > > > >> > > >    - (2) Introducing an operator called
> > >>>>>>> localAggregation which
> > >>>>>>> > > is
> > >>>>>>> > > > >> > > >    composed of a key selector, a window-like
> > >>>>>>> operator, and an
> > >>>>>>> > > > >> aggregate
> > >>>>>>> > > > >> > > >    function.
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > There may exist some optimization opportunities by
> > >>>>>>> providing a
> > >>>>>>> > > > >> > composited
> > >>>>>>> > > > >> > > > interface for local aggregation. But at the same
> > >>>>>>> time, in my
> > >>>>>>> > > > >> opinion,
> > >>>>>>> > > > >> > we
> > >>>>>>> > > > >> > > > lose flexibility (Or we need certain efforts to
> > >>>>>>> achieve the same
> > >>>>>>> > > > >> > > > flexibility).
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > As said in the previous mails, we have many use
> > cases
> > >>>>>>> where the
> > >>>>>>> > > > >> > > > aggregation is very complicated and cannot be
> > >>>>>>> performed with
> > >>>>>>> > > > >> > > > AggregateFunction. For example, users may perform
> > >>>>>>> windowed
> > >>>>>>> > > > >> aggregations
> > >>>>>>> > > > >> > > > according to time, data values, or even external
> > >>>>>>> storage.
> > >>>>>>> > > > Typically,
> > >>>>>>> > > > >> > they
> > >>>>>>> > > > >> > > > now use KeyedProcessFunction or customized
> triggers
> > >>>>>>> to implement
> > >>>>>>> > > > >> these
> > >>>>>>> > > > >> > > > aggregations. It's not easy to address data skew
> in
> > >>>>>>> such cases
> > >>>>>>> > > > with
> > >>>>>>> > > > >> a
> > >>>>>>> > > > >> > > > composited interface for local aggregation.
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > Given that Data Stream API is exactly targeted at
> > >>>>>>> these cases
> > >>>>>>> > > > where
> > >>>>>>> > > > >> the
> > >>>>>>> > > > >> > > > application logic is very complicated and
> > >>>>>>> optimization does not
> > >>>>>>> > > > >> > matter, I
> > >>>>>>> > > > >> > > > think it's a better choice to provide a relatively
> > >>>>>>> low-level and
> > >>>>>>> > > > >> > > canonical
> > >>>>>>> > > > >> > > > interface.
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > The composited interface, on the other side, may
> be
> > a
> > >>>>>>> good
> > >>>>>>> > > choice
> > >>>>>>> > > > in
> > >>>>>>> > > > >> > > > declarative interfaces, including SQL and Table
> API,
> > >>>>>>> as it
> > >>>>>>> > > allows
> > >>>>>>> > > > >> more
> > >>>>>>> > > > >> > > > optimization opportunities.
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > Best,
> > >>>>>>> > > > >> > > > Vino
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > > Kurt Young <ykt...@gmail.com <mailto:
> > ykt...@gmail.com>>
> > >>>>>>> 于2019年6月20日周四 上午10:15写道:
> > >>>>>>> > > > >> > > >
> > >>>>>>> > > > >> > > >> Hi all,
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > > >> As vino said in previous emails, I think we
> should
> > >>>>>>> first
> > >>>>>>> > > discuss
> > >>>>>>> > > > >> and
> > >>>>>>> > > > >> > > >> decide
> > >>>>>>> > > > >> > > >> what kind of use cases this FLIP want to
> > >>>>>>> > > > >> > > >> resolve, and what the API should look like. From
> my
> > >>>>>>> side, I
> > >>>>>>> > > think
> > >>>>>>> > > > >> this
> > >>>>>>> > > > >> > > is
> > >>>>>>> > > > >> > > >> probably the root cause of current divergence.
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > > >> My understand is (from the FLIP title and
> > motivation
> > >>>>>>> section of
> > >>>>>>> > > > the
> > >>>>>>> > > > >> > > >> document), we want to have a proper support of
> > >>>>>>> > > > >> > > >> local aggregation, or pre aggregation. This is
> not
> > a
> > >>>>>>> very new
> > >>>>>>> > > > idea,
> > >>>>>>> > > > >> > most
> > >>>>>>> > > > >> > > >> SQL engine already did this improvement. And
> > >>>>>>> > > > >> > > >> the core concept about this is, there should be
> an
> > >>>>>>> > > > >> AggregateFunction,
> > >>>>>>> > > > >> > no
> > >>>>>>> > > > >> > > >> matter it's a Flink runtime's AggregateFunction
> or
> > >>>>>>> > > > >> > > >> SQL's UserDefinedAggregateFunction. Both
> > aggregation
> > >>>>>>> have
> > >>>>>>> > > concept
> > >>>>>>> > > > >> of
> > >>>>>>> > > > >> > > >> intermediate data type, sometimes we call it ACC.
> > >>>>>>> > > > >> > > >> I quickly went through the POC piotr did before
> > [1],
> > >>>>>>> it also
> > >>>>>>> > > > >> directly
> > >>>>>>> > > > >> > > uses
> > >>>>>>> > > > >> > > >> AggregateFunction.
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > > >> But the thing is, after reading the design of
> this
> > >>>>>>> FLIP, I
> > >>>>>>> > > can't
> > >>>>>>> > > > >> help
> > >>>>>>> > > > >> > > >> myself feeling that this FLIP is not targeting to
> > >>>>>>> have a proper
> > >>>>>>> > > > >> > > >> local aggregation support. It actually want to
> > >>>>>>> introduce
> > >>>>>>> > > another
> > >>>>>>> > > > >> > > concept:
> > >>>>>>> > > > >> > > >> LocalKeyBy, and how to split and merge local key
> > >>>>>>> groups,
> > >>>>>>> > > > >> > > >> and how to properly support state on local key.
> > Local
> > >>>>>>> > > aggregation
> > >>>>>>> > > > >> just
> > >>>>>>> > > > >> > > >> happened to be one possible use case of
> LocalKeyBy.
> > >>>>>>> > > > >> > > >> But it lacks supporting the essential concept of
> > >>>>>>> local
> > >>>>>>> > > > aggregation,
> > >>>>>>> > > > >> > > which
> > >>>>>>> > > > >> > > >> is intermediate data type. Without this, I really
> > >>>>>>> don't thing
> > >>>>>>> > > > >> > > >> it is a good fit of local aggregation.
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > > >> Here I want to make sure of the scope or the goal
> > >>>>>>> about this
> > >>>>>>> > > > FLIP,
> > >>>>>>> > > > >> do
> > >>>>>>> > > > >> > we
> > >>>>>>> > > > >> > > >> want to have a proper local aggregation engine,
> or
> > we
> > >>>>>>> > > > >> > > >> just want to introduce a new concept called
> > >>>>>>> LocalKeyBy?
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > > >> [1]: https://github.com/apache/flink/pull/4626 <
> > >>>>>>> https://github.com/apache/flink/pull/4626>
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > > >> Best,
> > >>>>>>> > > > >> > > >> Kurt
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > > >> On Wed, Jun 19, 2019 at 5:13 PM vino yang <
> > >>>>>>> > > yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
> > >>>>>>> > > > >
> > >>>>>>> > > > >> > > wrote:
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > > >> > Hi Hequn,
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > Thanks for your comments!
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > I agree that allowing local aggregation reusing
> > >>>>>>> window API
> > >>>>>>> > > and
> > >>>>>>> > > > >> > > refining
> > >>>>>>> > > > >> > > >> > window operator to make it match both
> > requirements
> > >>>>>>> (come from
> > >>>>>>> > > > our
> > >>>>>>> > > > >> > and
> > >>>>>>> > > > >> > > >> Kurt)
> > >>>>>>> > > > >> > > >> > is a good decision!
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > Concerning your questions:
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > 1. The result of
> > >>>>>>> input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > >>>>>>> > > may
> > >>>>>>> > > > >> be
> > >>>>>>> > > > >> > > >> > meaningless.
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > Yes, it does not make sense in most cases.
> > >>>>>>> However, I also
> > >>>>>>> > > want
> > >>>>>>> > > > >> to
> > >>>>>>> > > > >> > > note
> > >>>>>>> > > > >> > > >> > users should know the right semantics of
> > >>>>>>> localKeyBy and use
> > >>>>>>> > > it
> > >>>>>>> > > > >> > > >> correctly.
> > >>>>>>> > > > >> > > >> > Because this issue also exists for the global
> > >>>>>>> keyBy, consider
> > >>>>>>> > > > >> this
> > >>>>>>> > > > >> > > >> example:
> > >>>>>>> > > > >> > > >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the
> result
> > >>>>>>> is also
> > >>>>>>> > > > >> > meaningless.
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > 2. About the semantics of
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)).
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > Good catch! I agree with you that it's not good
> > to
> > >>>>>>> enable all
> > >>>>>>> > > > >> > > >> > functionalities for localKeyBy from
> KeyedStream.
> > >>>>>>> > > > >> > > >> > Currently, We do not support some APIs such as
> > >>>>>>> > > > >> > > >> > connect/join/intervalJoin/coGroup. This is due
> to
> > >>>>>>> that we
> > >>>>>>> > > force
> > >>>>>>> > > > >> the
> > >>>>>>> > > > >> > > >> > operators on LocalKeyedStreams chained with the
> > >>>>>>> inputs.
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > Best,
> > >>>>>>> > > > >> > > >> > Vino
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > Hequn Cheng <chenghe...@gmail.com <mailto:
> > >>>>>>> chenghe...@gmail.com>> 于2019年6月19日周三 下午3:42写道:
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >> > > Hi,
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > Thanks a lot for your great discussion and
> > great
> > >>>>>>> to see
> > >>>>>>> > > that
> > >>>>>>> > > > >> some
> > >>>>>>> > > > >> > > >> > agreement
> > >>>>>>> > > > >> > > >> > > has been reached on the "local aggregate
> > engine"!
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > ===> Considering the abstract engine,
> > >>>>>>> > > > >> > > >> > > I'm thinking is it valuable for us to extend
> > the
> > >>>>>>> current
> > >>>>>>> > > > >> window to
> > >>>>>>> > > > >> > > >> meet
> > >>>>>>> > > > >> > > >> > > both demands raised by Kurt and Vino? There
> are
> > >>>>>>> some
> > >>>>>>> > > benefits
> > >>>>>>> > > > >> we
> > >>>>>>> > > > >> > can
> > >>>>>>> > > > >> > > >> get:
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > 1. The interfaces of the window are complete
> > and
> > >>>>>>> clear.
> > >>>>>>> > > With
> > >>>>>>> > > > >> > > windows,
> > >>>>>>> > > > >> > > >> we
> > >>>>>>> > > > >> > > >> > > can define a lot of ways to split the data
> and
> > >>>>>>> perform
> > >>>>>>> > > > >> different
> > >>>>>>> > > > >> > > >> > > computations.
> > >>>>>>> > > > >> > > >> > > 2. We can also leverage the window to do
> > >>>>>>> miniBatch for the
> > >>>>>>> > > > >> global
> > >>>>>>> > > > >> > > >> > > aggregation, i.e, we can use the window to
> > >>>>>>> bundle data
> > >>>>>>> > > belong
> > >>>>>>> > > > >> to
> > >>>>>>> > > > >> > the
> > >>>>>>> > > > >> > > >> same
> > >>>>>>> > > > >> > > >> > > key, for every bundle we only need to read
> and
> > >>>>>>> write once
> > >>>>>>> > > > >> state.
> > >>>>>>> > > > >> > > This
> > >>>>>>> > > > >> > > >> can
> > >>>>>>> > > > >> > > >> > > greatly reduce state IO and improve
> > performance.
> > >>>>>>> > > > >> > > >> > > 3. A lot of other use cases can also benefit
> > >>>>>>> from the
> > >>>>>>> > > window
> > >>>>>>> > > > >> base
> > >>>>>>> > > > >> > on
> > >>>>>>> > > > >> > > >> > memory
> > >>>>>>> > > > >> > > >> > > or stateless.
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > ===> As for the API,
> > >>>>>>> > > > >> > > >> > > I think it is good to make our API more
> > >>>>>>> flexible. However,
> > >>>>>>> > > we
> > >>>>>>> > > > >> may
> > >>>>>>> > > > >> > > >> need to
> > >>>>>>> > > > >> > > >> > > make our API meaningful.
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > Take my previous reply as an example,
> > >>>>>>> > > > >> > > >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1).
> The
> > >>>>>>> result may
> > >>>>>>> > > be
> > >>>>>>> > > > >> > > >> > meaningless.
> > >>>>>>> > > > >> > > >> > > Another example I find is the intervalJoin,
> > e.g.,
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In
> > >>>>>>> > > > >> this
> > >>>>>>> > > > >> > > >> case, it
> > >>>>>>> > > > >> > > >> > > will bring problems if input1 and input2
> share
> > >>>>>>> different
> > >>>>>>> > > > >> > > parallelism.
> > >>>>>>> > > > >> > > >> We
> > >>>>>>> > > > >> > > >> > > don't know which input should the join
> chained
> > >>>>>>> with? Even
> > >>>>>>> > > if
> > >>>>>>> > > > >> they
> > >>>>>>> > > > >> > > >> share
> > >>>>>>> > > > >> > > >> > the
> > >>>>>>> > > > >> > > >> > > same parallelism, it's hard to tell what the
> > >>>>>>> join is doing.
> > >>>>>>> > > > >> There
> > >>>>>>> > > > >> > > are
> > >>>>>>> > > > >> > > >> > maybe
> > >>>>>>> > > > >> > > >> > > some other problems.
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > From this point of view, it's at least not
> good
> > >>>>>>> to enable
> > >>>>>>> > > all
> > >>>>>>> > > > >> > > >> > > functionalities for localKeyBy from
> > KeyedStream?
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > Great to also have your opinions.
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > Best, Hequn
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang <
> > >>>>>>> > > > >> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> > > >> > wrote:
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > > Hi Kurt and Piotrek,
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > > > Thanks for your comments.
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > > > I agree that we can provide a better
> > >>>>>>> abstraction to be
> > >>>>>>> > > > >> > compatible
> > >>>>>>> > > > >> > > >> with
> > >>>>>>> > > > >> > > >> > > two
> > >>>>>>> > > > >> > > >> > > > different implementations.
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > > > First of all, I think we should consider
> what
> > >>>>>>> kind of
> > >>>>>>> > > > >> scenarios
> > >>>>>>> > > > >> > we
> > >>>>>>> > > > >> > > >> need
> > >>>>>>> > > > >> > > >> > > to
> > >>>>>>> > > > >> > > >> > > > support in *API* level?
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > > > We have some use cases which need to a
> > >>>>>>> customized
> > >>>>>>> > > > aggregation
> > >>>>>>> > > > >> > > >> through
> > >>>>>>> > > > >> > > >> > > > KeyedProcessFunction, (in the usage of our
> > >>>>>>> > > > localKeyBy.window
> > >>>>>>> > > > >> > they
> > >>>>>>> > > > >> > > >> can
> > >>>>>>> > > > >> > > >> > use
> > >>>>>>> > > > >> > > >> > > > ProcessWindowFunction).
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > > > Shall we support these flexible use
> > scenarios?
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > > > Best,
> > >>>>>>> > > > >> > > >> > > > Vino
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > > > Kurt Young <ykt...@gmail.com <mailto:
> > >>>>>>> ykt...@gmail.com>> 于2019年6月18日周二 下午8:37写道:
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > > > > Hi Piotr,
> > >>>>>>> > > > >> > > >> > > > >
> > >>>>>>> > > > >> > > >> > > > > Thanks for joining the discussion. Make
> > >>>>>>> “local
> > >>>>>>> > > > aggregation"
> > >>>>>>> > > > >> > > >> abstract
> > >>>>>>> > > > >> > > >> > > > enough
> > >>>>>>> > > > >> > > >> > > > > sounds good to me, we could
> > >>>>>>> > > > >> > > >> > > > > implement and verify alternative
> solutions
> > >>>>>>> for use
> > >>>>>>> > > cases
> > >>>>>>> > > > of
> > >>>>>>> > > > >> > > local
> > >>>>>>> > > > >> > > >> > > > > aggregation. Maybe we will find both
> > >>>>>>> solutions
> > >>>>>>> > > > >> > > >> > > > > are appropriate for different scenarios.
> > >>>>>>> > > > >> > > >> > > > >
> > >>>>>>> > > > >> > > >> > > > > Starting from a simple one sounds a
> > >>>>>>> practical way to
> > >>>>>>> > > go.
> > >>>>>>> > > > >> What
> > >>>>>>> > > > >> > do
> > >>>>>>> > > > >> > > >> you
> > >>>>>>> > > > >> > > >> > > > think,
> > >>>>>>> > > > >> > > >> > > > > vino?
> > >>>>>>> > > > >> > > >> > > > >
> > >>>>>>> > > > >> > > >> > > > > Best,
> > >>>>>>> > > > >> > > >> > > > > Kurt
> > >>>>>>> > > > >> > > >> > > > >
> > >>>>>>> > > > >> > > >> > > > >
> > >>>>>>> > > > >> > > >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr
> > >>>>>>> Nowojski <
> > >>>>>>> > > > >> > > >> pi...@ververica.com <mailto:pi...@ververica.com
> >>
> > >>>>>>> > > > >> > > >> > > > > wrote:
> > >>>>>>> > > > >> > > >> > > > >
> > >>>>>>> > > > >> > > >> > > > > > Hi Kurt and Vino,
> > >>>>>>> > > > >> > > >> > > > > >
> > >>>>>>> > > > >> > > >> > > > > > I think there is a trade of hat we need
> > to
> > >>>>>>> consider
> > >>>>>>> > > for
> > >>>>>>> > > > >> the
> > >>>>>>> > > > >> > > >> local
> > >>>>>>> > > > >> > > >> > > > > > aggregation.
> > >>>>>>> > > > >> > > >> > > > > >
> > >>>>>>> > > > >> > > >> > > > > > Generally speaking I would agree with
> > Kurt
> > >>>>>>> about
> > >>>>>>> > > local
> > >>>>>>> > > > >> > > >> > > aggregation/pre
> > >>>>>>> > > > >> > > >> > > > > > aggregation not using Flink's state
> flush
> > >>>>>>> the
> > >>>>>>> > > operator
> > >>>>>>> > > > >> on a
> > >>>>>>> > > > >> > > >> > > checkpoint.
> > >>>>>>> > > > >> > > >> > > > > > Network IO is usually cheaper compared
> to
> > >>>>>>> Disks IO.
> > >>>>>>> > > > This
> > >>>>>>> > > > >> has
> > >>>>>>> > > > >> > > >> > however
> > >>>>>>> > > > >> > > >> > > > > couple
> > >>>>>>> > > > >> > > >> > > > > > of issues:
> > >>>>>>> > > > >> > > >> > > > > > 1. It can explode number of in-flight
> > >>>>>>> records during
> > >>>>>>> > > > >> > > checkpoint
> > >>>>>>> > > > >> > > >> > > barrier
> > >>>>>>> > > > >> > > >> > > > > > alignment, making checkpointing slower
> > and
> > >>>>>>> decrease
> > >>>>>>> > > the
> > >>>>>>> > > > >> > actual
> > >>>>>>> > > > >> > > >> > > > > throughput.
> > >>>>>>> > > > >> > > >> > > > > > 2. This trades Disks IO on the local
> > >>>>>>> aggregation
> > >>>>>>> > > > machine
> > >>>>>>> > > > >> > with
> > >>>>>>> > > > >> > > >> CPU
> > >>>>>>> > > > >> > > >> > > (and
> > >>>>>>> > > > >> > > >> > > > > > Disks IO in case of RocksDB) on the
> final
> > >>>>>>> aggregation
> > >>>>>>> > > > >> > machine.
> > >>>>>>> > > > >> > > >> This
> > >>>>>>> > > > >> > > >> > > is
> > >>>>>>> > > > >> > > >> > > > > > fine, as long there is no huge data
> skew.
> > >>>>>>> If there is
> > >>>>>>> > > > >> only a
> > >>>>>>> > > > >> > > >> > handful
> > >>>>>>> > > > >> > > >> > > > (or
> > >>>>>>> > > > >> > > >> > > > > > even one single) hot keys, it might be
> > >>>>>>> better to keep
> > >>>>>>> > > > the
> > >>>>>>> > > > >> > > >> > persistent
> > >>>>>>> > > > >> > > >> > > > > state
> > >>>>>>> > > > >> > > >> > > > > > in the LocalAggregationOperator to
> > offload
> > >>>>>>> final
> > >>>>>>> > > > >> aggregation
> > >>>>>>> > > > >> > > as
> > >>>>>>> > > > >> > > >> > much
> > >>>>>>> > > > >> > > >> > > as
> > >>>>>>> > > > >> > > >> > > > > > possible.
> > >>>>>>> > > > >> > > >> > > > > > 3. With frequent checkpointing local
> > >>>>>>> aggregation
> > >>>>>>> > > > >> > effectiveness
> > >>>>>>> > > > >> > > >> > would
> > >>>>>>> > > > >> > > >> > > > > > degrade.
> > >>>>>>> > > > >> > > >> > > > > >
> > >>>>>>> > > > >> > > >> > > > > > I assume Kurt is correct, that in your
> > use
> > >>>>>>> cases
> > >>>>>>> > > > >> stateless
> > >>>>>>> > > > >> > > >> operator
> > >>>>>>> > > > >> > > >> > > was
> > >>>>>>> > > > >> > > >> > > > > > behaving better, but I could easily see
> > >>>>>>> other use
> > >>>>>>> > > cases
> > >>>>>>> > > > >> as
> > >>>>>>> > > > >> > > well.
> > >>>>>>> > > > >> > > >> > For
> > >>>>>>> > > > >> > > >> > > > > > example someone is already using
> RocksDB,
> > >>>>>>> and his job
> > >>>>>>> > > > is
> > >>>>>>> > > > >> > > >> > bottlenecked
> > >>>>>>> > > > >> > > >> > > > on
> > >>>>>>> > > > >> > > >> > > > > a
> > >>>>>>> > > > >> > > >> > > > > > single window operator instance because
> > of
> > >>>>>>> the data
> > >>>>>>> > > > >> skew. In
> > >>>>>>> > > > >> > > >> that
> > >>>>>>> > > > >> > > >> > > case
> > >>>>>>> > > > >> > > >> > > > > > stateful local aggregation would be
> > >>>>>>> probably a better
> > >>>>>>> > > > >> > choice.
> > >>>>>>> > > > >> > > >> > > > > >
> > >>>>>>> > > > >> > > >> > > > > > Because of that, I think we should
> > >>>>>>> eventually provide
> > >>>>>>> > > > >> both
> > >>>>>>> > > > >> > > >> versions
> > >>>>>>> > > > >> > > >> > > and
> > >>>>>>> > > > >> > > >> > > > > in
> > >>>>>>> > > > >> > > >> > > > > > the initial version we should at least
> > >>>>>>> make the
> > >>>>>>> > > “local
> > >>>>>>> > > > >> > > >> aggregation
> > >>>>>>> > > > >> > > >> > > > > engine”
> > >>>>>>> > > > >> > > >> > > > > > abstract enough, that one could easily
> > >>>>>>> provide
> > >>>>>>> > > > different
> > >>>>>>> > > > >> > > >> > > implementation
> > >>>>>>> > > > >> > > >> > > > > > strategy.
> > >>>>>>> > > > >> > > >> > > > > >
> > >>>>>>> > > > >> > > >> > > > > > Piotrek
> > >>>>>>> > > > >> > > >> > > > > >
> > >>>>>>> > > > >> > > >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young
> <
> > >>>>>>> > > > ykt...@gmail.com <mailto:ykt...@gmail.com>
> > >>>>>>> > > > >> >
> > >>>>>>> > > > >> > > >> wrote:
> > >>>>>>> > > > >> > > >> > > > > > >
> > >>>>>>> > > > >> > > >> > > > > > > Hi,
> > >>>>>>> > > > >> > > >> > > > > > >
> > >>>>>>> > > > >> > > >> > > > > > > For the trigger, it depends on what
> > >>>>>>> operator we
> > >>>>>>> > > want
> > >>>>>>> > > > to
> > >>>>>>> > > > >> > use
> > >>>>>>> > > > >> > > >> under
> > >>>>>>> > > > >> > > >> > > the
> > >>>>>>> > > > >> > > >> > > > > > API.
> > >>>>>>> > > > >> > > >> > > > > > > If we choose to use window operator,
> > >>>>>>> > > > >> > > >> > > > > > > we should also use window's trigger.
> > >>>>>>> However, I
> > >>>>>>> > > also
> > >>>>>>> > > > >> think
> > >>>>>>> > > > >> > > >> reuse
> > >>>>>>> > > > >> > > >> > > > window
> > >>>>>>> > > > >> > > >> > > > > > > operator for this scenario may not be
> > >>>>>>> > > > >> > > >> > > > > > > the best choice. The reasons are the
> > >>>>>>> following:
> > >>>>>>> > > > >> > > >> > > > > > >
> > >>>>>>> > > > >> > > >> > > > > > > 1. As a lot of people already pointed
> > >>>>>>> out, window
> > >>>>>>> > > > >> relies
> > >>>>>>> > > > >> > > >> heavily
> > >>>>>>> > > > >> > > >> > on
> > >>>>>>> > > > >> > > >> > > > > state
> > >>>>>>> > > > >> > > >> > > > > > > and it will definitely effect
> > >>>>>>> performance. You can
> > >>>>>>> > > > >> > > >> > > > > > > argue that one can use heap based
> > >>>>>>> statebackend, but
> > >>>>>>> > > > >> this
> > >>>>>>> > > > >> > > will
> > >>>>>>> > > > >> > > >> > > > introduce
> > >>>>>>> > > > >> > > >> > > > > > > extra coupling. Especially we have a
> > >>>>>>> chance to
> > >>>>>>> > > > >> > > >> > > > > > > design a pure stateless operator.
> > >>>>>>> > > > >> > > >> > > > > > > 2. The window operator is *the most*
> > >>>>>>> complicated
> > >>>>>>> > > > >> operator
> > >>>>>>> > > > >> > > >> Flink
> > >>>>>>> > > > >> > > >> > > > > currently
> > >>>>>>> > > > >> > > >> > > > > > > have. Maybe we only need to pick a
> > >>>>>>> subset of
> > >>>>>>> > > > >> > > >> > > > > > > window operator to achieve the goal,
> > but
> > >>>>>>> once the
> > >>>>>>> > > > user
> > >>>>>>> > > > >> > wants
> > >>>>>>> > > > >> > > >> to
> > >>>>>>> > > > >> > > >> > > have
> > >>>>>>> > > > >> > > >> > > > a
> > >>>>>>> > > > >> > > >> > > > > > deep
> > >>>>>>> > > > >> > > >> > > > > > > look at the localAggregation
> operator,
> > >>>>>>> it's still
> > >>>>>>> > > > >> > > >> > > > > > > hard to find out what's going on
> under
> > >>>>>>> the window
> > >>>>>>> > > > >> > operator.
> > >>>>>>> > > > >> > > >> For
> > >>>>>>> > > > >> > > >> > > > > > simplicity,
> > >>>>>>> > > > >> > > >> > > > > > > I would also recommend we introduce a
> > >>>>>>> dedicated
> > >>>>>>> > > > >> > > >> > > > > > > lightweight operator, which also much
> > >>>>>>> easier for a
> > >>>>>>> > > > >> user to
> > >>>>>>> > > > >> > > >> learn
> > >>>>>>> > > > >> > > >> > > and
> > >>>>>>> > > > >> > > >> > > > > use.
> > >>>>>>> > > > >> > > >> > > > > > >
> > >>>>>>> > > > >> > > >> > > > > > > For your question about increasing
> the
> > >>>>>>> burden in
> > >>>>>>> > > > >> > > >> > > > > > >
> > >>>>>>> `StreamOperator::prepareSnapshotPreBarrier()`, the
> > >>>>>>> > > > only
> > >>>>>>> > > > >> > > thing
> > >>>>>>> > > > >> > > >> > this
> > >>>>>>> > > > >> > > >> > > > > > function
> > >>>>>>> > > > >> > > >> > > > > > > need
> > >>>>>>> > > > >> > > >> > > > > > > to do is output all the partial
> > results,
> > >>>>>>> it's
> > >>>>>>> > > purely
> > >>>>>>> > > > >> cpu
> > >>>>>>> > > > >> > > >> > workload,
> > >>>>>>> > > > >> > > >> > > > not
> > >>>>>>> > > > >> > > >> > > > > > > introducing any IO. I want to point
> out
> > >>>>>>> that even
> > >>>>>>> > > if
> > >>>>>>> > > > we
> > >>>>>>> > > > >> > have
> > >>>>>>> > > > >> > > >> this
> > >>>>>>> > > > >> > > >> > > > > > > cost, we reduced another barrier
> align
> > >>>>>>> cost of the
> > >>>>>>> > > > >> > operator,
> > >>>>>>> > > > >> > > >> > which
> > >>>>>>> > > > >> > > >> > > is
> > >>>>>>> > > > >> > > >> > > > > the
> > >>>>>>> > > > >> > > >> > > > > > > sync flush stage of the state, if you
> > >>>>>>> introduced
> > >>>>>>> > > > state.
> > >>>>>>> > > > >> > This
> > >>>>>>> > > > >> > > >> > > > > > > flush actually will introduce disk
> IO,
> > >>>>>>> and I think
> > >>>>>>> > > > it's
> > >>>>>>> > > > >> > > >> worthy to
> > >>>>>>> > > > >> > > >> > > > > > exchange
> > >>>>>>> > > > >> > > >> > > > > > > this cost with purely CPU workload.
> And
> > >>>>>>> we do have
> > >>>>>>> > > > some
> > >>>>>>> > > > >> > > >> > > > > > > observations about these two behavior
> > >>>>>>> (as i said
> > >>>>>>> > > > >> before,
> > >>>>>>> > > > >> > we
> > >>>>>>> > > > >> > > >> > > actually
> > >>>>>>> > > > >> > > >> > > > > > > implemented both solutions), the
> > >>>>>>> stateless one
> > >>>>>>> > > > actually
> > >>>>>>> > > > >> > > >> performs
> > >>>>>>> > > > >> > > >> > > > > > > better both in performance and
> barrier
> > >>>>>>> align time.
> > >>>>>>> > > > >> > > >> > > > > > >
> > >>>>>>> > > > >> > > >> > > > > > > Best,
> > >>>>>>> > > > >> > > >> > > > > > > Kurt
> > >>>>>>> > > > >> > > >> > > > > > >
> > >>>>>>> > > > >> > > >> > > > > > >
> > >>>>>>> > > > >> > > >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino
> > >>>>>>> yang <
> > >>>>>>> > > > >> > > >> yanghua1...@gmail.com <mailto:
> > yanghua1...@gmail.com>
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> > > > > wrote:
> > >>>>>>> > > > >> > > >> > > > > > >
> > >>>>>>> > > > >> > > >> > > > > > >> Hi Kurt,
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> Thanks for your example. Now, it
> looks
> > >>>>>>> more
> > >>>>>>> > > clearly
> > >>>>>>> > > > >> for
> > >>>>>>> > > > >> > me.
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> From your example code snippet, I
> saw
> > >>>>>>> the
> > >>>>>>> > > > >> localAggregate
> > >>>>>>> > > > >> > > API
> > >>>>>>> > > > >> > > >> has
> > >>>>>>> > > > >> > > >> > > > three
> > >>>>>>> > > > >> > > >> > > > > > >> parameters:
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >>   1. key field
> > >>>>>>> > > > >> > > >> > > > > > >>   2. PartitionAvg
> > >>>>>>> > > > >> > > >> > > > > > >>   3. CountTrigger: Does this trigger
> > >>>>>>> comes from
> > >>>>>>> > > > window
> > >>>>>>> > > > >> > > >> package?
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> I will compare our and your design
> > from
> > >>>>>>> API and
> > >>>>>>> > > > >> operator
> > >>>>>>> > > > >> > > >> level:
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> *From the API level:*
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> As I replied to @dianfu in the old
> > email
> > >>>>>>> > > thread,[1]
> > >>>>>>> > > > >> the
> > >>>>>>> > > > >> > > >> Window
> > >>>>>>> > > > >> > > >> > API
> > >>>>>>> > > > >> > > >> > > > can
> > >>>>>>> > > > >> > > >> > > > > > >> provide the second and the third
> > >>>>>>> parameter right
> > >>>>>>> > > > now.
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> If you reuse specified interface or
> > >>>>>>> class, such as
> > >>>>>>> > > > >> > > *Trigger*
> > >>>>>>> > > > >> > > >> or
> > >>>>>>> > > > >> > > >> > > > > > >> *CounterTrigger* provided by window
> > >>>>>>> package, but
> > >>>>>>> > > do
> > >>>>>>> > > > >> not
> > >>>>>>> > > > >> > use
> > >>>>>>> > > > >> > > >> > window
> > >>>>>>> > > > >> > > >> > > > > API,
> > >>>>>>> > > > >> > > >> > > > > > >> it's not reasonable.
> > >>>>>>> > > > >> > > >> > > > > > >> And if you do not reuse these
> > interface
> > >>>>>>> or class,
> > >>>>>>> > > > you
> > >>>>>>> > > > >> > would
> > >>>>>>> > > > >> > > >> need
> > >>>>>>> > > > >> > > >> > > to
> > >>>>>>> > > > >> > > >> > > > > > >> introduce more things however they
> are
> > >>>>>>> looked
> > >>>>>>> > > > similar
> > >>>>>>> > > > >> to
> > >>>>>>> > > > >> > > the
> > >>>>>>> > > > >> > > >> > > things
> > >>>>>>> > > > >> > > >> > > > > > >> provided by window package.
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> The window package has provided
> > several
> > >>>>>>> types of
> > >>>>>>> > > the
> > >>>>>>> > > > >> > window
> > >>>>>>> > > > >> > > >> and
> > >>>>>>> > > > >> > > >> > > many
> > >>>>>>> > > > >> > > >> > > > > > >> triggers and let users customize it.
> > >>>>>>> What's more,
> > >>>>>>> > > > the
> > >>>>>>> > > > >> > user
> > >>>>>>> > > > >> > > is
> > >>>>>>> > > > >> > > >> > more
> > >>>>>>> > > > >> > > >> > > > > > familiar
> > >>>>>>> > > > >> > > >> > > > > > >> with Window API.
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> This is the reason why we just
> provide
> > >>>>>>> localKeyBy
> > >>>>>>> > > > API
> > >>>>>>> > > > >> and
> > >>>>>>> > > > >> > > >> reuse
> > >>>>>>> > > > >> > > >> > > the
> > >>>>>>> > > > >> > > >> > > > > > window
> > >>>>>>> > > > >> > > >> > > > > > >> API. It reduces unnecessary
> components
> > >>>>>>> such as
> > >>>>>>> > > > >> triggers
> > >>>>>>> > > > >> > and
> > >>>>>>> > > > >> > > >> the
> > >>>>>>> > > > >> > > >> > > > > > mechanism
> > >>>>>>> > > > >> > > >> > > > > > >> of buffer (based on count num or
> > time).
> > >>>>>>> > > > >> > > >> > > > > > >> And it has a clear and easy to
> > >>>>>>> understand
> > >>>>>>> > > semantics.
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> *From the operator level:*
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> We reused window operator, so we can
> > >>>>>>> get all the
> > >>>>>>> > > > >> benefits
> > >>>>>>> > > > >> > > >> from
> > >>>>>>> > > > >> > > >> > > state
> > >>>>>>> > > > >> > > >> > > > > and
> > >>>>>>> > > > >> > > >> > > > > > >> checkpoint.
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> From your design, you named the
> > >>>>>>> operator under
> > >>>>>>> > > > >> > > localAggregate
> > >>>>>>> > > > >> > > >> > API
> > >>>>>>> > > > >> > > >> > > > is a
> > >>>>>>> > > > >> > > >> > > > > > >> *stateless* operator. IMO, it is
> still
> > >>>>>>> a state, it
> > >>>>>>> > > > is
> > >>>>>>> > > > >> > just
> > >>>>>>> > > > >> > > >> not
> > >>>>>>> > > > >> > > >> > > Flink
> > >>>>>>> > > > >> > > >> > > > > > >> managed state.
> > >>>>>>> > > > >> > > >> > > > > > >> About the memory buffer (I think
> it's
> > >>>>>>> still not
> > >>>>>>> > > very
> > >>>>>>> > > > >> > clear,
> > >>>>>>> > > > >> > > >> if
> > >>>>>>> > > > >> > > >> > you
> > >>>>>>> > > > >> > > >> > > > > have
> > >>>>>>> > > > >> > > >> > > > > > >> time, can you give more detail
> > >>>>>>> information or
> > >>>>>>> > > answer
> > >>>>>>> > > > >> my
> > >>>>>>> > > > >> > > >> > > questions),
> > >>>>>>> > > > >> > > >> > > > I
> > >>>>>>> > > > >> > > >> > > > > > have
> > >>>>>>> > > > >> > > >> > > > > > >> some questions:
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >>   - if it just a raw JVM heap memory
> > >>>>>>> buffer, how
> > >>>>>>> > > to
> > >>>>>>> > > > >> > support
> > >>>>>>> > > > >> > > >> > fault
> > >>>>>>> > > > >> > > >> > > > > > >>   tolerance, if the job is
> configured
> > >>>>>>> EXACTLY-ONCE
> > >>>>>>> > > > >> > semantic
> > >>>>>>> > > > >> > > >> > > > guarantee?
> > >>>>>>> > > > >> > > >> > > > > > >>   - if you thought the memory
> > >>>>>>> buffer(non-Flink
> > >>>>>>> > > > state),
> > >>>>>>> > > > >> > has
> > >>>>>>> > > > >> > > >> > better
> > >>>>>>> > > > >> > > >> > > > > > >>   performance. In our design, users
> > can
> > >>>>>>> also
> > >>>>>>> > > config
> > >>>>>>> > > > >> HEAP
> > >>>>>>> > > > >> > > >> state
> > >>>>>>> > > > >> > > >> > > > backend
> > >>>>>>> > > > >> > > >> > > > > > to
> > >>>>>>> > > > >> > > >> > > > > > >>   provide the performance close to
> > your
> > >>>>>>> mechanism.
> > >>>>>>> > > > >> > > >> > > > > > >>   -
> > >>>>>>> `StreamOperator::prepareSnapshotPreBarrier()`
> > >>>>>>> > > > >> related
> > >>>>>>> > > > >> > > to
> > >>>>>>> > > > >> > > >> the
> > >>>>>>> > > > >> > > >> > > > > timing
> > >>>>>>> > > > >> > > >> > > > > > of
> > >>>>>>> > > > >> > > >> > > > > > >>   snapshot. IMO, the flush action
> > >>>>>>> should be a
> > >>>>>>> > > > >> > synchronized
> > >>>>>>> > > > >> > > >> > action?
> > >>>>>>> > > > >> > > >> > > > (if
> > >>>>>>> > > > >> > > >> > > > > > >> not,
> > >>>>>>> > > > >> > > >> > > > > > >>   please point out my mistake) I
> still
> > >>>>>>> think we
> > >>>>>>> > > > should
> > >>>>>>> > > > >> > not
> > >>>>>>> > > > >> > > >> > depend
> > >>>>>>> > > > >> > > >> > > on
> > >>>>>>> > > > >> > > >> > > > > the
> > >>>>>>> > > > >> > > >> > > > > > >>   timing of checkpoint. Checkpoint
> > >>>>>>> related
> > >>>>>>> > > > operations
> > >>>>>>> > > > >> are
> > >>>>>>> > > > >> > > >> > inherent
> > >>>>>>> > > > >> > > >> > > > > > >>   performance sensitive, we should
> not
> > >>>>>>> increase
> > >>>>>>> > > its
> > >>>>>>> > > > >> > burden
> > >>>>>>> > > > >> > > >> > > anymore.
> > >>>>>>> > > > >> > > >> > > > > Our
> > >>>>>>> > > > >> > > >> > > > > > >>   implementation based on the
> > mechanism
> > >>>>>>> of Flink's
> > >>>>>>> > > > >> > > >> checkpoint,
> > >>>>>>> > > > >> > > >> > > which
> > >>>>>>> > > > >> > > >> > > > > can
> > >>>>>>> > > > >> > > >> > > > > > >>   benefit from the asnyc snapshot
> and
> > >>>>>>> incremental
> > >>>>>>> > > > >> > > checkpoint.
> > >>>>>>> > > > >> > > >> > IMO,
> > >>>>>>> > > > >> > > >> > > > the
> > >>>>>>> > > > >> > > >> > > > > > >>   performance is not a problem, and
> we
> > >>>>>>> also do not
> > >>>>>>> > > > >> find
> > >>>>>>> > > > >> > the
> > >>>>>>> > > > >> > > >> > > > > performance
> > >>>>>>> > > > >> > > >> > > > > > >> issue
> > >>>>>>> > > > >> > > >> > > > > > >>   in our production.
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> [1]:
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > >
> > >>>>>>> > > > >> > > >> > > > >
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > >
> > >>>>>>> > > > >> > > >> >
> > >>>>>>> > > > >> > > >>
> > >>>>>>> > > > >> > >
> > >>>>>>> > > > >> >
> > >>>>>>> > > > >>
> > >>>>>>> > > >
> > >>>>>>> > >
> > >>>>>>>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > >>>>>>> <
> > >>>>>>>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > >>>>>>> >
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> Best,
> > >>>>>>> > > > >> > > >> > > > > > >> Vino
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >> Kurt Young <ykt...@gmail.com
> <mailto:
> > >>>>>>> ykt...@gmail.com>> 于2019年6月18日周二
> > >>>>>>> > > > 下午2:27写道:
> > >>>>>>> > > > >> > > >> > > > > > >>
> > >>>>>>> > > > >> > > >> > > > > > >>> Yeah, sorry for not expressing
> myself
> > >>>>>>> clearly. I
> > >>>>>>> > > > will
> > >>>>>>> > > > >> > try
> > >>>>>>> > > > >> > > to
> > >>>>>>> > > > >> > > >> > > > provide
> > >>>>>>> > > > >> > > >> > > > > > more
> > >>>>>>> > > > >> > > >> > > > > > >>> details to make sure we are on the
> > >>>>>>> same page.
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>> For DataStream API, it shouldn't be
> > >>>>>>> optimized
> > >>>>>>> > > > >> > > automatically.
> > >>>>>>> > > > >> > > >> > You
> > >>>>>>> > > > >> > > >> > > > have
> > >>>>>>> > > > >> > > >> > > > > > to
> > >>>>>>> > > > >> > > >> > > > > > >>> explicitly call API to do local
> > >>>>>>> aggregation
> > >>>>>>> > > > >> > > >> > > > > > >>> as well as the trigger policy of
> the
> > >>>>>>> local
> > >>>>>>> > > > >> aggregation.
> > >>>>>>> > > > >> > > Take
> > >>>>>>> > > > >> > > >> > > > average
> > >>>>>>> > > > >> > > >> > > > > > for
> > >>>>>>> > > > >> > > >> > > > > > >>> example, the user program may look
> > >>>>>>> like this
> > >>>>>>> > > (just
> > >>>>>>> > > > a
> > >>>>>>> > > > >> > > draft):
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>> assuming the input type is
> > >>>>>>> > > DataStream<Tupl2<String,
> > >>>>>>> > > > >> > Int>>
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>> ds.localAggregate(
> > >>>>>>> > > > >> > > >> > > > > > >>>        0,
> > >>>>>>> > >  //
> > >>>>>>> > > > >> The
> > >>>>>>> > > > >> > > local
> > >>>>>>> > > > >> > > >> > key,
> > >>>>>>> > > > >> > > >> > > > > which
> > >>>>>>> > > > >> > > >> > > > > > >> is
> > >>>>>>> > > > >> > > >> > > > > > >>> the String from Tuple2
> > >>>>>>> > > > >> > > >> > > > > > >>>        PartitionAvg(1),
> > >>>>>>>  // The
> > >>>>>>> > > > >> partial
> > >>>>>>> > > > >> > > >> > > aggregation
> > >>>>>>> > > > >> > > >> > > > > > >>> function, produces Tuple2<Long,
> Int>,
> > >>>>>>> indicating
> > >>>>>>> > > > sum
> > >>>>>>> > > > >> and
> > >>>>>>> > > > >> > > >> count
> > >>>>>>> > > > >> > > >> > > > > > >>>        CountTrigger.of(1000L)    //
> > >>>>>>> Trigger
> > >>>>>>> > > policy,
> > >>>>>>> > > > >> note
> > >>>>>>> > > > >> > > >> this
> > >>>>>>> > > > >> > > >> > > > should
> > >>>>>>> > > > >> > > >> > > > > be
> > >>>>>>> > > > >> > > >> > > > > > >>> best effort, and also be composited
> > >>>>>>> with time
> > >>>>>>> > > based
> > >>>>>>> > > > >> or
> > >>>>>>> > > > >> > > >> memory
> > >>>>>>> > > > >> > > >> > > size
> > >>>>>>> > > > >> > > >> > > > > > based
> > >>>>>>> > > > >> > > >> > > > > > >>> trigger
> > >>>>>>> > > > >> > > >> > > > > > >>>    )
> > >>>>>>>          //
> > >>>>>>> > > > The
> > >>>>>>> > > > >> > > return
> > >>>>>>> > > > >> > > >> > type
> > >>>>>>> > > > >> > > >> > > > is
> > >>>>>>> > > > >> > > >> > > > > > >> local
> > >>>>>>> > > > >> > > >> > > > > > >>> aggregate Tuple2<String,
> Tupl2<Long,
> > >>>>>>> Int>>
> > >>>>>>> > > > >> > > >> > > > > > >>>    .keyBy(0)
> > >>>>>>>    //
> > >>>>>>> > > Further
> > >>>>>>> > > > >> > keyby
> > >>>>>>> > > > >> > > it
> > >>>>>>> > > > >> > > >> > with
> > >>>>>>> > > > >> > > >> > > > > > >> required
> > >>>>>>> > > > >> > > >> > > > > > >>> key
> > >>>>>>> > > > >> > > >> > > > > > >>>    .aggregate(1)
> > >>>>>>> // This
> > >>>>>>> > > will
> > >>>>>>> > > > >> merge
> > >>>>>>> > > > >> > > all
> > >>>>>>> > > > >> > > >> > the
> > >>>>>>> > > > >> > > >> > > > > > partial
> > >>>>>>> > > > >> > > >> > > > > > >>> results and get the final average.
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>> (This is only a draft, only trying
> to
> > >>>>>>> explain
> > >>>>>>> > > what
> > >>>>>>> > > > it
> > >>>>>>> > > > >> > > looks
> > >>>>>>> > > > >> > > >> > > like. )
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>> The local aggregate operator can be
> > >>>>>>> stateless, we
> > >>>>>>> > > > can
> > >>>>>>> > > > >> > > keep a
> > >>>>>>> > > > >> > > >> > > memory
> > >>>>>>> > > > >> > > >> > > > > > >> buffer
> > >>>>>>> > > > >> > > >> > > > > > >>> or other efficient data structure
> to
> > >>>>>>> improve the
> > >>>>>>> > > > >> > aggregate
> > >>>>>>> > > > >> > > >> > > > > performance.
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>> Let me know if you have any other
> > >>>>>>> questions.
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>> Best,
> > >>>>>>> > > > >> > > >> > > > > > >>> Kurt
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM
> vino
> > >>>>>>> yang <
> > >>>>>>> > > > >> > > >> > yanghua1...@gmail.com <mailto:
> > >>>>>>> yanghua1...@gmail.com>
> > >>>>>>> > > > >> > > >> > > >
> > >>>>>>> > > > >> > > >> > > > > > wrote:
> > >>>>>>> > > > >> > > >> > > > > > >>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> Hi Kurt,
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> Thanks for your reply.
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> Actually, I am not against you to
> > >>>>>>> raise your
> > >>>>>>> > > > design.
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> From your description before, I
> just
> > >>>>>>> can imagine
> > >>>>>>> > > > >> your
> > >>>>>>> > > > >> > > >> > high-level
> > >>>>>>> > > > >> > > >> > > > > > >>>> implementation is about SQL and
> the
> > >>>>>>> optimization
> > >>>>>>> > > > is
> > >>>>>>> > > > >> > inner
> > >>>>>>> > > > >> > > >> of
> > >>>>>>> > > > >> > > >> > the
> > >>>>>>> > > > >> > > >> > > > > API.
> > >>>>>>> > > > >> > > >> > > > > > >> Is
> > >>>>>>> > > > >> > > >> > > > > > >>> it
> > >>>>>>> > > > >> > > >> > > > > > >>>> automatically? how to give the
> > >>>>>>> configuration
> > >>>>>>> > > > option
> > >>>>>>> > > > >> > about
> > >>>>>>> > > > >> > > >> > > trigger
> > >>>>>>> > > > >> > > >> > > > > > >>>> pre-aggregation?
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> Maybe after I get more
> information,
> > >>>>>>> it sounds
> > >>>>>>> > > more
> > >>>>>>> > > > >> > > >> reasonable.
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> IMO, first of all, it would be
> > better
> > >>>>>>> to make
> > >>>>>>> > > your
> > >>>>>>> > > > >> user
> > >>>>>>> > > > >> > > >> > > interface
> > >>>>>>> > > > >> > > >> > > > > > >>> concrete,
> > >>>>>>> > > > >> > > >> > > > > > >>>> it's the basis of the discussion.
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> For example, can you give an
> example
> > >>>>>>> code
> > >>>>>>> > > snippet
> > >>>>>>> > > > to
> > >>>>>>> > > > >> > > >> introduce
> > >>>>>>> > > > >> > > >> > > how
> > >>>>>>> > > > >> > > >> > > > > to
> > >>>>>>> > > > >> > > >> > > > > > >>> help
> > >>>>>>> > > > >> > > >> > > > > > >>>> users to process data skew caused
> by
> > >>>>>>> the jobs
> > >>>>>>> > > > which
> > >>>>>>> > > > >> > built
> > >>>>>>> > > > >> > > >> with
> > >>>>>>> > > > >> > > >> > > > > > >> DataStream
> > >>>>>>> > > > >> > > >> > > > > > >>>> API?
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> If you give more details we can
> > >>>>>>> discuss further
> > >>>>>>> > > > >> more. I
> > >>>>>>> > > > >> > > >> think
> > >>>>>>> > > > >> > > >> > if
> > >>>>>>> > > > >> > > >> > > > one
> > >>>>>>> > > > >> > > >> > > > > > >>> design
> > >>>>>>> > > > >> > > >> > > > > > >>>> introduces an exact interface and
> > >>>>>>> another does
> > >>>>>>> > > > not.
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> The implementation has an obvious
> > >>>>>>> difference.
> > >>>>>>> > > For
> > >>>>>>> > > > >> > > example,
> > >>>>>>> > > > >> > > >> we
> > >>>>>>> > > > >> > > >> > > > > > introduce
> > >>>>>>> > > > >> > > >> > > > > > >>> an
> > >>>>>>> > > > >> > > >> > > > > > >>>> exact API in DataStream named
> > >>>>>>> localKeyBy, about
> > >>>>>>> > > > the
> > >>>>>>> > > > >> > > >> > > > pre-aggregation
> > >>>>>>> > > > >> > > >> > > > > we
> > >>>>>>> > > > >> > > >> > > > > > >>> need
> > >>>>>>> > > > >> > > >> > > > > > >>>> to define the trigger mechanism of
> > >>>>>>> local
> > >>>>>>> > > > >> aggregation,
> > >>>>>>> > > > >> > so
> > >>>>>>> > > > >> > > we
> > >>>>>>> > > > >> > > >> > find
> > >>>>>>> > > > >> > > >> > > > > > reused
> > >>>>>>> > > > >> > > >> > > > > > >>>> window API and operator is a good
> > >>>>>>> choice. This
> > >>>>>>> > > is
> > >>>>>>> > > > a
> > >>>>>>> > > > >> > > >> reasoning
> > >>>>>>> > > > >> > > >> > > link
> > >>>>>>> > > > >> > > >> > > > > > from
> > >>>>>>> > > > >> > > >> > > > > > >>>> design to implementation.
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> What do you think?
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> Best,
> > >>>>>>> > > > >> > > >> > > > > > >>>> Vino
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>> Kurt Young <ykt...@gmail.com
> > <mailto:
> > >>>>>>> ykt...@gmail.com>> 于2019年6月18日周二
> > >>>>>>> > > > >> 上午11:58写道:
> > >>>>>>> > > > >> > > >> > > > > > >>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>>> Hi Vino,
> > >>>>>>> > > > >> > > >> > > > > > >>>>>
> > >>>>>>> > > > >> > > >> > > > > > >>>>> Now I feel that we may have
> > different
> > >>>>>>> > > > >> understandings
> > >>>>>>> > > > >> > > about
> > >>>>>>> > > > >> > > >> > what
> > >>>>>>> > > > >> > > >> > > > > kind
> > >>>>>>> > > > >> > > >> > > > > > >> of
> > >>>>>>> > > > >> > > >> > > > > > >>>>> problems or improvements you want
> > to
> > >>>>>>> > > > >> > > >> > > > > > >>>>> resolve. Currently, most of the
> > >>>>>>> feedback are
> > >>>>>>> > > > >> focusing
> > >>>>>>> > > > >> > on
> > >>>>>>> > > > >> > > >> *how
> > >>>>>>> > > > >> > > >> > > to
> > >>>>>>> > > > >> > > >> > > > > do a
> > >>>>>>> > > > >> > > >
> > >>>>>>
> > >>>>>>
> >
>

Reply via email to