Thanks for the FLIP, Aljoscha.

The proposal makes sense to me. Separating the split discovery and
consumption is very useful as it enables Flink to better manage the sources.

Looking at the interface, I have a few questions:
1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the number
of splits can only increase, In your example, the source was Kafka, so the
assumption was true. But I am wondering are there case that the number of
splits can decrease?
2. I agree with Piotr that we need to be careful about potentially blocking
implementations. However, it is not clear to me how does the completable
future work if the underlying reader does not have its own thread (e.g. a
Kafka consumer). In that case, the future will never be completed unless
the caller thread touches the reader again. I am wondering if the following
interfaces for the reader makes sense:
    boolean isDone(); // Whether the source has more records.
    T poll(); // non-blocking read. We can add a timeout if needed.
    T take(); // blocking read;
This seems more intuitive to people who are familiar with existing
convention of poll() and take(). And with the non-blocking poll() we could
have an nio Selector-like API when there are multiple splits.

BTW, it would be really helpful if there is some Java doc describing the
behavior of the the interfaces in the FLIP.

Thanks again for the great proposal.

Jiangjie (Becket) Qin

On Thu, Nov 1, 2018 at 6:08 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
> possible improvements. I have one proposal. Instead of having a method:
>
> boolean advance() throws IOException;
>
> I would replace it with
>
> /*
>  * Return a future, which when completed means that source has more data
> and getNext() will not block.
>  * If you wish to use benefits of non blocking connectors, please
> implement this method appropriately.
>  */
> default CompletableFuture<?> isBlocked() {
>         return CompletableFuture.completedFuture(null);
> }
>
> And rename `getCurrent()` to `getNext()`.
>
> Couple of arguments:
> 1. I don’t understand the division of work between `advance()` and
> `getCurrent()`. What should be done in which, especially for connectors
> that handle records in batches (like Kafka) and when should you call
> `advance` and when `getCurrent()`.
> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
> future to have asynchronous/non blocking connectors and more efficiently
> handle large number of blocked threads, without busy waiting. While at the
> same time it doesn’t add much complexity, since naive connector
> implementations can be always blocking.
> 3. This also would allow us to use a fixed size thread pool of task
> executors, instead of one thread per task.
>
> Piotrek
>
> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> wrote:
> >
> > Hi All,
> >
> > In order to finally get the ball rolling on the new source interface
> that we have discussed for so long I finally created a FLIP:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >
> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about
> adding per-partition watermark support to the Kinesis source and because
> this would enable generic implementation of event-time alignment for all
> sources. Maybe we need another FLIP for the event-time alignment part,
> especially the part about information sharing between operations (I'm not
> calling it state sharing because state has a special meaning in Flink).
> >
> > Please discuss away!
> >
> > Aljoscha
> >
> >
>
>

Reply via email to