Hi vino, One thing to add, for a), I think use one or two examples like how to do local aggregation on a sliding window, and how do we do local aggregation on an unbounded aggregate, will do a lot help.
Best, Kurt On Mon, Jun 24, 2019 at 6:06 PM Kurt Young <ykt...@gmail.com> wrote: > Hi vino, > > I think there are several things still need discussion. > > a) We all agree that we should first go with a unified abstraction, but > the abstraction is not reflected by the FLIP. > If your answer is "locakKeyBy" API, then I would ask how do we combine > with `AggregateFunction`, and how do > we do proper local aggregation for those have different intermediate > result type, like AVG. Could you add these > to the document? > > b) From implementation side, reusing window operator is one of the > possible solutions, but not we base on window > operator to have two different implementations. What I understanding is, > one of the possible implementations should > not touch window operator. > > c) 80% of your FLIP content is actually describing how do we support local > keyed state. I don't know if this is necessary > to introduce at the first step and we should also involve committers work > on state backend to share their thoughts. > > Best, > Kurt > > > On Mon, Jun 24, 2019 at 5:17 PM vino yang <yanghua1...@gmail.com> wrote: > >> Hi Kurt, >> >> You did not give more further different opinions, so I thought you have >> agreed with the design after we promised to support two kinds of >> implementation. >> >> In API level, we have answered your question about pass an >> AggregateFunction to do the aggregation. No matter introduce localKeyBy >> API >> or not, we can support AggregateFunction. >> >> So what's your different opinion now? Can you share it with us? >> >> Best, >> Vino >> >> Kurt Young <ykt...@gmail.com> 于2019年6月24日周一 下午4:24写道: >> >> > Hi vino, >> > >> > Sorry I don't see the consensus about reusing window operator and keep >> the >> > API design of localKeyBy. But I think we should definitely more thoughts >> > about this topic. >> > >> > I also try to loop in Stephan for this discussion. >> > >> > Best, >> > Kurt >> > >> > >> > On Mon, Jun 24, 2019 at 3:26 PM vino yang <yanghua1...@gmail.com> >> wrote: >> > >> > > Hi all, >> > > >> > > I am happy we have a wonderful discussion and received many valuable >> > > opinions in the last few days. >> > > >> > > Now, let me try to summarize what we have reached consensus about the >> > > changes in the design. >> > > >> > > - provide a unified abstraction to support two kinds of >> > implementation; >> > > - reuse WindowOperator and try to enhance it so that we can make >> the >> > > intermediate result of the local aggregation can be buffered and >> > > flushed to >> > > support two kinds of implementation; >> > > - keep the API design of localKeyBy, but declare the disabled some >> > APIs >> > > we cannot support currently, and provide a configurable API for >> users >> > to >> > > choose how to handle intermediate result; >> > > >> > > The above three points have been updated in the design doc. Any >> > > questions, please let me know. >> > > >> > > @Aljoscha Krettek <aljos...@apache.org> What do you think? Any >> further >> > > comments? >> > > >> > > Best, >> > > Vino >> > > >> > > vino yang <yanghua1...@gmail.com> 于2019年6月20日周四 下午2:02写道: >> > > >> > > > Hi Kurt, >> > > > >> > > > Thanks for your comments. >> > > > >> > > > It seems we come to a consensus that we should alleviate the >> > performance >> > > > degraded by data skew with local aggregation. In this FLIP, our key >> > > > solution is to introduce local keyed partition to achieve this goal. >> > > > >> > > > I also agree that we can benefit a lot from the usage of >> > > > AggregateFunction. In combination with localKeyBy, We can easily >> use it >> > > to >> > > > achieve local aggregation: >> > > > >> > > > - input.localKeyBy(0).aggregate() >> > > > - input.localKeyBy(0).window().aggregate() >> > > > >> > > > >> > > > I think the only problem here is the choices between >> > > > >> > > > - (1) Introducing a new primitive called localKeyBy and implement >> > > > local aggregation with existing operators, or >> > > > - (2) Introducing an operator called localAggregation which is >> > > > composed of a key selector, a window-like operator, and an >> aggregate >> > > > function. >> > > > >> > > > >> > > > There may exist some optimization opportunities by providing a >> > composited >> > > > interface for local aggregation. But at the same time, in my >> opinion, >> > we >> > > > lose flexibility (Or we need certain efforts to achieve the same >> > > > flexibility). >> > > > >> > > > As said in the previous mails, we have many use cases where the >> > > > aggregation is very complicated and cannot be performed with >> > > > AggregateFunction. For example, users may perform windowed >> aggregations >> > > > according to time, data values, or even external storage. Typically, >> > they >> > > > now use KeyedProcessFunction or customized triggers to implement >> these >> > > > aggregations. It's not easy to address data skew in such cases with >> a >> > > > composited interface for local aggregation. >> > > > >> > > > Given that Data Stream API is exactly targeted at these cases where >> the >> > > > application logic is very complicated and optimization does not >> > matter, I >> > > > think it's a better choice to provide a relatively low-level and >> > > canonical >> > > > interface. >> > > > >> > > > The composited interface, on the other side, may be a good choice in >> > > > declarative interfaces, including SQL and Table API, as it allows >> more >> > > > optimization opportunities. >> > > > >> > > > Best, >> > > > Vino >> > > > >> > > > >> > > > Kurt Young <ykt...@gmail.com> 于2019年6月20日周四 上午10:15写道: >> > > > >> > > >> Hi all, >> > > >> >> > > >> As vino said in previous emails, I think we should first discuss >> and >> > > >> decide >> > > >> what kind of use cases this FLIP want to >> > > >> resolve, and what the API should look like. From my side, I think >> this >> > > is >> > > >> probably the root cause of current divergence. >> > > >> >> > > >> My understand is (from the FLIP title and motivation section of the >> > > >> document), we want to have a proper support of >> > > >> local aggregation, or pre aggregation. This is not a very new idea, >> > most >> > > >> SQL engine already did this improvement. And >> > > >> the core concept about this is, there should be an >> AggregateFunction, >> > no >> > > >> matter it's a Flink runtime's AggregateFunction or >> > > >> SQL's UserDefinedAggregateFunction. Both aggregation have concept >> of >> > > >> intermediate data type, sometimes we call it ACC. >> > > >> I quickly went through the POC piotr did before [1], it also >> directly >> > > uses >> > > >> AggregateFunction. >> > > >> >> > > >> But the thing is, after reading the design of this FLIP, I can't >> help >> > > >> myself feeling that this FLIP is not targeting to have a proper >> > > >> local aggregation support. It actually want to introduce another >> > > concept: >> > > >> LocalKeyBy, and how to split and merge local key groups, >> > > >> and how to properly support state on local key. Local aggregation >> just >> > > >> happened to be one possible use case of LocalKeyBy. >> > > >> But it lacks supporting the essential concept of local aggregation, >> > > which >> > > >> is intermediate data type. Without this, I really don't thing >> > > >> it is a good fit of local aggregation. >> > > >> >> > > >> Here I want to make sure of the scope or the goal about this FLIP, >> do >> > we >> > > >> want to have a proper local aggregation engine, or we >> > > >> just want to introduce a new concept called LocalKeyBy? >> > > >> >> > > >> [1]: https://github.com/apache/flink/pull/4626 >> > > >> >> > > >> Best, >> > > >> Kurt >> > > >> >> > > >> >> > > >> On Wed, Jun 19, 2019 at 5:13 PM vino yang <yanghua1...@gmail.com> >> > > wrote: >> > > >> >> > > >> > Hi Hequn, >> > > >> > >> > > >> > Thanks for your comments! >> > > >> > >> > > >> > I agree that allowing local aggregation reusing window API and >> > > refining >> > > >> > window operator to make it match both requirements (come from our >> > and >> > > >> Kurt) >> > > >> > is a good decision! >> > > >> > >> > > >> > Concerning your questions: >> > > >> > >> > > >> > 1. The result of input.localKeyBy(0).sum(1).keyBy(0).sum(1) may >> be >> > > >> > meaningless. >> > > >> > >> > > >> > Yes, it does not make sense in most cases. However, I also want >> to >> > > note >> > > >> > users should know the right semantics of localKeyBy and use it >> > > >> correctly. >> > > >> > Because this issue also exists for the global keyBy, consider >> this >> > > >> example: >> > > >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is also >> > meaningless. >> > > >> > >> > > >> > 2. About the semantics of >> > > >> > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). >> > > >> > >> > > >> > Good catch! I agree with you that it's not good to enable all >> > > >> > functionalities for localKeyBy from KeyedStream. >> > > >> > Currently, We do not support some APIs such as >> > > >> > connect/join/intervalJoin/coGroup. This is due to that we force >> the >> > > >> > operators on LocalKeyedStreams chained with the inputs. >> > > >> > >> > > >> > Best, >> > > >> > Vino >> > > >> > >> > > >> > >> > > >> > Hequn Cheng <chenghe...@gmail.com> 于2019年6月19日周三 下午3:42写道: >> > > >> > >> > > >> > > Hi, >> > > >> > > >> > > >> > > Thanks a lot for your great discussion and great to see that >> some >> > > >> > agreement >> > > >> > > has been reached on the "local aggregate engine"! >> > > >> > > >> > > >> > > ===> Considering the abstract engine, >> > > >> > > I'm thinking is it valuable for us to extend the current >> window to >> > > >> meet >> > > >> > > both demands raised by Kurt and Vino? There are some benefits >> we >> > can >> > > >> get: >> > > >> > > >> > > >> > > 1. The interfaces of the window are complete and clear. With >> > > windows, >> > > >> we >> > > >> > > can define a lot of ways to split the data and perform >> different >> > > >> > > computations. >> > > >> > > 2. We can also leverage the window to do miniBatch for the >> global >> > > >> > > aggregation, i.e, we can use the window to bundle data belong >> to >> > the >> > > >> same >> > > >> > > key, for every bundle we only need to read and write once >> state. >> > > This >> > > >> can >> > > >> > > greatly reduce state IO and improve performance. >> > > >> > > 3. A lot of other use cases can also benefit from the window >> base >> > on >> > > >> > memory >> > > >> > > or stateless. >> > > >> > > >> > > >> > > ===> As for the API, >> > > >> > > I think it is good to make our API more flexible. However, we >> may >> > > >> need to >> > > >> > > make our API meaningful. >> > > >> > > >> > > >> > > Take my previous reply as an example, >> > > >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The result may be >> > > >> > meaningless. >> > > >> > > Another example I find is the intervalJoin, e.g., >> > > >> > > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In >> this >> > > >> case, it >> > > >> > > will bring problems if input1 and input2 share different >> > > parallelism. >> > > >> We >> > > >> > > don't know which input should the join chained with? Even if >> they >> > > >> share >> > > >> > the >> > > >> > > same parallelism, it's hard to tell what the join is doing. >> There >> > > are >> > > >> > maybe >> > > >> > > some other problems. >> > > >> > > >> > > >> > > From this point of view, it's at least not good to enable all >> > > >> > > functionalities for localKeyBy from KeyedStream? >> > > >> > > >> > > >> > > Great to also have your opinions. >> > > >> > > >> > > >> > > Best, Hequn >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang < >> yanghua1...@gmail.com >> > > >> > > >> > wrote: >> > > >> > > >> > > >> > > > Hi Kurt and Piotrek, >> > > >> > > > >> > > >> > > > Thanks for your comments. >> > > >> > > > >> > > >> > > > I agree that we can provide a better abstraction to be >> > compatible >> > > >> with >> > > >> > > two >> > > >> > > > different implementations. >> > > >> > > > >> > > >> > > > First of all, I think we should consider what kind of >> scenarios >> > we >> > > >> need >> > > >> > > to >> > > >> > > > support in *API* level? >> > > >> > > > >> > > >> > > > We have some use cases which need to a customized aggregation >> > > >> through >> > > >> > > > KeyedProcessFunction, (in the usage of our localKeyBy.window >> > they >> > > >> can >> > > >> > use >> > > >> > > > ProcessWindowFunction). >> > > >> > > > >> > > >> > > > Shall we support these flexible use scenarios? >> > > >> > > > >> > > >> > > > Best, >> > > >> > > > Vino >> > > >> > > > >> > > >> > > > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午8:37写道: >> > > >> > > > >> > > >> > > > > Hi Piotr, >> > > >> > > > > >> > > >> > > > > Thanks for joining the discussion. Make “local aggregation" >> > > >> abstract >> > > >> > > > enough >> > > >> > > > > sounds good to me, we could >> > > >> > > > > implement and verify alternative solutions for use cases of >> > > local >> > > >> > > > > aggregation. Maybe we will find both solutions >> > > >> > > > > are appropriate for different scenarios. >> > > >> > > > > >> > > >> > > > > Starting from a simple one sounds a practical way to go. >> What >> > do >> > > >> you >> > > >> > > > think, >> > > >> > > > > vino? >> > > >> > > > > >> > > >> > > > > Best, >> > > >> > > > > Kurt >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski < >> > > >> pi...@ververica.com> >> > > >> > > > > wrote: >> > > >> > > > > >> > > >> > > > > > Hi Kurt and Vino, >> > > >> > > > > > >> > > >> > > > > > I think there is a trade of hat we need to consider for >> the >> > > >> local >> > > >> > > > > > aggregation. >> > > >> > > > > > >> > > >> > > > > > Generally speaking I would agree with Kurt about local >> > > >> > > aggregation/pre >> > > >> > > > > > aggregation not using Flink's state flush the operator >> on a >> > > >> > > checkpoint. >> > > >> > > > > > Network IO is usually cheaper compared to Disks IO. This >> has >> > > >> > however >> > > >> > > > > couple >> > > >> > > > > > of issues: >> > > >> > > > > > 1. It can explode number of in-flight records during >> > > checkpoint >> > > >> > > barrier >> > > >> > > > > > alignment, making checkpointing slower and decrease the >> > actual >> > > >> > > > > throughput. >> > > >> > > > > > 2. This trades Disks IO on the local aggregation machine >> > with >> > > >> CPU >> > > >> > > (and >> > > >> > > > > > Disks IO in case of RocksDB) on the final aggregation >> > machine. >> > > >> This >> > > >> > > is >> > > >> > > > > > fine, as long there is no huge data skew. If there is >> only a >> > > >> > handful >> > > >> > > > (or >> > > >> > > > > > even one single) hot keys, it might be better to keep the >> > > >> > persistent >> > > >> > > > > state >> > > >> > > > > > in the LocalAggregationOperator to offload final >> aggregation >> > > as >> > > >> > much >> > > >> > > as >> > > >> > > > > > possible. >> > > >> > > > > > 3. With frequent checkpointing local aggregation >> > effectiveness >> > > >> > would >> > > >> > > > > > degrade. >> > > >> > > > > > >> > > >> > > > > > I assume Kurt is correct, that in your use cases >> stateless >> > > >> operator >> > > >> > > was >> > > >> > > > > > behaving better, but I could easily see other use cases >> as >> > > well. >> > > >> > For >> > > >> > > > > > example someone is already using RocksDB, and his job is >> > > >> > bottlenecked >> > > >> > > > on >> > > >> > > > > a >> > > >> > > > > > single window operator instance because of the data >> skew. In >> > > >> that >> > > >> > > case >> > > >> > > > > > stateful local aggregation would be probably a better >> > choice. >> > > >> > > > > > >> > > >> > > > > > Because of that, I think we should eventually provide >> both >> > > >> versions >> > > >> > > and >> > > >> > > > > in >> > > >> > > > > > the initial version we should at least make the “local >> > > >> aggregation >> > > >> > > > > engine” >> > > >> > > > > > abstract enough, that one could easily provide different >> > > >> > > implementation >> > > >> > > > > > strategy. >> > > >> > > > > > >> > > >> > > > > > Piotrek >> > > >> > > > > > >> > > >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young <ykt...@gmail.com >> > >> > > >> wrote: >> > > >> > > > > > > >> > > >> > > > > > > Hi, >> > > >> > > > > > > >> > > >> > > > > > > For the trigger, it depends on what operator we want to >> > use >> > > >> under >> > > >> > > the >> > > >> > > > > > API. >> > > >> > > > > > > If we choose to use window operator, >> > > >> > > > > > > we should also use window's trigger. However, I also >> think >> > > >> reuse >> > > >> > > > window >> > > >> > > > > > > operator for this scenario may not be >> > > >> > > > > > > the best choice. The reasons are the following: >> > > >> > > > > > > >> > > >> > > > > > > 1. As a lot of people already pointed out, window >> relies >> > > >> heavily >> > > >> > on >> > > >> > > > > state >> > > >> > > > > > > and it will definitely effect performance. You can >> > > >> > > > > > > argue that one can use heap based statebackend, but >> this >> > > will >> > > >> > > > introduce >> > > >> > > > > > > extra coupling. Especially we have a chance to >> > > >> > > > > > > design a pure stateless operator. >> > > >> > > > > > > 2. The window operator is *the most* complicated >> operator >> > > >> Flink >> > > >> > > > > currently >> > > >> > > > > > > have. Maybe we only need to pick a subset of >> > > >> > > > > > > window operator to achieve the goal, but once the user >> > wants >> > > >> to >> > > >> > > have >> > > >> > > > a >> > > >> > > > > > deep >> > > >> > > > > > > look at the localAggregation operator, it's still >> > > >> > > > > > > hard to find out what's going on under the window >> > operator. >> > > >> For >> > > >> > > > > > simplicity, >> > > >> > > > > > > I would also recommend we introduce a dedicated >> > > >> > > > > > > lightweight operator, which also much easier for a >> user to >> > > >> learn >> > > >> > > and >> > > >> > > > > use. >> > > >> > > > > > > >> > > >> > > > > > > For your question about increasing the burden in >> > > >> > > > > > > `StreamOperator::prepareSnapshotPreBarrier()`, the only >> > > thing >> > > >> > this >> > > >> > > > > > function >> > > >> > > > > > > need >> > > >> > > > > > > to do is output all the partial results, it's purely >> cpu >> > > >> > workload, >> > > >> > > > not >> > > >> > > > > > > introducing any IO. I want to point out that even if we >> > have >> > > >> this >> > > >> > > > > > > cost, we reduced another barrier align cost of the >> > operator, >> > > >> > which >> > > >> > > is >> > > >> > > > > the >> > > >> > > > > > > sync flush stage of the state, if you introduced state. >> > This >> > > >> > > > > > > flush actually will introduce disk IO, and I think it's >> > > >> worthy to >> > > >> > > > > > exchange >> > > >> > > > > > > this cost with purely CPU workload. And we do have some >> > > >> > > > > > > observations about these two behavior (as i said >> before, >> > we >> > > >> > > actually >> > > >> > > > > > > implemented both solutions), the stateless one actually >> > > >> performs >> > > >> > > > > > > better both in performance and barrier align time. >> > > >> > > > > > > >> > > >> > > > > > > Best, >> > > >> > > > > > > Kurt >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang < >> > > >> yanghua1...@gmail.com >> > > >> > > >> > > >> > > > > wrote: >> > > >> > > > > > > >> > > >> > > > > > >> Hi Kurt, >> > > >> > > > > > >> >> > > >> > > > > > >> Thanks for your example. Now, it looks more clearly >> for >> > me. >> > > >> > > > > > >> >> > > >> > > > > > >> From your example code snippet, I saw the >> localAggregate >> > > API >> > > >> has >> > > >> > > > three >> > > >> > > > > > >> parameters: >> > > >> > > > > > >> >> > > >> > > > > > >> 1. key field >> > > >> > > > > > >> 2. PartitionAvg >> > > >> > > > > > >> 3. CountTrigger: Does this trigger comes from window >> > > >> package? >> > > >> > > > > > >> >> > > >> > > > > > >> I will compare our and your design from API and >> operator >> > > >> level: >> > > >> > > > > > >> >> > > >> > > > > > >> *From the API level:* >> > > >> > > > > > >> >> > > >> > > > > > >> As I replied to @dianfu in the old email thread,[1] >> the >> > > >> Window >> > > >> > API >> > > >> > > > can >> > > >> > > > > > >> provide the second and the third parameter right now. >> > > >> > > > > > >> >> > > >> > > > > > >> If you reuse specified interface or class, such as >> > > *Trigger* >> > > >> or >> > > >> > > > > > >> *CounterTrigger* provided by window package, but do >> not >> > use >> > > >> > window >> > > >> > > > > API, >> > > >> > > > > > >> it's not reasonable. >> > > >> > > > > > >> And if you do not reuse these interface or class, you >> > would >> > > >> need >> > > >> > > to >> > > >> > > > > > >> introduce more things however they are looked similar >> to >> > > the >> > > >> > > things >> > > >> > > > > > >> provided by window package. >> > > >> > > > > > >> >> > > >> > > > > > >> The window package has provided several types of the >> > window >> > > >> and >> > > >> > > many >> > > >> > > > > > >> triggers and let users customize it. What's more, the >> > user >> > > is >> > > >> > more >> > > >> > > > > > familiar >> > > >> > > > > > >> with Window API. >> > > >> > > > > > >> >> > > >> > > > > > >> This is the reason why we just provide localKeyBy API >> and >> > > >> reuse >> > > >> > > the >> > > >> > > > > > window >> > > >> > > > > > >> API. It reduces unnecessary components such as >> triggers >> > and >> > > >> the >> > > >> > > > > > mechanism >> > > >> > > > > > >> of buffer (based on count num or time). >> > > >> > > > > > >> And it has a clear and easy to understand semantics. >> > > >> > > > > > >> >> > > >> > > > > > >> *From the operator level:* >> > > >> > > > > > >> >> > > >> > > > > > >> We reused window operator, so we can get all the >> benefits >> > > >> from >> > > >> > > state >> > > >> > > > > and >> > > >> > > > > > >> checkpoint. >> > > >> > > > > > >> >> > > >> > > > > > >> From your design, you named the operator under >> > > localAggregate >> > > >> > API >> > > >> > > > is a >> > > >> > > > > > >> *stateless* operator. IMO, it is still a state, it is >> > just >> > > >> not >> > > >> > > Flink >> > > >> > > > > > >> managed state. >> > > >> > > > > > >> About the memory buffer (I think it's still not very >> > clear, >> > > >> if >> > > >> > you >> > > >> > > > > have >> > > >> > > > > > >> time, can you give more detail information or answer >> my >> > > >> > > questions), >> > > >> > > > I >> > > >> > > > > > have >> > > >> > > > > > >> some questions: >> > > >> > > > > > >> >> > > >> > > > > > >> - if it just a raw JVM heap memory buffer, how to >> > support >> > > >> > fault >> > > >> > > > > > >> tolerance, if the job is configured EXACTLY-ONCE >> > semantic >> > > >> > > > guarantee? >> > > >> > > > > > >> - if you thought the memory buffer(non-Flink state), >> > has >> > > >> > better >> > > >> > > > > > >> performance. In our design, users can also config >> HEAP >> > > >> state >> > > >> > > > backend >> > > >> > > > > > to >> > > >> > > > > > >> provide the performance close to your mechanism. >> > > >> > > > > > >> - `StreamOperator::prepareSnapshotPreBarrier()` >> related >> > > to >> > > >> the >> > > >> > > > > timing >> > > >> > > > > > of >> > > >> > > > > > >> snapshot. IMO, the flush action should be a >> > synchronized >> > > >> > action? >> > > >> > > > (if >> > > >> > > > > > >> not, >> > > >> > > > > > >> please point out my mistake) I still think we should >> > not >> > > >> > depend >> > > >> > > on >> > > >> > > > > the >> > > >> > > > > > >> timing of checkpoint. Checkpoint related operations >> are >> > > >> > inherent >> > > >> > > > > > >> performance sensitive, we should not increase its >> > burden >> > > >> > > anymore. >> > > >> > > > > Our >> > > >> > > > > > >> implementation based on the mechanism of Flink's >> > > >> checkpoint, >> > > >> > > which >> > > >> > > > > can >> > > >> > > > > > >> benefit from the asnyc snapshot and incremental >> > > checkpoint. >> > > >> > IMO, >> > > >> > > > the >> > > >> > > > > > >> performance is not a problem, and we also do not >> find >> > the >> > > >> > > > > performance >> > > >> > > > > > >> issue >> > > >> > > > > > >> in our production. >> > > >> > > > > > >> >> > > >> > > > > > >> [1]: >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >> > > >> > > > > > >> >> > > >> > > > > > >> Best, >> > > >> > > > > > >> Vino >> > > >> > > > > > >> >> > > >> > > > > > >> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午2:27写道: >> > > >> > > > > > >> >> > > >> > > > > > >>> Yeah, sorry for not expressing myself clearly. I will >> > try >> > > to >> > > >> > > > provide >> > > >> > > > > > more >> > > >> > > > > > >>> details to make sure we are on the same page. >> > > >> > > > > > >>> >> > > >> > > > > > >>> For DataStream API, it shouldn't be optimized >> > > automatically. >> > > >> > You >> > > >> > > > have >> > > >> > > > > > to >> > > >> > > > > > >>> explicitly call API to do local aggregation >> > > >> > > > > > >>> as well as the trigger policy of the local >> aggregation. >> > > Take >> > > >> > > > average >> > > >> > > > > > for >> > > >> > > > > > >>> example, the user program may look like this (just a >> > > draft): >> > > >> > > > > > >>> >> > > >> > > > > > >>> assuming the input type is DataStream<Tupl2<String, >> > Int>> >> > > >> > > > > > >>> >> > > >> > > > > > >>> ds.localAggregate( >> > > >> > > > > > >>> 0, // >> The >> > > local >> > > >> > key, >> > > >> > > > > which >> > > >> > > > > > >> is >> > > >> > > > > > >>> the String from Tuple2 >> > > >> > > > > > >>> PartitionAvg(1), // The >> partial >> > > >> > > aggregation >> > > >> > > > > > >>> function, produces Tuple2<Long, Int>, indicating sum >> and >> > > >> count >> > > >> > > > > > >>> CountTrigger.of(1000L) // Trigger policy, >> note >> > > >> this >> > > >> > > > should >> > > >> > > > > be >> > > >> > > > > > >>> best effort, and also be composited with time based >> or >> > > >> memory >> > > >> > > size >> > > >> > > > > > based >> > > >> > > > > > >>> trigger >> > > >> > > > > > >>> ) // The >> > > return >> > > >> > type >> > > >> > > > is >> > > >> > > > > > >> local >> > > >> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>> >> > > >> > > > > > >>> .keyBy(0) // Further >> > keyby >> > > it >> > > >> > with >> > > >> > > > > > >> required >> > > >> > > > > > >>> key >> > > >> > > > > > >>> .aggregate(1) // This will >> merge >> > > all >> > > >> > the >> > > >> > > > > > partial >> > > >> > > > > > >>> results and get the final average. >> > > >> > > > > > >>> >> > > >> > > > > > >>> (This is only a draft, only trying to explain what it >> > > looks >> > > >> > > like. ) >> > > >> > > > > > >>> >> > > >> > > > > > >>> The local aggregate operator can be stateless, we can >> > > keep a >> > > >> > > memory >> > > >> > > > > > >> buffer >> > > >> > > > > > >>> or other efficient data structure to improve the >> > aggregate >> > > >> > > > > performance. >> > > >> > > > > > >>> >> > > >> > > > > > >>> Let me know if you have any other questions. >> > > >> > > > > > >>> >> > > >> > > > > > >>> Best, >> > > >> > > > > > >>> Kurt >> > > >> > > > > > >>> >> > > >> > > > > > >>> >> > > >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang < >> > > >> > yanghua1...@gmail.com >> > > >> > > > >> > > >> > > > > > wrote: >> > > >> > > > > > >>> >> > > >> > > > > > >>>> Hi Kurt, >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> Thanks for your reply. >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> Actually, I am not against you to raise your design. >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> From your description before, I just can imagine >> your >> > > >> > high-level >> > > >> > > > > > >>>> implementation is about SQL and the optimization is >> > inner >> > > >> of >> > > >> > the >> > > >> > > > > API. >> > > >> > > > > > >> Is >> > > >> > > > > > >>> it >> > > >> > > > > > >>>> automatically? how to give the configuration option >> > about >> > > >> > > trigger >> > > >> > > > > > >>>> pre-aggregation? >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> Maybe after I get more information, it sounds more >> > > >> reasonable. >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> IMO, first of all, it would be better to make your >> user >> > > >> > > interface >> > > >> > > > > > >>> concrete, >> > > >> > > > > > >>>> it's the basis of the discussion. >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> For example, can you give an example code snippet to >> > > >> introduce >> > > >> > > how >> > > >> > > > > to >> > > >> > > > > > >>> help >> > > >> > > > > > >>>> users to process data skew caused by the jobs which >> > built >> > > >> with >> > > >> > > > > > >> DataStream >> > > >> > > > > > >>>> API? >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> If you give more details we can discuss further >> more. I >> > > >> think >> > > >> > if >> > > >> > > > one >> > > >> > > > > > >>> design >> > > >> > > > > > >>>> introduces an exact interface and another does not. >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> The implementation has an obvious difference. For >> > > example, >> > > >> we >> > > >> > > > > > introduce >> > > >> > > > > > >>> an >> > > >> > > > > > >>>> exact API in DataStream named localKeyBy, about the >> > > >> > > > pre-aggregation >> > > >> > > > > we >> > > >> > > > > > >>> need >> > > >> > > > > > >>>> to define the trigger mechanism of local >> aggregation, >> > so >> > > we >> > > >> > find >> > > >> > > > > > reused >> > > >> > > > > > >>>> window API and operator is a good choice. This is a >> > > >> reasoning >> > > >> > > link >> > > >> > > > > > from >> > > >> > > > > > >>>> design to implementation. >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> What do you think? >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> Best, >> > > >> > > > > > >>>> Vino >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> >> > > >> > > > > > >>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 >> 上午11:58写道: >> > > >> > > > > > >>>> >> > > >> > > > > > >>>>> Hi Vino, >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>>> Now I feel that we may have different >> understandings >> > > about >> > > >> > what >> > > >> > > > > kind >> > > >> > > > > > >> of >> > > >> > > > > > >>>>> problems or improvements you want to >> > > >> > > > > > >>>>> resolve. Currently, most of the feedback are >> focusing >> > on >> > > >> *how >> > > >> > > to >> > > >> > > > > do a >> > > >> > > > > > >>>>> proper local aggregation to improve performance >> > > >> > > > > > >>>>> and maybe solving the data skew issue*. And my gut >> > > >> feeling is >> > > >> > > > this >> > > >> > > > > is >> > > >> > > > > > >>>>> exactly what users want at the first place, >> > > >> > > > > > >>>>> especially those +1s. (Sorry to try to summarize >> here, >> > > >> please >> > > >> > > > > correct >> > > >> > > > > > >>> me >> > > >> > > > > > >>>> if >> > > >> > > > > > >>>>> i'm wrong). >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>>> But I still think the design is somehow diverged >> from >> > > the >> > > >> > goal. >> > > >> > > > If >> > > >> > > > > we >> > > >> > > > > > >>>> want >> > > >> > > > > > >>>>> to have an efficient and powerful way to >> > > >> > > > > > >>>>> have local aggregation, supporting intermedia >> result >> > > type >> > > >> is >> > > >> > > > > > >> essential >> > > >> > > > > > >>>> IMO. >> > > >> > > > > > >>>>> Both runtime's `AggregateFunction` and >> > > >> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction` have a proper >> > > >> support of >> > > >> > > > > > >>>> intermediate >> > > >> > > > > > >>>>> result type and can do `merge` operation >> > > >> > > > > > >>>>> on them. >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>>> Now, we have a lightweight alternatives which >> performs >> > > >> well, >> > > >> > > and >> > > >> > > > > > >> have a >> > > >> > > > > > >>>>> nice fit with the local aggregate requirements. >> > > >> > > > > > >>>>> Mostly importantly, it's much less complex because >> > it's >> > > >> > > > stateless. >> > > >> > > > > > >> And >> > > >> > > > > > >>>> it >> > > >> > > > > > >>>>> can also achieve the similar multiple-aggregation >> > > >> > > > > > >>>>> scenario. >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>>> I still not convinced why we shouldn't consider it >> as >> > a >> > > >> first >> > > >> > > > step. >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>>> Best, >> > > >> > > > > > >>>>> Kurt >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino yang < >> > > >> > > > yanghua1...@gmail.com> >> > > >> > > > > > >>>> wrote: >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>>>> Hi Kurt, >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> Thanks for your comments. >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> It seems we both implemented local aggregation >> > feature >> > > to >> > > >> > > > optimize >> > > >> > > > > > >>> the >> > > >> > > > > > >>>>>> issue of data skew. >> > > >> > > > > > >>>>>> However, IMHO, the API level of optimizing >> revenue is >> > > >> > > different. >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> *Your optimization benefits from Flink SQL and >> it's >> > not >> > > >> > user's >> > > >> > > > > > >>>> faces.(If >> > > >> > > > > > >>>>> I >> > > >> > > > > > >>>>>> understand it incorrectly, please correct this.)* >> > > >> > > > > > >>>>>> *Our implementation employs it as an optimization >> > tool >> > > >> API >> > > >> > for >> > > >> > > > > > >>>>> DataStream, >> > > >> > > > > > >>>>>> it just like a local version of the keyBy API.* >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> Based on this, I want to say support it as a >> > DataStream >> > > >> API >> > > >> > > can >> > > >> > > > > > >>> provide >> > > >> > > > > > >>>>>> these advantages: >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> - The localKeyBy API has a clear semantic and >> it's >> > > >> > flexible >> > > >> > > > not >> > > >> > > > > > >>> only >> > > >> > > > > > >>>>> for >> > > >> > > > > > >>>>>> processing data skew but also for implementing >> some >> > > >> user >> > > >> > > > cases, >> > > >> > > > > > >>> for >> > > >> > > > > > >>>>>> example, if we want to calculate the >> multiple-level >> > > >> > > > aggregation, >> > > >> > > > > > >>> we >> > > >> > > > > > >>>>> can >> > > >> > > > > > >>>>>> do >> > > >> > > > > > >>>>>> multiple-level aggregation in the local >> > aggregation: >> > > >> > > > > > >>>>>> >> > > input.localKeyBy("a").sum(1).localKeyBy("b").window(); >> > > >> // >> > > >> > > here >> > > >> > > > > > >> "a" >> > > >> > > > > > >>>> is >> > > >> > > > > > >>>>> a >> > > >> > > > > > >>>>>> sub-category, while "b" is a category, here we >> do >> > not >> > > >> need >> > > >> > > to >> > > >> > > > > > >>>> shuffle >> > > >> > > > > > >>>>>> data >> > > >> > > > > > >>>>>> in the network. >> > > >> > > > > > >>>>>> - The users of DataStream API will benefit from >> > this. >> > > >> > > > Actually, >> > > >> > > > > > >> we >> > > >> > > > > > >>>>> have >> > > >> > > > > > >>>>>> a lot of scenes need to use DataStream API. >> > > Currently, >> > > >> > > > > > >> DataStream >> > > >> > > > > > >>>> API >> > > >> > > > > > >>>>> is >> > > >> > > > > > >>>>>> the cornerstone of the physical plan of Flink >> SQL. >> > > >> With a >> > > >> > > > > > >>> localKeyBy >> > > >> > > > > > >>>>>> API, >> > > >> > > > > > >>>>>> the optimization of SQL at least may use this >> > > optimized >> > > >> > API, >> > > >> > > > > > >> this >> > > >> > > > > > >>>> is a >> > > >> > > > > > >>>>>> further topic. >> > > >> > > > > > >>>>>> - Based on the window operator, our state would >> > > benefit >> > > >> > from >> > > >> > > > > > >> Flink >> > > >> > > > > > >>>>> State >> > > >> > > > > > >>>>>> and checkpoint, we do not need to worry about >> OOM >> > and >> > > >> job >> > > >> > > > > > >> failed. >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> Now, about your questions: >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> 1. About our design cannot change the data type >> and >> > > about >> > > >> > the >> > > >> > > > > > >>>>>> implementation of average: >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> Just like my reply to Hequn, the localKeyBy is an >> API >> > > >> > provides >> > > >> > > > to >> > > >> > > > > > >> the >> > > >> > > > > > >>>>> users >> > > >> > > > > > >>>>>> who use DataStream API to build their jobs. >> > > >> > > > > > >>>>>> Users should know its semantics and the difference >> > with >> > > >> > keyBy >> > > >> > > > API, >> > > >> > > > > > >> so >> > > >> > > > > > >>>> if >> > > >> > > > > > >>>>>> they want to the average aggregation, they should >> > carry >> > > >> > local >> > > >> > > > sum >> > > >> > > > > > >>>> result >> > > >> > > > > > >>>>>> and local count result. >> > > >> > > > > > >>>>>> I admit that it will be convenient to use keyBy >> > > directly. >> > > >> > But >> > > >> > > we >> > > >> > > > > > >> need >> > > >> > > > > > >>>> to >> > > >> > > > > > >>>>>> pay a little price when we get some benefits. I >> think >> > > >> this >> > > >> > > price >> > > >> > > > > is >> > > >> > > > > > >>>>>> reasonable. Considering that the DataStream API >> > itself >> > > >> is a >> > > >> > > > > > >> low-level >> > > >> > > > > > >>>> API >> > > >> > > > > > >>>>>> (at least for now). >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> 2. About stateless operator and >> > > >> > > > > > >>>>>> `StreamOperator::prepareSnapshotPreBarrier()`: >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> Actually, I have discussed this opinion with >> @dianfu >> > in >> > > >> the >> > > >> > > old >> > > >> > > > > > >> mail >> > > >> > > > > > >>>>>> thread. I will copy my opinion from there: >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> - for your design, you still need somewhere to >> give >> > > the >> > > >> > > users >> > > >> > > > > > >>>>> configure >> > > >> > > > > > >>>>>> the trigger threshold (maybe memory >> availability?), >> > > >> this >> > > >> > > > design >> > > >> > > > > > >>>> cannot >> > > >> > > > > > >>>>>> guarantee a deterministic semantics (it will >> bring >> > > >> trouble >> > > >> > > for >> > > >> > > > > > >>>> testing >> > > >> > > > > > >>>>>> and >> > > >> > > > > > >>>>>> debugging). >> > > >> > > > > > >>>>>> - if the implementation depends on the timing of >> > > >> > checkpoint, >> > > >> > > > it >> > > >> > > > > > >>>> would >> > > >> > > > > > >>>>>> affect the checkpoint's progress, and the >> buffered >> > > data >> > > >> > may >> > > >> > > > > > >> cause >> > > >> > > > > > >>>> OOM >> > > >> > > > > > >>>>>> issue. In addition, if the operator is >> stateless, >> > it >> > > >> can >> > > >> > not >> > > >> > > > > > >>> provide >> > > >> > > > > > >>>>>> fault >> > > >> > > > > > >>>>>> tolerance. >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> Best, >> > > >> > > > > > >>>>>> Vino >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 >> > 上午9:22写道: >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>>>> Hi Vino, >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>>> Thanks for the proposal, I like the general idea >> and >> > > IMO >> > > >> > it's >> > > >> > > > > > >> very >> > > >> > > > > > >>>>> useful >> > > >> > > > > > >>>>>>> feature. >> > > >> > > > > > >>>>>>> But after reading through the document, I feel >> that >> > we >> > > >> may >> > > >> > > over >> > > >> > > > > > >>>> design >> > > >> > > > > > >>>>>> the >> > > >> > > > > > >>>>>>> required >> > > >> > > > > > >>>>>>> operator for proper local aggregation. The main >> > reason >> > > >> is >> > > >> > we >> > > >> > > > want >> > > >> > > > > > >>> to >> > > >> > > > > > >>>>>> have a >> > > >> > > > > > >>>>>>> clear definition and behavior about the "local >> keyed >> > > >> state" >> > > >> > > > which >> > > >> > > > > > >>> in >> > > >> > > > > > >>>> my >> > > >> > > > > > >>>>>>> opinion is not >> > > >> > > > > > >>>>>>> necessary for local aggregation, at least for >> start. >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>>> Another issue I noticed is the local key by >> operator >> > > >> cannot >> > > >> > > > > > >> change >> > > >> > > > > > >>>>>> element >> > > >> > > > > > >>>>>>> type, it will >> > > >> > > > > > >>>>>>> also restrict a lot of use cases which can be >> > benefit >> > > >> from >> > > >> > > > local >> > > >> > > > > > >>>>>>> aggregation, like "average". >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>>> We also did similar logic in SQL and the only >> thing >> > > >> need to >> > > >> > > be >> > > >> > > > > > >> done >> > > >> > > > > > >>>> is >> > > >> > > > > > >>>>>>> introduce >> > > >> > > > > > >>>>>>> a stateless lightweight operator which is >> *chained* >> > > >> before >> > > >> > > > > > >>> `keyby()`. >> > > >> > > > > > >>>>> The >> > > >> > > > > > >>>>>>> operator will flush all buffered >> > > >> > > > > > >>>>>>> elements during >> > > >> > `StreamOperator::prepareSnapshotPreBarrier()` >> > > >> > > > and >> > > >> > > > > > >>>> make >> > > >> > > > > > >>>>>>> himself stateless. >> > > >> > > > > > >>>>>>> By the way, in the earlier version we also did >> the >> > > >> similar >> > > >> > > > > > >> approach >> > > >> > > > > > >>>> by >> > > >> > > > > > >>>>>>> introducing a stateful >> > > >> > > > > > >>>>>>> local aggregation operator but it's not >> performed as >> > > >> well >> > > >> > as >> > > >> > > > the >> > > >> > > > > > >>>> later >> > > >> > > > > > >>>>>> one, >> > > >> > > > > > >>>>>>> and also effect the barrie >> > > >> > > > > > >>>>>>> alignment time. The later one is fairly simple >> and >> > > more >> > > >> > > > > > >> efficient. >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>>> I would highly suggest you to consider to have a >> > > >> stateless >> > > >> > > > > > >> approach >> > > >> > > > > > >>>> at >> > > >> > > > > > >>>>>> the >> > > >> > > > > > >>>>>>> first step. >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>>> Best, >> > > >> > > > > > >>>>>>> Kurt >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark Wu < >> > > >> imj...@gmail.com> >> > > >> > > > > > >> wrote: >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>>>> Hi Vino, >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>>> Thanks for the proposal. >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>>> Regarding to the "input.keyBy(0).sum(1)" vs >> > > >> > > > > > >>>>>>>> >> > > >> > "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", >> > > >> > > > > > >> have >> > > >> > > > > > >>>> you >> > > >> > > > > > >>>>>>> done >> > > >> > > > > > >>>>>>>> some benchmark? >> > > >> > > > > > >>>>>>>> Because I'm curious about how much performance >> > > >> improvement >> > > >> > > can >> > > >> > > > > > >> we >> > > >> > > > > > >>>> get >> > > >> > > > > > >>>>>> by >> > > >> > > > > > >>>>>>>> using count window as the local operator. >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>>> Best, >> > > >> > > > > > >>>>>>>> Jark >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino yang < >> > > >> > > > yanghua1...@gmail.com >> > > >> > > > > > >>> >> > > >> > > > > > >>>>> wrote: >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>>>> Hi Hequn, >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> Thanks for your reply. >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> The purpose of localKeyBy API is to provide a >> tool >> > > >> which >> > > >> > > can >> > > >> > > > > > >>> let >> > > >> > > > > > >>>>>> users >> > > >> > > > > > >>>>>>> do >> > > >> > > > > > >>>>>>>>> pre-aggregation in the local. The behavior of >> the >> > > >> > > > > > >>> pre-aggregation >> > > >> > > > > > >>>>> is >> > > >> > > > > > >>>>>>>>> similar to keyBy API. >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> So the three cases are different, I will >> describe >> > > them >> > > >> > one >> > > >> > > by >> > > >> > > > > > >>>> one: >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1) >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> *In this case, the result is event-driven, each >> > > event >> > > >> can >> > > >> > > > > > >>> produce >> > > >> > > > > > >>>>> one >> > > >> > > > > > >>>>>>> sum >> > > >> > > > > > >>>>>>>>> aggregation result and it is the latest one >> from >> > the >> > > >> > source >> > > >> > > > > > >>>> start.* >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1) >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> *In this case, the semantic may have a >> problem, it >> > > >> would >> > > >> > do >> > > >> > > > > > >> the >> > > >> > > > > > >>>>> local >> > > >> > > > > > >>>>>>> sum >> > > >> > > > > > >>>>>>>>> aggregation and will produce the latest partial >> > > result >> > > >> > from >> > > >> > > > > > >> the >> > > >> > > > > > >>>>>> source >> > > >> > > > > > >>>>>>>>> start for every event. * >> > > >> > > > > > >>>>>>>>> *These latest partial results from the same key >> > are >> > > >> > hashed >> > > >> > > to >> > > >> > > > > > >>> one >> > > >> > > > > > >>>>>> node >> > > >> > > > > > >>>>>>> to >> > > >> > > > > > >>>>>>>>> do the global sum aggregation.* >> > > >> > > > > > >>>>>>>>> *In the global aggregation, when it received >> > > multiple >> > > >> > > partial >> > > >> > > > > > >>>>> results >> > > >> > > > > > >>>>>>>> (they >> > > >> > > > > > >>>>>>>>> are all calculated from the source start) and >> sum >> > > them >> > > >> > will >> > > >> > > > > > >> get >> > > >> > > > > > >>>> the >> > > >> > > > > > >>>>>>> wrong >> > > >> > > > > > >>>>>>>>> result.* >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> 3. >> > > >> > > input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> *In this case, it would just get a partial >> > > aggregation >> > > >> > > result >> > > >> > > > > > >>> for >> > > >> > > > > > >>>>>> the 5 >> > > >> > > > > > >>>>>>>>> records in the count window. The partial >> > aggregation >> > > >> > > results >> > > >> > > > > > >>> from >> > > >> > > > > > >>>>> the >> > > >> > > > > > >>>>>>>> same >> > > >> > > > > > >>>>>>>>> key will be aggregated globally.* >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> So the first case and the third case can get >> the >> > > >> *same* >> > > >> > > > > > >> result, >> > > >> > > > > > >>>> the >> > > >> > > > > > >>>>>>>>> difference is the output-style and the latency. >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> Generally speaking, the local key API is just >> an >> > > >> > > optimization >> > > >> > > > > > >>>> API. >> > > >> > > > > > >>>>> We >> > > >> > > > > > >>>>>>> do >> > > >> > > > > > >>>>>>>>> not limit the user's usage, but the user has to >> > > >> > understand >> > > >> > > > > > >> its >> > > >> > > > > > >>>>>>> semantics >> > > >> > > > > > >>>>>>>>> and use it correctly. >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> Best, >> > > >> > > > > > >>>>>>>>> Vino >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com> >> 于2019年6月17日周一 >> > > >> > 下午4:18写道: >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>>>> Hi Vino, >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> Thanks for the proposal, I think it is a very >> > good >> > > >> > > feature! >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> One thing I want to make sure is the semantics >> > for >> > > >> the >> > > >> > > > > > >>>>>> `localKeyBy`. >> > > >> > > > > > >>>>>>>> From >> > > >> > > > > > >>>>>>>>>> the document, the `localKeyBy` API returns an >> > > >> instance >> > > >> > of >> > > >> > > > > > >>>>>>> `KeyedStream` >> > > >> > > > > > >>>>>>>>>> which can also perform sum(), so in this case, >> > > what's >> > > >> > the >> > > >> > > > > > >>>>> semantics >> > > >> > > > > > >>>>>>> for >> > > >> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will the >> following >> > > code >> > > >> > share >> > > >> > > > > > >>> the >> > > >> > > > > > >>>>> same >> > > >> > > > > > >>>>>>>>> result? >> > > >> > > > > > >>>>>>>>>> and what're the differences between them? >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1) >> > > >> > > > > > >>>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1) >> > > >> > > > > > >>>>>>>>>> 3. >> > > >> > > > > > >> >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> Would also be great if we can add this into >> the >> > > >> > document. >> > > >> > > > > > >>> Thank >> > > >> > > > > > >>>>> you >> > > >> > > > > > >>>>>>>> very >> > > >> > > > > > >>>>>>>>>> much. >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> Best, Hequn >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM vino yang < >> > > >> > > > > > >>>>> yanghua1...@gmail.com> >> > > >> > > > > > >>>>>>>>> wrote: >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> Hi Aljoscha, >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> I have looked at the "*Process*" section of >> FLIP >> > > >> wiki >> > > >> > > > > > >>>> page.[1] >> > > >> > > > > > >>>>>> This >> > > >> > > > > > >>>>>>>>> mail >> > > >> > > > > > >>>>>>>>>>> thread indicates that it has proceeded to the >> > > third >> > > >> > step. >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> When I looked at the fourth step(vote step), >> I >> > > >> didn't >> > > >> > > > > > >> find >> > > >> > > > > > >>>> the >> > > >> > > > > > >>>>>>>>>>> prerequisites for starting the voting >> process. >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> Considering that the discussion of this >> feature >> > > has >> > > >> > been >> > > >> > > > > > >>> done >> > > >> > > > > > >>>>> in >> > > >> > > > > > >>>>>>> the >> > > >> > > > > > >>>>>>>>> old >> > > >> > > > > > >>>>>>>>>>> thread. [2] So can you tell me when should I >> > start >> > > >> > > > > > >> voting? >> > > >> > > > > > >>>> Can >> > > >> > > > > > >>>>> I >> > > >> > > > > > >>>>>>>> start >> > > >> > > > > > >>>>>>>>>> now? >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> Best, >> > > >> > > > > > >>>>>>>>>>> Vino >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> [1]: >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>> >> > > >> > > > > > >>> >> > > >> > > > > > >> >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up >> > > >> > > > > > >>>>>>>>>>> [2]: >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>> >> > > >> > > > > > >>> >> > > >> > > > > > >> >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com> 于2019年6月13日周四 >> > > 上午9:19写道: >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for your >> efforts. >> > > >> > > > > > >>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>> Best, >> > > >> > > > > > >>>>>>>>>>>> Leesf >> > > >> > > > > > >>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com> >> > 于2019年6月12日周三 >> > > >> > > > > > >>> 下午5:46写道: >> > > >> > > > > > >>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> Hi folks, >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> I would like to start the FLIP discussion >> > thread >> > > >> > > > > > >> about >> > > >> > > > > > >>>>>>> supporting >> > > >> > > > > > >>>>>>>>>> local >> > > >> > > > > > >>>>>>>>>>>>> aggregation in Flink. >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> In short, this feature can effectively >> > alleviate >> > > >> data >> > > >> > > > > > >>>> skew. >> > > >> > > > > > >>>>>>> This >> > > >> > > > > > >>>>>>>> is >> > > >> > > > > > >>>>>>>>>> the >> > > >> > > > > > >>>>>>>>>>>>> FLIP: >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>> >> > > >> > > > > > >>> >> > > >> > > > > > >> >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP) >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> Currently, keyed streams are widely used to >> > > >> perform >> > > >> > > > > > >>>>>> aggregating >> > > >> > > > > > >>>>>>>>>>>> operations >> > > >> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window) on the >> elements >> > > >> that >> > > >> > > > > > >>> have >> > > >> > > > > > >>>>> the >> > > >> > > > > > >>>>>>> same >> > > >> > > > > > >>>>>>>>>> key. >> > > >> > > > > > >>>>>>>>>>>> When >> > > >> > > > > > >>>>>>>>>>>>> executed at runtime, the elements with the >> > same >> > > >> key >> > > >> > > > > > >>> will >> > > >> > > > > > >>>> be >> > > >> > > > > > >>>>>>> sent >> > > >> > > > > > >>>>>>>> to >> > > >> > > > > > >>>>>>>>>> and >> > > >> > > > > > >>>>>>>>>>>>> aggregated by the same task. >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> The performance of these aggregating >> > operations >> > > is >> > > >> > > > > > >> very >> > > >> > > > > > >>>>>>> sensitive >> > > >> > > > > > >>>>>>>>> to >> > > >> > > > > > >>>>>>>>>>> the >> > > >> > > > > > >>>>>>>>>>>>> distribution of keys. In the cases where >> the >> > > >> > > > > > >>> distribution >> > > >> > > > > > >>>>> of >> > > >> > > > > > >>>>>>> keys >> > > >> > > > > > >>>>>>>>>>>> follows a >> > > >> > > > > > >>>>>>>>>>>>> powerful law, the performance will be >> > > >> significantly >> > > >> > > > > > >>>>>> downgraded. >> > > >> > > > > > >>>>>>>>> More >> > > >> > > > > > >>>>>>>>>>>>> unluckily, increasing the degree of >> > parallelism >> > > >> does >> > > >> > > > > > >>> not >> > > >> > > > > > >>>>> help >> > > >> > > > > > >>>>>>>> when >> > > >> > > > > > >>>>>>>>> a >> > > >> > > > > > >>>>>>>>>>> task >> > > >> > > > > > >>>>>>>>>>>>> is overloaded by a single key. >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> Local aggregation is a widely-adopted >> method >> > to >> > > >> > > > > > >> reduce >> > > >> > > > > > >>>> the >> > > >> > > > > > >>>>>>>>>> performance >> > > >> > > > > > >>>>>>>>>>>>> degraded by data skew. We can decompose the >> > > >> > > > > > >> aggregating >> > > >> > > > > > >>>>>>>> operations >> > > >> > > > > > >>>>>>>>>> into >> > > >> > > > > > >>>>>>>>>>>> two >> > > >> > > > > > >>>>>>>>>>>>> phases. In the first phase, we aggregate >> the >> > > >> elements >> > > >> > > > > > >>> of >> > > >> > > > > > >>>>> the >> > > >> > > > > > >>>>>>> same >> > > >> > > > > > >>>>>>>>> key >> > > >> > > > > > >>>>>>>>>>> at >> > > >> > > > > > >>>>>>>>>>>>> the sender side to obtain partial results. >> > Then >> > > at >> > > >> > > > > > >> the >> > > >> > > > > > >>>>> second >> > > >> > > > > > >>>>>>>>> phase, >> > > >> > > > > > >>>>>>>>>>>> these >> > > >> > > > > > >>>>>>>>>>>>> partial results are sent to receivers >> > according >> > > to >> > > >> > > > > > >>> their >> > > >> > > > > > >>>>> keys >> > > >> > > > > > >>>>>>> and >> > > >> > > > > > >>>>>>>>> are >> > > >> > > > > > >>>>>>>>>>>>> combined to obtain the final result. Since >> the >> > > >> number >> > > >> > > > > > >>> of >> > > >> > > > > > >>>>>>> partial >> > > >> > > > > > >>>>>>>>>>> results >> > > >> > > > > > >>>>>>>>>>>>> received by each receiver is limited by the >> > > >> number of >> > > >> > > > > > >>>>>> senders, >> > > >> > > > > > >>>>>>>> the >> > > >> > > > > > >>>>>>>>>>>>> imbalance among receivers can be reduced. >> > > >> Besides, by >> > > >> > > > > > >>>>>> reducing >> > > >> > > > > > >>>>>>>> the >> > > >> > > > > > >>>>>>>>>>> amount >> > > >> > > > > > >>>>>>>>>>>>> of transferred data the performance can be >> > > further >> > > >> > > > > > >>>>> improved. >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> *More details*: >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> Design documentation: >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>> >> > > >> > > > > > >>> >> > > >> > > > > > >> >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> > >> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> Old discussion thread: >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>> >> > > >> > > > > > >>> >> > > >> > > > > > >> >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 < >> > > >> > > > > > >>>>>>>>> >> https://issues.apache.org/jira/browse/FLINK-12786 >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> We are looking forwards to your feedback! >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>>> Best, >> > > >> > > > > > >>>>>>>>>>>>> Vino >> > > >> > > > > > >>>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>>> >> > > >> > > > > > >>>>>>>>>> >> > > >> > > > > > >>>>>>>>> >> > > >> > > > > > >>>>>>>> >> > > >> > > > > > >>>>>>> >> > > >> > > > > > >>>>>> >> > > >> > > > > > >>>>> >> > > >> > > > > > >>>> >> > > >> > > > > > >>> >> > > >> > > > > > >> >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > > >> > > >> > >> >