Hey Becket,
Re 2.
With:
If source is purely single threaded and blocking, then it could be implemented
in the following way:
/*
* Return a future, which when completed means that source has more data and
getNext() will not block.
* If you wish to use benefits of non blocking connectors, please implement this
method appropriately.
*/
CompletableFuture<?> isBlocked() {
return CompletableFuture.completedFuture(null); // this would be the
default behaviour, so user wouldn’t need to override this at all
}
T getNext() {
// do some blocking reading operation
return result;
}
Implementing `isBlocked` doesn’t have to be mandatory. It’s more like an
optional optimisation that some connectors might provide.
Providing non blocking `poll` method doesn’t solve the problem of actually
limiting the number of active threads. One of the potential benefits of
`CompletableFuture<?> isBlocked()` is that we could have a fixed size pool of
worker threads. Worker thread could pick a non blocked task that’s waiting to
be executed and to this `CompletableFuture<?>` would be needed to juggle
between blocked/active state. Other potential side benefit could be for
reporting in UI/metrics which tasks are blocked (kind of like current back
pressure monitoring).
Maybe such extension could use of some PoC that would (or not) show some
benefits.
Piotrek
> On 1 Nov 2018, at 19:29, Becket Qin <[email protected]> wrote:
>
> Thanks for the FLIP, Aljoscha.
>
> The proposal makes sense to me. Separating the split discovery and
> consumption is very useful as it enables Flink to better manage the sources.
>
> Looking at the interface, I have a few questions:
> 1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the number
> of splits can only increase, In your example, the source was Kafka, so the
> assumption was true. But I am wondering are there case that the number of
> splits can decrease?
> 2. I agree with Piotr that we need to be careful about potentially blocking
> implementations. However, it is not clear to me how does the completable
> future work if the underlying reader does not have its own thread (e.g. a
> Kafka consumer). In that case, the future will never be completed unless
> the caller thread touches the reader again. I am wondering if the following
> interfaces for the reader makes sense:
> boolean isDone(); // Whether the source has more records.
> T poll(); // non-blocking read. We can add a timeout if needed.
> T take(); // blocking read;
> This seems more intuitive to people who are familiar with existing
> convention of poll() and take(). And with the non-blocking poll() we could
> have an nio Selector-like API when there are multiple splits.
>
> BTW, it would be really helpful if there is some Java doc describing the
> behavior of the the interfaces in the FLIP.
>
> Thanks again for the great proposal.
>
> Jiangjie (Becket) Qin
>
> On Thu, Nov 1, 2018 at 6:08 PM Piotr Nowojski <[email protected]>
> wrote:
>
>> Hi,
>>
>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>> possible improvements. I have one proposal. Instead of having a method:
>>
>> boolean advance() throws IOException;
>>
>> I would replace it with
>>
>> /*
>> * Return a future, which when completed means that source has more data
>> and getNext() will not block.
>> * If you wish to use benefits of non blocking connectors, please
>> implement this method appropriately.
>> */
>> default CompletableFuture<?> isBlocked() {
>> return CompletableFuture.completedFuture(null);
>> }
>>
>> And rename `getCurrent()` to `getNext()`.
>>
>> Couple of arguments:
>> 1. I don’t understand the division of work between `advance()` and
>> `getCurrent()`. What should be done in which, especially for connectors
>> that handle records in batches (like Kafka) and when should you call
>> `advance` and when `getCurrent()`.
>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>> future to have asynchronous/non blocking connectors and more efficiently
>> handle large number of blocked threads, without busy waiting. While at the
>> same time it doesn’t add much complexity, since naive connector
>> implementations can be always blocking.
>> 3. This also would allow us to use a fixed size thread pool of task
>> executors, instead of one thread per task.
>>
>> Piotrek
>>
>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[email protected]> wrote:
>>>
>>> Hi All,
>>>
>>> In order to finally get the ball rolling on the new source interface
>> that we have discussed for so long I finally created a FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>
>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>> adding per-partition watermark support to the Kinesis source and because
>> this would enable generic implementation of event-time alignment for all
>> sources. Maybe we need another FLIP for the event-time alignment part,
>> especially the part about information sharing between operations (I'm not
>> calling it state sharing because state has a special meaning in Flink).
>>>
>>> Please discuss away!
>>>
>>> Aljoscha
>>>
>>>
>>
>>