While we’re on operators and tasks, I think it would also make sense in the long run to move the logic that is now in AbstractStreamOperator.setup()/initializeState()/snapshot()/snapshotState()(and the other snapshotState()…)/dispose() outside of the operator itself. This logic is the same for every operator but shouldn’t really be in there. We currently have a very complicated dance between the StreamTask and AbstractStreamOperator for initialising the state backends that doesn’t really seem necessary.
> On 14. Feb 2019, at 11:54, Stephan Ewen <se...@apache.org> wrote: > > To move this forward, would suggest the following: > > - Let's quickly check which other classes need to change. I assume the > TwoInputStreamTask and StreamTwoInputProcessor ? > - Can those changes be new classes that are used when the new operator is > used? The current TwoInputStreamTask and StreamTwoInputProcessor remain > until they are fully subsumed and are then removed. > > - Do we need and other refactorings before, like some cleanup of the > Operator Config or the Operator Chain? > > Best, > Stephan > > > On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <guowei....@gmail.com> wrote: > >> 2019.2.10 >> >> >> Hi,Stephan >> >> >> Thank you very much for such detailed and constructive comments. >> >> >> *binary vs. n-ary* and *enum vs. integer* >> >> >> Considering the N-ary, as you mentioned, using integers may be a better >> choice. >> >> >> *generic selectable interface* >> >> >> You are right. This interface can be removed. >> >> >> *end-input* >> >> It is true that the Operator does not need to store the end-input state, >> which can be inferred by the system and notify the Operator at the right >> time. We can consider using this mechanism when the system can checkpoint >> the topology with the Finish Tasks. >> >> >> *early-out* >> >> It is reasonable for me not to consider this situation at present. >> >> >> *distributed stream deadlocks* >> >> >> At present, there is no deadlock for the streaming, but I think it might >> be still necessary to do some validation(Warning or Reject) in JobGraph. >> Because once Flink introduces this TwoInputSelectable interface, the user >> of the streaming would also construct a diamond-style topology that may be >> deadlocked. >> >> >> *empty input / selection timeout* >> >> It is reasonable for me not to consider this situation at present. >> >> >> *timers* >> >> When all the inputs are finished, TimeService will wait until all timers >> are triggered. So there should be no problem. I and others guys are >> confirming the details to see if there are other considerations >> >> >> Best >> >> GuoWei >> >> Stephan Ewen <se...@apache.org> 于2019年2月8日周五 下午7:56写道: >> >>> Nice design proposal, and +1 to the general idea. >>> >>> A few thoughts / suggestions: >>> >>> *binary vs. n-ary* >>> >>> I would plan ahead for N-ary operators. Not because we necessarily need >>> n-ary inputs (one can probably build that purely in the API) but because >> of >>> future side inputs. The proposal should be able to handle that as well. >>> >>> *enum vs. integer* >>> >>> The above might be easier is to realize when going directly with integer >>> and having ANY, FIRST, SECOND, etc. as pre-defined constants. >>> Performance wise, it is probably not difference whether to use int or >> enum. >>> >>> *generic selectable interface* >>> >>> From the proposal, I don't understand quite what that interface is for. >> My >>> understanding is that the input processor or task that calls the >>> operators's functions would anyways work on the TwoInputStreamOperator >>> interface, for efficiency. >>> >>> *end-input* >>> >>> I think we should not make storing the end-input the operator's >>> responsibility >>> There is a simple way to handle this, which is also consistent with other >>> aspects of handling finished tasks: >>> >>> - If a task is finished, that should be stored in the checkpoint. >>> - Upon restoring a finished task, if it has still running successors, we >>> deploy a "finished input channel", which immediately send the "end of >>> input" when task is started. >>> - the operator will hence set the end of input immediately again upon >>> >>> *early-out* >>> >>> Letting nextSelection() return “NONE” or “FINISHED" may be relevant for >>> early-out cases, but I would remove this from the scope of this proposal. >>> There are most likely other big changes involved, like communicating this >>> to the upstream operators. >>> >>> *distributed stream deadlocks* >>> >>> We had this issue in the DataSet API. Earlier versions of the DataSet API >>> made an analysis of the flow detecting dams and whether the pipeline >>> breaking behavior in the flow would cause deadlocks, and introduce >>> artificial pipeline breakers in response. >>> >>> The logic was really complicated and it took a while to become stable. We >>> had several issues that certain user functions (like mapPartition) could >>> either be pipelined or have a full dam (not possible to know for the >>> system), so we had to insert artificial pipeline breakers in all paths. >>> >>> In the end we simply decided that in the case of a diamond-style flow, we >>> make the point where the flow first forks as blocking shuffle. That was >>> super simple, solved all issues, and has the additional nice property >> that >>> it great point to materialize data for recovery, because it helps both >>> paths of the diamond upon failure. >>> >>> My suggestion: >>> ==> For streaming, no problem so far, nothing to do >>> ==> For batch, would suggest to go with the simple solution described >> above >>> first, and improve when we see cases where this impacts performance >>> significantly >>> >>> *empty input / selection timeout* >>> >>> I can see that being relevant in future streaming cases, for example with >>> side inputs. You want to wait for the side input data, but with a >> timeout, >>> so the program can still proceed with non-perfect context data in case >> that >>> context data is very late. >>> >>> Because we do not support side inputs at the moment, we may want to defer >>> this for now. Let's not over-design for problems that are not well >>> understood at this point. >>> >>> *timers* >>> >>> I don't understand the problem with timers. Timers are bound to the >>> operator, not the input, so they should still work if an input ends. >>> There are cases where some state in the operator that is only relevant as >>> long as an input still has data (like in symmetric joins) and the timers >>> are relevant to that state. >>> When the state is dropped, the timers should also be dropped, but that is >>> the operator's logic on "endInput()". So there is no inherent issue >> between >>> input and timers. >>> >>> Best, >>> Stephan >>> >>> >>> On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <guowei....@gmail.com> wrote: >>> >>>> Hi, guys: >>>> I propose a design to enhance Stream Operator API for Batch’s >>> requirements. >>>> This is also the Flink’s goal that Batch is a special case of >> Streaming. >>>> This >>>> proposal mainly contains two changes to operator api: >>>> >>>> 1. Allow "StreamOperator" can choose which input to read; >>>> 2. Notify "StreamOperator" that an input has ended. >>>> >>>> >>>> This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo >>> Sun >>>> offlline. >>>> It will be great to hear the feed backs and suggestions from the >>> community. >>>> Please kindly share your comments and suggestions. >>>> >>>> Best >>>> GuoWei Ma. >>>> >>>> Enhance Operator API to Support Dynamically Sel... >>>> < >>>> >>> >> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web >>>>> >>>> >>> >>