While we’re on operators and tasks, I think it would also make sense in the 
long run to move the logic that is now in 
AbstractStreamOperator.setup()/initializeState()/snapshot()/snapshotState()(and 
the other snapshotState()…)/dispose() outside of the operator itself. This 
logic is the same for every operator but shouldn’t really be in there. We 
currently have a very complicated dance between the StreamTask and 
AbstractStreamOperator for initialising the state backends that doesn’t really 
seem necessary.

> On 14. Feb 2019, at 11:54, Stephan Ewen <se...@apache.org> wrote:
> 
> To move this forward, would suggest the following:
> 
>  - Let's quickly check which other classes need to change. I assume the
> TwoInputStreamTask and StreamTwoInputProcessor ?
>  - Can those changes be new classes that are used when the new operator is
> used? The current TwoInputStreamTask and StreamTwoInputProcessor remain
> until they are fully subsumed and are then removed.
> 
>  - Do we need and other refactorings before, like some cleanup of the
> Operator Config or the Operator Chain?
> 
> Best,
> Stephan
> 
> 
> On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <guowei....@gmail.com> wrote:
> 
>> 2019.2.10
>> 
>> 
>> Hi,Stephan
>> 
>> 
>> Thank you very much for such detailed and constructive comments.
>> 
>> 
>> *binary vs. n-ary* and *enum vs. integer*
>> 
>> 
>> Considering the N-ary, as you mentioned, using integers may be a better
>> choice.
>> 
>> 
>> *generic selectable interface*
>> 
>> 
>> You are right. This interface can be removed.
>> 
>> 
>> *end-input*
>> 
>> It is true that the Operator does not need to store the end-input state,
>> which can be inferred by the system and notify the Operator at the right
>> time. We can consider using this mechanism when the system can checkpoint
>> the topology with the Finish Tasks.
>> 
>> 
>> *early-out*
>> 
>> It is reasonable for me not to consider this situation at present.
>> 
>> 
>> *distributed stream deadlocks*
>> 
>> 
>> At present, there is no deadlock for the streaming, but I think it might
>> be  still necessary to do some validation(Warning or Reject) in JobGraph.
>> Because once Flink introduces this TwoInputSelectable interface, the user
>> of the streaming would also construct a diamond-style topology that may be
>> deadlocked.
>> 
>> 
>> *empty input / selection timeout*
>> 
>> It is reasonable for me not to consider this situation at present.
>> 
>> 
>> *timers*
>> 
>> When all the inputs are finished, TimeService will wait until all timers
>> are triggered. So there should be no problem. I and others guys are
>> confirming the details to see if there are other considerations
>> 
>> 
>> Best
>> 
>> GuoWei
>> 
>> Stephan Ewen <se...@apache.org> 于2019年2月8日周五 下午7:56写道:
>> 
>>> Nice design proposal, and +1 to the general idea.
>>> 
>>> A few thoughts / suggestions:
>>> 
>>> *binary vs. n-ary*
>>> 
>>> I would plan ahead for N-ary operators. Not because we necessarily need
>>> n-ary inputs (one can probably build that purely in the API) but because
>> of
>>> future side inputs. The proposal should be able to handle that as well.
>>> 
>>> *enum vs. integer*
>>> 
>>> The above might be easier is to realize when going directly with integer
>>> and having ANY, FIRST, SECOND, etc. as pre-defined constants.
>>> Performance wise, it is probably not difference whether to use int or
>> enum.
>>> 
>>> *generic selectable interface*
>>> 
>>> From the proposal, I don't understand quite what that interface is for.
>> My
>>> understanding is that the input processor or task that calls the
>>> operators's functions would anyways work on the TwoInputStreamOperator
>>> interface, for efficiency.
>>> 
>>> *end-input*
>>> 
>>> I think we should not make storing the end-input the operator's
>>> responsibility
>>> There is a simple way to handle this, which is also consistent with other
>>> aspects of handling finished tasks:
>>> 
>>>  - If a task is finished, that should be stored in the checkpoint.
>>> - Upon restoring a finished task, if it has still running successors, we
>>> deploy a "finished input channel", which immediately send the "end of
>>> input" when task is started.
>>> - the operator will hence set the end of input immediately again upon
>>> 
>>> *early-out*
>>> 
>>> Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
>>> early-out cases, but I would remove this from the scope of this proposal.
>>> There are most likely other big changes involved, like communicating this
>>> to the upstream operators.
>>> 
>>> *distributed stream deadlocks*
>>> 
>>> We had this issue in the DataSet API. Earlier versions of the DataSet API
>>> made an analysis of the flow detecting dams and whether the pipeline
>>> breaking behavior in the flow would cause deadlocks, and introduce
>>> artificial pipeline breakers in response.
>>> 
>>> The logic was really complicated and it took a while to become stable. We
>>> had several issues that certain user functions (like mapPartition) could
>>> either be pipelined or have a full dam (not possible to know for the
>>> system), so we had to insert artificial pipeline breakers in all paths.
>>> 
>>> In the end we simply decided that in the case of a diamond-style flow, we
>>> make the point where the flow first forks as blocking shuffle. That was
>>> super simple, solved all issues, and has the additional nice property
>> that
>>> it great point to materialize data for recovery, because it helps both
>>> paths of the diamond upon failure.
>>> 
>>> My suggestion:
>>> ==> For streaming, no problem so far, nothing to do
>>> ==> For batch, would suggest to go with the simple solution described
>> above
>>> first, and improve when we see cases where this impacts performance
>>> significantly
>>> 
>>> *empty input / selection timeout*
>>> 
>>> I can see that being relevant in future streaming cases, for example with
>>> side inputs. You want to wait for the side input data, but with a
>> timeout,
>>> so the program can still proceed with non-perfect context data in case
>> that
>>> context data is very late.
>>> 
>>> Because we do not support side inputs at the moment, we may want to defer
>>> this for now. Let's not over-design for problems that are not well
>>> understood at this point.
>>> 
>>> *timers*
>>> 
>>> I don't understand the problem with timers. Timers are bound to the
>>> operator, not the input, so they should still work if an input ends.
>>> There are cases where some state in the operator that is only relevant as
>>> long as an input still has data (like in symmetric joins) and the timers
>>> are relevant to that state.
>>> When the state is dropped, the timers should also be dropped, but that is
>>> the operator's logic on "endInput()". So there is no inherent issue
>> between
>>> input and timers.
>>> 
>>> Best,
>>> Stephan
>>> 
>>> 
>>> On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <guowei....@gmail.com> wrote:
>>> 
>>>> Hi, guys:
>>>> I propose a design to enhance Stream Operator API for Batch’s
>>> requirements.
>>>> This is also the Flink’s goal that Batch is a special case of
>> Streaming.
>>>> This
>>>> proposal mainly contains two changes to operator api:
>>>> 
>>>> 1. Allow "StreamOperator" can choose which input to read;
>>>> 2. Notify "StreamOperator" that an input has ended.
>>>> 
>>>> 
>>>> This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo
>>> Sun
>>>> offlline.
>>>> It will be great to hear the feed backs and suggestions from the
>>> community.
>>>> Please kindly share your comments and suggestions.
>>>> 
>>>> Best
>>>> GuoWei Ma.
>>>> 
>>>> Enhance Operator API to Support Dynamically Sel...
>>>> <
>>>> 
>>> 
>> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
>>>>> 
>>>> 
>>> 
>> 

Reply via email to