Couple more points regarding discovery: The proposal mentions that discovery could be outside the execution graph. Today, discovered partitions/shards are checkpointed. I believe that will also need to be the case in the future, even when discovery and reading are split between different tasks.
For cases such as resharding of a Kinesis stream, the relationship between splits needs to be considered. Splits cannot be randomly distributed over readers in certain situations. An example was mentioned here: https://github.com/apache/flink/pull/6980#issuecomment-435202809 Thomas On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org> wrote: > Thanks for getting the ball rolling on this! > > Can the number of splits decrease? Yes, splits can be closed and go away. > An example would be a shard merge in Kinesis (2 existing shards will be > closed and replaced with a new shard). > > Regarding advance/poll/take: IMO the least restrictive approach would be > the thread-less IO model (pull based, non-blocking, caller retrieves new > records when available). The current Kinesis API requires the use of > threads. But that can be internal to the split reader and does not need to > be a source API concern. In fact, that's what we are working on right now > as improvement to the existing consumer: Each shard consumer thread will > push to a queue, the consumer main thread will poll the queue(s). It is > essentially a mapping from threaded IO to non-blocking. > > The proposed SplitReader interface would fit the thread-less IO model. > Similar to an iterator, we find out if there is a new element (hasNext) and > if so, move to it (next()). Separate calls deliver the meta information > (timestamp, watermark). Perhaps advance call could offer a timeout option, > so that the caller does not end up in a busy wait. On the other hand, a > caller processing multiple splits may want to cycle through fast, to > process elements of other splits as soon as they become available. The nice > thing is that this "split merge" logic can now live in Flink and be > optimized and shared between different sources. > > Thanks, > Thomas > > > On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi, >> Thanks Aljoscha for this FLIP. >> >> 1. I agree with Piotr and Becket that the non-blocking source is very >> important. But in addition to `Future/poll`, there may be another way to >> achieve this. I think it may be not very memory friendly if every advance >> call return a Future. >> >> public interface Listener { >> public void notify(); >> } >> >> public interface SplitReader() { >> /** >> * When there is no element temporarily, this will return false. >> * When elements is available again splitReader can call >> listener.notify() >> * In addition the frame would check `advance` periodically . >> * Of course advance can always return true and ignore the listener >> argument for simplicity. >> */ >> public boolean advance(Listener listener); >> } >> >> 2. The FLIP tells us very clearly that how to create all Splits and how >> to create a SplitReader from a Split. But there is no strategy for the user >> to choose how to assign the splits to the tasks. I think we could add a >> Enum to let user to choose. >> /** >> public Enum SplitsAssignmentPolicy { >> Location, >> Workload, >> Random, >> Average >> } >> */ >> >> 3. If merge the `advance` and `getCurrent` to one method like `getNext` >> the `getNext` would need return a `ElementWithTimestamp` because some >> sources want to add timestamp to every element. IMO, this is not so memory >> friendly so I prefer this design. >> >> >> >> Thanks >> >> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道: >> >>> Hi, >>> >>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other >>> possible improvements. I have one proposal. Instead of having a method: >>> >>> boolean advance() throws IOException; >>> >>> I would replace it with >>> >>> /* >>> * Return a future, which when completed means that source has more data >>> and getNext() will not block. >>> * If you wish to use benefits of non blocking connectors, please >>> implement this method appropriately. >>> */ >>> default CompletableFuture<?> isBlocked() { >>> return CompletableFuture.completedFuture(null); >>> } >>> >>> And rename `getCurrent()` to `getNext()`. >>> >>> Couple of arguments: >>> 1. I don’t understand the division of work between `advance()` and >>> `getCurrent()`. What should be done in which, especially for connectors >>> that handle records in batches (like Kafka) and when should you call >>> `advance` and when `getCurrent()`. >>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the >>> future to have asynchronous/non blocking connectors and more efficiently >>> handle large number of blocked threads, without busy waiting. While at the >>> same time it doesn’t add much complexity, since naive connector >>> implementations can be always blocking. >>> 3. This also would allow us to use a fixed size thread pool of task >>> executors, instead of one thread per task. >>> >>> Piotrek >>> >>> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> > >>> > Hi All, >>> > >>> > In order to finally get the ball rolling on the new source interface >>> that we have discussed for so long I finally created a FLIP: >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >>> > >>> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about >>> adding per-partition watermark support to the Kinesis source and because >>> this would enable generic implementation of event-time alignment for all >>> sources. Maybe we need another FLIP for the event-time alignment part, >>> especially the part about information sharing between operations (I'm not >>> calling it state sharing because state has a special meaning in Flink). >>> > >>> > Please discuss away! >>> > >>> > Aljoscha >>> > >>> > >>> >>>