Hi Biao,

Thanks for the explanation. The current API makes more sense to me now. It
basically means:
1. Readers should all be non-blocking
2. The offset advancing and the record fetching are two steps.
3. After each advance() call, the currentRecord, currentTimestamp and
watermark will all be updated at the same time. And those values can be
accessed multiple times.

That being said, with the poll()/take() method, we don't have to return
tuple3. poll()/take() will just return the record. It means:
1. Readers could be blocking (take()) or non-blocking(poll())
2. The offset advancing and the record fetching are combined into one step,
i.e. poll()/take()
3. After each poll()/take(), the currentTimestamp and watermark are
updated. That means after poll()/take(), users can call
getCurrentTimestamp() or getWatermark() to get the information at the point
after the previous record was returned.

One concern I have for the completely non-blocking reader is that it would
be difficult to implement a blocking behavior on top of the thread-less
non-blocking reader. If advance() returns false, since the reader is
thread-less, unless the caller thread call something on the reader, no
progress will be made. Hence the caller has to call advance() again to
check, either with a backoff (introducing some latency) or a tight loop.
But neither of them is ideal. From this perspective, I think it is useful
to have a blocking() API in the reader, so the blocking behavior could be
done efficiently, e.g. by using a NIO selector which relies on the OS
signals.

WRT to the SplitEnumerator, I still feel that it would be better for the
SplitEnumerator to not only return new splits but all the splits to cover
the splits shrink case. Also, it took me a while to understand why
*createInitialEnumeratorCheckpoint()
*is needed. I am wondering whether it would be better to replace it with a
parameter-less *createSplitEnumerator(). *

Thanks,

Jiangjie (Becket) Qin



On Tue, Nov 6, 2018 at 11:40 PM Biao Liu <mmyy1...@gmail.com> wrote:

> Regarding the naming style.
>
> The advantage of `poll()` style is that basically the name of `poll` means
> it should be a non-blocking operator, same with `Queue` in Java API. It's
> easy to understand. We don't need to write too much in docs to imply the
> implementation should not do something heavy.
> However `poll` also means it should return the thing we want. In our
> scenario, there are 3 types currently, record, timestamp and watermark. So
> the return type of `poll` should be tuple3 or something like that. It looks
> a little hacky IMO.
>
> The `advance()` style is more like RecordReader
> <
> https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/RecordReader.html
> >
> of
> MapReduce, or ISpout
> <
> https://storm.apache.org/releases/1.1.2/javadocs/org/apache/storm/spout/ISpout.html
> >
> of
> Storm. It means moving the offset forward indeed. It makes sense to me.
> To be honest I like `advance()` style more.
>
> And there is also another small point I can't get.
>
> Why use `start()` and `close()` in `SplitReader`? `start()` makes me think
> of "starting a thread" or something like that. We should not assume there
> would be some thread. I prefer `open()`, it also matches the `close()`
> better.
>
>
> Becket Qin <becket....@gmail.com> 于2018年11月6日周二 上午11:04写道:
>
> > Thanks for updating the wiki, Aljoscha.
> >
> > The isDone()/advance()/getCurrent() API looks more similar to
> > hasNext()/isNextReady()/getNext(), but implying some different behaviors.
> >
> > If users call getCurrent() twice without calling advance() in between,
> will
> > they get the same record back? From the API itself, users might think
> > advance() is the API that moves the offset forward, and getCurrent() just
> > return the record at the current offset.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Nov 5, 2018 at 10:41 PM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > I updated the FLIP [1] with some Javadoc for the SplitReader to outline
> > > what I had in mind with the interface. Sorry for not doing that
> earlier,
> > > it's not quite clear how the methods should work from the name alone.
> > >
> > > The gist of it is that advance() should be non-blocking, so
> > > isDone/advance()/getCurrent() are very similar to
> isDone()/poll()/take()
> > > that I have seen mentioned.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > > >
> > >
> > > > On 5. Nov 2018, at 11:05, Biao Liu <mmyy1...@gmail.com> wrote:
> > > >
> > > > Thanks Aljoscha for bringing us this discussion!
> > > >
> > > > 1. I think one of the reason about separating `advance()` and
> > > > `getCurrent()` is that we have several different types returned by
> > > source.
> > > > Not just the `record`, but also the timestamp of record and the
> > > watermark.
> > > > If we don't separate these into different methods, the source has to
> > > return
> > > > a tuple3 which is not so user friendly. The prototype of Aljoscha is
> > > > acceptable to me. Regarding the specific method name, I'm not sure
> > which
> > > > one is better. Both of them are reasonable for me.
> > > >
> > > > 2. As Thomas and Becket mentioned before, I think a non-blocking API
> is
> > > > necessary. Moreover, IMO we should not offer a blocking API. It
> doesn't
> > > > help but makes things more complicated.
> > > >
> > > > 3. About the thread model.
> > > > I agree with Thomas about the thread-less IO model. A standard
> workflow
> > > > should look like below.
> > > >  - If there is available data, Flink would read it.
> > > >  - If there is no data available temporary, Flink would check again a
> > > > moment later. Maybe waiting on a semaphore until a timer wake it up.
> > > > Furthermore, we can offer an optional optimization for source which
> has
> > > > external thread. Like Guowei mentioned, there can be a listener which
> > the
> > > > reader can wake the framework up as soon as new data comes. This can
> > > solve
> > > > Piotr's concern about efficiency.
> > > >
> > > > 4. One more thing. After taking a look at the prototype codes. Off
> the
> > > top
> > > > of my head, the implementation is more fit for batch job not
> streaming
> > > job.
> > > > There are two types of tasks in prototype. First is a source task
> that
> > > > discovers the splits. The source passes the splits to the second task
> > > which
> > > > process the splits one by one. And then the source keeps watch to
> > > discover
> > > > more splits.
> > > >
> > > > However, I think the more common scenario of streaming job is:
> > > > there are fixed splits, each of the subtasks takes several splits.
> The
> > > > subtasks just keep processing the fixed splits. There would be
> > continuous
> > > > datum in each split. We don't need a source task to discover more
> > splits.
> > > > It can not be finished in streaming job since we don't want the
> > > processing
> > > > task finished even there are no more splits.
> > > >
> > > > So IMO we should offer another source operator for the new interface.
> > It
> > > > would discover all splits when it is opening. Then picks the splits
> > > belong
> > > > to this subtask. Keep processing these splits until all of them are
> > > > finished.
> > > >
> > > >
> > > > Becket Qin <becket....@gmail.com> 于2018年11月5日周一 上午11:00写道:
> > > >
> > > >> Hi Thomas,
> > > >>
> > > >> The iterator-like API was also the first thing that came to me. But
> it
> > > >> seems a little confusing that hasNext() does not mean "the stream
> has
> > > not
> > > >> ended", but means "the next record is ready", which is repurposing
> the
> > > well
> > > >> known meaning of hasNext(). If we follow the hasNext()/next()
> pattern,
> > > an
> > > >> additional isNextReady() method to indicate whether the next record
> is
> > > >> ready seems more intuitive to me.
> > > >>
> > > >> Similarly, in poll()/take() pattern, another method of isDone() is
> > > needed
> > > >> to indicate whether the stream has ended or not.
> > > >>
> > > >> Compared with hasNext()/next()/isNextReady() pattern,
> > > >> isDone()/poll()/take() seems more flexible for the reader
> > > implementation.
> > > >> When I am implementing a reader, I could have a couple of choices:
> > > >>
> > > >>   - A thread-less reader that does not have any internal thread.
> > > >>   - When poll() is called, the same calling thread will perform a
> > bunch
> > > of
> > > >>      IO asynchronously.
> > > >>      - When take() is called, the same calling thread will perform a
> > > bunch
> > > >>      of IO and wait until the record is ready.
> > > >>   - A reader with internal threads performing network IO and put
> > records
> > > >>   into a buffer.
> > > >>      - When poll() is called, the calling thread simply reads from
> the
> > > >>      buffer and return empty result immediately if there is no
> record.
> > > >>      - When take() is called, the calling thread reads from the
> buffer
> > > and
> > > >>      block waiting if the buffer is empty.
> > > >>
> > > >> On the other hand, with the hasNext()/next()/isNextReady() API, it
> is
> > > less
> > > >> intuitive for the reader developers to write the thread-less
> pattern.
> > > >> Although technically speaking one can still do the asynchronous IO
> to
> > > >> prepare the record in isNextReady(). But it is inexplicit and seems
> > > >> somewhat hacky.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >>
> > > >>
> > > >> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org> wrote:
> > > >>
> > > >>> Couple more points regarding discovery:
> > > >>>
> > > >>> The proposal mentions that discovery could be outside the execution
> > > >> graph.
> > > >>> Today, discovered partitions/shards are checkpointed. I believe
> that
> > > will
> > > >>> also need to be the case in the future, even when discovery and
> > reading
> > > >> are
> > > >>> split between different tasks.
> > > >>>
> > > >>> For cases such as resharding of a Kinesis stream, the relationship
> > > >> between
> > > >>> splits needs to be considered. Splits cannot be randomly
> distributed
> > > over
> > > >>> readers in certain situations. An example was mentioned here:
> > > >>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> > > >>>
> > > >>> Thomas
> > > >>>
> > > >>>
> > > >>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org>
> wrote:
> > > >>>
> > > >>>> Thanks for getting the ball rolling on this!
> > > >>>>
> > > >>>> Can the number of splits decrease? Yes, splits can be closed and
> go
> > > >> away.
> > > >>>> An example would be a shard merge in Kinesis (2 existing shards
> will
> > > be
> > > >>>> closed and replaced with a new shard).
> > > >>>>
> > > >>>> Regarding advance/poll/take: IMO the least restrictive approach
> > would
> > > >> be
> > > >>>> the thread-less IO model (pull based, non-blocking, caller
> retrieves
> > > >> new
> > > >>>> records when available). The current Kinesis API requires the use
> of
> > > >>>> threads. But that can be internal to the split reader and does not
> > > need
> > > >>> to
> > > >>>> be a source API concern. In fact, that's what we are working on
> > right
> > > >> now
> > > >>>> as improvement to the existing consumer: Each shard consumer
> thread
> > > >> will
> > > >>>> push to a queue, the consumer main thread will poll the queue(s).
> It
> > > is
> > > >>>> essentially a mapping from threaded IO to non-blocking.
> > > >>>>
> > > >>>> The proposed SplitReader interface would fit the thread-less IO
> > model.
> > > >>>> Similar to an iterator, we find out if there is a new element
> > > (hasNext)
> > > >>> and
> > > >>>> if so, move to it (next()). Separate calls deliver the meta
> > > information
> > > >>>> (timestamp, watermark). Perhaps advance call could offer a timeout
> > > >>> option,
> > > >>>> so that the caller does not end up in a busy wait. On the other
> > hand,
> > > a
> > > >>>> caller processing multiple splits may want to cycle through fast,
> to
> > > >>>> process elements of other splits as soon as they become available.
> > The
> > > >>> nice
> > > >>>> thing is that this "split merge" logic can now live in Flink and
> be
> > > >>>> optimized and shared between different sources.
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Thomas
> > > >>>>
> > > >>>>
> > > >>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com>
> > > wrote:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>> Thanks Aljoscha for this FLIP.
> > > >>>>>
> > > >>>>> 1. I agree with Piotr and Becket that the non-blocking source is
> > very
> > > >>>>> important. But in addition to `Future/poll`, there may be another
> > way
> > > >> to
> > > >>>>> achieve this. I think it may be not very memory friendly if every
> > > >>> advance
> > > >>>>> call return a Future.
> > > >>>>>
> > > >>>>> public interface Listener {
> > > >>>>>     public void notify();
> > > >>>>> }
> > > >>>>>
> > > >>>>> public interface SplitReader() {
> > > >>>>>     /**
> > > >>>>>      * When there is no element temporarily, this will return
> > false.
> > > >>>>>      * When elements is available again splitReader can call
> > > >>>>> listener.notify()
> > > >>>>>      * In addition the frame would check `advance` periodically .
> > > >>>>>      * Of course advance can always return true and ignore the
> > > >> listener
> > > >>>>> argument for simplicity.
> > > >>>>>      */
> > > >>>>>     public boolean advance(Listener listener);
> > > >>>>> }
> > > >>>>>
> > > >>>>> 2.  The FLIP tells us very clearly that how to create all Splits
> > and
> > > >> how
> > > >>>>> to create a SplitReader from a Split. But there is no strategy
> for
> > > the
> > > >>> user
> > > >>>>> to choose how to assign the splits to the tasks. I think we could
> > add
> > > >> a
> > > >>>>> Enum to let user to choose.
> > > >>>>> /**
> > > >>>>>  public Enum SplitsAssignmentPolicy {
> > > >>>>>    Location,
> > > >>>>>    Workload,
> > > >>>>>    Random,
> > > >>>>>    Average
> > > >>>>>  }
> > > >>>>> */
> > > >>>>>
> > > >>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> > > >> `getNext`
> > > >>>>> the `getNext` would need return a `ElementWithTimestamp` because
> > some
> > > >>>>> sources want to add timestamp to every element. IMO, this is not
> so
> > > >>> memory
> > > >>>>> friendly so I prefer this design.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>>
> > > >>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道:
> > > >>>>>
> > > >>>>>> Hi,
> > > >>>>>>
> > > >>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
> > > other
> > > >>>>>> possible improvements. I have one proposal. Instead of having a
> > > >> method:
> > > >>>>>>
> > > >>>>>> boolean advance() throws IOException;
> > > >>>>>>
> > > >>>>>> I would replace it with
> > > >>>>>>
> > > >>>>>> /*
> > > >>>>>> * Return a future, which when completed means that source has
> more
> > > >>> data
> > > >>>>>> and getNext() will not block.
> > > >>>>>> * If you wish to use benefits of non blocking connectors, please
> > > >>>>>> implement this method appropriately.
> > > >>>>>> */
> > > >>>>>> default CompletableFuture<?> isBlocked() {
> > > >>>>>>        return CompletableFuture.completedFuture(null);
> > > >>>>>> }
> > > >>>>>>
> > > >>>>>> And rename `getCurrent()` to `getNext()`.
> > > >>>>>>
> > > >>>>>> Couple of arguments:
> > > >>>>>> 1. I don’t understand the division of work between `advance()`
> and
> > > >>>>>> `getCurrent()`. What should be done in which, especially for
> > > >> connectors
> > > >>>>>> that handle records in batches (like Kafka) and when should you
> > call
> > > >>>>>> `advance` and when `getCurrent()`.
> > > >>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us
> > in
> > > >> the
> > > >>>>>> future to have asynchronous/non blocking connectors and more
> > > >>> efficiently
> > > >>>>>> handle large number of blocked threads, without busy waiting.
> > While
> > > >> at
> > > >>> the
> > > >>>>>> same time it doesn’t add much complexity, since naive connector
> > > >>>>>> implementations can be always blocking.
> > > >>>>>> 3. This also would allow us to use a fixed size thread pool of
> > task
> > > >>>>>> executors, instead of one thread per task.
> > > >>>>>>
> > > >>>>>> Piotrek
> > > >>>>>>
> > > >>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <
> aljos...@apache.org>
> > > >>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi All,
> > > >>>>>>>
> > > >>>>>>> In order to finally get the ball rolling on the new source
> > > >> interface
> > > >>>>>> that we have discussed for so long I finally created a FLIP:
> > > >>>>>>
> > > >>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > > >>>>>>>
> > > >>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
> > > >> about
> > > >>>>>> adding per-partition watermark support to the Kinesis source and
> > > >>> because
> > > >>>>>> this would enable generic implementation of event-time alignment
> > for
> > > >>> all
> > > >>>>>> sources. Maybe we need another FLIP for the event-time alignment
> > > >> part,
> > > >>>>>> especially the part about information sharing between operations
> > > (I'm
> > > >>> not
> > > >>>>>> calling it state sharing because state has a special meaning in
> > > >> Flink).
> > > >>>>>>>
> > > >>>>>>> Please discuss away!
> > > >>>>>>>
> > > >>>>>>> Aljoscha
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Reply via email to