Couple more points regarding discovery:

The proposal mentions that discovery could be outside the execution graph.
Today, discovered partitions/shards are checkpointed. I believe that will
also need to be the case in the future, even when discovery and reading are
split between different tasks.

For cases such as resharding of a Kinesis stream, the relationship between
splits needs to be considered. Splits cannot be randomly distributed over
readers in certain situations. An example was mentioned here:
https://github.com/apache/flink/pull/6980#issuecomment-435202809

Thomas


On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org> wrote:

> Thanks for getting the ball rolling on this!
>
> Can the number of splits decrease? Yes, splits can be closed and go away.
> An example would be a shard merge in Kinesis (2 existing shards will be
> closed and replaced with a new shard).
>
> Regarding advance/poll/take: IMO the least restrictive approach would be
> the thread-less IO model (pull based, non-blocking, caller retrieves new
> records when available). The current Kinesis API requires the use of
> threads. But that can be internal to the split reader and does not need to
> be a source API concern. In fact, that's what we are working on right now
> as improvement to the existing consumer: Each shard consumer thread will
> push to a queue, the consumer main thread will poll the queue(s). It is
> essentially a mapping from threaded IO to non-blocking.
>
> The proposed SplitReader interface would fit the thread-less IO model.
> Similar to an iterator, we find out if there is a new element (hasNext) and
> if so, move to it (next()). Separate calls deliver the meta information
> (timestamp, watermark). Perhaps advance call could offer a timeout option,
> so that the caller does not end up in a busy wait. On the other hand, a
> caller processing multiple splits may want to cycle through fast, to
> process elements of other splits as soon as they become available. The nice
> thing is that this "split merge" logic can now live in Flink and be
> optimized and shared between different sources.
>
> Thanks,
> Thomas
>
>
> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi,
>> Thanks Aljoscha for this FLIP.
>>
>> 1. I agree with Piotr and Becket that the non-blocking source is very
>> important. But in addition to `Future/poll`, there may be another way to
>> achieve this. I think it may be not very memory friendly if every advance
>> call return a Future.
>>
>> public interface Listener {
>>      public void notify();
>> }
>>
>> public interface SplitReader() {
>>      /**
>>       * When there is no element temporarily, this will return false.
>>       * When elements is available again splitReader can call
>> listener.notify()
>>       * In addition the frame would check `advance` periodically .
>>       * Of course advance can always return true and ignore the listener
>> argument for simplicity.
>>       */
>>      public boolean advance(Listener listener);
>> }
>>
>> 2.  The FLIP tells us very clearly that how to create all Splits and how
>> to create a SplitReader from a Split. But there is no strategy for the user
>> to choose how to assign the splits to the tasks. I think we could add a
>> Enum to let user to choose.
>> /**
>>   public Enum SplitsAssignmentPolicy {
>>     Location,
>>     Workload,
>>     Random,
>>     Average
>>   }
>> */
>>
>> 3. If merge the `advance` and `getCurrent`  to one method like `getNext`
>> the `getNext` would need return a `ElementWithTimestamp` because some
>> sources want to add timestamp to every element. IMO, this is not so memory
>> friendly so I prefer this design.
>>
>>
>>
>> Thanks
>>
>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道:
>>
>>> Hi,
>>>
>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>>> possible improvements. I have one proposal. Instead of having a method:
>>>
>>> boolean advance() throws IOException;
>>>
>>> I would replace it with
>>>
>>> /*
>>>  * Return a future, which when completed means that source has more data
>>> and getNext() will not block.
>>>  * If you wish to use benefits of non blocking connectors, please
>>> implement this method appropriately.
>>>  */
>>> default CompletableFuture<?> isBlocked() {
>>>         return CompletableFuture.completedFuture(null);
>>> }
>>>
>>> And rename `getCurrent()` to `getNext()`.
>>>
>>> Couple of arguments:
>>> 1. I don’t understand the division of work between `advance()` and
>>> `getCurrent()`. What should be done in which, especially for connectors
>>> that handle records in batches (like Kafka) and when should you call
>>> `advance` and when `getCurrent()`.
>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>>> future to have asynchronous/non blocking connectors and more efficiently
>>> handle large number of blocked threads, without busy waiting. While at the
>>> same time it doesn’t add much complexity, since naive connector
>>> implementations can be always blocking.
>>> 3. This also would allow us to use a fixed size thread pool of task
>>> executors, instead of one thread per task.
>>>
>>> Piotrek
>>>
>>> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>> >
>>> > Hi All,
>>> >
>>> > In order to finally get the ball rolling on the new source interface
>>> that we have discussed for so long I finally created a FLIP:
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>> >
>>> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>>> adding per-partition watermark support to the Kinesis source and because
>>> this would enable generic implementation of event-time alignment for all
>>> sources. Maybe we need another FLIP for the event-time alignment part,
>>> especially the part about information sharing between operations (I'm not
>>> calling it state sharing because state has a special meaning in Flink).
>>> >
>>> > Please discuss away!
>>> >
>>> > Aljoscha
>>> >
>>> >
>>>
>>>

Reply via email to