Hi Jingsong, Thanks for the feedback :)
Could you clarify a little bit what do you mean by your wished use cases? > There are a large number jobs (in production environment) that their > TwoInputOperators that can be chained. We used to only watch the last > ten tasks transmit data through disk and network, which could have been > done in one task. > For performance, if we can chain them, the average is 30%+, and there > is an order of magnitude in extreme cases. As I mentioned at the end, I would like to avoid/post pone chaining of multiple/two input operators one after another because of the complexity of input selection. For the first version I would like to aim only to allow chaining the single input operators with something (2 or N input must be always head of the chain) . For example chains: a) two input operator X -> one input operator Y -> one input operator Z (ALLOWED) b) n input operator X -> one input operator Y -> one input operator Z (ALLOWED) c) two input operator X -> one input operator Y -> two input operator Z (NOT ALLOWED as a single chain) The example above sounds to me like c) I think as a follow up, we could allow c), by extend chaining to a simple rule: there can only be a single input selectable operator in the chain (again, it’s the chaining of multiple input selectable operators that’s causing some problems). > The table layer has many special features. which give us the chance to > optimize > it, but also results that it is hard to let underlying layer to provide an > abstract > mechanism to implement it. For example: > - HashJoin must read all the data on one side(build side) and then read the > other side (probe side). > - HashJoin only emit data when read probe side. > - SortMergeJoin read random, but if we have SortMergeJoin chain another > MergeJoin(Sort attribute re-use), that make things complicated. > - HashAggregate/Sort only emit data in endInput. > > Provide an N-Ary stream operator to make everything possible. The upper > layer can do anything. These things can be specific optimization, which is > much > more natural than the lower layer. Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin) could be easily handled by a single N-Ary Stream Operator, so this would be covered by steps 1. and 2. from my plan from my previous e-mail? That would be real nice (avoiding the input selection chaining). Piotrek > On 4 Dec 2019, at 14:29, Jingsong Li <jingsongl...@gmail.com> wrote: > > Hi Piotr, > > Huge +1 for N-Ary Stream Operator. > And I love this Golden Shovel award very much! > > There are a large number jobs (in production environment) that their > TwoInputOperators that can be chained. We used to only watch the last > ten tasks transmit data through disk and network, which could have been > done in one task. > For performance, if we can chain them, the average is 30%+, and there > is an order of magnitude in extreme cases. > > The table layer has many special features. which give us the chance to > optimize > it, but also results that it is hard to let underlying layer to provide an > abstract > mechanism to implement it. For example: > - HashJoin must read all the data on one side(build side) and then read the > other side (probe side). > - HashJoin only emit data when read probe side. > - SortMergeJoin read random, but if we have SortMergeJoin chain another > MergeJoin(Sort attribute re-use), that make things complicated. > - HashAggregate/Sort only emit data in endInput. > > Provide an N-Ary stream operator to make everything possible. The upper > layer can do anything. These things can be specific optimization, which is > much > more natural than the lower layer. > > In addition to the two optimizations you mentioned, it also gives more space > to > eliminate virtual function calls: > Because following this way, the table layer has to consider the operator > chain. > And in the future, we can optimize a whole N-Ary stream operator to a > JIT-friendly operator. Without virtual function calls, JIT can play its real > strength. > > Best, > Jingsong Lee > > On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi, > > First and foremost I would like to nominate myself to the Golden Shovel award > for digging out this topic: > > > > Secondly, I would like to discuss coming back to this particular idea of > implementing N-Ary Stream Operator. This time motivation doesn’t come from > the Side Inputs, but to efficiently support multi joins in SQL, without extra > network exchanges. I’ve reviewed the design doc proposed by Aljoscha, I quite > like it and I think we could start from that. > > Specifically the end-goal is to allow for example Blink, to: > > I. Implement A* multi broadcast join - to have a single operator chain, where > probe table (source) is read locally (inside the task that’s is actually > doing the join), then joined with multiple other broadcasted tables. > II. Another example might be when we have 2 or more sources, pre-partitioned > on the same key. In that case we should also be able to perform all of the > table reading and the join inside a single Task. > > In order to achieve that, I would propose the following plan: > > 1. Implement N-Ary Stream Operator as proposed in the design doc below, > however with added support for the input selection [1]. > - initially it can be just exposed via the `StreamTransformation`, without > direct access from the `DataStream API` > > 2. Allow it to be chained with sources (implemented using the FLIP-27 > SourceReader [2]) > > 3. Think about whether we need to support more complex chaining. Without this > point, motivating examples (I and II) could be implemented if all of the > joins/filtering/mappings are compiled/composed into a single N-Ary Stream > Operator (which could be chained with some other single input operators at > the tail). We could also think about supporting of chaining a tree of for > example TwoInputStreamOperators inside a single Task. However I’m leaving > this as a follow up, since in that case, it’s not so easy to handle the > `InputSelection` of multiple operators inside the tree. > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html> > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface> >>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org >>> <mailto:aljos...@apache.org>> wrote: >>> >>> Hi, >>> yes, I see operators of this style as very much an internal thing. You are >>> probably talking about use cases for OneInputOperator and TwoInputOperator >>> where users have a very specific need and require access the the low-level >>> details such as watermarks, state and timers to get stuff done. Maybe we >>> could have a wrapper for these so that users can still use them but >>> internally we wrap them in an N-Ary Operator. >>> >>> Cheers, >>> Aljoscha >>> >>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org >>> <mailto:gyf...@apache.org>> wrote: >>> Hey, >>> >>> Some initial feedback from side: >>> >>> I think this a very important problem to deal with as a lot of applications >>> depend on it. I like the proposed runtime model and that is probably the >>> good way to handle this task, it is very clean what is happening. >>> >>> My main concern is how to handle this from the API and UDFs. What you >>> proposed seems like a very internal thing from the API perspective and I >>> would be against exposing it in the way you wrote in your example. We >>> should make all effort to streamline this with the functional style >>> operators in some way. (so in that sense the way broadcastsets are handled >>> is pretty nice) Maybe we could extend ds.connect() to many streams >>> >>> But in any case this is awesome initiative :) >>> >>> Cheers, >>> Gyula >>> >>> >>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> ezt >>> írta (időpont: 2016. ápr. 21., >>> Cs, 15:56): >>> >>>> Hi Team, >>>> I'm currently thinking about how we can bring the broadcast set/broadcast >>>> input feature form the DataSet API to the DataStream API. I think this >>>> would be a valuable addition since it would enable use cases that join >>>> streams with constant (or slowly changing) side information. >>>> >>>> For this purpose, I think that we need to change the way we handle stream >>>> operators. The new model would have one unified operator that handles all >>>> cases and allows to add inputs after the operator was constructed, thus >>>> allowing the specification of broadcast inputs. >>>> >>>> I wrote up this preliminary document detailing the reason why we need such >>>> a new operator for broadcast inputs and also what the API of such an >>>> operator would be. It also quickly touches on the required changes of >>>> existing per-operation stream operations such as StreamMap: >>>> >>>> >>>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing >>>> >>>> <https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing> >>>> >>>> Please have a look if you're interested. Feedback/insights are very >>>> welcome. :-) >>>> >>>> Cheers, >>>> Aljoscha >>>> >> > > > > -- > Best, Jingsong Lee