Hi, Good question. I think not, at least not in the first version, unless someone can convince us that this is better to do immediately.
Piotrek > On 14 Jan 2020, at 04:49, Yun Tang <myas...@live.com> wrote: > > Hi > > I noticed that previous design doc [1] also talked about the topic of > introducing new KeyedStreamOperatorNG, I wonder is that a must-do to > introduce N-ary stream operator? > > > [1] > https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI > > <https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI> > > Best > Yun Tang > From: Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> > Sent: Thursday, January 9, 2020 23:27 > To: dev <dev@flink.apache.org <mailto:dev@flink.apache.org>> > Subject: Re: [DISCUSS] Add N-Ary Stream Operator > > Hi, > > I have started a vote on this topic [1], please cast your +1 or -1 there :) > > Also I assigned FLIP-92 number to this design doc. > > Piotrek > > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html > > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html> > > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html > > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html>> > > > On 10 Dec 2019, at 07:10, Jingsong Li <jingsongl...@gmail.com > > <mailto:jingsongl...@gmail.com>> wrote: > > > > Hi Piotr, > > > > Sorry for the misunderstanding, chaining does work with multiple output > > right now, I mean, it's also a very important feature, and it should work > > with N-ary selectable input operators. > > We all think that providing N-ary selectable input operator is a very > > important thing, it makes TwoInputOperator chaining possible in upper > > layer, and it makes things simpler. > > > > Looking forward to it very much. > > > > Best, > > Jingsong Lee > > > > On Thu, Dec 5, 2019 at 6:01 PM Piotr Nowojski <pi...@ververica.com > > <mailto:pi...@ververica.com>> wrote: > > > >> Hi, > >> > >> Thanks for the clarifications Jingsong. Indeed, if chaining doesn’t work > >> with multiple output right now (doesn’t it?), that’s also a good future > >> story. > >> > >> Re Kurt: > >> I think this pattern could be easily handled if those two joins are > >> implemented as a single 3 input operator, that internally is composed of > >> those three operators. > >> 1. You can set the initial InputSelection to Build1 and Build2. > >> 2. When Build1 receives `endOfInput`, InputSelection switches to Probe1 > >> and Build2. > >> 3. When Probe1 receives `endOfInput`, you do not forward the `endOfInput` > >> to the internal `HashAgg` operator > >> 4. When Build2 finally receives `endOfInput`, you can finally forward the > >> `endOfInput` to the internal `HashAgg` > >> > >> Exactly for reasons like that, I wanted to at least post pone handling > >> tree-like operator chains in the Flink. Logic like that is difficult to > >> express generically, since it requires the knowledge about the operators > >> behaviour. While when hardcoded for the specific project (Blink in this > >> case) and encapsulated behind N-ary selectable input operator, it’s very > >> easy to handle by the runtime. Sure, at the expense of a bit more > >> complexity in forcing the user to compose operators, that’s why I’m not > >> saying that we do not want to handle this at some point in the future, but > >> at least not in the first version. > >> > >> Piotrek > >> > >>> On 5 Dec 2019, at 10:11, Jingsong Li <jingsongl...@gmail.com > >>> <mailto:jingsongl...@gmail.com>> wrote: > >>> > >>> Kurt mentioned a very interesting thing, > >>> > >>> If we want to better performance to read simultaneously, To this pattern: > >>> We need to control not only the read order of inputs, but also the > >> outputs > >>> of endInput. > >>> In this case, HashAggregate can only call its real endInput after the > >> input > >>> of build2 is finished, so the endInput of an operator is not necessarily > >>> determined by its input, but also by other associated inputs. > >>> I think we have the ability to do this in the n-input operator. > >>> > >>> Note that these behaviors should be determined at compile time. > >>> > >>> Best, > >>> Jingsong Lee > >>> > >>> On Thu, Dec 5, 2019 at 4:42 PM Kurt Young <ykt...@gmail.com > >>> <mailto:ykt...@gmail.com>> wrote: > >>> > >>>> During implementing n-ary input operator in table, please keep > >>>> this pattern in mind: > >>>> > >>>> Build1 ---+ > >>>> > >>>> | > >>>> > >>>> +---> HshJoin1 --—> HashAgg ---+ > >>>> > >>>> | | > >>>> > >>>> Probe1 ---+ +---> HashJoin2 > >>>> > >>>> | > >>>> > >>>> Build2 ---+ > >>>> > >>>> It's quite interesting that both `Build1`, `Build2` and `Probe1` can > >>>> be read simultaneously. But we need to control `HashAgg`'s output > >>>> before `Build2` finished. I don't have a clear solution for now, but > >>>> it's a common pattern we will face. > >>>> > >>>> Best, > >>>> Kurt > >>>> > >>>> > >>>> On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <jingsongl...@gmail.com > >>>> <mailto:jingsongl...@gmail.com>> > >> wrote: > >>>> > >>>>> Hi Piotr, > >>>>> > >>>>>> a) two input operator X -> one input operator Y -> one input operator > >> Z > >>>>> (ALLOWED) > >>>>>> b) n input operator X -> one input operator Y -> one input operator Z > >>>>> (ALLOWED) > >>>>>> c) two input operator X -> one input operator Y -> two input operator > >> Z > >>>>> (NOT ALLOWED as a single chain) > >>>>> > >>>>> NOT ALLOWED to c) sounds good to me. I understand that it is very > >>>> difficult > >>>>> to propose a general support for any input selectable two input > >> operators > >>>>> chain with high performance. > >>>>> And it is not necessary for table layer too. b) has already excited us. > >>>>> > >>>>> Actually, we have supported n output chain too: > >>>>> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1 > >>>>> -> one op A2 -> one op > >>>> B2 > >>>>> -> one op C2 > >>>>> d) is a very useful feature too. > >>>>> > >>>>>> Do you mean that those Table API/SQL use cases > >> (HashJoin/SortMergeJoin) > >>>>> could be easily handled by a single N-Ary Stream Operator, so this > >> would > >>>> be > >>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That > >>>> would > >>>>> be real nice (avoiding the input selection chaining). > >>>>> > >>>>> Yes, because in the table layer, the typical scenarios currently only > >>>> have > >>>>> static order. (We don't consider MergeJoin here, because it's too > >> complex > >>>>> to be optimized, and not deserved to be optimized at present.). > >>>>> For example, the current TwoInputOperators: HashJoin and > >> NestedLoopJoin. > >>>>> They are all static reading order. We must read the build input before > >> we > >>>>> can read the probe input. > >>>>> So after we analyze chain, we put all the operators that can chain into > >>>> a N > >>>>> input operator, We can analyze the static order required by this > >>>> operator, > >>>>> and divide the reading order into several levels: > >>>>> - fist level: input4, input5, input1 > >>>>> - second level: input2, input6 > >>>>> - third level: input1, input7 > >>>>> Note that these analyses are at the compile time of the client. > >>>>> At runtime, we just need to read in a fixed order. > >>>>> > >>>>> Best, > >>>>> Jingsong Lee > >>>>> > >>>>> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com > >>>>> <mailto:pi...@ververica.com>> > >>>>> wrote: > >>>>> > >>>>>> Hi Jingsong, > >>>>>> > >>>>>> Thanks for the feedback :) > >>>>>> > >>>>>> Could you clarify a little bit what do you mean by your wished use > >>>> cases? > >>>>>> > >>>>>>> There are a large number jobs (in production environment) that their > >>>>>>> TwoInputOperators that can be chained. We used to only watch the last > >>>>>>> ten tasks transmit data through disk and network, which could have > >>>> been > >>>>>>> done in one task. > >>>>>>> For performance, if we can chain them, the average is 30%+, and there > >>>>>>> is an order of magnitude in extreme cases. > >>>>>> > >>>>>> As I mentioned at the end, I would like to avoid/post pone chaining of > >>>>>> multiple/two input operators one after another because of the > >>>> complexity > >>>>> of > >>>>>> input selection. For the first version I would like to aim only to > >>>> allow > >>>>>> chaining the single input operators with something (2 or N input must > >>>> be > >>>>>> always head of the chain) . For example chains: > >>>>>> > >>>>>> a) two input operator X -> one input operator Y -> one input operator > >> Z > >>>>>> (ALLOWED) > >>>>>> b) n input operator X -> one input operator Y -> one input operator Z > >>>>>> (ALLOWED) > >>>>>> c) two input operator X -> one input operator Y -> two input operator > >> Z > >>>>>> (NOT ALLOWED as a single chain) > >>>>>> > >>>>>> The example above sounds to me like c) > >>>>>> > >>>>>> I think as a follow up, we could allow c), by extend chaining to a > >>>> simple > >>>>>> rule: there can only be a single input selectable operator in the > >> chain > >>>>>> (again, it’s the chaining of multiple input selectable operators > >> that’s > >>>>>> causing some problems). > >>>>>> > >>>>>>> The table layer has many special features. which give us the chance > >>>> to > >>>>>> optimize > >>>>>>> it, but also results that it is hard to let underlying layer to > >>>>> provide > >>>>>> an abstract > >>>>>>> mechanism to implement it. For example: > >>>>>>> - HashJoin must read all the data on one side(build side) and then > >>>> read > >>>>>> the > >>>>>>> other side (probe side). > >>>>>>> - HashJoin only emit data when read probe side. > >>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain > >>>> another > >>>>>>> MergeJoin(Sort attribute re-use), that make things complicated. > >>>>>>> - HashAggregate/Sort only emit data in endInput. > >>>>>>> > >>>>>>> Provide an N-Ary stream operator to make everything possible. The > >>>> upper > >>>>>>> layer can do anything. These things can be specific optimization, > >>>>> which > >>>>>> is much > >>>>>>> more natural than the lower layer. > >>>>>> > >>>>>> Do you mean that those Table API/SQL use cases > >> (HashJoin/SortMergeJoin) > >>>>>> could be easily handled by a single N-Ary Stream Operator, so this > >>>> would > >>>>> be > >>>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That > >>>>> would > >>>>>> be real nice (avoiding the input selection chaining). > >>>>>> > >>>>>> Piotrek > >>>>>> > >>>>>>> On 4 Dec 2019, at 14:29, Jingsong Li <jingsongl...@gmail.com > >>>>>>> <mailto:jingsongl...@gmail.com>> wrote: > >>>>>>> > >>>>>>> Hi Piotr, > >>>>>>> > >>>>>>> Huge +1 for N-Ary Stream Operator. > >>>>>>> And I love this Golden Shovel award very much! > >>>>>>> > >>>>>>> There are a large number jobs (in production environment) that their > >>>>>>> TwoInputOperators that can be chained. We used to only watch the last > >>>>>>> ten tasks transmit data through disk and network, which could have > >>>> been > >>>>>>> done in one task. > >>>>>>> For performance, if we can chain them, the average is 30%+, and there > >>>>>>> is an order of magnitude in extreme cases. > >>>>>>> > >>>>>>> The table layer has many special features. which give us the chance > >>>> to > >>>>>> optimize > >>>>>>> it, but also results that it is hard to let underlying layer to > >>>>> provide > >>>>>> an abstract > >>>>>>> mechanism to implement it. For example: > >>>>>>> - HashJoin must read all the data on one side(build side) and then > >>>> read > >>>>>> the > >>>>>>> other side (probe side). > >>>>>>> - HashJoin only emit data when read probe side. > >>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain > >>>> another > >>>>>>> MergeJoin(Sort attribute re-use), that make things complicated. > >>>>>>> - HashAggregate/Sort only emit data in endInput. > >>>>>>> > >>>>>>> Provide an N-Ary stream operator to make everything possible. The > >>>> upper > >>>>>>> layer can do anything. These things can be specific optimization, > >>>>> which > >>>>>> is much > >>>>>>> more natural than the lower layer. > >>>>>>> > >>>>>>> In addition to the two optimizations you mentioned, it also gives > >>>> more > >>>>>> space to > >>>>>>> eliminate virtual function calls: > >>>>>>> Because following this way, the table layer has to consider the > >>>>> operator > >>>>>> chain. > >>>>>>> And in the future, we can optimize a whole N-Ary stream operator to a > >>>>>>> JIT-friendly operator. Without virtual function calls, JIT can play > >>>>> its > >>>>>> real strength. > >>>>>>> > >>>>>>> Best, > >>>>>>> Jingsong Lee > >>>>>>> > >>>>>>> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com > >>>>>>> <mailto:pi...@ververica.com> > >>>>>> <mailto:pi...@ververica.com <mailto:pi...@ververica.com>>> wrote: > >>>>>>> Hi, > >>>>>>> > >>>>>>> First and foremost I would like to nominate myself to the Golden > >>>> Shovel > >>>>>> award for digging out this topic: > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Secondly, I would like to discuss coming back to this particular idea > >>>>> of > >>>>>> implementing N-Ary Stream Operator. This time motivation doesn’t come > >>>>> from > >>>>>> the Side Inputs, but to efficiently support multi joins in SQL, > >> without > >>>>>> extra network exchanges. I’ve reviewed the design doc proposed by > >>>>> Aljoscha, > >>>>>> I quite like it and I think we could start from that. > >>>>>>> > >>>>>>> Specifically the end-goal is to allow for example Blink, to: > >>>>>>> > >>>>>>> I. Implement A* multi broadcast join - to have a single operator > >>>> chain, > >>>>>> where probe table (source) is read locally (inside the task that’s is > >>>>>> actually doing the join), then joined with multiple other broadcasted > >>>>>> tables. > >>>>>>> II. Another example might be when we have 2 or more sources, > >>>>>> pre-partitioned on the same key. In that case we should also be able > >> to > >>>>>> perform all of the table reading and the join inside a single Task. > >>>>>>> > >>>>>>> In order to achieve that, I would propose the following plan: > >>>>>>> > >>>>>>> 1. Implement N-Ary Stream Operator as proposed in the design doc > >>>> below, > >>>>>> however with added support for the input selection [1]. > >>>>>>> - initially it can be just exposed via the `StreamTransformation`, > >>>>>> without direct access from the `DataStream API` > >>>>>>> > >>>>>>> 2. Allow it to be chained with sources (implemented using the FLIP-27 > >>>>>> SourceReader [2]) > >>>>>>> > >>>>>>> 3. Think about whether we need to support more complex chaining. > >>>>> Without > >>>>>> this point, motivating examples (I and II) could be implemented if all > >>>> of > >>>>>> the joins/filtering/mappings are compiled/composed into a single N-Ary > >>>>>> Stream Operator (which could be chained with some other single input > >>>>>> operators at the tail). We could also think about supporting of > >>>> chaining > >>>>> a > >>>>>> tree of for example TwoInputStreamOperators inside a single Task. > >>>> However > >>>>>> I’m leaving this as a follow up, since in that case, it’s not so easy > >>>> to > >>>>>> handle the `InputSelection` of multiple operators inside the tree. > >>>>>>> > >>>>>>> Piotrek > >>>>>>> > >>>>>>> [1] > >>>>>> > >>>>> > >>>> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html > >> > >> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html> > >>>>>> < > >>>>>> > >>>>> > >>>> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html > >> > >> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html> > >>>>>>> > >>>>>>> [2] > >>>>>> > >>>>> > >>>> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > >> > >> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface> > >>>>>> < > >>>>>> > >>>>> > >>>> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > >> > >> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface> > >>>>>>> > >>>>>>>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org > >>>>>>>>> <mailto:aljos...@apache.org> > >>>>>> <mailto:aljos...@apache.org <mailto:aljos...@apache.org>>> wrote: > >>>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> yes, I see operators of this style as very much an internal thing. > >>>>> You > >>>>>> are probably talking about use cases for OneInputOperator and > >>>>>> TwoInputOperator where users have a very specific need and require > >>>> access > >>>>>> the the low-level details such as watermarks, state and timers to get > >>>>> stuff > >>>>>> done. Maybe we could have a wrapper for these so that users can still > >>>> use > >>>>>> them but internally we wrap them in an N-Ary Operator. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Aljoscha > >>>>>>>>> > >>>>>>>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org > >>>>>>>>> <mailto:gyf...@apache.org> > >>>> <mailto: > >>>>>> gyf...@apache.org <mailto:gyf...@apache.org>>> wrote: > >>>>>>>>> Hey, > >>>>>>>>> > >>>>>>>>> Some initial feedback from side: > >>>>>>>>> > >>>>>>>>> I think this a very important problem to deal with as a lot of > >>>>>> applications > >>>>>>>>> depend on it. I like the proposed runtime model and that is > >>>> probably > >>>>>> the > >>>>>>>>> good way to handle this task, it is very clean what is happening. > >>>>>>>>> > >>>>>>>>> My main concern is how to handle this from the API and UDFs. What > >>>> you > >>>>>>>>> proposed seems like a very internal thing from the API perspective > >>>>> and > >>>>>> I > >>>>>>>>> would be against exposing it in the way you wrote in your example. > >>>> We > >>>>>>>>> should make all effort to streamline this with the functional style > >>>>>>>>> operators in some way. (so in that sense the way broadcastsets are > >>>>>> handled > >>>>>>>>> is pretty nice) Maybe we could extend ds.connect() to many streams > >>>>>>>>> > >>>>>>>>> But in any case this is awesome initiative :) > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Gyula > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org> > >>>>>>>>> < > >>>>>>>>> <applewebdata://871EABA1-D0CC-44F4-8186-E60D01B3A4DE>mailto:aljos...@apache.org > >>>>>>>>> <mailto:aljos...@apache.org> > >>>>>> > >>>>>> ezt írta (időpont: 2016. ápr. 21., > >>>>>>>>> Cs, 15:56): > >>>>>>>>> > >>>>>>>>>> Hi Team, > >>>>>>>>>> I'm currently thinking about how we can bring the broadcast > >>>>>> set/broadcast > >>>>>>>>>> input feature form the DataSet API to the DataStream API. I think > >>>>> this > >>>>>>>>>> would be a valuable addition since it would enable use cases that > >>>>> join > >>>>>>>>>> streams with constant (or slowly changing) side information. > >>>>>>>>>> > >>>>>>>>>> For this purpose, I think that we need to change the way we handle > >>>>>> stream > >>>>>>>>>> operators. The new model would have one unified operator that > >>>>> handles > >>>>>> all > >>>>>>>>>> cases and allows to add inputs after the operator was constructed, > >>>>>> thus > >>>>>>>>>> allowing the specification of broadcast inputs. > >>>>>>>>>> > >>>>>>>>>> I wrote up this preliminary document detailing the reason why we > >>>>> need > >>>>>> such > >>>>>>>>>> a new operator for broadcast inputs and also what the API of such > >>>> an > >>>>>>>>>> operator would be. It also quickly touches on the required changes > >>>>> of > >>>>>>>>>> existing per-operation stream operations such as StreamMap: > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>>> > >>>> > >> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing > >> > >> <https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing> > >>>>>> < > >>>>>> > >>>>> > >>>> > >> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing > >> > >> <https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing> > >>>>>>> > >>>>>>>>>> > >>>>>>>>>> Please have a look if you're interested. Feedback/insights are > >>>> very > >>>>>>>>>> welcome. :-) > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Aljoscha > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Best, Jingsong Lee > >>>>>> > >>>>>> > >>>>> > >>>>> -- > >>>>> Best, Jingsong Lee > >>>>> > >>>> > >>> > >>> > >>> -- > >>> Best, Jingsong Lee > >> > >> > > > > -- > > Best, Jingsong Lee >