Hi,
a)
> BTW, regarding the isBlock() method, I have a few more questions. 21, Is a
> method isReady() with boolean as a return value
> equivalent? Personally I found it is a little bit confusing in what is
> supposed to be returned when the future is completed. 22. if
> the implementation of isBlocked() is optional, how do the callers know
> whether the method is properly implemented or not?
> Does not implemented mean it always return a completed future?
`CompletableFuture<?> isBlocked()` is more or less an equivalent to `boolean
hasNext()` which in case of “false” provides some kind of a listener/callback
that notifies about presence of next element. There are some minor details,
like `CompletableFuture<?>` has a minimal two state logic:
1. Future is completed - we have more data
2. Future not yet completed - we don’t have data now, but we might/we will have
in the future
While `boolean hasNext()` and `notify()` callback are a bit more
complicated/dispersed and can lead/encourage `notify()` spam.
b)
> 3. If merge the `advance` and `getCurrent` to one method like `getNext` the
> `getNext` would need return a
>`ElementWithTimestamp` because some sources want to add timestamp to every
>element. IMO, this is not so memory friendly
> so I prefer this design.
Guowei I don’t quite understand this. Could you elaborate why having a separate
`advance()` help?
c)
Regarding advance/poll/take. What’s the value of having two separate methods:
poll and take? Which one of them should be called and which implemented? What’s
the benefit of having those methods compared to having a one single method
`getNextElement()` (or `pollElement() or whatever we name it) with following
contract:
CompletableFuture<?> isBlocked();
/**
Return next element - will be called only if `isBlocked()` is completed. Try to
implement it in non blocking fashion, but if that’s impossible or you just
don’t need the effort, you can block in this method.
*/
T getNextElement();
I mean, if the connector is implemented non-blockingly, Flink should use it
that way. If it’s not, then `poll()` will `throw new
NotImplementedException()`. Implementing both of them and providing both of
them to Flink wouldn’t make a sense, thus why not merge them into a single
method call that should preferably (but not necessarily need to) be
non-blocking? It’s not like we are implementing general purpose `Queue`, which
users might want to call either of `poll` or `take`. We would always prefer to
call `poll`, but if it’s blocking, then still we have no choice, but to call it
and block on it.
d)
> 1. I agree with Piotr and Becket that the non-blocking source is very
> important. But in addition to `Future/poll`, there may be another way to
> achieve this. I think it may be not very memory friendly if every advance
> call return a Future.
I didn’t want to mention this, to not clog my initial proposal, but there is a
simple solution for the problem:
public interface SplitReader {
(…)
CompletableFuture<?> NOT_BLOCKED = CompletableFuture.completedFuture(null);
/**
* Returns a future that will be completed when the page source becomes
* unblocked. If the page source is not blocked, this method should return
* {@code NOT_BLOCKED}.
*/
default CompletableFuture<?> isBlocked()
{
return NOT_BLOCKED;
}
If we are blocked and we are waiting for the IO, then creating a new Future is
non-issue. Under full throttle/throughput and not blocked sources returning a
static `NOT_BLOCKED` constant should also solve the problem.
One more remark, non-blocking sources might be a necessity in a single threaded
model without a checkpointing lock. (Currently when sources are blocked, they
can release checkpointing lock and re-acquire it again later). Non-blocking
`poll`/`getNext()` would allow for checkpoints to happen when source is idling.
In that case either `notify()` or my proposed `isBlocked()` would allow to
avoid busy-looping.
Piotrek
> On 5 Nov 2018, at 03:59, Becket Qin <[email protected]> wrote:
>
> Hi Thomas,
>
> The iterator-like API was also the first thing that came to me. But it
> seems a little confusing that hasNext() does not mean "the stream has not
> ended", but means "the next record is ready", which is repurposing the well
> known meaning of hasNext(). If we follow the hasNext()/next() pattern, an
> additional isNextReady() method to indicate whether the next record is
> ready seems more intuitive to me.
>
> Similarly, in poll()/take() pattern, another method of isDone() is needed
> to indicate whether the stream has ended or not.
>
> Compared with hasNext()/next()/isNextReady() pattern,
> isDone()/poll()/take() seems more flexible for the reader implementation.
> When I am implementing a reader, I could have a couple of choices:
>
> - A thread-less reader that does not have any internal thread.
> - When poll() is called, the same calling thread will perform a bunch of
> IO asynchronously.
> - When take() is called, the same calling thread will perform a bunch
> of IO and wait until the record is ready.
> - A reader with internal threads performing network IO and put records
> into a buffer.
> - When poll() is called, the calling thread simply reads from the
> buffer and return empty result immediately if there is no record.
> - When take() is called, the calling thread reads from the buffer and
> block waiting if the buffer is empty.
>
> On the other hand, with the hasNext()/next()/isNextReady() API, it is less
> intuitive for the reader developers to write the thread-less pattern.
> Although technically speaking one can still do the asynchronous IO to
> prepare the record in isNextReady(). But it is inexplicit and seems
> somewhat hacky.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[email protected]> wrote:
>
>> Couple more points regarding discovery:
>>
>> The proposal mentions that discovery could be outside the execution graph.
>> Today, discovered partitions/shards are checkpointed. I believe that will
>> also need to be the case in the future, even when discovery and reading are
>> split between different tasks.
>>
>> For cases such as resharding of a Kinesis stream, the relationship between
>> splits needs to be considered. Splits cannot be randomly distributed over
>> readers in certain situations. An example was mentioned here:
>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>
>> Thomas
>>
>>
>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[email protected]> wrote:
>>
>>> Thanks for getting the ball rolling on this!
>>>
>>> Can the number of splits decrease? Yes, splits can be closed and go away.
>>> An example would be a shard merge in Kinesis (2 existing shards will be
>>> closed and replaced with a new shard).
>>>
>>> Regarding advance/poll/take: IMO the least restrictive approach would be
>>> the thread-less IO model (pull based, non-blocking, caller retrieves new
>>> records when available). The current Kinesis API requires the use of
>>> threads. But that can be internal to the split reader and does not need
>> to
>>> be a source API concern. In fact, that's what we are working on right now
>>> as improvement to the existing consumer: Each shard consumer thread will
>>> push to a queue, the consumer main thread will poll the queue(s). It is
>>> essentially a mapping from threaded IO to non-blocking.
>>>
>>> The proposed SplitReader interface would fit the thread-less IO model.
>>> Similar to an iterator, we find out if there is a new element (hasNext)
>> and
>>> if so, move to it (next()). Separate calls deliver the meta information
>>> (timestamp, watermark). Perhaps advance call could offer a timeout
>> option,
>>> so that the caller does not end up in a busy wait. On the other hand, a
>>> caller processing multiple splits may want to cycle through fast, to
>>> process elements of other splits as soon as they become available. The
>> nice
>>> thing is that this "split merge" logic can now live in Flink and be
>>> optimized and shared between different sources.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[email protected]> wrote:
>>>
>>>> Hi,
>>>> Thanks Aljoscha for this FLIP.
>>>>
>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>>> important. But in addition to `Future/poll`, there may be another way to
>>>> achieve this. I think it may be not very memory friendly if every
>> advance
>>>> call return a Future.
>>>>
>>>> public interface Listener {
>>>> public void notify();
>>>> }
>>>>
>>>> public interface SplitReader() {
>>>> /**
>>>> * When there is no element temporarily, this will return false.
>>>> * When elements is available again splitReader can call
>>>> listener.notify()
>>>> * In addition the frame would check `advance` periodically .
>>>> * Of course advance can always return true and ignore the listener
>>>> argument for simplicity.
>>>> */
>>>> public boolean advance(Listener listener);
>>>> }
>>>>
>>>> 2. The FLIP tells us very clearly that how to create all Splits and how
>>>> to create a SplitReader from a Split. But there is no strategy for the
>> user
>>>> to choose how to assign the splits to the tasks. I think we could add a
>>>> Enum to let user to choose.
>>>> /**
>>>> public Enum SplitsAssignmentPolicy {
>>>> Location,
>>>> Workload,
>>>> Random,
>>>> Average
>>>> }
>>>> */
>>>>
>>>> 3. If merge the `advance` and `getCurrent` to one method like `getNext`
>>>> the `getNext` would need return a `ElementWithTimestamp` because some
>>>> sources want to add timestamp to every element. IMO, this is not so
>> memory
>>>> friendly so I prefer this design.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Piotr Nowojski <[email protected]> 于2018年11月1日周四 下午6:08写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>>>>> possible improvements. I have one proposal. Instead of having a method:
>>>>>
>>>>> boolean advance() throws IOException;
>>>>>
>>>>> I would replace it with
>>>>>
>>>>> /*
>>>>> * Return a future, which when completed means that source has more
>> data
>>>>> and getNext() will not block.
>>>>> * If you wish to use benefits of non blocking connectors, please
>>>>> implement this method appropriately.
>>>>> */
>>>>> default CompletableFuture<?> isBlocked() {
>>>>> return CompletableFuture.completedFuture(null);
>>>>> }
>>>>>
>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>
>>>>> Couple of arguments:
>>>>> 1. I don’t understand the division of work between `advance()` and
>>>>> `getCurrent()`. What should be done in which, especially for connectors
>>>>> that handle records in batches (like Kafka) and when should you call
>>>>> `advance` and when `getCurrent()`.
>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>>>>> future to have asynchronous/non blocking connectors and more
>> efficiently
>>>>> handle large number of blocked threads, without busy waiting. While at
>> the
>>>>> same time it doesn’t add much complexity, since naive connector
>>>>> implementations can be always blocking.
>>>>> 3. This also would allow us to use a fixed size thread pool of task
>>>>> executors, instead of one thread per task.
>>>>>
>>>>> Piotrek
>>>>>
>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[email protected]>
>>>>> wrote:
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> In order to finally get the ball rolling on the new source interface
>>>>> that we have discussed for so long I finally created a FLIP:
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>
>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>>>>> adding per-partition watermark support to the Kinesis source and
>> because
>>>>> this would enable generic implementation of event-time alignment for
>> all
>>>>> sources. Maybe we need another FLIP for the event-time alignment part,
>>>>> especially the part about information sharing between operations (I'm
>> not
>>>>> calling it state sharing because state has a special meaning in Flink).
>>>>>>
>>>>>> Please discuss away!
>>>>>>
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>
>>>>>
>>