To move this forward, would suggest the following:

  - Let's quickly check which other classes need to change. I assume the
TwoInputStreamTask and StreamTwoInputProcessor ?
  - Can those changes be new classes that are used when the new operator is
used? The current TwoInputStreamTask and StreamTwoInputProcessor remain
until they are fully subsumed and are then removed.

  - Do we need and other refactorings before, like some cleanup of the
Operator Config or the Operator Chain?

Best,
Stephan


On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <guowei....@gmail.com> wrote:

> 2019.2.10
>
>
> Hi,Stephan
>
>
> Thank you very much for such detailed and constructive comments.
>
>
> *binary vs. n-ary* and *enum vs. integer*
>
>
> Considering the N-ary, as you mentioned, using integers may be a better
> choice.
>
>
> *generic selectable interface*
>
>
> You are right. This interface can be removed.
>
>
> *end-input*
>
> It is true that the Operator does not need to store the end-input state,
> which can be inferred by the system and notify the Operator at the right
> time. We can consider using this mechanism when the system can checkpoint
> the topology with the Finish Tasks.
>
>
> *early-out*
>
> It is reasonable for me not to consider this situation at present.
>
>
> *distributed stream deadlocks*
>
>
> At present, there is no deadlock for the streaming, but I think it might
> be  still necessary to do some validation(Warning or Reject) in JobGraph.
> Because once Flink introduces this TwoInputSelectable interface, the user
> of the streaming would also construct a diamond-style topology that may be
> deadlocked.
>
>
> *empty input / selection timeout*
>
> It is reasonable for me not to consider this situation at present.
>
>
> *timers*
>
> When all the inputs are finished, TimeService will wait until all timers
> are triggered. So there should be no problem. I and others guys are
> confirming the details to see if there are other considerations
>
>
> Best
>
> GuoWei
>
> Stephan Ewen <se...@apache.org> 于2019年2月8日周五 下午7:56写道:
>
> > Nice design proposal, and +1 to the general idea.
> >
> > A few thoughts / suggestions:
> >
> > *binary vs. n-ary*
> >
> > I would plan ahead for N-ary operators. Not because we necessarily need
> > n-ary inputs (one can probably build that purely in the API) but because
> of
> > future side inputs. The proposal should be able to handle that as well.
> >
> > *enum vs. integer*
> >
> > The above might be easier is to realize when going directly with integer
> > and having ANY, FIRST, SECOND, etc. as pre-defined constants.
> > Performance wise, it is probably not difference whether to use int or
> enum.
> >
> > *generic selectable interface*
> >
> > From the proposal, I don't understand quite what that interface is for.
> My
> > understanding is that the input processor or task that calls the
> > operators's functions would anyways work on the TwoInputStreamOperator
> > interface, for efficiency.
> >
> > *end-input*
> >
> > I think we should not make storing the end-input the operator's
> > responsibility
> > There is a simple way to handle this, which is also consistent with other
> > aspects of handling finished tasks:
> >
> >   - If a task is finished, that should be stored in the checkpoint.
> >  - Upon restoring a finished task, if it has still running successors, we
> > deploy a "finished input channel", which immediately send the "end of
> > input" when task is started.
> >  - the operator will hence set the end of input immediately again upon
> >
> > *early-out*
> >
> > Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
> > early-out cases, but I would remove this from the scope of this proposal.
> > There are most likely other big changes involved, like communicating this
> > to the upstream operators.
> >
> > *distributed stream deadlocks*
> >
> > We had this issue in the DataSet API. Earlier versions of the DataSet API
> > made an analysis of the flow detecting dams and whether the pipeline
> > breaking behavior in the flow would cause deadlocks, and introduce
> > artificial pipeline breakers in response.
> >
> > The logic was really complicated and it took a while to become stable. We
> > had several issues that certain user functions (like mapPartition) could
> > either be pipelined or have a full dam (not possible to know for the
> > system), so we had to insert artificial pipeline breakers in all paths.
> >
> > In the end we simply decided that in the case of a diamond-style flow, we
> > make the point where the flow first forks as blocking shuffle. That was
> > super simple, solved all issues, and has the additional nice property
> that
> > it great point to materialize data for recovery, because it helps both
> > paths of the diamond upon failure.
> >
> > My suggestion:
> > ==> For streaming, no problem so far, nothing to do
> > ==> For batch, would suggest to go with the simple solution described
> above
> > first, and improve when we see cases where this impacts performance
> > significantly
> >
> > *empty input / selection timeout*
> >
> > I can see that being relevant in future streaming cases, for example with
> > side inputs. You want to wait for the side input data, but with a
> timeout,
> > so the program can still proceed with non-perfect context data in case
> that
> > context data is very late.
> >
> > Because we do not support side inputs at the moment, we may want to defer
> > this for now. Let's not over-design for problems that are not well
> > understood at this point.
> >
> > *timers*
> >
> > I don't understand the problem with timers. Timers are bound to the
> > operator, not the input, so they should still work if an input ends.
> > There are cases where some state in the operator that is only relevant as
> > long as an input still has data (like in symmetric joins) and the timers
> > are relevant to that state.
> > When the state is dropped, the timers should also be dropped, but that is
> > the operator's logic on "endInput()". So there is no inherent issue
> between
> > input and timers.
> >
> > Best,
> > Stephan
> >
> >
> > On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <guowei....@gmail.com> wrote:
> >
> > > Hi, guys:
> > > I propose a design to enhance Stream Operator API for Batch’s
> > requirements.
> > > This is also the Flink’s goal that Batch is a special case of
> Streaming.
> > > This
> > > proposal mainly contains two changes to operator api:
> > >
> > > 1. Allow "StreamOperator" can choose which input to read;
> > > 2. Notify "StreamOperator" that an input has ended.
> > >
> > >
> > > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo
> > Sun
> > > offlline.
> > > It will be great to hear the feed backs and suggestions from the
> > community.
> > > Please kindly share your comments and suggestions.
> > >
> > > Best
> > > GuoWei Ma.
> > >
> > >  Enhance Operator API to Support Dynamically Sel...
> > > <
> > >
> >
> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
> > > >
> > >
> >
>

Reply via email to