Thanks everyone for the lively discussion. Let me try to summarize where I
see convergence in the discussion and open issues.
I'll try to group this by design aspect of the source. Please let me know
if I got things wrong or missed something crucial here.

For issues 1-3, if the below reflects the state of the discussion, I would
try and update the FLIP in the next days.
For the remaining ones we need more discussion.

I would suggest to fork each of these aspects into a separate mail thread,
or will loose sight of the individual aspects.

*(1) Separation of Split Enumerator and Split Reader*

  - All seem to agree this is a good thing
  - Split Enumerator could in the end live on JobManager (and assign splits
via RPC) or in a task (and assign splits via data streams)
  - this discussion is orthogonal and should come later, when the interface
is agreed upon.

*(2) Split Readers for one or more splits*

  - Discussion seems to agree that we need to support one reader that
possibly handles multiple splits concurrently.
  - The requirement comes from sources where one poll()-style call fetches
data from different splits / partitions
    --> example sources that require that would be for example Kafka,
Pravega, Pulsar

  - Could have one split reader per source, or multiple split readers that
share the "poll()" function
  - To not make it too complicated, we can start with thinking about one
split reader for all splits initially and see if that covers all
requirements

*(3) Threading model of the Split Reader*

  - Most active part of the discussion ;-)

  - A non-blocking way for Flink's task code to interact with the source is
needed in order to a task runtime code based on a
single-threaded/actor-style task design
    --> I personally am a big proponent of that, it will help with
well-behaved checkpoints, efficiency, and simpler yet more robust runtime
code

  - Users care about simple abstraction, so as a subclass of SplitReader
(non-blocking / async) we need to have a BlockingSplitReader which will
form the basis of most source implementations. BlockingSplitReader lets
users do blocking simple poll() calls.
  - The BlockingSplitReader would spawn a thread (or more) and the
thread(s) can make blocking calls and hand over data buffers via a blocking
queue
  - This should allow us to cover both, a fully async runtime, and a simple
blocking interface for users.
  - This is actually very similar to how the Kafka connectors work. Kafka
9+ with one thread, Kafka 8 with multiple threads

  - On the base SplitReader (the async one), the non-blocking method that
gets the next chunk of data would signal data availability via a
CompletableFuture, because that gives the best flexibility (can await
completion or register notification handlers).
  - The source task would register a "thenHandle()" (or similar) on the
future to put a "take next data" task into the actor-style mailbox

*(4) Split Enumeration and Assignment*

  - Splits may be generated lazily, both in cases where there is a limited
number of splits (but very many), or splits are discovered over time
  - Assignment should also be lazy, to get better load balancing
  - Assignment needs support locality preferences

  - Possible design based on discussion so far:

    --> SplitReader has a method "addSplits(SplitT...)" to add one or more
splits. Some split readers might assume they have only one split ever,
concurrently, others assume multiple splits. (Note: idea behind being able
to add multiple splits at the same time is to ease startup where multiple
splits may be assigned instantly.)
    --> SplitReader has a context object on which it can call indicate when
splits are completed. The enumerator gets that notification and can use to
decide when to assign new splits. This should help both in cases of sources
that take splits lazily (file readers) and in case the source needs to
preserve a partial order between splits (Kinesis, Pravega, Pulsar may need
that).
    --> SplitEnumerator gets notification when SplitReaders start and when
they finish splits. They can decide at that moment to push more splits to
that reader
    --> The SplitEnumerator should probably be aware of the source
parallelism, to build its initial distribution.

  - Open question: Should the source expose something like "host
preferences", so that yarn/mesos/k8s can take this into account when
selecting a node to start a TM on?

*(5) Watermarks and event time alignment*

  - Watermark generation, as well as idleness, needs to be per split (like
currently in the Kafka Source, per partition)
  - It is desirable to support optional event-time-alignment, meaning that
splits that are ahead are back-pressured or temporarily unsubscribed

  - I think i would be desirable to encapsulate watermark generation logic
in watermark generators, for a separation of concerns. The watermark
generators should run per split.
  - Using watermark generators would also help with another problem of the
suggested interface, namely supporting non-periodic watermarks efficiently.

  - Need a way to "dispatch" next record to different watermark generators
  - Need a way to tell SplitReader to "suspend" a split until a certain
watermark is reached (event time backpressure)
  - This would in fact be not needed (and thus simpler) if we had a
SplitReader per split and may be a reason to re-open that discussion

*(6) Watermarks across splits and in the Split Enumerator*

  - The split enumerator may need some watermark awareness, which should be
purely based on split metadata (like create timestamp of file splits)
  - If there are still more splits with overlapping event time range for a
split reader, then that split reader should not advance the watermark
within the split beyond the overlap boundary. Otherwise future splits will
produce late data.

  - One way to approach this could be that the split enumerator may send
watermarks to the readers, and the readers cannot emit watermarks beyond
that received watermark.
  - Many split enumerators would simply immediately send Long.MAX out and
leave the progress purely to the split readers.

  - For event-time alignment / split back pressure, this begs the question
how we can avoid deadlocks that may arise when splits are suspended for
event time back pressure,

*(7) Batch and streaming Unification*

  - Functionality wise, the above design should support both
  - Batch often (mostly) does not care about reading "in order" and
generating watermarks
    --> Might use different enumerator logic that is more locality aware
and ignores event time order
    --> Does not generate watermarks
  - Would be great if bounded sources could be identified at compile time,
so that "env.addBoundedSource(...)" is type safe and can return a
"BoundedDataStream".
  - Possible to defer this discussion until later

*Miscellaneous Comments*

  - Should the source have a TypeInformation for the produced type, instead
of a serializer? We need a type information in the stream anyways, and can
derive the serializer from that. Plus, creating the serializer should
respect the ExecutionConfig.

  - The TypeSerializer interface is very powerful but also not easy to
implement. Its purpose is to handle data super efficiently, support
flexible ways of evolution, etc.
  For metadata I would suggest to look at the SimpleVersionedSerializer
instead, which is used for example for checkpoint master hooks, or for the
streaming file sink. I think that is is a good match for cases where we do
not need more than ser/deser (no copy, etc.) and don't need to push
versioning out of the serialization paths for best performance (as in the
TypeSerializer)


On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Biao,
>
> Thanks for the answer!
>
> So given the multi-threaded readers, now we have as open questions:
>
> 1) How do we let the checkpoints pass through our multi-threaded reader
> operator?
>
> 2) Do we have separate reader and source operators or not? In the strategy
> that has a separate source, the source operator has a parallelism of 1 and
> is responsible for split recovery only.
>
> For the first one, given also the constraints (blocking, finite queues,
> etc), I do not have an answer yet.
>
> For the 2nd, I think that we should go with separate operators for the
> source and the readers, for the following reasons:
>
> 1) This is more aligned with a potential future improvement where the split
> discovery becomes a responsibility of the JobManager and readers are
> pooling more work from the JM.
>
> 2) The source is going to be the "single point of truth". It will know what
> has been processed and what not. If the source and the readers are a single
> operator with parallelism > 1, or in general, if the split discovery is
> done by each task individually, then:
>    i) we have to have a deterministic scheme for each reader to assign
> splits to itself (e.g. mod subtaskId). This is not necessarily trivial for
> all sources.
>    ii) each reader would have to keep a copy of all its processed slpits
>    iii) the state has to be a union state with a non-trivial merging logic
> in order to support rescaling.
>
> Two additional points that you raised above:
>
> i) The point that you raised that we need to keep all splits (processed and
> not-processed) I think is a bit of a strong requirement. This would imply
> that for infinite sources the state will grow indefinitely. This is problem
> is even more pronounced if we do not have a single source that assigns
> splits to readers, as each reader will have its own copy of the state.
>
> ii) it is true that for finite sources we need to somehow not close the
> readers when the source/split discoverer finishes. The
> ContinuousFileReaderOperator has a work-around for that. It is not elegant,
> and checkpoints are not emitted after closing the source, but this, I
> believe, is a bigger problem which requires more changes than just
> refactoring the source interface.
>
> Cheers,
> Kostas
>

Reply via email to