[DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

2019-02-01 Thread Guowei Ma
Hi, guys: I propose a design to enhance Stream Operator API for Batch’s requirements. This is also the Flink’s goal that Batch is a special case of Streaming. This proposal mainly contains two changes to operator api: 1. Allow "StreamOperator" can choose which input to read; 2. Notify "StreamOpera

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

2019-02-08 Thread Stephan Ewen
Nice design proposal, and +1 to the general idea. A few thoughts / suggestions: *binary vs. n-ary* I would plan ahead for N-ary operators. Not because we necessarily need n-ary inputs (one can probably build that purely in the API) but because of future side inputs. The proposal should be able t

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

2019-02-09 Thread Guowei Ma
2019.2.10 Hi,Stephan Thank you very much for such detailed and constructive comments. *binary vs. n-ary* and *enum vs. integer* Considering the N-ary, as you mentioned, using integers may be a better choice. *generic selectable interface* You are right. This interface can be removed.

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

2019-02-14 Thread Stephan Ewen
To move this forward, would suggest the following: - Let's quickly check which other classes need to change. I assume the TwoInputStreamTask and StreamTwoInputProcessor ? - Can those changes be new classes that are used when the new operator is used? The current TwoInputStreamTask and StreamTw

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

2019-02-14 Thread Aljoscha Krettek
While we’re on operators and tasks, I think it would also make sense in the long run to move the logic that is now in AbstractStreamOperator.setup()/initializeState()/snapshot()/snapshotState()(and the other snapshotState()…)/dispose() outside of the operator itself. This logic is the same for

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

2019-02-14 Thread Haibo Sun
Which classes need to be changed or use new classes? I'm working on a design that Flink runtime support TwoInputSelectable, and I'll give a initial proposal document next Monday. In the proposal, the core classes that need to be changed include UnionInputGate and StremTwoInputProcessor. I think

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

2019-02-15 Thread Guowei Ma
I agree with you. In the long run, we should clearly define which parts should be exposed to users. At present, AbstractStreamOperator exposes a lot of concrete implementations, such as the details of the asynchronous Checkpoint "OperatorSnapshotFutures", and how to release some resource that is ne