One more thing. In the current proposal, with the
supportsBoundedness(Boundedness) method and the boundedness coming from
either continuousSource or boundedSource I could not find how this
information is fed back to the SplitEnumerator.

Best,

Dawid

On 09/12/2019 13:52, Becket Qin wrote:
> Hi Dawid,
>
> Thanks for the comments. This actually brings another relevant question
> about what does a "bounded source" imply. I actually had the same
> impression when I look at the Source API. Here is what I understand after
> some discussion with Stephan. The bounded source has the following impacts.
>
> 1. API validity.
> - A bounded source generates a bounded stream so some operations that only
> works for bounded records would be performed, e.g. sort.
> - To expose these bounded stream only APIs, there are two options:
>      a. Add them to the DataStream API and throw exception if a method is
> called on an unbounded stream.
>      b. Create a BoundedDataStream class which is returned from
> env.boundedSource(), while DataStream is returned from env.continousSource().
> Note that this cannot be done by having single env.source(theSource) even
> the Source has a getBoundedness() method.
>
> 2. Scheduling
> - A bounded source could be computed stage by stage without bringing up all
> the tasks at the same time.
>
> 3. Operator behaviors
> - A bounded source indicates the records are finite so some operators can
> wait until it receives all the records before it starts the processing.
>
> In the above impact, only 1 is relevant to the API design. And the current
> proposal in FLIP-27 is following 1.b.
>
> // boundedness depends of source property, imo this should always be
>> preferred
>>
>
> DataStream<MyType> stream = env.source(theSource);
>
>
> In your proposal, does DataStream have bounded stream only methods? It
> looks it should have, otherwise passing a bounded Source to env.source()
> would be confusing. In that case, we will essentially do 1.a if an
> unbounded Source is created from env.source(unboundedSource).
>
> If we have the methods only supported for bounded streams in DataStream, it
> seems a little weird to have a separate BoundedDataStream interface.
>
> Am I understand it correctly?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi all,
>>
>> Really well written proposal and very important one. I must admit I have
>> not understood all the intricacies of it yet.
>>
>> One question I have though is about where does the information about
>> boundedness come from. I think in most cases it is a property of the
>> source. As you described it might be e.g. end offset, a flag should it
>> monitor new splits etc. I think it would be a really nice use case to be
>> able to say:
>>
>> new KafkaSource().readUntil(long timestamp),
>>
>> which could work as an "end offset". Moreover I think all Bounded sources
>> support continuous mode, but no intrinsically continuous source support the
>> Bounded mode. If I understood the proposal correctly it suggest the
>> boundedness sort of "comes" from the outside of the source, from the
>> invokation of either boundedStream or continousSource.
>>
>> I am wondering if it would make sense to actually change the method
>>
>> boolean Source#supportsBoundedness(Boundedness)
>>
>> to
>>
>> Boundedness Source#getBoundedness().
>>
>> As for the methods #boundedSource, #continousSource, assuming the
>> boundedness is property of the source they do not affect how the enumerator
>> works, but mostly how the dag is scheduled, right? I am not against those
>> methods, but I think it is a very specific use case to actually override
>> the property of the source. In general I would expect users to only call
>> env.source(theSource), where the source tells if it is bounded or not. I
>> would suggest considering following set of methods:
>>
>> // boundedness depends of source property, imo this should always be 
>> preferred
>>
>> DataStream<MyType> stream = env.source(theSource);
>>
>>
>> // always continous execution, whether bounded or unbounded source
>>
>> DataStream<MyType> boundedStream = env.continousSource(theSource);
>>
>> // imo this would make sense if the BoundedDataStream provides additional 
>> features unavailable for continous mode
>> BoundedDataStream<MyType> batch = env.boundedSource(theSource);
>>
>>
>> Best,
>>
>> Dawid
>>
>>
>> On 04/12/2019 11:25, Stephan Ewen wrote:
>>
>> Thanks, Becket, for updating this.
>>
>> I agree with moving the aspects you mentioned into separate FLIPs - this
>> one way becoming unwieldy in size.
>>
>> +1 to the FLIP in its current state. Its a very detailed write-up, nicely
>> done!
>>
>> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> 
>> <becket....@gmail.com> wrote:
>>
>>
>> Hi all,
>>
>> Sorry for the long belated update. I have updated FLIP-27 wiki page with
>> the latest proposals. Some noticeable changes include:
>> 1. A new generic communication mechanism between SplitEnumerator and
>> SourceReader.
>> 2. Some detail API method signature changes.
>>
>> We left a few things out of this FLIP and will address them in separate
>> FLIPs. Including:
>> 1. Per split event time.
>> 2. Event time alignment.
>> 3. Fine grained failover for SplitEnumerator failure.
>>
>> Please let us know if you have any question.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> 
>> <se...@apache.org> wrote:
>>
>>
>> Hi  Łukasz!
>>
>> Becket and me are working hard on figuring out the last details and
>> implementing the first PoC. We would update the FLIP hopefully next week.
>>
>> There is a fair chance that a first version of this will be in 1.10, but
>>
>> I
>>
>> think it will take another release to battle test it and migrate the
>> connectors.
>>
>> Best,
>> Stephan
>>
>>
>>
>>
>> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl> 
>> <l...@touk.pl>
>>
>> wrote:
>>
>> Hi,
>>
>> This proposal looks very promising for us. Do you have any plans in
>>
>> which
>>
>> Flink release it is going to be released? We are thinking on using a
>>
>> Data
>>
>> Set API for our future use cases but on the other hand Data Set API is
>> going to be deprecated so using proposed bounded data streams solution
>> could be more viable in the long term.
>>
>> Thanks,
>> Łukasz
>>
>> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> 
>> <thomas.we...@gmail.com> wrote:
>>
>> Thanks for putting together this proposal!
>>
>> I see that the "Per Split Event Time" and "Event Time Alignment"
>>
>> sections
>>
>> are still TBD.
>>
>> It would probably be good to flesh those out a bit before proceeding
>>
>> too
>>
>> far
>>
>> as the event time alignment will probably influence the interaction
>>
>> with
>>
>> the split reader, specifically ReaderStatus emitNext(SourceOutput<E>
>> output).
>>
>> We currently have only one implementation for event time alignment in
>>
>> the
>>
>> Kinesis consumer. The synchronization in that case takes place as the
>>
>> last
>>
>> step before records are emitted downstream (RecordEmitter). With the
>> currently proposed interfaces, the equivalent can be implemented in
>>
>> the
>>
>> reader loop, although note that in the Kinesis consumer the per shard
>> threads push records.
>>
>> Synchronization has not been implemented for the Kafka consumer yet.
>> https://issues.apache.org/jira/browse/FLINK-12675
>>
>> When I looked at it, I realized that the implementation will look
>>
>> quite
>>
>> different
>> from Kinesis because it needs to take place in the pull part, where
>>
>> records
>>
>> are taken from the Kafka client. Due to the multiplexing it cannot be
>>
>> done
>>
>> by blocking the split thread like it currently works for Kinesis.
>>
>> Reading
>>
>> from individual Kafka partitions needs to be controlled via
>>
>> pause/resume
>>
>> on the Kafka client.
>>
>> To take on that responsibility the split thread would need to be
>>
>> aware
>>
>> of
>>
>> the
>> watermarks or at least whether it should or should not continue to
>>
>> consume
>>
>> a given split and this may require a different SourceReader or
>>
>> SourceOutput
>>
>> interface.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> 
>> <mmyy1...@gmail.com> wrote:
>>
>>
>> Hi Stephan,
>>
>> Thank you for feedback!
>> Will take a look at your branch before public discussing.
>>
>>
>> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> 
>> <se...@apache.org>
>>
>> wrote:
>>
>> Hi Biao!
>>
>> Thanks for reviving this. I would like to join this discussion,
>>
>> but
>>
>> am
>>
>> quite occupied with the 1.9 release, so can we maybe pause this
>>
>> discussion
>>
>> for a week or so?
>>
>> In the meantime I can share some suggestion based on prior
>>
>> experiments:
>>
>> How to do watermarks / timestamp extractors in a simpler and more
>>
>> flexible
>>
>> way. I think that part is quite promising should be part of the
>>
>> new
>>
>> source
>>
>> interface.
>>
>>
>>
>> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
>>
>> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
>>
>> Some experiments on how to build the source reader and its
>>
>> library
>>
>> for
>>
>> common threading/split patterns:
>>
>>
>>
>> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
>>
>> Best,
>> Stephan
>>
>>
>> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> 
>> <mmyy1...@gmail.com>
>>
>> wrote:
>>
>> Hi devs,
>>
>> Since 1.9 is nearly released, I think we could get back to
>>
>> FLIP-27.
>>
>> I
>>
>> believe it should be included in 1.10.
>>
>> There are so many things mentioned in document of FLIP-27. [1] I
>>
>> think
>>
>> we'd better discuss them separately. However the wiki is not a
>>
>> good
>>
>> place
>>
>> to discuss. I wrote google doc about SplitReader API which
>>
>> misses
>>
>> some
>>
>> details in the document. [2]
>>
>> 1.
>>
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
>>
>> 2.
>>
>>
>> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
>>
>> CC Stephan, Aljoscha, Piotrek, Becket
>>
>>
>> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> 
>> <mmyy1...@gmail.com>
>>
>> wrote:
>>
>> Hi Steven,
>> Thank you for the feedback. Please take a look at the document
>>
>> FLIP-27
>>
>> <
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>
>> which
>>
>> is updated recently. A lot of details of enumerator were added
>>
>> in
>>
>> this
>>
>> document. I think it would help.
>>
>> Steven Wu <stevenz...@gmail.com> <stevenz...@gmail.com> 于2019年3月28日周四 
>> 下午12:52写道:
>>
>>
>> This proposal mentioned that SplitEnumerator might run on the
>> JobManager or
>> in a single task on a TaskManager.
>>
>> if enumerator is a single task on a taskmanager, then the job
>>
>> DAG
>>
>> can
>>
>> never
>> been embarrassingly parallel anymore. That will nullify the
>>
>> leverage
>>
>> of
>>
>> fine-grained recovery for embarrassingly parallel jobs.
>>
>> It's not clear to me what's the implication of running
>>
>> enumerator
>>
>> on
>>
>> the
>>
>> jobmanager. So I will leave that out for now.
>>
>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> 
>> <mmyy1...@gmail.com>
>>
>> wrote:
>>
>> Hi Stephan & Piotrek,
>>
>> Thank you for feedback.
>>
>> It seems that there are a lot of things to do in community.
>>
>> I
>>
>> am
>>
>> just
>>
>> afraid that this discussion may be forgotten since there so
>>
>> many
>>
>> proposals
>>
>> recently.
>> Anyway, wish to see the split topics soon :)
>>
>> Piotr Nowojski <pi...@da-platform.com> <pi...@da-platform.com> 于2019年1月24日周四
>>
>> 下午8:21写道:
>>
>> Hi Biao!
>>
>> This discussion was stalled because of preparations for
>>
>> the
>>
>> open
>>
>> sourcing
>>
>> & merging Blink. I think before creating the tickets we
>>
>> should
>>
>> split this
>>
>> discussion into topics/areas outlined by Stephan and
>>
>> create
>>
>> Flips
>>
>> for
>>
>> that.
>>
>> I think there is no chance for this to be completed in
>>
>> couple
>>
>> of
>>
>> remaining
>>
>> weeks/1 month before 1.8 feature freeze, however it would
>>
>> be
>>
>> good
>>
>> to aim
>>
>> with those changes for 1.9.
>>
>> Piotrek
>>
>>
>> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> <mmyy1...@gmail.com>
>>
>> wrote:
>>
>> Hi community,
>> The summary of Stephan makes a lot sense to me. It is
>>
>> much
>>
>> clearer
>>
>> indeed
>>
>> after splitting the complex topic into small ones.
>> I was wondering is there any detail plan for next step?
>>
>> If
>>
>> not,
>>
>> I
>>
>> would
>>
>> like to push this thing forward by creating some JIRA
>>
>> issues.
>>
>> Another question is that should version 1.8 include
>>
>> these
>>
>> features?
>>
>> Stephan Ewen <se...@apache.org> <se...@apache.org> 于2018年12月1日周六 上午4:20写道:
>>
>>
>> Thanks everyone for the lively discussion. Let me try
>>
>> to
>>
>> summarize
>>
>> where I
>>
>> see convergence in the discussion and open issues.
>> I'll try to group this by design aspect of the source.
>>
>> Please
>>
>> let me
>>
>> know
>>
>> if I got things wrong or missed something crucial here.
>>
>> For issues 1-3, if the below reflects the state of the
>>
>> discussion, I
>>
>> would
>>
>> try and update the FLIP in the next days.
>> For the remaining ones we need more discussion.
>>
>> I would suggest to fork each of these aspects into a
>>
>> separate
>>
>> mail
>>
>> thread,
>>
>> or will loose sight of the individual aspects.
>>
>> *(1) Separation of Split Enumerator and Split Reader*
>>
>>  - All seem to agree this is a good thing
>>  - Split Enumerator could in the end live on JobManager
>>
>> (and
>>
>> assign
>>
>> splits
>>
>> via RPC) or in a task (and assign splits via data
>>
>> streams)
>>
>>  - this discussion is orthogonal and should come later,
>>
>> when
>>
>> the
>>
>> interface
>>
>> is agreed upon.
>>
>> *(2) Split Readers for one or more splits*
>>
>>  - Discussion seems to agree that we need to support
>>
>> one
>>
>> reader
>>
>> that
>>
>> possibly handles multiple splits concurrently.
>>  - The requirement comes from sources where one
>>
>> poll()-style
>>
>> call
>>
>> fetches
>>
>> data from different splits / partitions
>>    --> example sources that require that would be for
>>
>> example
>>
>> Kafka,
>>
>> Pravega, Pulsar
>>
>>  - Could have one split reader per source, or multiple
>>
>> split
>>
>> readers
>>
>> that
>>
>> share the "poll()" function
>>  - To not make it too complicated, we can start with
>>
>> thinking
>>
>> about
>>
>> one
>>
>> split reader for all splits initially and see if that
>>
>> covers
>>
>> all
>>
>> requirements
>>
>> *(3) Threading model of the Split Reader*
>>
>>  - Most active part of the discussion ;-)
>>
>>  - A non-blocking way for Flink's task code to interact
>>
>> with
>>
>> the
>>
>> source
>>
>> is
>>
>> needed in order to a task runtime code based on a
>> single-threaded/actor-style task design
>>    --> I personally am a big proponent of that, it will
>>
>> help
>>
>> with
>>
>> well-behaved checkpoints, efficiency, and simpler yet
>>
>> more
>>
>> robust
>>
>> runtime
>>
>> code
>>
>>  - Users care about simple abstraction, so as a
>>
>> subclass
>>
>> of
>>
>> SplitReader
>>
>> (non-blocking / async) we need to have a
>>
>> BlockingSplitReader
>>
>> which
>>
>> will
>>
>> form the basis of most source implementations.
>>
>> BlockingSplitReader
>>
>> lets
>>
>> users do blocking simple poll() calls.
>>  - The BlockingSplitReader would spawn a thread (or
>>
>> more)
>>
>> and
>>
>> the
>>
>> thread(s) can make blocking calls and hand over data
>>
>> buffers
>>
>> via
>>
>> a
>>
>> blocking
>>
>> queue
>>  - This should allow us to cover both, a fully async
>>
>> runtime,
>>
>> and a
>>
>> simple
>>
>> blocking interface for users.
>>  - This is actually very similar to how the Kafka
>>
>> connectors
>>
>> work.
>>
>> Kafka
>>
>> 9+ with one thread, Kafka 8 with multiple threads
>>
>>  - On the base SplitReader (the async one), the
>>
>> non-blocking
>>
>> method
>>
>> that
>>
>> gets the next chunk of data would signal data
>>
>> availability
>>
>> via
>>
>> a
>>
>> CompletableFuture, because that gives the best
>>
>> flexibility
>>
>> (can
>>
>> await
>>
>> completion or register notification handlers).
>>  - The source task would register a "thenHandle()" (or
>>
>> similar)
>>
>> on the
>>
>> future to put a "take next data" task into the
>>
>> actor-style
>>
>> mailbox
>>
>> *(4) Split Enumeration and Assignment*
>>
>>  - Splits may be generated lazily, both in cases where
>>
>> there
>>
>> is a
>>
>> limited
>>
>> number of splits (but very many), or splits are
>>
>> discovered
>>
>> over
>>
>> time
>>
>>  - Assignment should also be lazy, to get better load
>>
>> balancing
>>
>>  - Assignment needs support locality preferences
>>
>>  - Possible design based on discussion so far:
>>
>>    --> SplitReader has a method "addSplits(SplitT...)"
>>
>> to
>>
>> add
>>
>> one or
>>
>> more
>>
>> splits. Some split readers might assume they have only
>>
>> one
>>
>> split
>>
>> ever,
>>
>> concurrently, others assume multiple splits. (Note:
>>
>> idea
>>
>> behind
>>
>> being
>>
>> able
>>
>> to add multiple splits at the same time is to ease
>>
>> startup
>>
>> where
>>
>> multiple
>>
>> splits may be assigned instantly.)
>>    --> SplitReader has a context object on which it can
>>
>> call
>>
>> indicate
>>
>> when
>>
>> splits are completed. The enumerator gets that
>>
>> notification and
>>
>> can
>>
>> use
>>
>> to
>>
>> decide when to assign new splits. This should help both
>>
>> in
>>
>> cases
>>
>> of
>>
>> sources
>>
>> that take splits lazily (file readers) and in case the
>>
>> source
>>
>> needs to
>>
>> preserve a partial order between splits (Kinesis,
>>
>> Pravega,
>>
>> Pulsar may
>>
>> need
>>
>> that).
>>    --> SplitEnumerator gets notification when
>>
>> SplitReaders
>>
>> start
>>
>> and
>>
>> when
>>
>> they finish splits. They can decide at that moment to
>>
>> push
>>
>> more
>>
>> splits
>>
>> to
>>
>> that reader
>>    --> The SplitEnumerator should probably be aware of
>>
>> the
>>
>> source
>>
>> parallelism, to build its initial distribution.
>>
>>  - Open question: Should the source expose something
>>
>> like
>>
>> "host
>>
>> preferences", so that yarn/mesos/k8s can take this into
>>
>> account
>>
>> when
>>
>> selecting a node to start a TM on?
>>
>> *(5) Watermarks and event time alignment*
>>
>>  - Watermark generation, as well as idleness, needs to
>>
>> be
>>
>> per
>>
>> split
>>
>> (like
>>
>> currently in the Kafka Source, per partition)
>>  - It is desirable to support optional
>>
>> event-time-alignment,
>>
>> meaning
>>
>> that
>>
>> splits that are ahead are back-pressured or temporarily
>>
>> unsubscribed
>>
>>  - I think i would be desirable to encapsulate
>>
>> watermark
>>
>> generation
>>
>> logic
>>
>> in watermark generators, for a separation of concerns.
>>
>> The
>>
>> watermark
>>
>> generators should run per split.
>>  - Using watermark generators would also help with
>>
>> another
>>
>> problem of
>>
>> the
>>
>> suggested interface, namely supporting non-periodic
>>
>> watermarks
>>
>> efficiently.
>>
>>  - Need a way to "dispatch" next record to different
>>
>> watermark
>>
>> generators
>>
>>  - Need a way to tell SplitReader to "suspend" a split
>>
>> until a
>>
>> certain
>>
>> watermark is reached (event time backpressure)
>>  - This would in fact be not needed (and thus simpler)
>>
>> if
>>
>> we
>>
>> had
>>
>> a
>>
>> SplitReader per split and may be a reason to re-open
>>
>> that
>>
>> discussion
>>
>> *(6) Watermarks across splits and in the Split
>>
>> Enumerator*
>>
>>  - The split enumerator may need some watermark
>>
>> awareness,
>>
>> which
>>
>> should
>>
>> be
>>
>> purely based on split metadata (like create timestamp
>>
>> of
>>
>> file
>>
>> splits)
>>
>>  - If there are still more splits with overlapping
>>
>> event
>>
>> time
>>
>> range
>>
>> for
>>
>> a
>>
>> split reader, then that split reader should not advance
>>
>> the
>>
>> watermark
>>
>> within the split beyond the overlap boundary. Otherwise
>>
>> future
>>
>> splits
>>
>> will
>>
>> produce late data.
>>
>>  - One way to approach this could be that the split
>>
>> enumerator
>>
>> may
>>
>> send
>>
>> watermarks to the readers, and the readers cannot emit
>>
>> watermarks
>>
>> beyond
>>
>> that received watermark.
>>  - Many split enumerators would simply immediately send
>>
>> Long.MAX
>>
>> out
>>
>> and
>>
>> leave the progress purely to the split readers.
>>
>>  - For event-time alignment / split back pressure, this
>>
>> begs
>>
>> the
>>
>> question
>>
>> how we can avoid deadlocks that may arise when splits
>>
>> are
>>
>> suspended
>>
>> for
>>
>> event time back pressure,
>>
>> *(7) Batch and streaming Unification*
>>
>>  - Functionality wise, the above design should support
>>
>> both
>>
>>  - Batch often (mostly) does not care about reading "in
>>
>> order"
>>
>> and
>>
>> generating watermarks
>>    --> Might use different enumerator logic that is
>>
>> more
>>
>> locality
>>
>> aware
>>
>> and ignores event time order
>>    --> Does not generate watermarks
>>  - Would be great if bounded sources could be
>>
>> identified
>>
>> at
>>
>> compile
>>
>> time,
>>
>> so that "env.addBoundedSource(...)" is type safe and
>>
>> can
>>
>> return a
>>
>> "BoundedDataStream".
>>  - Possible to defer this discussion until later
>>
>> *Miscellaneous Comments*
>>
>>  - Should the source have a TypeInformation for the
>>
>> produced
>>
>> type,
>>
>> instead
>>
>> of a serializer? We need a type information in the
>>
>> stream
>>
>> anyways, and
>>
>> can
>>
>> derive the serializer from that. Plus, creating the
>>
>> serializer
>>
>> should
>>
>> respect the ExecutionConfig.
>>
>>  - The TypeSerializer interface is very powerful but
>>
>> also
>>
>> not
>>
>> easy to
>>
>> implement. Its purpose is to handle data super
>>
>> efficiently,
>>
>> support
>>
>> flexible ways of evolution, etc.
>>  For metadata I would suggest to look at the
>>
>> SimpleVersionedSerializer
>>
>> instead, which is used for example for checkpoint
>>
>> master
>>
>> hooks,
>>
>> or for
>>
>> the
>>
>> streaming file sink. I think that is is a good match
>>
>> for
>>
>> cases
>>
>> where
>>
>> we
>>
>> do
>>
>> not need more than ser/deser (no copy, etc.) and don't
>>
>> need to
>>
>> push
>>
>> versioning out of the serialization paths for best
>>
>> performance
>>
>> (as in
>>
>> the
>>
>> TypeSerializer)
>>
>>
>> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <k.klou...@data-artisans.com>
>> wrote:
>>
>>
>> Hi Biao,
>>
>> Thanks for the answer!
>>
>> So given the multi-threaded readers, now we have as
>>
>> open
>>
>> questions:
>>
>> 1) How do we let the checkpoints pass through our
>>
>> multi-threaded
>>
>> reader
>>
>> operator?
>>
>> 2) Do we have separate reader and source operators or
>>
>> not? In
>>
>> the
>>
>> strategy
>>
>> that has a separate source, the source operator has a
>>
>> parallelism of
>>
>> 1
>>
>> and
>>
>> is responsible for split recovery only.
>>
>> For the first one, given also the constraints
>>
>> (blocking,
>>
>> finite
>>
>> queues,
>>
>> etc), I do not have an answer yet.
>>
>> For the 2nd, I think that we should go with separate
>>
>> operators
>>
>> for
>>
>> the
>>
>> source and the readers, for the following reasons:
>>
>> 1) This is more aligned with a potential future
>>
>> improvement
>>
>> where the
>>
>> split
>>
>> discovery becomes a responsibility of the JobManager
>>
>> and
>>
>> readers are
>>
>> pooling more work from the JM.
>>
>> 2) The source is going to be the "single point of
>>
>> truth".
>>
>> It
>>
>> will
>>
>> know
>>
>> what
>>
>> has been processed and what not. If the source and the
>>
>> readers
>>
>> are a
>>
>> single
>>
>> operator with parallelism > 1, or in general, if the
>>
>> split
>>
>> discovery
>>
>> is
>>
>> done by each task individually, then:
>>   i) we have to have a deterministic scheme for each
>>
>> reader to
>>
>> assign
>>
>> splits to itself (e.g. mod subtaskId). This is not
>>
>> necessarily
>>
>> trivial
>>
>> for
>>
>> all sources.
>>   ii) each reader would have to keep a copy of all its
>>
>> processed
>>
>> slpits
>>
>>   iii) the state has to be a union state with a
>>
>> non-trivial
>>
>> merging
>>
>> logic
>>
>> in order to support rescaling.
>>
>> Two additional points that you raised above:
>>
>> i) The point that you raised that we need to keep all
>>
>> splits
>>
>> (processed
>>
>> and
>>
>> not-processed) I think is a bit of a strong
>>
>> requirement.
>>
>> This
>>
>> would
>>
>> imply
>>
>> that for infinite sources the state will grow
>>
>> indefinitely.
>>
>> This is
>>
>> problem
>>
>> is even more pronounced if we do not have a single
>>
>> source
>>
>> that
>>
>> assigns
>>
>> splits to readers, as each reader will have its own
>>
>> copy
>>
>> of
>>
>> the
>>
>> state.
>>
>> ii) it is true that for finite sources we need to
>>
>> somehow
>>
>> not
>>
>> close
>>
>> the
>>
>> readers when the source/split discoverer finishes. The
>> ContinuousFileReaderOperator has a work-around for
>>
>> that.
>>
>> It is
>>
>> not
>>
>> elegant,
>>
>> and checkpoints are not emitted after closing the
>>
>> source,
>>
>> but
>>
>> this, I
>>
>> believe, is a bigger problem which requires more
>>
>> changes
>>
>> than
>>
>> just
>>
>> refactoring the source interface.
>>
>> Cheers,
>> Kostas
>>
>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to