Hi all, I also think it's a good idea that we need to agree on the API level first.
I am sorry, we did not give some usage examples of the API in the FLIP documentation before. This may have caused some misunderstandings about the discussion of this mail thread. So, now I have added some usage examples in the "Public Interfaces" section of the FLIP-44 documentation. Let us first know the API through its use examples. Any feedback and questions please let me know. Best, Vino vino yang <yanghua1...@gmail.com> 于2019年6月27日周四 下午12:51写道: > Hi Jark, > > `DataStream.localKeyBy().process()` has some key difference with > `DataStream.process()`. The former API receive `KeyedProcessFunction` > (sorry my previous reply may let you misunderstood), the latter receive API > receive `ProcessFunction`. When you read the java doc of ProcessFunction, > you can find a "*Note*" statement: > > Access to keyed state and timers (which are also scoped to a key) is only >> available if the ProcessFunction is applied on a KeyedStream. > > > In addition, you can also compare the two > implementations(`ProcessOperator` and `KeyedProcessOperator`) of them to > view the difference. > > IMO, the "Note" statement means a lot for many use scenarios. For example, > if we cannot access keyed state, we can only use heap memory to buffer data > while it does not guarantee the semantics of correctness! And the timer is > also very important in some scenarios. > > That's why we say our API is flexible, it can get most benefits (even > subsequent potential benefits in the future) from KeyedStream. > > I have added some instructions on the use of localKeyBy in the FLIP-44 > documentation. > > Best, > Vino > > > Jark Wu <imj...@gmail.com> 于2019年6月27日周四 上午10:44写道: > >> Hi Piotr, >> >> I think the state migration you raised is a good point. Having >> "stream.enableLocalAggregation(Trigger)” might add some implicit operators >> which users can't set uid and cause the state compatibility/evolution >> problems. >> So let's put this in rejected alternatives. >> >> Hi Vino, >> >> You mentioned several times that "DataStream.localKeyBy().process()" can >> solve the data skew problem of "DataStream.keyBy().process()". >> I'm curious about what's the differences between "DataStream.process()" >> and "DataStream.localKeyBy().process()"? >> Can't "DataStream.process()" solve the data skew problem? >> >> Best, >> Jark >> >> >> On Wed, 26 Jun 2019 at 18:20, Piotr Nowojski <pi...@ververica.com> wrote: >> >>> Hi Jark and Vino, >>> >>> I agree fully with Jark, that in order to have the discussion focused >>> and to limit the number of parallel topics, we should first focus on one >>> topic. We can first decide on the API and later we can discuss the runtime >>> details. At least as long as we keep the potential requirements of the >>> runtime part in mind while designing the API. >>> >>> Regarding the automatic optimisation and proposed by Jark: >>> >>> "stream.enableLocalAggregation(Trigger)” >>> >>> I would be against that in the DataStream API for the reasons that Vino >>> presented. There was a discussion thread about future directions of Table >>> API vs DataStream API and the consensus was that the automatic >>> optimisations are one of the dividing lines between those two, for at least >>> a couple of reasons. Flexibility and full control over the program was one >>> of them. Another is state migration. Having >>> "stream.enableLocalAggregation(Trigger)” that might add some implicit >>> operators in the job graph can cause problems with savepoint/checkpoint >>> compatibility. >>> >>> However I haven’t thought about/looked into the details of the Vino’s >>> API proposal, so I can not fully judge it. >>> >>> Piotrek >>> >>> > On 26 Jun 2019, at 09:17, vino yang <yanghua1...@gmail.com> wrote: >>> > >>> > Hi Jark, >>> > >>> > Similar questions and responses have been repeated many times. >>> > >>> > Why didn't we spend more sections discussing the API? >>> > >>> > Because we try to reuse the ability of KeyedStream. The localKeyBy API >>> just returns the KeyedStream, that's our design, we can get all the benefit >>> from the KeyedStream and get further benefit from WindowedStream. The APIs >>> come from KeyedStream and WindowedStream is long-tested and flexible. Yes, >>> we spend much space discussing the local keyed state, that's not the goal >>> and motivation, that's the way to implement local aggregation. It is much >>> more complicated than the API we introduced, so we spent more section. Of >>> course, this is the implementation level of the Operator. We also agreed to >>> support the implementation of buffer+flush and added related instructions >>> to the documentation. This needs to wait for the community to recognize, >>> and if the community agrees, we will give more instructions. What's more, I >>> have indicated before that we welcome state-related commenters to >>> participate in the discussion, but it is not wise to modify the FLIP title. >>> > >>> > About the API of local aggregation: >>> > >>> > I don't object to ease of use is very important. But IMHO flexibility >>> is the most important at the DataStream API level. Otherwise, what does >>> DataStream mean? The significance of the DataStream API is that it is more >>> flexible than Table/SQL, if it cannot provide this point then everyone >>> would just use Table/SQL. >>> > >>> > The DataStream API should focus more on flexibility than on automatic >>> optimization, which allows users to have more possibilities to implement >>> complex programs and meet specific scenarios. There are a lot of programs >>> written using the DataStream API that are far more complex than we think. >>> It is very difficult to optimize at the API level and the benefit is very >>> low. >>> > >>> > I want to say that we support a more generalized local aggregation. I >>> mentioned in the previous reply that not only the UDF that implements >>> AggregateFunction is called aggregation. In some complex scenarios, we have >>> to support local aggregation through ProcessFunction and >>> ProcessWindowFunction to solve the data skew problem. How do you support >>> them in the API implementation and optimization you mentioned? >>> > >>> > Flexible APIs are arbitrarily combined to result in erroneous >>> semantics, which does not prove that flexibility is meaningless because the >>> user is the decision maker. I have been exemplified many times, for many >>> APIs in DataStream, if we arbitrarily combined them, they also do not have >>> much practical significance. So, users who use flexible APIs need to >>> understand what they are doing and what is the right choice. >>> > >>> > I think that if we discuss this, there will be no result. >>> > >>> > @Stephan Ewen <mailto:se...@apache.org> , @Aljoscha Krettek <mailto: >>> aljos...@apache.org> and @Piotr Nowojski <mailto:pi...@ververica.com> >>> Do you have further comments? >>> > >>> > >>> > Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> 于2019年6月26日周三 >>> 上午11:46写道: >>> > Thanks for the long discussion Vino, Kurt, Hequn, Piotr and others, >>> > >>> > It seems that we still have some different ideas about the API >>> > (localKeyBy()?) and implementation details (reuse window operator? >>> local >>> > keyed state?). >>> > And the discussion is stalled and mixed with motivation and API and >>> > implementation discussion. >>> > >>> > In order to make some progress in this topic, I want to summarize the >>> > points (pls correct me if I'm wrong or missing sth) and would suggest >>> to >>> > split >>> > the topic into following aspects and discuss them one by one. >>> > >>> > 1) What's the main purpose of this FLIP? >>> > - From the title of this FLIP, it is to support local aggregate. >>> However >>> > from the content of the FLIP, 80% are introducing a new state called >>> local >>> > keyed state. >>> > - If we mainly want to introduce local keyed state, then we should >>> > re-title the FLIP and involve in more people who works on state. >>> > - If we mainly want to support local aggregate, then we can jump to >>> step 2 >>> > to discuss the API design. >>> > >>> > 2) What does the API look like? >>> > - Vino proposed to use "localKeyBy()" to do local process, the output >>> of >>> > local process is the result type of aggregate function. >>> > a) For non-windowed aggregate: >>> > input.localKeyBy(..).aggregate(agg1).keyBy(..).aggregate(agg2) **NOT >>> > SUPPORT** >>> > b) For windowed aggregate: >>> > >>> input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2) >>> > >>> > 3) What's the implementation detail? >>> > - may reuse window operator or not. >>> > - may introduce a new state concepts or not. >>> > - may not have state in local operator by flushing buffers in >>> > prepareSnapshotPreBarrier >>> > - and so on... >>> > - we can discuss these later when we reach a consensus on API >>> > >>> > -------------------- >>> > >>> > Here are my thoughts: >>> > >>> > 1) Purpose of this FLIP >>> > - From the motivation section in the FLIP, I think the purpose is to >>> > support local aggregation to solve the data skew issue. >>> > Then I think we should focus on how to provide a easy to use and >>> clear >>> > API to support **local aggregation**. >>> > - Vino's point is centered around the local keyed state API (or >>> > localKeyBy()), and how to leverage the local keyed state API to support >>> > local aggregation. >>> > But I'm afraid it's not a good way to design API for local >>> aggregation. >>> > >>> > 2) local aggregation API >>> > - IMO, the method call chain >>> > >>> "input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)" >>> > is not such easy to use. >>> > Because we have to provide two implementation for an aggregation >>> (one >>> > for partial agg, another for final agg). And we have to take care of >>> > the first window call, an inappropriate window call will break the >>> > sematics. >>> > - From my point of view, local aggregation is a mature concept which >>> > should output the intermediate accumulator (ACC) in the past period of >>> time >>> > (a trigger). >>> > And the downstream final aggregation will merge ACCs received from >>> local >>> > side, and output the current final result. >>> > - The current "AggregateFunction" API in DataStream already has the >>> > accumulator type and "merge" method. So the only thing user need to do >>> is >>> > how to enable >>> > local aggregation opimization and set a trigger. >>> > - One idea comes to my head is that, assume we have a windowed >>> aggregation >>> > stream: "val stream = input.keyBy().window(w).aggregate(agg)". We can >>> > provide an API on the stream. >>> > For exmaple, "stream.enableLocalAggregation(Trigger)", the trigger >>> can >>> > be "ContinuousEventTimeTrigger.of(Time.of(Time.minutes(1)))". Then it >>> will >>> > be optmized into >>> > local operator + final operator, and local operator will combine >>> records >>> > every minute on event time. >>> > - In this way, there is only one line added, and the output is the >>> same >>> > with before, because it is just an opimization. >>> > >>> > >>> > Regards, >>> > Jark >>> > >>> > >>> > >>> > On Tue, 25 Jun 2019 at 14:34, vino yang <yanghua1...@gmail.com >>> <mailto:yanghua1...@gmail.com>> wrote: >>> > >>> > > Hi Kurt, >>> > > >>> > > Answer your questions: >>> > > >>> > > a) Sorry, I just updated the Google doc, still have no time update >>> the >>> > > FLIP, will update FLIP as soon as possible. >>> > > About your description at this point, I have a question, what does >>> it mean: >>> > > how do we combine with >>> > > `AggregateFunction`? >>> > > >>> > > I have shown you the examples which Flink has supported: >>> > > >>> > > - input.localKeyBy(0).aggregate() >>> > > - input.localKeyBy(0).window().aggregate() >>> > > >>> > > You can show me a example about how do we combine with >>> `AggregateFuncion` >>> > > through your localAggregate API. >>> > > >>> > > About the example, how to do the local aggregation for AVG, consider >>> this >>> > > code: >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > *DataStream<Tuple2<String, Long>> source = null; source >>> .localKeyBy(0) >>> > > .timeWindow(Time.seconds(60)) .aggregate(agg1, new >>> > > WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long, Long>, >>> String, >>> > > TimeWindow>() {}) .keyBy(0) .timeWindow(Time.seconds(60)) >>> .aggregate(agg2, >>> > > new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>, String, >>> > > TimeWindow>());* >>> > > >>> > > *agg1:* >>> > > *signature : new AggregateFunction<Tuple2<String, Long>, Tuple2<Long, >>> > > Long>, Tuple2<Long, Long>>() {}* >>> > > *input param type: Tuple2<String, Long> f0: key, f1: value* >>> > > *intermediate result type: Tuple2<Long, Long>, f0: local aggregated >>> sum; >>> > > f1: local aggregated count* >>> > > *output param type: Tuple2<Long, Long>, f0: local aggregated sum; >>> f1: >>> > > local aggregated count* >>> > > >>> > > *agg2:* >>> > > *signature: new AggregateFunction<Tuple3<String, Long, Long>, Long, >>> > > Tuple2<String, Long>>() {},* >>> > > *input param type: Tuple3<String, Long, Long>, f0: key, f1: local >>> > > aggregated sum; f2: local aggregated count* >>> > > >>> > > *intermediate result type: Long avg result* >>> > > *output param type: Tuple2<String, Long> f0: key, f1 avg result* >>> > > >>> > > For sliding window, we just need to change the window type if users >>> want to >>> > > do. >>> > > Again, we try to give the design and implementation in the DataStream >>> > > level. So I believe we can match all the requirements(It's just that >>> the >>> > > implementation may be different) comes from the SQL level. >>> > > >>> > > b) Yes, Theoretically, your thought is right. But in reality, it >>> cannot >>> > > bring many benefits. >>> > > If we want to get the benefits from the window API, while we do not >>> reuse >>> > > the window operator? And just copy some many duplicated code to >>> another >>> > > operator? >>> > > >>> > > c) OK, I agree to let the state backend committers join this >>> discussion. >>> > > >>> > > Best, >>> > > Vino >>> > > >>> > > >>> > > Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>> >>> 于2019年6月24日周一 下午6:53写道: >>> > > >>> > > > Hi vino, >>> > > > >>> > > > One thing to add, for a), I think use one or two examples like >>> how to do >>> > > > local aggregation on a sliding window, >>> > > > and how do we do local aggregation on an unbounded aggregate, will >>> do a >>> > > lot >>> > > > help. >>> > > > >>> > > > Best, >>> > > > Kurt >>> > > > >>> > > > >>> > > > On Mon, Jun 24, 2019 at 6:06 PM Kurt Young <ykt...@gmail.com >>> <mailto:ykt...@gmail.com>> wrote: >>> > > > >>> > > > > Hi vino, >>> > > > > >>> > > > > I think there are several things still need discussion. >>> > > > > >>> > > > > a) We all agree that we should first go with a unified >>> abstraction, but >>> > > > > the abstraction is not reflected by the FLIP. >>> > > > > If your answer is "locakKeyBy" API, then I would ask how do we >>> combine >>> > > > > with `AggregateFunction`, and how do >>> > > > > we do proper local aggregation for those have different >>> intermediate >>> > > > > result type, like AVG. Could you add these >>> > > > > to the document? >>> > > > > >>> > > > > b) From implementation side, reusing window operator is one of >>> the >>> > > > > possible solutions, but not we base on window >>> > > > > operator to have two different implementations. What I >>> understanding >>> > > is, >>> > > > > one of the possible implementations should >>> > > > > not touch window operator. >>> > > > > >>> > > > > c) 80% of your FLIP content is actually describing how do we >>> support >>> > > > local >>> > > > > keyed state. I don't know if this is necessary >>> > > > > to introduce at the first step and we should also involve >>> committers >>> > > work >>> > > > > on state backend to share their thoughts. >>> > > > > >>> > > > > Best, >>> > > > > Kurt >>> > > > > >>> > > > > >>> > > > > On Mon, Jun 24, 2019 at 5:17 PM vino yang <yanghua1...@gmail.com >>> <mailto:yanghua1...@gmail.com>> >>> > > wrote: >>> > > > > >>> > > > >> Hi Kurt, >>> > > > >> >>> > > > >> You did not give more further different opinions, so I thought >>> you >>> > > have >>> > > > >> agreed with the design after we promised to support two kinds of >>> > > > >> implementation. >>> > > > >> >>> > > > >> In API level, we have answered your question about pass an >>> > > > >> AggregateFunction to do the aggregation. No matter introduce >>> > > localKeyBy >>> > > > >> API >>> > > > >> or not, we can support AggregateFunction. >>> > > > >> >>> > > > >> So what's your different opinion now? Can you share it with us? >>> > > > >> >>> > > > >> Best, >>> > > > >> Vino >>> > > > >> >>> > > > >> Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>> >>> 于2019年6月24日周一 下午4:24写道: >>> > > > >> >>> > > > >> > Hi vino, >>> > > > >> > >>> > > > >> > Sorry I don't see the consensus about reusing window operator >>> and >>> > > keep >>> > > > >> the >>> > > > >> > API design of localKeyBy. But I think we should definitely >>> more >>> > > > thoughts >>> > > > >> > about this topic. >>> > > > >> > >>> > > > >> > I also try to loop in Stephan for this discussion. >>> > > > >> > >>> > > > >> > Best, >>> > > > >> > Kurt >>> > > > >> > >>> > > > >> > >>> > > > >> > On Mon, Jun 24, 2019 at 3:26 PM vino yang < >>> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> >>> > > > >> wrote: >>> > > > >> > >>> > > > >> > > Hi all, >>> > > > >> > > >>> > > > >> > > I am happy we have a wonderful discussion and received many >>> > > valuable >>> > > > >> > > opinions in the last few days. >>> > > > >> > > >>> > > > >> > > Now, let me try to summarize what we have reached consensus >>> about >>> > > > the >>> > > > >> > > changes in the design. >>> > > > >> > > >>> > > > >> > > - provide a unified abstraction to support two kinds of >>> > > > >> > implementation; >>> > > > >> > > - reuse WindowOperator and try to enhance it so that we >>> can >>> > > make >>> > > > >> the >>> > > > >> > > intermediate result of the local aggregation can be >>> buffered >>> > > and >>> > > > >> > > flushed to >>> > > > >> > > support two kinds of implementation; >>> > > > >> > > - keep the API design of localKeyBy, but declare the >>> disabled >>> > > > some >>> > > > >> > APIs >>> > > > >> > > we cannot support currently, and provide a configurable >>> API for >>> > > > >> users >>> > > > >> > to >>> > > > >> > > choose how to handle intermediate result; >>> > > > >> > > >>> > > > >> > > The above three points have been updated in the design doc. >>> Any >>> > > > >> > > questions, please let me know. >>> > > > >> > > >>> > > > >> > > @Aljoscha Krettek <aljos...@apache.org <mailto: >>> aljos...@apache.org>> What do you think? Any >>> > > > >> further >>> > > > >> > > comments? >>> > > > >> > > >>> > > > >> > > Best, >>> > > > >> > > Vino >>> > > > >> > > >>> > > > >> > > vino yang <yanghua1...@gmail.com <mailto: >>> yanghua1...@gmail.com>> 于2019年6月20日周四 下午2:02写道: >>> > > > >> > > >>> > > > >> > > > Hi Kurt, >>> > > > >> > > > >>> > > > >> > > > Thanks for your comments. >>> > > > >> > > > >>> > > > >> > > > It seems we come to a consensus that we should alleviate >>> the >>> > > > >> > performance >>> > > > >> > > > degraded by data skew with local aggregation. In this >>> FLIP, our >>> > > > key >>> > > > >> > > > solution is to introduce local keyed partition to achieve >>> this >>> > > > goal. >>> > > > >> > > > >>> > > > >> > > > I also agree that we can benefit a lot from the usage of >>> > > > >> > > > AggregateFunction. In combination with localKeyBy, We can >>> easily >>> > > > >> use it >>> > > > >> > > to >>> > > > >> > > > achieve local aggregation: >>> > > > >> > > > >>> > > > >> > > > - input.localKeyBy(0).aggregate() >>> > > > >> > > > - input.localKeyBy(0).window().aggregate() >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > I think the only problem here is the choices between >>> > > > >> > > > >>> > > > >> > > > - (1) Introducing a new primitive called localKeyBy and >>> > > > implement >>> > > > >> > > > local aggregation with existing operators, or >>> > > > >> > > > - (2) Introducing an operator called localAggregation >>> which >>> > > is >>> > > > >> > > > composed of a key selector, a window-like operator, >>> and an >>> > > > >> aggregate >>> > > > >> > > > function. >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > There may exist some optimization opportunities by >>> providing a >>> > > > >> > composited >>> > > > >> > > > interface for local aggregation. But at the same time, in >>> my >>> > > > >> opinion, >>> > > > >> > we >>> > > > >> > > > lose flexibility (Or we need certain efforts to achieve >>> the same >>> > > > >> > > > flexibility). >>> > > > >> > > > >>> > > > >> > > > As said in the previous mails, we have many use cases >>> where the >>> > > > >> > > > aggregation is very complicated and cannot be performed >>> with >>> > > > >> > > > AggregateFunction. For example, users may perform windowed >>> > > > >> aggregations >>> > > > >> > > > according to time, data values, or even external storage. >>> > > > Typically, >>> > > > >> > they >>> > > > >> > > > now use KeyedProcessFunction or customized triggers to >>> implement >>> > > > >> these >>> > > > >> > > > aggregations. It's not easy to address data skew in such >>> cases >>> > > > with >>> > > > >> a >>> > > > >> > > > composited interface for local aggregation. >>> > > > >> > > > >>> > > > >> > > > Given that Data Stream API is exactly targeted at these >>> cases >>> > > > where >>> > > > >> the >>> > > > >> > > > application logic is very complicated and optimization >>> does not >>> > > > >> > matter, I >>> > > > >> > > > think it's a better choice to provide a relatively >>> low-level and >>> > > > >> > > canonical >>> > > > >> > > > interface. >>> > > > >> > > > >>> > > > >> > > > The composited interface, on the other side, may be a good >>> > > choice >>> > > > in >>> > > > >> > > > declarative interfaces, including SQL and Table API, as it >>> > > allows >>> > > > >> more >>> > > > >> > > > optimization opportunities. >>> > > > >> > > > >>> > > > >> > > > Best, >>> > > > >> > > > Vino >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>> >>> 于2019年6月20日周四 上午10:15写道: >>> > > > >> > > > >>> > > > >> > > >> Hi all, >>> > > > >> > > >> >>> > > > >> > > >> As vino said in previous emails, I think we should first >>> > > discuss >>> > > > >> and >>> > > > >> > > >> decide >>> > > > >> > > >> what kind of use cases this FLIP want to >>> > > > >> > > >> resolve, and what the API should look like. From my >>> side, I >>> > > think >>> > > > >> this >>> > > > >> > > is >>> > > > >> > > >> probably the root cause of current divergence. >>> > > > >> > > >> >>> > > > >> > > >> My understand is (from the FLIP title and motivation >>> section of >>> > > > the >>> > > > >> > > >> document), we want to have a proper support of >>> > > > >> > > >> local aggregation, or pre aggregation. This is not a >>> very new >>> > > > idea, >>> > > > >> > most >>> > > > >> > > >> SQL engine already did this improvement. And >>> > > > >> > > >> the core concept about this is, there should be an >>> > > > >> AggregateFunction, >>> > > > >> > no >>> > > > >> > > >> matter it's a Flink runtime's AggregateFunction or >>> > > > >> > > >> SQL's UserDefinedAggregateFunction. Both aggregation have >>> > > concept >>> > > > >> of >>> > > > >> > > >> intermediate data type, sometimes we call it ACC. >>> > > > >> > > >> I quickly went through the POC piotr did before [1], it >>> also >>> > > > >> directly >>> > > > >> > > uses >>> > > > >> > > >> AggregateFunction. >>> > > > >> > > >> >>> > > > >> > > >> But the thing is, after reading the design of this FLIP, >>> I >>> > > can't >>> > > > >> help >>> > > > >> > > >> myself feeling that this FLIP is not targeting to have a >>> proper >>> > > > >> > > >> local aggregation support. It actually want to introduce >>> > > another >>> > > > >> > > concept: >>> > > > >> > > >> LocalKeyBy, and how to split and merge local key groups, >>> > > > >> > > >> and how to properly support state on local key. Local >>> > > aggregation >>> > > > >> just >>> > > > >> > > >> happened to be one possible use case of LocalKeyBy. >>> > > > >> > > >> But it lacks supporting the essential concept of local >>> > > > aggregation, >>> > > > >> > > which >>> > > > >> > > >> is intermediate data type. Without this, I really don't >>> thing >>> > > > >> > > >> it is a good fit of local aggregation. >>> > > > >> > > >> >>> > > > >> > > >> Here I want to make sure of the scope or the goal about >>> this >>> > > > FLIP, >>> > > > >> do >>> > > > >> > we >>> > > > >> > > >> want to have a proper local aggregation engine, or we >>> > > > >> > > >> just want to introduce a new concept called LocalKeyBy? >>> > > > >> > > >> >>> > > > >> > > >> [1]: https://github.com/apache/flink/pull/4626 < >>> https://github.com/apache/flink/pull/4626> >>> > > > >> > > >> >>> > > > >> > > >> Best, >>> > > > >> > > >> Kurt >>> > > > >> > > >> >>> > > > >> > > >> >>> > > > >> > > >> On Wed, Jun 19, 2019 at 5:13 PM vino yang < >>> > > yanghua1...@gmail.com <mailto:yanghua1...@gmail.com> >>> > > > > >>> > > > >> > > wrote: >>> > > > >> > > >> >>> > > > >> > > >> > Hi Hequn, >>> > > > >> > > >> > >>> > > > >> > > >> > Thanks for your comments! >>> > > > >> > > >> > >>> > > > >> > > >> > I agree that allowing local aggregation reusing window >>> API >>> > > and >>> > > > >> > > refining >>> > > > >> > > >> > window operator to make it match both requirements >>> (come from >>> > > > our >>> > > > >> > and >>> > > > >> > > >> Kurt) >>> > > > >> > > >> > is a good decision! >>> > > > >> > > >> > >>> > > > >> > > >> > Concerning your questions: >>> > > > >> > > >> > >>> > > > >> > > >> > 1. The result of >>> input.localKeyBy(0).sum(1).keyBy(0).sum(1) >>> > > may >>> > > > >> be >>> > > > >> > > >> > meaningless. >>> > > > >> > > >> > >>> > > > >> > > >> > Yes, it does not make sense in most cases. However, I >>> also >>> > > want >>> > > > >> to >>> > > > >> > > note >>> > > > >> > > >> > users should know the right semantics of localKeyBy >>> and use >>> > > it >>> > > > >> > > >> correctly. >>> > > > >> > > >> > Because this issue also exists for the global keyBy, >>> consider >>> > > > >> this >>> > > > >> > > >> example: >>> > > > >> > > >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is >>> also >>> > > > >> > meaningless. >>> > > > >> > > >> > >>> > > > >> > > >> > 2. About the semantics of >>> > > > >> > > >> > >>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). >>> > > > >> > > >> > >>> > > > >> > > >> > Good catch! I agree with you that it's not good to >>> enable all >>> > > > >> > > >> > functionalities for localKeyBy from KeyedStream. >>> > > > >> > > >> > Currently, We do not support some APIs such as >>> > > > >> > > >> > connect/join/intervalJoin/coGroup. This is due to that >>> we >>> > > force >>> > > > >> the >>> > > > >> > > >> > operators on LocalKeyedStreams chained with the inputs. >>> > > > >> > > >> > >>> > > > >> > > >> > Best, >>> > > > >> > > >> > Vino >>> > > > >> > > >> > >>> > > > >> > > >> > >>> > > > >> > > >> > Hequn Cheng <chenghe...@gmail.com <mailto: >>> chenghe...@gmail.com>> 于2019年6月19日周三 下午3:42写道: >>> > > > >> > > >> > >>> > > > >> > > >> > > Hi, >>> > > > >> > > >> > > >>> > > > >> > > >> > > Thanks a lot for your great discussion and great to >>> see >>> > > that >>> > > > >> some >>> > > > >> > > >> > agreement >>> > > > >> > > >> > > has been reached on the "local aggregate engine"! >>> > > > >> > > >> > > >>> > > > >> > > >> > > ===> Considering the abstract engine, >>> > > > >> > > >> > > I'm thinking is it valuable for us to extend the >>> current >>> > > > >> window to >>> > > > >> > > >> meet >>> > > > >> > > >> > > both demands raised by Kurt and Vino? There are some >>> > > benefits >>> > > > >> we >>> > > > >> > can >>> > > > >> > > >> get: >>> > > > >> > > >> > > >>> > > > >> > > >> > > 1. The interfaces of the window are complete and >>> clear. >>> > > With >>> > > > >> > > windows, >>> > > > >> > > >> we >>> > > > >> > > >> > > can define a lot of ways to split the data and >>> perform >>> > > > >> different >>> > > > >> > > >> > > computations. >>> > > > >> > > >> > > 2. We can also leverage the window to do miniBatch >>> for the >>> > > > >> global >>> > > > >> > > >> > > aggregation, i.e, we can use the window to bundle >>> data >>> > > belong >>> > > > >> to >>> > > > >> > the >>> > > > >> > > >> same >>> > > > >> > > >> > > key, for every bundle we only need to read and write >>> once >>> > > > >> state. >>> > > > >> > > This >>> > > > >> > > >> can >>> > > > >> > > >> > > greatly reduce state IO and improve performance. >>> > > > >> > > >> > > 3. A lot of other use cases can also benefit from the >>> > > window >>> > > > >> base >>> > > > >> > on >>> > > > >> > > >> > memory >>> > > > >> > > >> > > or stateless. >>> > > > >> > > >> > > >>> > > > >> > > >> > > ===> As for the API, >>> > > > >> > > >> > > I think it is good to make our API more flexible. >>> However, >>> > > we >>> > > > >> may >>> > > > >> > > >> need to >>> > > > >> > > >> > > make our API meaningful. >>> > > > >> > > >> > > >>> > > > >> > > >> > > Take my previous reply as an example, >>> > > > >> > > >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The >>> result may >>> > > be >>> > > > >> > > >> > meaningless. >>> > > > >> > > >> > > Another example I find is the intervalJoin, e.g., >>> > > > >> > > >> > > >>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In >>> > > > >> this >>> > > > >> > > >> case, it >>> > > > >> > > >> > > will bring problems if input1 and input2 share >>> different >>> > > > >> > > parallelism. >>> > > > >> > > >> We >>> > > > >> > > >> > > don't know which input should the join chained with? >>> Even >>> > > if >>> > > > >> they >>> > > > >> > > >> share >>> > > > >> > > >> > the >>> > > > >> > > >> > > same parallelism, it's hard to tell what the join is >>> doing. >>> > > > >> There >>> > > > >> > > are >>> > > > >> > > >> > maybe >>> > > > >> > > >> > > some other problems. >>> > > > >> > > >> > > >>> > > > >> > > >> > > From this point of view, it's at least not good to >>> enable >>> > > all >>> > > > >> > > >> > > functionalities for localKeyBy from KeyedStream? >>> > > > >> > > >> > > >>> > > > >> > > >> > > Great to also have your opinions. >>> > > > >> > > >> > > >>> > > > >> > > >> > > Best, Hequn >>> > > > >> > > >> > > >>> > > > >> > > >> > > >>> > > > >> > > >> > > >>> > > > >> > > >> > > >>> > > > >> > > >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang < >>> > > > >> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com> >>> > > > >> > > >>> > > > >> > > >> > wrote: >>> > > > >> > > >> > > >>> > > > >> > > >> > > > Hi Kurt and Piotrek, >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > Thanks for your comments. >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > I agree that we can provide a better abstraction >>> to be >>> > > > >> > compatible >>> > > > >> > > >> with >>> > > > >> > > >> > > two >>> > > > >> > > >> > > > different implementations. >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > First of all, I think we should consider what kind >>> of >>> > > > >> scenarios >>> > > > >> > we >>> > > > >> > > >> need >>> > > > >> > > >> > > to >>> > > > >> > > >> > > > support in *API* level? >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > We have some use cases which need to a customized >>> > > > aggregation >>> > > > >> > > >> through >>> > > > >> > > >> > > > KeyedProcessFunction, (in the usage of our >>> > > > localKeyBy.window >>> > > > >> > they >>> > > > >> > > >> can >>> > > > >> > > >> > use >>> > > > >> > > >> > > > ProcessWindowFunction). >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > Shall we support these flexible use scenarios? >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > Best, >>> > > > >> > > >> > > > Vino >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > Kurt Young <ykt...@gmail.com <mailto: >>> ykt...@gmail.com>> 于2019年6月18日周二 下午8:37写道: >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > > Hi Piotr, >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > > Thanks for joining the discussion. Make “local >>> > > > aggregation" >>> > > > >> > > >> abstract >>> > > > >> > > >> > > > enough >>> > > > >> > > >> > > > > sounds good to me, we could >>> > > > >> > > >> > > > > implement and verify alternative solutions for >>> use >>> > > cases >>> > > > of >>> > > > >> > > local >>> > > > >> > > >> > > > > aggregation. Maybe we will find both solutions >>> > > > >> > > >> > > > > are appropriate for different scenarios. >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > > Starting from a simple one sounds a practical >>> way to >>> > > go. >>> > > > >> What >>> > > > >> > do >>> > > > >> > > >> you >>> > > > >> > > >> > > > think, >>> > > > >> > > >> > > > > vino? >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > > Best, >>> > > > >> > > >> > > > > Kurt >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski < >>> > > > >> > > >> pi...@ververica.com <mailto:pi...@ververica.com>> >>> > > > >> > > >> > > > > wrote: >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > > > Hi Kurt and Vino, >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > > I think there is a trade of hat we need to >>> consider >>> > > for >>> > > > >> the >>> > > > >> > > >> local >>> > > > >> > > >> > > > > > aggregation. >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > > Generally speaking I would agree with Kurt >>> about >>> > > local >>> > > > >> > > >> > > aggregation/pre >>> > > > >> > > >> > > > > > aggregation not using Flink's state flush the >>> > > operator >>> > > > >> on a >>> > > > >> > > >> > > checkpoint. >>> > > > >> > > >> > > > > > Network IO is usually cheaper compared to >>> Disks IO. >>> > > > This >>> > > > >> has >>> > > > >> > > >> > however >>> > > > >> > > >> > > > > couple >>> > > > >> > > >> > > > > > of issues: >>> > > > >> > > >> > > > > > 1. It can explode number of in-flight records >>> during >>> > > > >> > > checkpoint >>> > > > >> > > >> > > barrier >>> > > > >> > > >> > > > > > alignment, making checkpointing slower and >>> decrease >>> > > the >>> > > > >> > actual >>> > > > >> > > >> > > > > throughput. >>> > > > >> > > >> > > > > > 2. This trades Disks IO on the local >>> aggregation >>> > > > machine >>> > > > >> > with >>> > > > >> > > >> CPU >>> > > > >> > > >> > > (and >>> > > > >> > > >> > > > > > Disks IO in case of RocksDB) on the final >>> aggregation >>> > > > >> > machine. >>> > > > >> > > >> This >>> > > > >> > > >> > > is >>> > > > >> > > >> > > > > > fine, as long there is no huge data skew. If >>> there is >>> > > > >> only a >>> > > > >> > > >> > handful >>> > > > >> > > >> > > > (or >>> > > > >> > > >> > > > > > even one single) hot keys, it might be better >>> to keep >>> > > > the >>> > > > >> > > >> > persistent >>> > > > >> > > >> > > > > state >>> > > > >> > > >> > > > > > in the LocalAggregationOperator to offload >>> final >>> > > > >> aggregation >>> > > > >> > > as >>> > > > >> > > >> > much >>> > > > >> > > >> > > as >>> > > > >> > > >> > > > > > possible. >>> > > > >> > > >> > > > > > 3. With frequent checkpointing local >>> aggregation >>> > > > >> > effectiveness >>> > > > >> > > >> > would >>> > > > >> > > >> > > > > > degrade. >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > > I assume Kurt is correct, that in your use >>> cases >>> > > > >> stateless >>> > > > >> > > >> operator >>> > > > >> > > >> > > was >>> > > > >> > > >> > > > > > behaving better, but I could easily see other >>> use >>> > > cases >>> > > > >> as >>> > > > >> > > well. >>> > > > >> > > >> > For >>> > > > >> > > >> > > > > > example someone is already using RocksDB, and >>> his job >>> > > > is >>> > > > >> > > >> > bottlenecked >>> > > > >> > > >> > > > on >>> > > > >> > > >> > > > > a >>> > > > >> > > >> > > > > > single window operator instance because of the >>> data >>> > > > >> skew. In >>> > > > >> > > >> that >>> > > > >> > > >> > > case >>> > > > >> > > >> > > > > > stateful local aggregation would be probably a >>> better >>> > > > >> > choice. >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > > Because of that, I think we should eventually >>> provide >>> > > > >> both >>> > > > >> > > >> versions >>> > > > >> > > >> > > and >>> > > > >> > > >> > > > > in >>> > > > >> > > >> > > > > > the initial version we should at least make the >>> > > “local >>> > > > >> > > >> aggregation >>> > > > >> > > >> > > > > engine” >>> > > > >> > > >> > > > > > abstract enough, that one could easily provide >>> > > > different >>> > > > >> > > >> > > implementation >>> > > > >> > > >> > > > > > strategy. >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > > Piotrek >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young < >>> > > > ykt...@gmail.com <mailto:ykt...@gmail.com> >>> > > > >> > >>> > > > >> > > >> wrote: >>> > > > >> > > >> > > > > > > >>> > > > >> > > >> > > > > > > Hi, >>> > > > >> > > >> > > > > > > >>> > > > >> > > >> > > > > > > For the trigger, it depends on what operator >>> we >>> > > want >>> > > > to >>> > > > >> > use >>> > > > >> > > >> under >>> > > > >> > > >> > > the >>> > > > >> > > >> > > > > > API. >>> > > > >> > > >> > > > > > > If we choose to use window operator, >>> > > > >> > > >> > > > > > > we should also use window's trigger. >>> However, I >>> > > also >>> > > > >> think >>> > > > >> > > >> reuse >>> > > > >> > > >> > > > window >>> > > > >> > > >> > > > > > > operator for this scenario may not be >>> > > > >> > > >> > > > > > > the best choice. The reasons are the >>> following: >>> > > > >> > > >> > > > > > > >>> > > > >> > > >> > > > > > > 1. As a lot of people already pointed out, >>> window >>> > > > >> relies >>> > > > >> > > >> heavily >>> > > > >> > > >> > on >>> > > > >> > > >> > > > > state >>> > > > >> > > >> > > > > > > and it will definitely effect performance. >>> You can >>> > > > >> > > >> > > > > > > argue that one can use heap based >>> statebackend, but >>> > > > >> this >>> > > > >> > > will >>> > > > >> > > >> > > > introduce >>> > > > >> > > >> > > > > > > extra coupling. Especially we have a chance >>> to >>> > > > >> > > >> > > > > > > design a pure stateless operator. >>> > > > >> > > >> > > > > > > 2. The window operator is *the most* >>> complicated >>> > > > >> operator >>> > > > >> > > >> Flink >>> > > > >> > > >> > > > > currently >>> > > > >> > > >> > > > > > > have. Maybe we only need to pick a subset of >>> > > > >> > > >> > > > > > > window operator to achieve the goal, but >>> once the >>> > > > user >>> > > > >> > wants >>> > > > >> > > >> to >>> > > > >> > > >> > > have >>> > > > >> > > >> > > > a >>> > > > >> > > >> > > > > > deep >>> > > > >> > > >> > > > > > > look at the localAggregation operator, it's >>> still >>> > > > >> > > >> > > > > > > hard to find out what's going on under the >>> window >>> > > > >> > operator. >>> > > > >> > > >> For >>> > > > >> > > >> > > > > > simplicity, >>> > > > >> > > >> > > > > > > I would also recommend we introduce a >>> dedicated >>> > > > >> > > >> > > > > > > lightweight operator, which also much easier >>> for a >>> > > > >> user to >>> > > > >> > > >> learn >>> > > > >> > > >> > > and >>> > > > >> > > >> > > > > use. >>> > > > >> > > >> > > > > > > >>> > > > >> > > >> > > > > > > For your question about increasing the >>> burden in >>> > > > >> > > >> > > > > > > >>> `StreamOperator::prepareSnapshotPreBarrier()`, the >>> > > > only >>> > > > >> > > thing >>> > > > >> > > >> > this >>> > > > >> > > >> > > > > > function >>> > > > >> > > >> > > > > > > need >>> > > > >> > > >> > > > > > > to do is output all the partial results, it's >>> > > purely >>> > > > >> cpu >>> > > > >> > > >> > workload, >>> > > > >> > > >> > > > not >>> > > > >> > > >> > > > > > > introducing any IO. I want to point out that >>> even >>> > > if >>> > > > we >>> > > > >> > have >>> > > > >> > > >> this >>> > > > >> > > >> > > > > > > cost, we reduced another barrier align cost >>> of the >>> > > > >> > operator, >>> > > > >> > > >> > which >>> > > > >> > > >> > > is >>> > > > >> > > >> > > > > the >>> > > > >> > > >> > > > > > > sync flush stage of the state, if you >>> introduced >>> > > > state. >>> > > > >> > This >>> > > > >> > > >> > > > > > > flush actually will introduce disk IO, and I >>> think >>> > > > it's >>> > > > >> > > >> worthy to >>> > > > >> > > >> > > > > > exchange >>> > > > >> > > >> > > > > > > this cost with purely CPU workload. And we >>> do have >>> > > > some >>> > > > >> > > >> > > > > > > observations about these two behavior (as i >>> said >>> > > > >> before, >>> > > > >> > we >>> > > > >> > > >> > > actually >>> > > > >> > > >> > > > > > > implemented both solutions), the stateless >>> one >>> > > > actually >>> > > > >> > > >> performs >>> > > > >> > > >> > > > > > > better both in performance and barrier align >>> time. >>> > > > >> > > >> > > > > > > >>> > > > >> > > >> > > > > > > Best, >>> > > > >> > > >> > > > > > > Kurt >>> > > > >> > > >> > > > > > > >>> > > > >> > > >> > > > > > > >>> > > > >> > > >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang < >>> > > > >> > > >> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com> >>> > > > >> > > >> > > >>> > > > >> > > >> > > > > wrote: >>> > > > >> > > >> > > > > > > >>> > > > >> > > >> > > > > > >> Hi Kurt, >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> Thanks for your example. Now, it looks more >>> > > clearly >>> > > > >> for >>> > > > >> > me. >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> From your example code snippet, I saw the >>> > > > >> localAggregate >>> > > > >> > > API >>> > > > >> > > >> has >>> > > > >> > > >> > > > three >>> > > > >> > > >> > > > > > >> parameters: >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> 1. key field >>> > > > >> > > >> > > > > > >> 2. PartitionAvg >>> > > > >> > > >> > > > > > >> 3. CountTrigger: Does this trigger comes >>> from >>> > > > window >>> > > > >> > > >> package? >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> I will compare our and your design from API >>> and >>> > > > >> operator >>> > > > >> > > >> level: >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> *From the API level:* >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> As I replied to @dianfu in the old email >>> > > thread,[1] >>> > > > >> the >>> > > > >> > > >> Window >>> > > > >> > > >> > API >>> > > > >> > > >> > > > can >>> > > > >> > > >> > > > > > >> provide the second and the third parameter >>> right >>> > > > now. >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> If you reuse specified interface or class, >>> such as >>> > > > >> > > *Trigger* >>> > > > >> > > >> or >>> > > > >> > > >> > > > > > >> *CounterTrigger* provided by window >>> package, but >>> > > do >>> > > > >> not >>> > > > >> > use >>> > > > >> > > >> > window >>> > > > >> > > >> > > > > API, >>> > > > >> > > >> > > > > > >> it's not reasonable. >>> > > > >> > > >> > > > > > >> And if you do not reuse these interface or >>> class, >>> > > > you >>> > > > >> > would >>> > > > >> > > >> need >>> > > > >> > > >> > > to >>> > > > >> > > >> > > > > > >> introduce more things however they are >>> looked >>> > > > similar >>> > > > >> to >>> > > > >> > > the >>> > > > >> > > >> > > things >>> > > > >> > > >> > > > > > >> provided by window package. >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> The window package has provided several >>> types of >>> > > the >>> > > > >> > window >>> > > > >> > > >> and >>> > > > >> > > >> > > many >>> > > > >> > > >> > > > > > >> triggers and let users customize it. What's >>> more, >>> > > > the >>> > > > >> > user >>> > > > >> > > is >>> > > > >> > > >> > more >>> > > > >> > > >> > > > > > familiar >>> > > > >> > > >> > > > > > >> with Window API. >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> This is the reason why we just provide >>> localKeyBy >>> > > > API >>> > > > >> and >>> > > > >> > > >> reuse >>> > > > >> > > >> > > the >>> > > > >> > > >> > > > > > window >>> > > > >> > > >> > > > > > >> API. It reduces unnecessary components such >>> as >>> > > > >> triggers >>> > > > >> > and >>> > > > >> > > >> the >>> > > > >> > > >> > > > > > mechanism >>> > > > >> > > >> > > > > > >> of buffer (based on count num or time). >>> > > > >> > > >> > > > > > >> And it has a clear and easy to understand >>> > > semantics. >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> *From the operator level:* >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> We reused window operator, so we can get >>> all the >>> > > > >> benefits >>> > > > >> > > >> from >>> > > > >> > > >> > > state >>> > > > >> > > >> > > > > and >>> > > > >> > > >> > > > > > >> checkpoint. >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> From your design, you named the operator >>> under >>> > > > >> > > localAggregate >>> > > > >> > > >> > API >>> > > > >> > > >> > > > is a >>> > > > >> > > >> > > > > > >> *stateless* operator. IMO, it is still a >>> state, it >>> > > > is >>> > > > >> > just >>> > > > >> > > >> not >>> > > > >> > > >> > > Flink >>> > > > >> > > >> > > > > > >> managed state. >>> > > > >> > > >> > > > > > >> About the memory buffer (I think it's still >>> not >>> > > very >>> > > > >> > clear, >>> > > > >> > > >> if >>> > > > >> > > >> > you >>> > > > >> > > >> > > > > have >>> > > > >> > > >> > > > > > >> time, can you give more detail information >>> or >>> > > answer >>> > > > >> my >>> > > > >> > > >> > > questions), >>> > > > >> > > >> > > > I >>> > > > >> > > >> > > > > > have >>> > > > >> > > >> > > > > > >> some questions: >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> - if it just a raw JVM heap memory >>> buffer, how >>> > > to >>> > > > >> > support >>> > > > >> > > >> > fault >>> > > > >> > > >> > > > > > >> tolerance, if the job is configured >>> EXACTLY-ONCE >>> > > > >> > semantic >>> > > > >> > > >> > > > guarantee? >>> > > > >> > > >> > > > > > >> - if you thought the memory >>> buffer(non-Flink >>> > > > state), >>> > > > >> > has >>> > > > >> > > >> > better >>> > > > >> > > >> > > > > > >> performance. In our design, users can also >>> > > config >>> > > > >> HEAP >>> > > > >> > > >> state >>> > > > >> > > >> > > > backend >>> > > > >> > > >> > > > > > to >>> > > > >> > > >> > > > > > >> provide the performance close to your >>> mechanism. >>> > > > >> > > >> > > > > > >> - >>> `StreamOperator::prepareSnapshotPreBarrier()` >>> > > > >> related >>> > > > >> > > to >>> > > > >> > > >> the >>> > > > >> > > >> > > > > timing >>> > > > >> > > >> > > > > > of >>> > > > >> > > >> > > > > > >> snapshot. IMO, the flush action should be >>> a >>> > > > >> > synchronized >>> > > > >> > > >> > action? >>> > > > >> > > >> > > > (if >>> > > > >> > > >> > > > > > >> not, >>> > > > >> > > >> > > > > > >> please point out my mistake) I still >>> think we >>> > > > should >>> > > > >> > not >>> > > > >> > > >> > depend >>> > > > >> > > >> > > on >>> > > > >> > > >> > > > > the >>> > > > >> > > >> > > > > > >> timing of checkpoint. Checkpoint related >>> > > > operations >>> > > > >> are >>> > > > >> > > >> > inherent >>> > > > >> > > >> > > > > > >> performance sensitive, we should not >>> increase >>> > > its >>> > > > >> > burden >>> > > > >> > > >> > > anymore. >>> > > > >> > > >> > > > > Our >>> > > > >> > > >> > > > > > >> implementation based on the mechanism of >>> Flink's >>> > > > >> > > >> checkpoint, >>> > > > >> > > >> > > which >>> > > > >> > > >> > > > > can >>> > > > >> > > >> > > > > > >> benefit from the asnyc snapshot and >>> incremental >>> > > > >> > > checkpoint. >>> > > > >> > > >> > IMO, >>> > > > >> > > >> > > > the >>> > > > >> > > >> > > > > > >> performance is not a problem, and we also >>> do not >>> > > > >> find >>> > > > >> > the >>> > > > >> > > >> > > > > performance >>> > > > >> > > >> > > > > > >> issue >>> > > > >> > > >> > > > > > >> in our production. >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> [1]: >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > >>> > > > >> > > >> > > >>> > > > >> > > >> > >>> > > > >> > > >> >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>> < >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>> > >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> Best, >>> > > > >> > > >> > > > > > >> Vino >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >> Kurt Young <ykt...@gmail.com <mailto: >>> ykt...@gmail.com>> 于2019年6月18日周二 >>> > > > 下午2:27写道: >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >>> Yeah, sorry for not expressing myself >>> clearly. I >>> > > > will >>> > > > >> > try >>> > > > >> > > to >>> > > > >> > > >> > > > provide >>> > > > >> > > >> > > > > > more >>> > > > >> > > >> > > > > > >>> details to make sure we are on the same >>> page. >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>> For DataStream API, it shouldn't be >>> optimized >>> > > > >> > > automatically. >>> > > > >> > > >> > You >>> > > > >> > > >> > > > have >>> > > > >> > > >> > > > > > to >>> > > > >> > > >> > > > > > >>> explicitly call API to do local aggregation >>> > > > >> > > >> > > > > > >>> as well as the trigger policy of the local >>> > > > >> aggregation. >>> > > > >> > > Take >>> > > > >> > > >> > > > average >>> > > > >> > > >> > > > > > for >>> > > > >> > > >> > > > > > >>> example, the user program may look like >>> this >>> > > (just >>> > > > a >>> > > > >> > > draft): >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>> assuming the input type is >>> > > DataStream<Tupl2<String, >>> > > > >> > Int>> >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>> ds.localAggregate( >>> > > > >> > > >> > > > > > >>> 0, >>> > > // >>> > > > >> The >>> > > > >> > > local >>> > > > >> > > >> > key, >>> > > > >> > > >> > > > > which >>> > > > >> > > >> > > > > > >> is >>> > > > >> > > >> > > > > > >>> the String from Tuple2 >>> > > > >> > > >> > > > > > >>> PartitionAvg(1), // >>> The >>> > > > >> partial >>> > > > >> > > >> > > aggregation >>> > > > >> > > >> > > > > > >>> function, produces Tuple2<Long, Int>, >>> indicating >>> > > > sum >>> > > > >> and >>> > > > >> > > >> count >>> > > > >> > > >> > > > > > >>> CountTrigger.of(1000L) // Trigger >>> > > policy, >>> > > > >> note >>> > > > >> > > >> this >>> > > > >> > > >> > > > should >>> > > > >> > > >> > > > > be >>> > > > >> > > >> > > > > > >>> best effort, and also be composited with >>> time >>> > > based >>> > > > >> or >>> > > > >> > > >> memory >>> > > > >> > > >> > > size >>> > > > >> > > >> > > > > > based >>> > > > >> > > >> > > > > > >>> trigger >>> > > > >> > > >> > > > > > >>> ) >>> // >>> > > > The >>> > > > >> > > return >>> > > > >> > > >> > type >>> > > > >> > > >> > > > is >>> > > > >> > > >> > > > > > >> local >>> > > > >> > > >> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>> >>> > > > >> > > >> > > > > > >>> .keyBy(0) // >>> > > Further >>> > > > >> > keyby >>> > > > >> > > it >>> > > > >> > > >> > with >>> > > > >> > > >> > > > > > >> required >>> > > > >> > > >> > > > > > >>> key >>> > > > >> > > >> > > > > > >>> .aggregate(1) // >>> This >>> > > will >>> > > > >> merge >>> > > > >> > > all >>> > > > >> > > >> > the >>> > > > >> > > >> > > > > > partial >>> > > > >> > > >> > > > > > >>> results and get the final average. >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>> (This is only a draft, only trying to >>> explain >>> > > what >>> > > > it >>> > > > >> > > looks >>> > > > >> > > >> > > like. ) >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>> The local aggregate operator can be >>> stateless, we >>> > > > can >>> > > > >> > > keep a >>> > > > >> > > >> > > memory >>> > > > >> > > >> > > > > > >> buffer >>> > > > >> > > >> > > > > > >>> or other efficient data structure to >>> improve the >>> > > > >> > aggregate >>> > > > >> > > >> > > > > performance. >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>> Let me know if you have any other >>> questions. >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>> Best, >>> > > > >> > > >> > > > > > >>> Kurt >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang < >>> > > > >> > > >> > yanghua1...@gmail.com <mailto:yanghua1...@gmail.com> >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > > > wrote: >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>>> Hi Kurt, >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> Thanks for your reply. >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> Actually, I am not against you to raise >>> your >>> > > > design. >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> From your description before, I just can >>> imagine >>> > > > >> your >>> > > > >> > > >> > high-level >>> > > > >> > > >> > > > > > >>>> implementation is about SQL and the >>> optimization >>> > > > is >>> > > > >> > inner >>> > > > >> > > >> of >>> > > > >> > > >> > the >>> > > > >> > > >> > > > > API. >>> > > > >> > > >> > > > > > >> Is >>> > > > >> > > >> > > > > > >>> it >>> > > > >> > > >> > > > > > >>>> automatically? how to give the >>> configuration >>> > > > option >>> > > > >> > about >>> > > > >> > > >> > > trigger >>> > > > >> > > >> > > > > > >>>> pre-aggregation? >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> Maybe after I get more information, it >>> sounds >>> > > more >>> > > > >> > > >> reasonable. >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> IMO, first of all, it would be better to >>> make >>> > > your >>> > > > >> user >>> > > > >> > > >> > > interface >>> > > > >> > > >> > > > > > >>> concrete, >>> > > > >> > > >> > > > > > >>>> it's the basis of the discussion. >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> For example, can you give an example code >>> > > snippet >>> > > > to >>> > > > >> > > >> introduce >>> > > > >> > > >> > > how >>> > > > >> > > >> > > > > to >>> > > > >> > > >> > > > > > >>> help >>> > > > >> > > >> > > > > > >>>> users to process data skew caused by the >>> jobs >>> > > > which >>> > > > >> > built >>> > > > >> > > >> with >>> > > > >> > > >> > > > > > >> DataStream >>> > > > >> > > >> > > > > > >>>> API? >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> If you give more details we can discuss >>> further >>> > > > >> more. I >>> > > > >> > > >> think >>> > > > >> > > >> > if >>> > > > >> > > >> > > > one >>> > > > >> > > >> > > > > > >>> design >>> > > > >> > > >> > > > > > >>>> introduces an exact interface and another >>> does >>> > > > not. >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> The implementation has an obvious >>> difference. >>> > > For >>> > > > >> > > example, >>> > > > >> > > >> we >>> > > > >> > > >> > > > > > introduce >>> > > > >> > > >> > > > > > >>> an >>> > > > >> > > >> > > > > > >>>> exact API in DataStream named localKeyBy, >>> about >>> > > > the >>> > > > >> > > >> > > > pre-aggregation >>> > > > >> > > >> > > > > we >>> > > > >> > > >> > > > > > >>> need >>> > > > >> > > >> > > > > > >>>> to define the trigger mechanism of local >>> > > > >> aggregation, >>> > > > >> > so >>> > > > >> > > we >>> > > > >> > > >> > find >>> > > > >> > > >> > > > > > reused >>> > > > >> > > >> > > > > > >>>> window API and operator is a good choice. >>> This >>> > > is >>> > > > a >>> > > > >> > > >> reasoning >>> > > > >> > > >> > > link >>> > > > >> > > >> > > > > > from >>> > > > >> > > >> > > > > > >>>> design to implementation. >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> What do you think? >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> Best, >>> > > > >> > > >> > > > > > >>>> Vino >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>> Kurt Young <ykt...@gmail.com <mailto: >>> ykt...@gmail.com>> 于2019年6月18日周二 >>> > > > >> 上午11:58写道: >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>>>> Hi Vino, >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>>> Now I feel that we may have different >>> > > > >> understandings >>> > > > >> > > about >>> > > > >> > > >> > what >>> > > > >> > > >> > > > > kind >>> > > > >> > > >> > > > > > >> of >>> > > > >> > > >> > > > > > >>>>> problems or improvements you want to >>> > > > >> > > >> > > > > > >>>>> resolve. Currently, most of the feedback >>> are >>> > > > >> focusing >>> > > > >> > on >>> > > > >> > > >> *how >>> > > > >> > > >> > > to >>> > > > >> > > >> > > > > do a >>> > > > >> > > >> > > > > > >>>>> proper local aggregation to improve >>> performance >>> > > > >> > > >> > > > > > >>>>> and maybe solving the data skew issue*. >>> And my >>> > > > gut >>> > > > >> > > >> feeling is >>> > > > >> > > >> > > > this >>> > > > >> > > >> > > > > is >>> > > > >> > > >> > > > > > >>>>> exactly what users want at the first >>> place, >>> > > > >> > > >> > > > > > >>>>> especially those +1s. (Sorry to try to >>> > > summarize >>> > > > >> here, >>> > > > >> > > >> please >>> > > > >> > > >> > > > > correct >>> > > > >> > > >> > > > > > >>> me >>> > > > >> > > >> > > > > > >>>> if >>> > > > >> > > >> > > > > > >>>>> i'm wrong). >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>>> But I still think the design is somehow >>> > > diverged >>> > > > >> from >>> > > > >> > > the >>> > > > >> > > >> > goal. >>> > > > >> > > >> > > > If >>> > > > >> > > >> > > > > we >>> > > > >> > > >> > > > > > >>>> want >>> > > > >> > > >> > > > > > >>>>> to have an efficient and powerful way to >>> > > > >> > > >> > > > > > >>>>> have local aggregation, supporting >>> intermedia >>> > > > >> result >>> > > > >> > > type >>> > > > >> > > >> is >>> > > > >> > > >> > > > > > >> essential >>> > > > >> > > >> > > > > > >>>> IMO. >>> > > > >> > > >> > > > > > >>>>> Both runtime's `AggregateFunction` and >>> > > > >> > > >> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction` >>> have a >>> > > > proper >>> > > > >> > > >> support of >>> > > > >> > > >> > > > > > >>>> intermediate >>> > > > >> > > >> > > > > > >>>>> result type and can do `merge` operation >>> > > > >> > > >> > > > > > >>>>> on them. >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>>> Now, we have a lightweight alternatives >>> which >>> > > > >> performs >>> > > > >> > > >> well, >>> > > > >> > > >> > > and >>> > > > >> > > >> > > > > > >> have a >>> > > > >> > > >> > > > > > >>>>> nice fit with the local aggregate >>> requirements. >>> > > > >> > > >> > > > > > >>>>> Mostly importantly, it's much less >>> complex >>> > > > because >>> > > > >> > it's >>> > > > >> > > >> > > > stateless. >>> > > > >> > > >> > > > > > >> And >>> > > > >> > > >> > > > > > >>>> it >>> > > > >> > > >> > > > > > >>>>> can also achieve the similar >>> > > multiple-aggregation >>> > > > >> > > >> > > > > > >>>>> scenario. >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>>> I still not convinced why we shouldn't >>> consider >>> > > > it >>> > > > >> as >>> > > > >> > a >>> > > > >> > > >> first >>> > > > >> > > >> > > > step. >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>>> Best, >>> > > > >> > > >> > > > > > >>>>> Kurt >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino >>> yang < >>> > > > >> > > >> > > > yanghua1...@gmail.com <mailto: >>> yanghua1...@gmail.com>> >>> > > > >> > > >> > > > > > >>>> wrote: >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>>>> Hi Kurt, >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> Thanks for your comments. >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> It seems we both implemented local >>> aggregation >>> > > > >> > feature >>> > > > >> > > to >>> > > > >> > > >> > > > optimize >>> > > > >> > > >> > > > > > >>> the >>> > > > >> > > >> > > > > > >>>>>> issue of data skew. >>> > > > >> > > >> > > > > > >>>>>> However, IMHO, the API level of >>> optimizing >>> > > > >> revenue is >>> > > > >> > > >> > > different. >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> *Your optimization benefits from Flink >>> SQL and >>> > > > >> it's >>> > > > >> > not >>> > > > >> > > >> > user's >>> > > > >> > > >> > > > > > >>>> faces.(If >>> > > > >> > > >> > > > > > >>>>> I >>> > > > >> > > >> > > > > > >>>>>> understand it incorrectly, please >>> correct >>> > > > this.)* >>> > > > >> > > >> > > > > > >>>>>> *Our implementation employs it as an >>> > > > optimization >>> > > > >> > tool >>> > > > >> > > >> API >>> > > > >> > > >> > for >>> > > > >> > > >> > > > > > >>>>> DataStream, >>> > > > >> > > >> > > > > > >>>>>> it just like a local version of the >>> keyBy >>> > > API.* >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> Based on this, I want to say support it >>> as a >>> > > > >> > DataStream >>> > > > >> > > >> API >>> > > > >> > > >> > > can >>> > > > >> > > >> > > > > > >>> provide >>> > > > >> > > >> > > > > > >>>>>> these advantages: >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> - The localKeyBy API has a clear >>> semantic >>> > > and >>> > > > >> it's >>> > > > >> > > >> > flexible >>> > > > >> > > >> > > > not >>> > > > >> > > >> > > > > > >>> only >>> > > > >> > > >> > > > > > >>>>> for >>> > > > >> > > >> > > > > > >>>>>> processing data skew but also for >>> > > implementing >>> > > > >> some >>> > > > >> > > >> user >>> > > > >> > > >> > > > cases, >>> > > > >> > > >> > > > > > >>> for >>> > > > >> > > >> > > > > > >>>>>> example, if we want to calculate the >>> > > > >> multiple-level >>> > > > >> > > >> > > > aggregation, >>> > > > >> > > >> > > > > > >>> we >>> > > > >> > > >> > > > > > >>>>> can >>> > > > >> > > >> > > > > > >>>>>> do >>> > > > >> > > >> > > > > > >>>>>> multiple-level aggregation in the >>> local >>> > > > >> > aggregation: >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > input.localKeyBy("a").sum(1).localKeyBy("b").window(); >>> > > > >> > > >> // >>> > > > >> > > >> > > here >>> > > > >> > > >> > > > > > >> "a" >>> > > > >> > > >> > > > > > >>>> is >>> > > > >> > > >> > > > > > >>>>> a >>> > > > >> > > >> > > > > > >>>>>> sub-category, while "b" is a >>> category, here >>> > > we >>> > > > >> do >>> > > > >> > not >>> > > > >> > > >> need >>> > > > >> > > >> > > to >>> > > > >> > > >> > > > > > >>>> shuffle >>> > > > >> > > >> > > > > > >>>>>> data >>> > > > >> > > >> > > > > > >>>>>> in the network. >>> > > > >> > > >> > > > > > >>>>>> - The users of DataStream API will >>> benefit >>> > > > from >>> > > > >> > this. >>> > > > >> > > >> > > > Actually, >>> > > > >> > > >> > > > > > >> we >>> > > > >> > > >> > > > > > >>>>> have >>> > > > >> > > >> > > > > > >>>>>> a lot of scenes need to use >>> DataStream API. >>> > > > >> > > Currently, >>> > > > >> > > >> > > > > > >> DataStream >>> > > > >> > > >> > > > > > >>>> API >>> > > > >> > > >> > > > > > >>>>> is >>> > > > >> > > >> > > > > > >>>>>> the cornerstone of the physical plan >>> of >>> > > Flink >>> > > > >> SQL. >>> > > > >> > > >> With a >>> > > > >> > > >> > > > > > >>> localKeyBy >>> > > > >> > > >> > > > > > >>>>>> API, >>> > > > >> > > >> > > > > > >>>>>> the optimization of SQL at least may >>> use >>> > > this >>> > > > >> > > optimized >>> > > > >> > > >> > API, >>> > > > >> > > >> > > > > > >> this >>> > > > >> > > >> > > > > > >>>> is a >>> > > > >> > > >> > > > > > >>>>>> further topic. >>> > > > >> > > >> > > > > > >>>>>> - Based on the window operator, our >>> state >>> > > > would >>> > > > >> > > benefit >>> > > > >> > > >> > from >>> > > > >> > > >> > > > > > >> Flink >>> > > > >> > > >> > > > > > >>>>> State >>> > > > >> > > >> > > > > > >>>>>> and checkpoint, we do not need to >>> worry >>> > > about >>> > > > >> OOM >>> > > > >> > and >>> > > > >> > > >> job >>> > > > >> > > >> > > > > > >> failed. >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> Now, about your questions: >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> 1. About our design cannot change the >>> data >>> > > type >>> > > > >> and >>> > > > >> > > about >>> > > > >> > > >> > the >>> > > > >> > > >> > > > > > >>>>>> implementation of average: >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> Just like my reply to Hequn, the >>> localKeyBy is >>> > > > an >>> > > > >> API >>> > > > >> > > >> > provides >>> > > > >> > > >> > > > to >>> > > > >> > > >> > > > > > >> the >>> > > > >> > > >> > > > > > >>>>> users >>> > > > >> > > >> > > > > > >>>>>> who use DataStream API to build their >>> jobs. >>> > > > >> > > >> > > > > > >>>>>> Users should know its semantics and the >>> > > > difference >>> > > > >> > with >>> > > > >> > > >> > keyBy >>> > > > >> > > >> > > > API, >>> > > > >> > > >> > > > > > >> so >>> > > > >> > > >> > > > > > >>>> if >>> > > > >> > > >> > > > > > >>>>>> they want to the average aggregation, >>> they >>> > > > should >>> > > > >> > carry >>> > > > >> > > >> > local >>> > > > >> > > >> > > > sum >>> > > > >> > > >> > > > > > >>>> result >>> > > > >> > > >> > > > > > >>>>>> and local count result. >>> > > > >> > > >> > > > > > >>>>>> I admit that it will be convenient to >>> use >>> > > keyBy >>> > > > >> > > directly. >>> > > > >> > > >> > But >>> > > > >> > > >> > > we >>> > > > >> > > >> > > > > > >> need >>> > > > >> > > >> > > > > > >>>> to >>> > > > >> > > >> > > > > > >>>>>> pay a little price when we get some >>> benefits. >>> > > I >>> > > > >> think >>> > > > >> > > >> this >>> > > > >> > > >> > > price >>> > > > >> > > >> > > > > is >>> > > > >> > > >> > > > > > >>>>>> reasonable. Considering that the >>> DataStream >>> > > API >>> > > > >> > itself >>> > > > >> > > >> is a >>> > > > >> > > >> > > > > > >> low-level >>> > > > >> > > >> > > > > > >>>> API >>> > > > >> > > >> > > > > > >>>>>> (at least for now). >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> 2. About stateless operator and >>> > > > >> > > >> > > > > > >>>>>> >>> `StreamOperator::prepareSnapshotPreBarrier()`: >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> Actually, I have discussed this opinion >>> with >>> > > > >> @dianfu >>> > > > >> > in >>> > > > >> > > >> the >>> > > > >> > > >> > > old >>> > > > >> > > >> > > > > > >> mail >>> > > > >> > > >> > > > > > >>>>>> thread. I will copy my opinion from >>> there: >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> - for your design, you still need >>> somewhere >>> > > to >>> > > > >> give >>> > > > >> > > the >>> > > > >> > > >> > > users >>> > > > >> > > >> > > > > > >>>>> configure >>> > > > >> > > >> > > > > > >>>>>> the trigger threshold (maybe memory >>> > > > >> availability?), >>> > > > >> > > >> this >>> > > > >> > > >> > > > design >>> > > > >> > > >> > > > > > >>>> cannot >>> > > > >> > > >> > > > > > >>>>>> guarantee a deterministic semantics >>> (it will >>> > > > >> bring >>> > > > >> > > >> trouble >>> > > > >> > > >> > > for >>> > > > >> > > >> > > > > > >>>> testing >>> > > > >> > > >> > > > > > >>>>>> and >>> > > > >> > > >> > > > > > >>>>>> debugging). >>> > > > >> > > >> > > > > > >>>>>> - if the implementation depends on the >>> > > timing >>> > > > of >>> > > > >> > > >> > checkpoint, >>> > > > >> > > >> > > > it >>> > > > >> > > >> > > > > > >>>> would >>> > > > >> > > >> > > > > > >>>>>> affect the checkpoint's progress, and >>> the >>> > > > >> buffered >>> > > > >> > > data >>> > > > >> > > >> > may >>> > > > >> > > >> > > > > > >> cause >>> > > > >> > > >> > > > > > >>>> OOM >>> > > > >> > > >> > > > > > >>>>>> issue. In addition, if the operator is >>> > > > >> stateless, >>> > > > >> > it >>> > > > >> > > >> can >>> > > > >> > > >> > not >>> > > > >> > > >> > > > > > >>> provide >>> > > > >> > > >> > > > > > >>>>>> fault >>> > > > >> > > >> > > > > > >>>>>> tolerance. >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> Best, >>> > > > >> > > >> > > > > > >>>>>> Vino >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>> Kurt Young <ykt...@gmail.com <mailto: >>> ykt...@gmail.com>> 于2019年6月18日周二 >>> > > > >> > 上午9:22写道: >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>>>> Hi Vino, >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> Thanks for the proposal, I like the >>> general >>> > > > idea >>> > > > >> and >>> > > > >> > > IMO >>> > > > >> > > >> > it's >>> > > > >> > > >> > > > > > >> very >>> > > > >> > > >> > > > > > >>>>> useful >>> > > > >> > > >> > > > > > >>>>>>> feature. >>> > > > >> > > >> > > > > > >>>>>>> But after reading through the >>> document, I >>> > > feel >>> > > > >> that >>> > > > >> > we >>> > > > >> > > >> may >>> > > > >> > > >> > > over >>> > > > >> > > >> > > > > > >>>> design >>> > > > >> > > >> > > > > > >>>>>> the >>> > > > >> > > >> > > > > > >>>>>>> required >>> > > > >> > > >> > > > > > >>>>>>> operator for proper local aggregation. >>> The >>> > > main >>> > > > >> > reason >>> > > > >> > > >> is >>> > > > >> > > >> > we >>> > > > >> > > >> > > > want >>> > > > >> > > >> > > > > > >>> to >>> > > > >> > > >> > > > > > >>>>>> have a >>> > > > >> > > >> > > > > > >>>>>>> clear definition and behavior about the >>> > > "local >>> > > > >> keyed >>> > > > >> > > >> state" >>> > > > >> > > >> > > > which >>> > > > >> > > >> > > > > > >>> in >>> > > > >> > > >> > > > > > >>>> my >>> > > > >> > > >> > > > > > >>>>>>> opinion is not >>> > > > >> > > >> > > > > > >>>>>>> necessary for local aggregation, at >>> least for >>> > > > >> start. >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> Another issue I noticed is the local >>> key by >>> > > > >> operator >>> > > > >> > > >> cannot >>> > > > >> > > >> > > > > > >> change >>> > > > >> > > >> > > > > > >>>>>> element >>> > > > >> > > >> > > > > > >>>>>>> type, it will >>> > > > >> > > >> > > > > > >>>>>>> also restrict a lot of use cases which >>> can be >>> > > > >> > benefit >>> > > > >> > > >> from >>> > > > >> > > >> > > > local >>> > > > >> > > >> > > > > > >>>>>>> aggregation, like "average". >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> We also did similar logic in SQL and >>> the only >>> > > > >> thing >>> > > > >> > > >> need to >>> > > > >> > > >> > > be >>> > > > >> > > >> > > > > > >> done >>> > > > >> > > >> > > > > > >>>> is >>> > > > >> > > >> > > > > > >>>>>>> introduce >>> > > > >> > > >> > > > > > >>>>>>> a stateless lightweight operator which >>> is >>> > > > >> *chained* >>> > > > >> > > >> before >>> > > > >> > > >> > > > > > >>> `keyby()`. >>> > > > >> > > >> > > > > > >>>>> The >>> > > > >> > > >> > > > > > >>>>>>> operator will flush all buffered >>> > > > >> > > >> > > > > > >>>>>>> elements during >>> > > > >> > > >> > `StreamOperator::prepareSnapshotPreBarrier()` >>> > > > >> > > >> > > > and >>> > > > >> > > >> > > > > > >>>> make >>> > > > >> > > >> > > > > > >>>>>>> himself stateless. >>> > > > >> > > >> > > > > > >>>>>>> By the way, in the earlier version we >>> also >>> > > did >>> > > > >> the >>> > > > >> > > >> similar >>> > > > >> > > >> > > > > > >> approach >>> > > > >> > > >> > > > > > >>>> by >>> > > > >> > > >> > > > > > >>>>>>> introducing a stateful >>> > > > >> > > >> > > > > > >>>>>>> local aggregation operator but it's not >>> > > > >> performed as >>> > > > >> > > >> well >>> > > > >> > > >> > as >>> > > > >> > > >> > > > the >>> > > > >> > > >> > > > > > >>>> later >>> > > > >> > > >> > > > > > >>>>>> one, >>> > > > >> > > >> > > > > > >>>>>>> and also effect the barrie >>> > > > >> > > >> > > > > > >>>>>>> alignment time. The later one is fairly >>> > > simple >>> > > > >> and >>> > > > >> > > more >>> > > > >> > > >> > > > > > >> efficient. >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> I would highly suggest you to consider >>> to >>> > > have >>> > > > a >>> > > > >> > > >> stateless >>> > > > >> > > >> > > > > > >> approach >>> > > > >> > > >> > > > > > >>>> at >>> > > > >> > > >> > > > > > >>>>>> the >>> > > > >> > > >> > > > > > >>>>>>> first step. >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> Best, >>> > > > >> > > >> > > > > > >>>>>>> Kurt >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark >>> Wu < >>> > > > >> > > >> imj...@gmail.com <mailto:imj...@gmail.com>> >>> > > > >> > > >> > > > > > >> wrote: >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> Hi Vino, >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> Thanks for the proposal. >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> Regarding to the >>> "input.keyBy(0).sum(1)" vs >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > >>> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", >>> > > > >> > > >> > > > > > >> have >>> > > > >> > > >> > > > > > >>>> you >>> > > > >> > > >> > > > > > >>>>>>> done >>> > > > >> > > >> > > > > > >>>>>>>> some benchmark? >>> > > > >> > > >> > > > > > >>>>>>>> Because I'm curious about how much >>> > > performance >>> > > > >> > > >> improvement >>> > > > >> > > >> > > can >>> > > > >> > > >> > > > > > >> we >>> > > > >> > > >> > > > > > >>>> get >>> > > > >> > > >> > > > > > >>>>>> by >>> > > > >> > > >> > > > > > >>>>>>>> using count window as the local >>> operator. >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> Best, >>> > > > >> > > >> > > > > > >>>>>>>> Jark >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino >>> yang < >>> > > > >> > > >> > > > yanghua1...@gmail.com <mailto: >>> yanghua1...@gmail.com> >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >>>>> wrote: >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> Hi Hequn, >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> Thanks for your reply. >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> The purpose of localKeyBy API is to >>> > > provide a >>> > > > >> tool >>> > > > >> > > >> which >>> > > > >> > > >> > > can >>> > > > >> > > >> > > > > > >>> let >>> > > > >> > > >> > > > > > >>>>>> users >>> > > > >> > > >> > > > > > >>>>>>> do >>> > > > >> > > >> > > > > > >>>>>>>>> pre-aggregation in the local. The >>> behavior >>> > > of >>> > > > >> the >>> > > > >> > > >> > > > > > >>> pre-aggregation >>> > > > >> > > >> > > > > > >>>>> is >>> > > > >> > > >> > > > > > >>>>>>>>> similar to keyBy API. >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> So the three cases are different, I >>> will >>> > > > >> describe >>> > > > >> > > them >>> > > > >> > > >> > one >>> > > > >> > > >> > > by >>> > > > >> > > >> > > > > > >>>> one: >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1) >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, the result is >>> event-driven, >>> > > > each >>> > > > >> > > event >>> > > > >> > > >> can >>> > > > >> > > >> > > > > > >>> produce >>> > > > >> > > >> > > > > > >>>>> one >>> > > > >> > > >> > > > > > >>>>>>> sum >>> > > > >> > > >> > > > > > >>>>>>>>> aggregation result and it is the >>> latest one >>> > > > >> from >>> > > > >> > the >>> > > > >> > > >> > source >>> > > > >> > > >> > > > > > >>>> start.* >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> 2. >>> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1) >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, the semantic may have >>> a >>> > > > >> problem, it >>> > > > >> > > >> would >>> > > > >> > > >> > do >>> > > > >> > > >> > > > > > >> the >>> > > > >> > > >> > > > > > >>>>> local >>> > > > >> > > >> > > > > > >>>>>>> sum >>> > > > >> > > >> > > > > > >>>>>>>>> aggregation and will produce the >>> latest >>> > > > partial >>> > > > >> > > result >>> > > > >> > > >> > from >>> > > > >> > > >> > > > > > >> the >>> > > > >> > > >> > > > > > >>>>>> source >>> > > > >> > > >> > > > > > >>>>>>>>> start for every event. * >>> > > > >> > > >> > > > > > >>>>>>>>> *These latest partial results from >>> the same >>> > > > key >>> > > > >> > are >>> > > > >> > > >> > hashed >>> > > > >> > > >> > > to >>> > > > >> > > >> > > > > > >>> one >>> > > > >> > > >> > > > > > >>>>>> node >>> > > > >> > > >> > > > > > >>>>>>> to >>> > > > >> > > >> > > > > > >>>>>>>>> do the global sum aggregation.* >>> > > > >> > > >> > > > > > >>>>>>>>> *In the global aggregation, when it >>> > > received >>> > > > >> > > multiple >>> > > > >> > > >> > > partial >>> > > > >> > > >> > > > > > >>>>> results >>> > > > >> > > >> > > > > > >>>>>>>> (they >>> > > > >> > > >> > > > > > >>>>>>>>> are all calculated from the source >>> start) >>> > > and >>> > > > >> sum >>> > > > >> > > them >>> > > > >> > > >> > will >>> > > > >> > > >> > > > > > >> get >>> > > > >> > > >> > > > > > >>>> the >>> > > > >> > > >> > > > > > >>>>>>> wrong >>> > > > >> > > >> > > > > > >>>>>>>>> result.* >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> 3. >>> > > > >> > > >> > > >>> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, it would just get a >>> partial >>> > > > >> > > aggregation >>> > > > >> > > >> > > result >>> > > > >> > > >> > > > > > >>> for >>> > > > >> > > >> > > > > > >>>>>> the 5 >>> > > > >> > > >> > > > > > >>>>>>>>> records in the count window. The >>> partial >>> > > > >> > aggregation >>> > > > >> > > >> > > results >>> > > > >> > > >> > > > > > >>> from >>> > > > >> > > >> > > > > > >>>>> the >>> > > > >> > > >> > > > > > >>>>>>>> same >>> > > > >> > > >> > > > > > >>>>>>>>> key will be aggregated globally.* >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> So the first case and the third case >>> can >>> > > get >>> > > > >> the >>> > > > >> > > >> *same* >>> > > > >> > > >> > > > > > >> result, >>> > > > >> > > >> > > > > > >>>> the >>> > > > >> > > >> > > > > > >>>>>>>>> difference is the output-style and >>> the >>> > > > latency. >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> Generally speaking, the local key >>> API is >>> > > just >>> > > > >> an >>> > > > >> > > >> > > optimization >>> > > > >> > > >> > > > > > >>>> API. >>> > > > >> > > >> > > > > > >>>>> We >>> > > > >> > > >> > > > > > >>>>>>> do >>> > > > >> > > >> > > > > > >>>>>>>>> not limit the user's usage, but the >>> user >>> > > has >>> > > > to >>> > > > >> > > >> > understand >>> > > > >> > > >> > > > > > >> its >>> > > > >> > > >> > > > > > >>>>>>> semantics >>> > > > >> > > >> > > > > > >>>>>>>>> and use it correctly. >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> Best, >>> > > > >> > > >> > > > > > >>>>>>>>> Vino >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com >>> <mailto:chenghe...@gmail.com>> >>> > > > >> 于2019年6月17日周一 >>> > > > >> > > >> > 下午4:18写道: >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> Hi Vino, >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> Thanks for the proposal, I think it >>> is a >>> > > > very >>> > > > >> > good >>> > > > >> > > >> > > feature! >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> One thing I want to make sure is the >>> > > > semantics >>> > > > >> > for >>> > > > >> > > >> the >>> > > > >> > > >> > > > > > >>>>>> `localKeyBy`. >>> > > > >> > > >> > > > > > >>>>>>>> From >>> > > > >> > > >> > > > > > >>>>>>>>>> the document, the `localKeyBy` API >>> returns >>> > > > an >>> > > > >> > > >> instance >>> > > > >> > > >> > of >>> > > > >> > > >> > > > > > >>>>>>> `KeyedStream` >>> > > > >> > > >> > > > > > >>>>>>>>>> which can also perform sum(), so in >>> this >>> > > > case, >>> > > > >> > > what's >>> > > > >> > > >> > the >>> > > > >> > > >> > > > > > >>>>> semantics >>> > > > >> > > >> > > > > > >>>>>>> for >>> > > > >> > > >> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will >>> the >>> > > > >> following >>> > > > >> > > code >>> > > > >> > > >> > share >>> > > > >> > > >> > > > > > >>> the >>> > > > >> > > >> > > > > > >>>>> same >>> > > > >> > > >> > > > > > >>>>>>>>> result? >>> > > > >> > > >> > > > > > >>>>>>>>>> and what're the differences between >>> them? >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1) >>> > > > >> > > >> > > > > > >>>>>>>>>> 2. >>> > > > input.localKeyBy(0).sum(1).keyBy(0).sum(1) >>> > > > >> > > >> > > > > > >>>>>>>>>> 3. >>> > > > >> > > >> > > > > > >> >>> > > > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> Would also be great if we can add >>> this >>> > > into >>> > > > >> the >>> > > > >> > > >> > document. >>> > > > >> > > >> > > > > > >>> Thank >>> > > > >> > > >> > > > > > >>>>> you >>> > > > >> > > >> > > > > > >>>>>>>> very >>> > > > >> > > >> > > > > > >>>>>>>>>> much. >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> Best, Hequn >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM >>> vino >>> > > yang < >>> > > > >> > > >> > > > > > >>>>> yanghua1...@gmail.com <mailto: >>> yanghua1...@gmail.com>> >>> > > > >> > > >> > > > > > >>>>>>>>> wrote: >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> Hi Aljoscha, >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> I have looked at the "*Process*" >>> section >>> > > of >>> > > > >> FLIP >>> > > > >> > > >> wiki >>> > > > >> > > >> > > > > > >>>> page.[1] >>> > > > >> > > >> > > > > > >>>>>> This >>> > > > >> > > >> > > > > > >>>>>>>>> mail >>> > > > >> > > >> > > > > > >>>>>>>>>>> thread indicates that it has >>> proceeded to >>> > > > the >>> > > > >> > > third >>> > > > >> > > >> > step. >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> When I looked at the fourth >>> step(vote >>> > > > step), >>> > > > >> I >>> > > > >> > > >> didn't >>> > > > >> > > >> > > > > > >> find >>> > > > >> > > >> > > > > > >>>> the >>> > > > >> > > >> > > > > > >>>>>>>>>>> prerequisites for starting the >>> voting >>> > > > >> process. >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> Considering that the discussion of >>> this >>> > > > >> feature >>> > > > >> > > has >>> > > > >> > > >> > been >>> > > > >> > > >> > > > > > >>> done >>> > > > >> > > >> > > > > > >>>>> in >>> > > > >> > > >> > > > > > >>>>>>> the >>> > > > >> > > >> > > > > > >>>>>>>>> old >>> > > > >> > > >> > > > > > >>>>>>>>>>> thread. [2] So can you tell me when >>> > > should >>> > > > I >>> > > > >> > start >>> > > > >> > > >> > > > > > >> voting? >>> > > > >> > > >> > > > > > >>>> Can >>> > > > >> > > >> > > > > > >>>>> I >>> > > > >> > > >> > > > > > >>>>>>>> start >>> > > > >> > > >> > > > > > >>>>>>>>>> now? >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> Best, >>> > > > >> > > >> > > > > > >>>>>>>>>>> Vino >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> [1]: >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > >>> > > > >> > > >> > > >>> > > > >> > > >> > >>> > > > >> > > >> >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up >>> < >>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up >>> > >>> > > > >> > > >> > > > > > >>>>>>>>>>> [2]: >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > >>> > > > >> > > >> > > >>> > > > >> > > >> > >>> > > > >> > > >> >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>> < >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>> > >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com >>> <mailto:leesf0...@gmail.com>> >>> > > 于2019年6月13日周四 >>> > > > >> > > 上午9:19写道: >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for >>> your >>> > > > >> efforts. >>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>> Best, >>> > > > >> > > >> > > > > > >>>>>>>>>>>> Leesf >>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com >>> <mailto:yanghua1...@gmail.com>> >>> > > > >> > 于2019年6月12日周三 >>> > > > >> > > >> > > > > > >>> 下午5:46写道: >>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Hi folks, >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> I would like to start the FLIP >>> > > discussion >>> > > > >> > thread >>> > > > >> > > >> > > > > > >> about >>> > > > >> > > >> > > > > > >>>>>>> supporting >>> > > > >> > > >> > > > > > >>>>>>>>>> local >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> aggregation in Flink. >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> In short, this feature can >>> effectively >>> > > > >> > alleviate >>> > > > >> > > >> data >>> > > > >> > > >> > > > > > >>>> skew. >>> > > > >> > > >> > > > > > >>>>>>> This >>> > > > >> > > >> > > > > > >>>>>>>> is >>> > > > >> > > >> > > > > > >>>>>>>>>> the >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> FLIP: >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > >>> > > > >> > > >> > > >>> > > > >> > > >> > >>> > > > >> > > >> >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink >>> < >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink >>> > >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP) >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Currently, keyed streams are >>> widely >>> > > used >>> > > > to >>> > > > >> > > >> perform >>> > > > >> > > >> > > > > > >>>>>> aggregating >>> > > > >> > > >> > > > > > >>>>>>>>>>>> operations >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window) >>> on the >>> > > > >> elements >>> > > > >> > > >> that >>> > > > >> > > >> > > > > > >>> have >>> > > > >> > > >> > > > > > >>>>> the >>> > > > >> > > >> > > > > > >>>>>>> same >>> > > > >> > > >> > > > > > >>>>>>>>>> key. >>> > > > >> > > >> > > > > > >>>>>>>>>>>> When >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> executed at runtime, the >>> elements with >>> > > > the >>> > > > >> > same >>> > > > >> > > >> key >>> > > > >> > > >> > > > > > >>> will >>> > > > >> > > >> > > > > > >>>> be >>> > > > >> > > >> > > > > > >>>>>>> sent >>> > > > >> > > >> > > > > > >>>>>>>> to >>> > > > >> > > >> > > > > > >>>>>>>>>> and >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> aggregated by the same task. >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> The performance of these >>> aggregating >>> > > > >> > operations >>> > > > >> > > is >>> > > > >> > > >> > > > > > >> very >>> > > > >> > > >> > > > > > >>>>>>> sensitive >>> > > > >> > > >> > > > > > >>>>>>>>> to >>> > > > >> > > >> > > > > > >>>>>>>>>>> the >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> distribution of keys. In the >>> cases >>> > > where >>> > > > >> the >>> > > > >> > > >> > > > > > >>> distribution >>> > > > >> > > >> > > > > > >>>>> of >>> > > > >> > > >> > > > > > >>>>>>> keys >>> > > > >> > > >> > > > > > >>>>>>>>>>>> follows a >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> powerful law, the performance >>> will be >>> > > > >> > > >> significantly >>> > > > >> > > >> > > > > > >>>>>> downgraded. >>> > > > >> > > >> > > > > > >>>>>>>>> More >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> unluckily, increasing the degree >>> of >>> > > > >> > parallelism >>> > > > >> > > >> does >>> > > > >> > > >> > > > > > >>> not >>> > > > >> > > >> > > > > > >>>>> help >>> > > > >> > > >> > > > > > >>>>>>>> when >>> > > > >> > > >> > > > > > >>>>>>>>> a >>> > > > >> > > >> > > > > > >>>>>>>>>>> task >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> is overloaded by a single key. >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Local aggregation is a >>> widely-adopted >>> > > > >> method >>> > > > >> > to >>> > > > >> > > >> > > > > > >> reduce >>> > > > >> > > >> > > > > > >>>> the >>> > > > >> > > >> > > > > > >>>>>>>>>> performance >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> degraded by data skew. We can >>> decompose >>> > > > the >>> > > > >> > > >> > > > > > >> aggregating >>> > > > >> > > >> > > > > > >>>>>>>> operations >>> > > > >> > > >> > > > > > >>>>>>>>>> into >>> > > > >> > > >> > > > > > >>>>>>>>>>>> two >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> phases. In the first phase, we >>> > > aggregate >>> > > > >> the >>> > > > >> > > >> elements >>> > > > >> > > >> > > > > > >>> of >>> > > > >> > > >> > > > > > >>>>> the >>> > > > >> > > >> > > > > > >>>>>>> same >>> > > > >> > > >> > > > > > >>>>>>>>> key >>> > > > >> > > >> > > > > > >>>>>>>>>>> at >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> the sender side to obtain partial >>> > > > results. >>> > > > >> > Then >>> > > > >> > > at >>> > > > >> > > >> > > > > > >> the >>> > > > >> > > >> > > > > > >>>>> second >>> > > > >> > > >> > > > > > >>>>>>>>> phase, >>> > > > >> > > >> > > > > > >>>>>>>>>>>> these >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> partial results are sent to >>> receivers >>> > > > >> > according >>> > > > >> > > to >>> > > > >> > > >> > > > > > >>> their >>> > > > >> > > >> > > > > > >>>>> keys >>> > > > >> > > >> > > > > > >>>>>>> and >>> > > > >> > > >> > > > > > >>>>>>>>> are >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> combined to obtain the final >>> result. >>> > > > Since >>> > > > >> the >>> > > > >> > > >> number >>> > > > >> > > >> > > > > > >>> of >>> > > > >> > > >> > > > > > >>>>>>> partial >>> > > > >> > > >> > > > > > >>>>>>>>>>> results >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> received by each receiver is >>> limited by >>> > > > the >>> > > > >> > > >> number of >>> > > > >> > > >> > > > > > >>>>>> senders, >>> > > > >> > > >> > > > > > >>>>>>>> the >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> imbalance among receivers can be >>> > > reduced. >>> > > > >> > > >> Besides, by >>> > > > >> > > >> > > > > > >>>>>> reducing >>> > > > >> > > >> > > > > > >>>>>>>> the >>> > > > >> > > >> > > > > > >>>>>>>>>>> amount >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> of transferred data the >>> performance can >>> > > > be >>> > > > >> > > further >>> > > > >> > > >> > > > > > >>>>> improved. >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> *More details*: >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Design documentation: >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > >>> > > > >> > > >> > > >>> > > > >> > > >> > >>> > > > >> > > >> >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>> < >>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>> > >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Old discussion thread: >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > >>> > > > >> > > >> > > >>> > > > >> > > >> > >>> > > > >> > > >> >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>> < >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>> > >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 < >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> https://issues.apache.org/jira/browse/FLINK-12786 < >>> https://issues.apache.org/jira/browse/FLINK-12786> >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> We are looking forwards to your >>> > > feedback! >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Best, >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Vino >>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>>> >>> > > > >> > > >> > > > > > >>>>>>> >>> > > > >> > > >> > > > > > >>>>>> >>> > > > >> > > >> > > > > > >>>>> >>> > > > >> > > >> > > > > > >>>> >>> > > > >> > > >> > > > > > >>> >>> > > > >> > > >> > > > > > >> >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > > >>> > > > >> > > >> > > > > >>> > > > >> > > >> > > > >>> > > > >> > > >> > > >>> > > > >> > > >> > >>> > > > >> > > >> >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > > >>> > > > >>> > > >>> >>>