Which classes need to be changed or use new classes?


I'm working on a design that Flink runtime support TwoInputSelectable,
and I'll give a initial proposal document next Monday. In the proposal,
the core classes that need to be changed include UnionInputGate and
StremTwoInputProcessor. I think the discussion that follows can be based
on this initial proposal.


Do we need other refactoring?


From the codes we have implemented this functionality, no other refactors
need to be done before it.


Best,
Haibo
At 2019-02-14 18:54:57, "Stephan Ewen" <se...@apache.org> wrote:
>To move this forward, would suggest the following:
>
>  - Let's quickly check which other classes need to change. I assume the
>TwoInputStreamTask and StreamTwoInputProcessor ?
>  - Can those changes be new classes that are used when the new operator is
>used? The current TwoInputStreamTask and StreamTwoInputProcessor remain
>until they are fully subsumed and are then removed.
>
>  - Do we need and other refactorings before, like some cleanup of the
>Operator Config or the Operator Chain?
>
>Best,
>Stephan
>
>
>On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <guowei....@gmail.com> wrote:
>
>> 2019.2.10
>>
>>
>> Hi,Stephan
>>
>>
>> Thank you very much for such detailed and constructive comments.
>>
>>
>> *binary vs. n-ary* and *enum vs. integer*
>>
>>
>> Considering the N-ary, as you mentioned, using integers may be a better
>> choice.
>>
>>
>> *generic selectable interface*
>>
>>
>> You are right. This interface can be removed.
>>
>>
>> *end-input*
>>
>> It is true that the Operator does not need to store the end-input state,
>> which can be inferred by the system and notify the Operator at the right
>> time. We can consider using this mechanism when the system can checkpoint
>> the topology with the Finish Tasks.
>>
>>
>> *early-out*
>>
>> It is reasonable for me not to consider this situation at present.
>>
>>
>> *distributed stream deadlocks*
>>
>>
>> At present, there is no deadlock for the streaming, but I think it might
>> be  still necessary to do some validation(Warning or Reject) in JobGraph.
>> Because once Flink introduces this TwoInputSelectable interface, the user
>> of the streaming would also construct a diamond-style topology that may be
>> deadlocked.
>>
>>
>> *empty input / selection timeout*
>>
>> It is reasonable for me not to consider this situation at present.
>>
>>
>> *timers*
>>
>> When all the inputs are finished, TimeService will wait until all timers
>> are triggered. So there should be no problem. I and others guys are
>> confirming the details to see if there are other considerations
>>
>>
>> Best
>>
>> GuoWei
>>
>> Stephan Ewen <se...@apache.org> 于2019年2月8日周五 下午7:56写道:
>>
>> > Nice design proposal, and +1 to the general idea.
>> >
>> > A few thoughts / suggestions:
>> >
>> > *binary vs. n-ary*
>> >
>> > I would plan ahead for N-ary operators. Not because we necessarily need
>> > n-ary inputs (one can probably build that purely in the API) but because
>> of
>> > future side inputs. The proposal should be able to handle that as well.
>> >
>> > *enum vs. integer*
>> >
>> > The above might be easier is to realize when going directly with integer
>> > and having ANY, FIRST, SECOND, etc. as pre-defined constants.
>> > Performance wise, it is probably not difference whether to use int or
>> enum.
>> >
>> > *generic selectable interface*
>> >
>> > From the proposal, I don't understand quite what that interface is for.
>> My
>> > understanding is that the input processor or task that calls the
>> > operators's functions would anyways work on the TwoInputStreamOperator
>> > interface, for efficiency.
>> >
>> > *end-input*
>> >
>> > I think we should not make storing the end-input the operator's
>> > responsibility
>> > There is a simple way to handle this, which is also consistent with other
>> > aspects of handling finished tasks:
>> >
>> >   - If a task is finished, that should be stored in the checkpoint.
>> >  - Upon restoring a finished task, if it has still running successors, we
>> > deploy a "finished input channel", which immediately send the "end of
>> > input" when task is started.
>> >  - the operator will hence set the end of input immediately again upon
>> >
>> > *early-out*
>> >
>> > Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
>> > early-out cases, but I would remove this from the scope of this proposal.
>> > There are most likely other big changes involved, like communicating this
>> > to the upstream operators.
>> >
>> > *distributed stream deadlocks*
>> >
>> > We had this issue in the DataSet API. Earlier versions of the DataSet API
>> > made an analysis of the flow detecting dams and whether the pipeline
>> > breaking behavior in the flow would cause deadlocks, and introduce
>> > artificial pipeline breakers in response.
>> >
>> > The logic was really complicated and it took a while to become stable. We
>> > had several issues that certain user functions (like mapPartition) could
>> > either be pipelined or have a full dam (not possible to know for the
>> > system), so we had to insert artificial pipeline breakers in all paths.
>> >
>> > In the end we simply decided that in the case of a diamond-style flow, we
>> > make the point where the flow first forks as blocking shuffle. That was
>> > super simple, solved all issues, and has the additional nice property
>> that
>> > it great point to materialize data for recovery, because it helps both
>> > paths of the diamond upon failure.
>> >
>> > My suggestion:
>> > ==> For streaming, no problem so far, nothing to do
>> > ==> For batch, would suggest to go with the simple solution described
>> above
>> > first, and improve when we see cases where this impacts performance
>> > significantly
>> >
>> > *empty input / selection timeout*
>> >
>> > I can see that being relevant in future streaming cases, for example with
>> > side inputs. You want to wait for the side input data, but with a
>> timeout,
>> > so the program can still proceed with non-perfect context data in case
>> that
>> > context data is very late.
>> >
>> > Because we do not support side inputs at the moment, we may want to defer
>> > this for now. Let's not over-design for problems that are not well
>> > understood at this point.
>> >
>> > *timers*
>> >
>> > I don't understand the problem with timers. Timers are bound to the
>> > operator, not the input, so they should still work if an input ends.
>> > There are cases where some state in the operator that is only relevant as
>> > long as an input still has data (like in symmetric joins) and the timers
>> > are relevant to that state.
>> > When the state is dropped, the timers should also be dropped, but that is
>> > the operator's logic on "endInput()". So there is no inherent issue
>> between
>> > input and timers.
>> >
>> > Best,
>> > Stephan
>> >
>> >
>> > On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <guowei....@gmail.com> wrote:
>> >
>> > > Hi, guys:
>> > > I propose a design to enhance Stream Operator API for Batch’s
>> > requirements.
>> > > This is also the Flink’s goal that Batch is a special case of
>> Streaming.
>> > > This
>> > > proposal mainly contains two changes to operator api:
>> > >
>> > > 1. Allow "StreamOperator" can choose which input to read;
>> > > 2. Notify "StreamOperator" that an input has ended.
>> > >
>> > >
>> > > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo
>> > Sun
>> > > offlline.
>> > > It will be great to hear the feed backs and suggestions from the
>> > community.
>> > > Please kindly share your comments and suggestions.
>> > >
>> > > Best
>> > > GuoWei Ma.
>> > >
>> > >  Enhance Operator API to Support Dynamically Sel...
>> > > <
>> > >
>> >
>> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
>> > > >
>> > >
>> >
>>

Reply via email to