Hi all,

>From the discussion, I understand that we are leaning towards a design
where the user writes a single-threaded SplitReader, which Flink executes
on another thread (not the main task thread). This way the task can have
multiple readers running concurrently, each one reading a different split.

Each of these threads writes in its own queue. These queues are then polled
by the main thread (based on a potentially user-defined prioritization),
which is responsible for emitting data downstream. There were also
proposals for a single shared queue, but I believe that 1) the contention
for the lock in such a queue can be a limitation and 2) it is not easy to
prioritise which elements to consume first (assuming that we want to
support different prioritisation strategies).

Assuming the above model, I have the following question:

We have the split/shard/partition discovery logic outside the "reader"
operator. For now it can be a plain old source function with parallelism of
1 that periodically checks for new splits (for an example see the existing
ContinuousFileMonitoringFunction).[1]

This source sends the split to be read downstream to the multi-threaded
readers. In these settings, there must be a "throttling" or
"rate-limitting" mechanism that guaranttees that we do not surpass the
capabilities of the machines. The first thing that comes to mind is some
kind of a fixed size (blocking) queue or a fixed size thread pool. The main
thread adds splits to the queue and the readers consume them. When the
queue or the pool is full, then we block (backpressure).

In the case above, how do we make sure that the checkpoints still go
through?

Cheers,
Kostas

PS: I am assuming the current task implementation and not an "actor" based
one.

*[1] The ContinuousFileReaderOperator has a single thread (different from
the main task thread) consuming the splits one by one. Unfortunately, there
is no rate-limiting mechanism.


On Sun, Nov 25, 2018 at 6:40 PM Biao Liu <mmyy1...@gmail.com> wrote:

> Hi community,
> Glad to see this topic is still so active.
>
> Thanks for replying @Piotrek and @Becket.
>
> Last time, I expressed some rough ideas about the thread model. However I
> found that it's hard to describe clearly in mailing list. So I wrote it
> down with some graphs, exampled some kinds of models, see Thread Model of
> Source
> <
> https://docs.google.com/document/d/1XpYkkJo97CUw-UMVrKU6b0ZZuJJ2V7mBb__L6UzdWTw/edit?usp=sharing
> >.
> I wish that can be helpful.
>
> IMO thread model is an important part. Without thinking of implementation
> clearly, it's difficult to decide what the up level interface should look
> like.
> It would be better if we draw the whole picture first and then fill the
> detail parts one by one.
>
> @Piotrek About adding new splits to existing split reader. It's an
> interesting idea. Not only for solving too many threads problem, but also
> for supporting some more complicated system. I know in some storage
> systems, there is some scenario which the partition is dynamic(dynamically
> splitting or merging). Though I have not think of it very clearly now. I
> would give you more detailed reply asap :)
>
>
> Guowei Ma <guowei....@gmail.com> 于2018年11月23日周五 下午6:37写道:
>
> > Hi,Piotr
> > Sorry  for so late to response.
> >
> >
> > First of all I think Flink runtime can assigned a thread for a
> StreamTask,
> > which likes  'Actor' model. The number of threads for a StreamTask should
> > not be proportional to the operator or other things. This will give Flink
> > the ability to scale horizontally. So I think it's not just the
> > network(flush),checkpoint and  source, but some operators' threads can
> also
> > be removed in the future, like AsyncWaitOperator.
> >
> >
> >
> > for b)
> > When using event time, some sources want to assign a timestamp to each
> > element. In current Flink interface, user will write like this
> > public class EventTimeSource<Element> implements SourceFunction {
> >   public void run() {
> >      while(...){
> >          Element record = // get from file or some queue;
> >          long timestamp = parseTimestampFromElement(record);
> >          sourceContext.collectWithTimestamp(record, timestamp);
> >      }
> >   }
> > }
> > Using the interfaces from this FLIP, user can write like this
> >
> > public EventTimeSplitReader<Element,X> implements SplitReader {
> >     Element currentRecord = null;
> >
> >
> >     // Please ignoring the handling of boundary conditions
> >     public boolean advace(){
> >        currentRecord = //move a pointer forward
> >        return true;
> >      }
> >
> >     public Element getCurrent(){
> >        return currentRecord;
> >     }
> >     public long getCurrentTimestamp() {
> >       return parseTimestampFromElement(currentRecord);
> >     }
> > }
> >
> > if merging the advance/getNext to a method like getNext() , the
> SplitReader
> > interface may need to change a little like this
> >
> > public interface SplitReader2<T,X> {
> > public class ElementWithTimestamp {
> >     T element;
> >     long timestamp;
> > }
> >
> > public ElementWithTimestamp getNext() ;
> >
> > }
> > Now user may need implement the source like this
> > public EventTimeSplitReader<Element,X> implements SplitReader2 {
> >     Element currentRecord = null;
> >
> >     // Please ignoring the handling of boundary conditions
> >     public ElementWithTimestamp getCurrent(){
> >        return new ElementWithTimestamp(currentRecord,
> > parseTimestampFromElement(currentRecord))
> >     }
> > }
> > The user can use a constant ElementWithTimestamp but I think this need
> the
> > every connector developers to know this trick. The current Flip will not
> > have this burden.
> > Maybe there has other way like '' void getCurrent(ElementWithTimestamp)"
> > to avoid creating a new object.  But my personal preference is
> > ‘advance/getCurrent’.
> >
> >
> >
> > Piotr Nowojski <pi...@data-artisans.com> 于2018年11月7日周三 下午4:31写道:
> >
> > > Hi,
> > >
> > > a)
> > >
> > > > BTW, regarding the isBlock() method, I have a few more questions. 21,
> > Is
> > > a method isReady() with boolean as a return value
> > > > equivalent? Personally I found it is a little bit confusing in what
> is
> > > supposed to be returned when the future is completed. 22. if
> > > > the implementation of isBlocked() is optional, how do the callers
> know
> > > whether the method is properly implemented or not?
> > > > Does not implemented mean it always return a completed future?
> > >
> > > `CompletableFuture<?> isBlocked()` is more or less an equivalent to
> > > `boolean hasNext()` which in case of “false” provides some kind of a
> > > listener/callback that notifies about presence of next element. There
> are
> > > some minor details, like `CompletableFuture<?>` has a minimal two state
> > > logic:
> > >
> > > 1. Future is completed - we have more data
> > > 2. Future not yet completed - we don’t have data now, but we might/we
> > will
> > > have in the future
> > >
> > > While `boolean hasNext()` and `notify()` callback are a bit more
> > > complicated/dispersed and can lead/encourage `notify()` spam.
> > >
> > > b)
> > >
> > > > 3. If merge the `advance` and `getCurrent`  to one method like
> > `getNext`
> > > the `getNext` would need return a
> > > >`ElementWithTimestamp` because some sources want to add timestamp to
> > > every element. IMO, this is not so memory friendly
> > > > so I prefer this design.
> > >
> > > Guowei I don’t quite understand this. Could you elaborate why having a
> > > separate `advance()` help?
> > >
> > > c)
> > >
> > > Regarding advance/poll/take. What’s the value of having two separate
> > > methods: poll and take? Which one of them should be called and which
> > > implemented? What’s the benefit of having those methods compared to
> > having
> > > a one single method `getNextElement()` (or `pollElement() or whatever
> we
> > > name it) with following contract:
> > >
> > > CompletableFuture<?> isBlocked();
> > >
> > > /**
> > > Return next element - will be called only if `isBlocked()` is
> completed.
> > > Try to implement it in non blocking fashion, but if that’s impossible
> or
> > > you just don’t need the effort, you can block in this method.
> > > */
> > > T getNextElement();
> > >
> > > I mean, if the connector is implemented non-blockingly, Flink should
> use
> > > it that way. If it’s not, then `poll()` will `throw new
> > > NotImplementedException()`. Implementing both of them and providing
> both
> > of
> > > them to Flink wouldn’t make a sense, thus why not merge them into a
> > single
> > > method call that should preferably (but not necessarily need to) be
> > > non-blocking? It’s not like we are implementing general purpose
> `Queue`,
> > > which users might want to call either of `poll` or `take`. We would
> > always
> > > prefer to call `poll`, but if it’s blocking, then still we have no
> > choice,
> > > but to call it and block on it.
> > >
> > > d)
> > >
> > > > 1. I agree with Piotr and Becket that the non-blocking source is very
> > > > important. But in addition to `Future/poll`, there may be another way
> > to
> > > > achieve this. I think it may be not very memory friendly if every
> > advance
> > > > call return a Future.
> > >
> > > I didn’t want to mention this, to not clog my initial proposal, but
> there
> > > is a simple solution for the problem:
> > >
> > > public interface SplitReader {
> > >
> > >     (…)
> > >
> > >     CompletableFuture<?> NOT_BLOCKED =
> > > CompletableFuture.completedFuture(null);
> > >
> > >     /**
> > >      * Returns a future that will be completed when the page source
> > becomes
> > >      * unblocked.  If the page source is not blocked, this method
> should
> > > return
> > >      * {@code NOT_BLOCKED}.
> > >      */
> > >     default CompletableFuture<?> isBlocked()
> > >     {
> > >         return NOT_BLOCKED;
> > >     }
> > >
> > > If we are blocked and we are waiting for the IO, then creating a new
> > > Future is non-issue. Under full throttle/throughput and not blocked
> > sources
> > > returning a static `NOT_BLOCKED` constant  should also solve the
> problem.
> > >
> > > One more remark, non-blocking sources might be a necessity in a single
> > > threaded model without a checkpointing lock. (Currently when sources
> are
> > > blocked, they can release checkpointing lock and re-acquire it again
> > > later). Non-blocking `poll`/`getNext()` would allow for checkpoints to
> > > happen when source is idling. In that case either `notify()` or my
> > proposed
> > > `isBlocked()` would allow to avoid busy-looping.
> > >
> > > Piotrek
> > >
> > > > On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> wrote:
> > > >
> > > > Hi Thomas,
> > > >
> > > > The iterator-like API was also the first thing that came to me. But
> it
> > > > seems a little confusing that hasNext() does not mean "the stream has
> > not
> > > > ended", but means "the next record is ready", which is repurposing
> the
> > > well
> > > > known meaning of hasNext(). If we follow the hasNext()/next()
> pattern,
> > an
> > > > additional isNextReady() method to indicate whether the next record
> is
> > > > ready seems more intuitive to me.
> > > >
> > > > Similarly, in poll()/take() pattern, another method of isDone() is
> > needed
> > > > to indicate whether the stream has ended or not.
> > > >
> > > > Compared with hasNext()/next()/isNextReady() pattern,
> > > > isDone()/poll()/take() seems more flexible for the reader
> > implementation.
> > > > When I am implementing a reader, I could have a couple of choices:
> > > >
> > > >   - A thread-less reader that does not have any internal thread.
> > > >   - When poll() is called, the same calling thread will perform a
> bunch
> > > of
> > > >      IO asynchronously.
> > > >      - When take() is called, the same calling thread will perform a
> > > bunch
> > > >      of IO and wait until the record is ready.
> > > >   - A reader with internal threads performing network IO and put
> > records
> > > >   into a buffer.
> > > >      - When poll() is called, the calling thread simply reads from
> the
> > > >      buffer and return empty result immediately if there is no
> record.
> > > >      - When take() is called, the calling thread reads from the
> buffer
> > > and
> > > >      block waiting if the buffer is empty.
> > > >
> > > > On the other hand, with the hasNext()/next()/isNextReady() API, it is
> > > less
> > > > intuitive for the reader developers to write the thread-less pattern.
> > > > Although technically speaking one can still do the asynchronous IO to
> > > > prepare the record in isNextReady(). But it is inexplicit and seems
> > > > somewhat hacky.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > > On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org> wrote:
> > > >
> > > >> Couple more points regarding discovery:
> > > >>
> > > >> The proposal mentions that discovery could be outside the execution
> > > graph.
> > > >> Today, discovered partitions/shards are checkpointed. I believe that
> > > will
> > > >> also need to be the case in the future, even when discovery and
> > reading
> > > are
> > > >> split between different tasks.
> > > >>
> > > >> For cases such as resharding of a Kinesis stream, the relationship
> > > between
> > > >> splits needs to be considered. Splits cannot be randomly distributed
> > > over
> > > >> readers in certain situations. An example was mentioned here:
> > > >> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> > > >>
> > > >> Thomas
> > > >>
> > > >>
> > > >> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org> wrote:
> > > >>
> > > >>> Thanks for getting the ball rolling on this!
> > > >>>
> > > >>> Can the number of splits decrease? Yes, splits can be closed and go
> > > away.
> > > >>> An example would be a shard merge in Kinesis (2 existing shards
> will
> > be
> > > >>> closed and replaced with a new shard).
> > > >>>
> > > >>> Regarding advance/poll/take: IMO the least restrictive approach
> would
> > > be
> > > >>> the thread-less IO model (pull based, non-blocking, caller
> retrieves
> > > new
> > > >>> records when available). The current Kinesis API requires the use
> of
> > > >>> threads. But that can be internal to the split reader and does not
> > need
> > > >> to
> > > >>> be a source API concern. In fact, that's what we are working on
> right
> > > now
> > > >>> as improvement to the existing consumer: Each shard consumer thread
> > > will
> > > >>> push to a queue, the consumer main thread will poll the queue(s).
> It
> > is
> > > >>> essentially a mapping from threaded IO to non-blocking.
> > > >>>
> > > >>> The proposed SplitReader interface would fit the thread-less IO
> > model.
> > > >>> Similar to an iterator, we find out if there is a new element
> > (hasNext)
> > > >> and
> > > >>> if so, move to it (next()). Separate calls deliver the meta
> > information
> > > >>> (timestamp, watermark). Perhaps advance call could offer a timeout
> > > >> option,
> > > >>> so that the caller does not end up in a busy wait. On the other
> > hand, a
> > > >>> caller processing multiple splits may want to cycle through fast,
> to
> > > >>> process elements of other splits as soon as they become available.
> > The
> > > >> nice
> > > >>> thing is that this "split merge" logic can now live in Flink and be
> > > >>> optimized and shared between different sources.
> > > >>>
> > > >>> Thanks,
> > > >>> Thomas
> > > >>>
> > > >>>
> > > >>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com>
> > wrote:
> > > >>>
> > > >>>> Hi,
> > > >>>> Thanks Aljoscha for this FLIP.
> > > >>>>
> > > >>>> 1. I agree with Piotr and Becket that the non-blocking source is
> > very
> > > >>>> important. But in addition to `Future/poll`, there may be another
> > way
> > > to
> > > >>>> achieve this. I think it may be not very memory friendly if every
> > > >> advance
> > > >>>> call return a Future.
> > > >>>>
> > > >>>> public interface Listener {
> > > >>>>     public void notify();
> > > >>>> }
> > > >>>>
> > > >>>> public interface SplitReader() {
> > > >>>>     /**
> > > >>>>      * When there is no element temporarily, this will return
> false.
> > > >>>>      * When elements is available again splitReader can call
> > > >>>> listener.notify()
> > > >>>>      * In addition the frame would check `advance` periodically .
> > > >>>>      * Of course advance can always return true and ignore the
> > > listener
> > > >>>> argument for simplicity.
> > > >>>>      */
> > > >>>>     public boolean advance(Listener listener);
> > > >>>> }
> > > >>>>
> > > >>>> 2.  The FLIP tells us very clearly that how to create all Splits
> and
> > > how
> > > >>>> to create a SplitReader from a Split. But there is no strategy for
> > the
> > > >> user
> > > >>>> to choose how to assign the splits to the tasks. I think we could
> > add
> > > a
> > > >>>> Enum to let user to choose.
> > > >>>> /**
> > > >>>>  public Enum SplitsAssignmentPolicy {
> > > >>>>    Location,
> > > >>>>    Workload,
> > > >>>>    Random,
> > > >>>>    Average
> > > >>>>  }
> > > >>>> */
> > > >>>>
> > > >>>> 3. If merge the `advance` and `getCurrent`  to one method like
> > > `getNext`
> > > >>>> the `getNext` would need return a `ElementWithTimestamp` because
> > some
> > > >>>> sources want to add timestamp to every element. IMO, this is not
> so
> > > >> memory
> > > >>>> friendly so I prefer this design.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Thanks
> > > >>>>
> > > >>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
> > other
> > > >>>>> possible improvements. I have one proposal. Instead of having a
> > > method:
> > > >>>>>
> > > >>>>> boolean advance() throws IOException;
> > > >>>>>
> > > >>>>> I would replace it with
> > > >>>>>
> > > >>>>> /*
> > > >>>>> * Return a future, which when completed means that source has
> more
> > > >> data
> > > >>>>> and getNext() will not block.
> > > >>>>> * If you wish to use benefits of non blocking connectors, please
> > > >>>>> implement this method appropriately.
> > > >>>>> */
> > > >>>>> default CompletableFuture<?> isBlocked() {
> > > >>>>>        return CompletableFuture.completedFuture(null);
> > > >>>>> }
> > > >>>>>
> > > >>>>> And rename `getCurrent()` to `getNext()`.
> > > >>>>>
> > > >>>>> Couple of arguments:
> > > >>>>> 1. I don’t understand the division of work between `advance()`
> and
> > > >>>>> `getCurrent()`. What should be done in which, especially for
> > > connectors
> > > >>>>> that handle records in batches (like Kafka) and when should you
> > call
> > > >>>>> `advance` and when `getCurrent()`.
> > > >>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us
> in
> > > the
> > > >>>>> future to have asynchronous/non blocking connectors and more
> > > >> efficiently
> > > >>>>> handle large number of blocked threads, without busy waiting.
> While
> > > at
> > > >> the
> > > >>>>> same time it doesn’t add much complexity, since naive connector
> > > >>>>> implementations can be always blocking.
> > > >>>>> 3. This also would allow us to use a fixed size thread pool of
> task
> > > >>>>> executors, instead of one thread per task.
> > > >>>>>
> > > >>>>> Piotrek
> > > >>>>>
> > > >>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org
> >
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>> Hi All,
> > > >>>>>>
> > > >>>>>> In order to finally get the ball rolling on the new source
> > interface
> > > >>>>> that we have discussed for so long I finally created a FLIP:
> > > >>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > > >>>>>>
> > > >>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
> > > about
> > > >>>>> adding per-partition watermark support to the Kinesis source and
> > > >> because
> > > >>>>> this would enable generic implementation of event-time alignment
> > for
> > > >> all
> > > >>>>> sources. Maybe we need another FLIP for the event-time alignment
> > > part,
> > > >>>>> especially the part about information sharing between operations
> > (I'm
> > > >> not
> > > >>>>> calling it state sharing because state has a special meaning in
> > > Flink).
> > > >>>>>>
> > > >>>>>> Please discuss away!
> > > >>>>>>
> > > >>>>>> Aljoscha
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>
> > >
> > >
> >
>

Reply via email to