Hi Jingsong,

Thanks for the feedback :)

Could you clarify a little bit what do you mean by your wished use cases?

> There are a large number jobs (in production environment) that their 
> TwoInputOperators that can be chained. We used to only watch the last 
> ten tasks transmit data through disk and network, which could have been
>  done in one task.
> For performance, if we can chain them, the average is 30%+, and there
>  is an order of magnitude in extreme cases.

As I mentioned at the end, I would like to avoid/post pone chaining of 
multiple/two input operators one after another because of the complexity of 
input selection. For the first version I would like to aim only to allow 
chaining the single input operators with something (2 or N input must be always 
head of the chain) . For example chains:

a) two input operator X -> one input operator Y -> one input operator Z  
(ALLOWED)
b) n input operator X -> one input operator Y -> one input operator Z  (ALLOWED)
c) two input operator X -> one input operator Y -> two input operator Z  (NOT 
ALLOWED as a single chain)

The example above sounds to me like c)

I think as a follow up, we could allow c), by extend chaining to a simple rule: 
there can only be a single input selectable operator in the chain (again, it’s 
the chaining of multiple input selectable operators that’s causing some 
problems).

> The table layer has many special features. which give us the chance to 
> optimize
>  it, but also results that it is hard to let underlying layer to provide an 
> abstract 
> mechanism to implement it. For example:
> - HashJoin must read all the data on one side(build side) and then read the 
> other side (probe side).
> - HashJoin only emit data when read probe side.
> - SortMergeJoin read random, but if we have SortMergeJoin chain another
>  MergeJoin(Sort attribute re-use), that make things complicated.
> - HashAggregate/Sort only emit data in endInput.
> 
> Provide an N-Ary stream operator to make everything possible. The upper
>  layer can do anything. These things can be specific optimization, which is 
> much
>  more natural than the lower layer.

Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin) could 
be easily handled by a single N-Ary Stream Operator, so this would be covered 
by steps 1. and 2. from my plan from my previous e-mail? That would be real 
nice (avoiding the input selection chaining).

Piotrek

> On 4 Dec 2019, at 14:29, Jingsong Li <jingsongl...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Huge +1 for N-Ary Stream Operator.
> And I love this Golden Shovel award very much!
> 
> There are a large number jobs (in production environment) that their 
> TwoInputOperators that can be chained. We used to only watch the last 
> ten tasks transmit data through disk and network, which could have been
>  done in one task.
> For performance, if we can chain them, the average is 30%+, and there
>  is an order of magnitude in extreme cases.
> 
> The table layer has many special features. which give us the chance to 
> optimize
>  it, but also results that it is hard to let underlying layer to provide an 
> abstract 
> mechanism to implement it. For example:
> - HashJoin must read all the data on one side(build side) and then read the 
> other side (probe side).
> - HashJoin only emit data when read probe side.
> - SortMergeJoin read random, but if we have SortMergeJoin chain another
>  MergeJoin(Sort attribute re-use), that make things complicated.
> - HashAggregate/Sort only emit data in endInput.
> 
> Provide an N-Ary stream operator to make everything possible. The upper
>  layer can do anything. These things can be specific optimization, which is 
> much
>  more natural than the lower layer.
> 
> In addition to the two optimizations you mentioned, it also gives more space 
> to
>  eliminate virtual function calls:
> Because following this way, the table layer has to consider the operator 
> chain.
> And in the future, we can optimize a whole N-Ary stream operator to a
>  JIT-friendly operator. Without virtual function calls, JIT can play its real 
> strength.
> 
> Best,
> Jingsong Lee
> 
> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi,
> 
> First and foremost I would like to nominate myself to the Golden Shovel award 
> for digging out this topic:
> 
> 
> 
> Secondly, I would like to discuss coming back to this particular idea of 
> implementing N-Ary Stream Operator. This time motivation doesn’t come from 
> the Side Inputs, but to efficiently support multi joins in SQL, without extra 
> network exchanges. I’ve reviewed the design doc proposed by Aljoscha, I quite 
> like it and I think we could start from that.
> 
> Specifically the end-goal is to allow for example Blink, to:
> 
> I. Implement A* multi broadcast join - to have a single operator chain, where 
> probe table (source) is read locally (inside the task that’s is actually 
> doing the join), then joined with multiple other broadcasted tables. 
> II. Another example might be when we have 2 or more sources, pre-partitioned 
> on the same key. In that case we should also be able to perform all of the 
> table reading and the join inside a single Task.
> 
> In order to achieve that, I would propose the following plan:
> 
> 1. Implement N-Ary Stream Operator as proposed in the design doc below, 
> however with added support for the input selection [1].
>   - initially it can be just exposed via the `StreamTransformation`, without 
> direct access from the `DataStream API`
> 
> 2. Allow it to be chained with sources (implemented using the FLIP-27 
> SourceReader [2])
> 
> 3. Think about whether we need to support more complex chaining. Without this 
> point, motivating examples (I and II) could be implemented if all of the 
> joins/filtering/mappings are compiled/composed into a single N-Ary Stream 
> Operator (which could be chained with some other single input operators at 
> the tail). We could also think about supporting of chaining a tree of for 
> example TwoInputStreamOperators inside a single Task. However I’m leaving 
> this as a follow up, since in that case, it’s not so easy to handle the 
> `InputSelection` of multiple operators inside the tree.
> 
> Piotrek
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html>
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org 
>>> <mailto:aljos...@apache.org>> wrote:
>>> 
>>> Hi,
>>> yes, I see operators of this style as very much an internal thing. You are 
>>> probably talking about use cases for OneInputOperator and TwoInputOperator 
>>> where users have a very specific need and require access the the low-level 
>>> details such as watermarks, state and timers to get stuff done. Maybe we 
>>> could have a wrapper for these so that users can still use them but 
>>> internally we wrap them in an N-Ary Operator.
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org 
>>> <mailto:gyf...@apache.org>> wrote:
>>> Hey,
>>> 
>>> Some initial feedback from side:
>>> 
>>> I think this a very important problem to deal with as a lot of applications
>>> depend on it. I like the proposed runtime model and that is probably the
>>> good way to handle this task, it is very clean what is happening.
>>> 
>>> My main concern is how to handle this from the API and UDFs. What you
>>> proposed seems like a very internal thing from the API perspective and I
>>> would be against exposing it in the way you wrote in your example. We
>>> should make all effort to streamline this with the functional style
>>> operators in some way. (so in that sense the way broadcastsets are handled
>>> is pretty nice) Maybe we could extend ds.connect() to many streams
>>> 
>>> But in any case this is awesome initiative :)
>>> 
>>> Cheers,
>>> Gyula
>>> 
>>> 
>>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> ezt 
>>> írta (időpont: 2016. ápr. 21.,
>>> Cs, 15:56):
>>> 
>>>> Hi Team,
>>>> I'm currently thinking about how we can bring the broadcast set/broadcast
>>>> input feature form the DataSet API to the DataStream API. I think this
>>>> would be a valuable addition since it would enable use cases that join
>>>> streams with constant (or slowly changing) side information.
>>>> 
>>>> For this purpose, I think that we need to change the way we handle stream
>>>> operators. The new model would have one unified operator that handles all
>>>> cases and allows to add inputs after the operator was constructed, thus
>>>> allowing the specification of broadcast inputs.
>>>> 
>>>> I wrote up this preliminary document detailing the reason why we need such
>>>> a new operator for broadcast inputs and also what the API of such an
>>>> operator would be. It also quickly touches on the required changes of
>>>> existing per-operation stream operations such as StreamMap:
>>>> 
>>>> 
>>>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>>>  
>>>> <https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing>
>>>> 
>>>> Please have a look if you're interested. Feedback/insights are very
>>>> welcome. :-)
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>> 
>> 
> 
> 
> 
> -- 
> Best, Jingsong Lee

Reply via email to