Hi Becket,

I think Dawid explained things clearly and makes a lot of sense.
I'm also in favor of #2, because #1 doesn't work for our future unified
envrionment.

You can see the vision in this documentation [1]. In the future, we would
like to
drop the global streaming/batch mode in SQL (i.e.
EnvironmentSettings#inStreamingMode/inBatchMode).
A source is bounded or unbounded once defined, so queries can be inferred
from source to run
in streaming or batch or hybrid mode. However, in #1, we will lose this
ability because the framework
doesn't know whether the source is bounded or unbounded.

Best,
Jark


[1]:
https://docs.google.com/document/d/1yrKXEIRATfxHJJ0K3t6wUgXAtZq8D-XgvEnvl2uUcr0/edit#heading=h.v4ib17buma1p

On Wed, 11 Dec 2019 at 20:52, Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> Regarding the:
>
> Collection<E> getNextRecords()
>
> I’m pretty sure such design would unfortunately impact the performance
> (accessing and potentially creating the collection on the hot path).
>
> Also the
>
> InputStatus emitNext(DataOutput<T> output) throws Exception;
> or
> Status pollNext(SourceOutput<T> sourceOutput) throws Exception;
>
> Gives us some opportunities in the future, to allow Source hot looping
> inside, until it receives some signal “please exit because of some reasons”
> (output collector could return such hint upon collecting the result). But
> that’s another topic outside of this FLIP’s scope.
>
> Piotrek
>
> > On 11 Dec 2019, at 10:41, Till Rohrmann <trohrm...@apache.org> wrote:
> >
> > Hi Becket,
> >
> > quick clarification from my side because I think you misunderstood my
> > question. I did not suggest to let the SourceReader return only a single
> > record at a time when calling getNextRecords. As the return type
> indicates,
> > the method can return an arbitrary number of records.
> >
> > Cheers,
> > Till
> >
> > On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz <
> dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
> > wrote:
> >
> >> Hi Becket,
> >>
> >> Issue #1 - Design of Source interface
> >>
> >> I mentioned the lack of a method like
> Source#createEnumerator(Boundedness
> >> boundedness, SplitEnumeratorContext context), because without the
> current
> >> proposal is not complete/does not work.
> >>
> >> If we say that boundedness is an intrinsic property of a source imo we
> >> don't need the Source#createEnumerator(Boundedness boundedness,
> >> SplitEnumeratorContext context) method.
> >>
> >> Assuming a source from my previous example:
> >>
> >> Source source = KafkaSource.builder()
> >>  ...
> >>  .untilTimestamp(...)
> >>  .build()
> >>
> >> Would the enumerator differ if created like
> >> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
> >> .createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this
> is
> >> the part that my opinion differ the most from the current proposal. I
> >> really think it should always be the source that tells if it is bounded
> or
> >> not. In the current proposal methods continousSource/boundedSource
> somewhat
> >> reconfigure the source, which I think is misleading.
> >>
> >> I think a call like:
> >>
> >> Source source = KafkaSource.builder()
> >>  ...
> >>  .readContinously() / readUntilLatestOffset() / readUntilTimestamp /
> readUntilOffsets / ...
> >>  .build()
> >>
> >> is way cleaner (and expressive) than
> >>
> >> Source source = KafkaSource.builder()
> >>  ...
> >>  .build()
> >>
> >>
> >> env.continousSource(source) // which actually underneath would call
> createEnumerator(CONTINUOUS, ctx) which would be equivalent to
> source.readContinously().createEnumerator(ctx)
> >> // or
> >> env.boundedSource(source) // which actually underneath would call
> createEnumerator(BOUNDED, ctx) which would be equivalent to
> source.readUntilLatestOffset().createEnumerator(ctx)
> >>
> >>
> >> Sorry for the comparison, but to me it seems there is too much magic
> >> happening underneath those two calls.
> >>
> >> I really believe the Source interface should have getBoundedness method
> >> instead of (supportBoundedness) + createEnumerator(Boundedness, ...)
> >>
> >>
> >> Issue #2 - Design of
> >> ExecutionEnvironment#source()/continuousSource()/boundedSource()
> >>
> >> As you might have guessed I am slightly in favor of option #2 modified.
> >> Yes I am aware every step of the dag would have to be able to say if it
> is
> >> bounded or not. I have a feeling it would be easier to express cross
> >> bounded/unbounded operations, but I must admit I have not thought it
> >> through thoroughly, In the spirit of batch is just a special case of
> >> streaming I thought BoundedStream would extend from DataStream. Correct
> me
> >> if I am wrong. In such a setup the cross bounded/unbounded operation
> could
> >> be expressed quite easily I think:
> >>
> >> DataStream {
> >>  DataStream join(DataStream, ...); // we could not really tell if the
> result is bounded or not, but because bounded stream is a special case of
> unbounded the API object is correct, irrespective if the left or right side
> of the join is bounded
> >> }
> >>
> >> BoundedStream extends DataStream {
> >>  BoundedStream join(BoundedStream, ...); // only if both sides are
> bounded the result can be bounded as well. However we do have access to the
> DataStream#join here, so you can still join with a DataStream
> >> }
> >>
> >>
> >> On the other hand I also see benefits of two completely disjointed APIs,
> >> as we could prohibit some streaming calls in the bounded API. I can't
> think
> >> of any unbounded operators that could not be implemented for bounded
> stream.
> >>
> >> Besides I think we both agree we don't like the method:
> >>
> >> DataStream boundedStream(Source)
> >>
> >> suggested in the current state of the FLIP. Do we ? :)
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 10/12/2019 18:57, Becket Qin wrote:
> >>
> >> Hi folks,
> >>
> >> Thanks for the discussion, great feedback. Also thanks Dawid for the
> >> explanation, it is much clearer now.
> >>
> >> One thing that is indeed missing from the FLIP is how the boundedness is
> >> passed to the Source implementation. So the API should be
> >> Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext
> >> context)
> >> And we can probably remove the Source#supportBoundedness(Boundedness
> >> boundedness) method.
> >>
> >> Assuming we have that, we are essentially choosing from one of the
> >> following two options:
> >>
> >> Option 1:
> >> // The source is continuous source, and only unbounded operations can be
> >> performed.
> >> DataStream<Type> datastream = env.continuousSource(someSource);
> >>
> >> // The source is bounded source, both bounded and unbounded operations
> can
> >> be performed.
> >> BoundedDataStream<Type> boundedDataStream =
> env.boundedSource(someSource);
> >>
> >>  - Pros:
> >>       a) explicit boundary between bounded / unbounded streams, it is
> >> quite simple and clear to the users.
> >>  - Cons:
> >>       a) For applications that do not involve bounded operations, they
> >> still have to call different API to distinguish bounded / unbounded
> streams.
> >>       b) No support for bounded stream to run in a streaming runtime
> >> setting, i.e. scheduling and operators behaviors.
> >>
> >>
> >> Option 2:
> >> // The source is either bounded or unbounded, but only unbounded
> operations
> >> could be performed on the returned DataStream.
> >> DataStream<Type> dataStream = env.source(someSource);
> >>
> >> // The source must be a bounded source, otherwise exception is thrown.
> >> BoundedDataStream<Type> boundedDataStream =
> >> env.boundedSource(boundedSource);
> >>
> >> The pros and cons are exactly the opposite of option 1.
> >>  - Pros:
> >>       a) For applications that do not involve bounded operations, they
> >> still have to call different API to distinguish bounded / unbounded
> streams.
> >>       b) Support for bounded stream to run in a streaming runtime
> setting,
> >> i.e. scheduling and operators behaviors.
> >>  - Cons:
> >>       a) Bounded / unbounded streams are kind of mixed, i.e. given a
> >> DataStream, it is not clear whether it is bounded or not, unless you
> have
> >> the access to its source.
> >>
> >>
> >> If we only think from the Source API perspective, option 2 seems a
> better
> >> choice because functionality wise it is a superset of option 1, at the
> cost
> >> of some seemingly acceptable ambiguity in the DataStream API.
> >> But if we look at the DataStream API as a whole, option 1 seems a
> clearer
> >> choice. For example, some times a library may have to know whether a
> >> certain task will finish or not. And it would be difficult to tell if
> the
> >> input is a DataStream, unless additional information is provided all the
> >> way from the Source. One possible solution is to have a *modified
> option 2*
> >> which adds a method to the DataStream API to indicate boundedness, such
> as
> >> getBoundedness(). It would solve the problem with a potential confusion
> of
> >> what is difference between a DataStream with getBoundedness()=true and a
> >> BoundedDataStream. But that seems not super difficult to explain.
> >>
> >> So from API's perspective, I don't have a strong opinion between
> *option 1*
> >> and *modified option 2. *I like the cleanness of option 1, but modified
> >> option 2 would be more attractive if we have concrete use case for the
> >> "Bounded stream with unbounded streaming runtime settings".
> >>
> >> Re: Till
> >>
> >>
> >> Maybe this has already been asked before but I was wondering why the
> >> SourceReader interface has the method pollNext which hands the
> >> responsibility of outputting elements to the SourceReader
> implementation?
> >> Has this been done for backwards compatibility reasons with the old
> source
> >> interface? If not, then one could define a Collection<E>
> getNextRecords()
> >> method which returns the currently retrieved records and then the caller
> >> emits them outside of the SourceReader. That way the interface would not
> >> allow to implement an outputting loop where we never hand back control
> to
> >> the caller. At the moment, this contract can be easily broken and is
> only
> >> mentioned loosely in the JavaDocs.
> >>
> >>
> >> The primary reason we handover the SourceOutput to the SourceReader is
> >> because sometimes it is difficult for a SourceReader to emit one record
> at
> >> a time. One example is some batched messaging systems which only have an
> >> offset for the entire batch instead of individual messages in the
> batch. In
> >> that case, returning one record at a time would leave the SourceReader
> in
> >> an uncheckpointable state because they can only checkpoint at the batch
> >> boundaries.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <trohrm...@apache.org
> <mailto:trohrm...@apache.org>> <trohrm...@apache.org <mailto:
> trohrm...@apache.org>> wrote:
> >>
> >>
> >> Hi everyone,
> >>
> >> thanks for drafting this FLIP. It reads very well.
> >>
> >> Concerning Dawid's proposal, I tend to agree. The boundedness could come
> >> from the source and tell the system how to treat the operator
> (scheduling
> >> wise). From a user's perspective it should be fine to get back a
> DataStream
> >> when calling env.source(boundedSource) if he does not need special
> >> operations defined on a BoundedDataStream. If he needs this, then one
> could
> >> use the method BoundedDataStream env.boundedSource(boundedSource).
> >>
> >> If possible, we could enforce the proper usage of env.boundedSource() by
> >> introducing a BoundedSource type so that one cannot pass an
> >> unbounded source to it. That way users would not be able to shoot
> >> themselves in the foot.
> >>
> >> Maybe this has already been asked before but I was wondering why the
> >> SourceReader interface has the method pollNext which hands the
> >> responsibility of outputting elements to the SourceReader
> implementation?
> >> Has this been done for backwards compatibility reasons with the old
> source
> >> interface? If not, then one could define a Collection<E>
> getNextRecords()
> >> method which returns the currently retrieved records and then the caller
> >> emits them outside of the SourceReader. That way the interface would not
> >> allow to implement an outputting loop where we never hand back control
> to
> >> the caller. At the moment, this contract can be easily broken and is
> only
> >> mentioned loosely in the JavaDocs.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <jingsongl...@gmail.com
> <mailto:jingsongl...@gmail.com>> <jingsongl...@gmail.com <mailto:
> jingsongl...@gmail.com>>
> >> wrote:
> >>
> >>
> >> Hi all,
> >>
> >> I think current design is good.
> >>
> >> My understanding is:
> >>
> >> For execution mode: bounded mode and continuous mode, It's totally
> >> different. I don't think we have the ability to integrate the two models
> >>
> >> at
> >>
> >> present. It's about scheduling, memory, algorithms, States, etc. we
> >> shouldn't confuse them.
> >>
> >> For source capabilities: only bounded, only continuous, both bounded and
> >> continuous.
> >> I think Kafka is a source that can be ran both bounded
> >> and continuous execution mode.
> >> And Kafka with end offset should be ran both bounded
> >> and continuous execution mode.  Using apache Beam with Flink runner, I
> >>
> >> used
> >>
> >> to run a "bounded" Kafka in streaming mode. For our previous DataStream,
> >>
> >> it
> >>
> >> is not necessarily required that the source cannot be bounded.
> >>
> >> So it is my thought for Dawid's question:
> >> 1.pass a bounded source to continuousSource() +1
> >> 2.pass a continuous source to boundedSource() -1, should throw
> exception.
> >>
> >> In StreamExecutionEnvironment, continuousSource and boundedSource define
> >> the execution mode. It defines a clear boundary of execution mode.
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com <mailto:
> imj...@gmail.com>> <imj...@gmail.com <mailto:imj...@gmail.com>> wrote:
> >>
> >>
> >> I agree with Dawid's point that the boundedness information should come
> >> from the source itself (e.g. the end timestamp), not through
> >> env.boundedSouce()/continuousSource().
> >> I think if we want to support something like `env.source()` that derive
> >>
> >> the
> >>
> >> execution mode from source, `supportsBoundedness(Boundedness)`
> >> method is not enough, because we don't know whether it is bounded or
> >>
> >> not.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org
> <mailto:dwysakow...@apache.org>> <dwysakow...@apache.org <mailto:
> dwysakow...@apache.org>>
> >> wrote:
> >>
> >>
> >> One more thing. In the current proposal, with the
> >> supportsBoundedness(Boundedness) method and the boundedness coming
> >>
> >> from
> >>
> >> either continuousSource or boundedSource I could not find how this
> >> information is fed back to the SplitEnumerator.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 09/12/2019 13:52, Becket Qin wrote:
> >>
> >> Hi Dawid,
> >>
> >> Thanks for the comments. This actually brings another relevant
> >>
> >> question
> >>
> >> about what does a "bounded source" imply. I actually had the same
> >> impression when I look at the Source API. Here is what I understand
> >>
> >> after
> >>
> >> some discussion with Stephan. The bounded source has the following
> >>
> >> impacts.
> >>
> >> 1. API validity.
> >> - A bounded source generates a bounded stream so some operations
> >>
> >> that
> >>
> >> only
> >>
> >> works for bounded records would be performed, e.g. sort.
> >> - To expose these bounded stream only APIs, there are two options:
> >>     a. Add them to the DataStream API and throw exception if a
> >>
> >> method
> >>
> >> is
> >>
> >> called on an unbounded stream.
> >>     b. Create a BoundedDataStream class which is returned from
> >> env.boundedSource(), while DataStream is returned from
> >>
> >> env.continousSource().
> >>
> >> Note that this cannot be done by having single
> >>
> >> env.source(theSource)
> >>
> >> even
> >>
> >> the Source has a getBoundedness() method.
> >>
> >> 2. Scheduling
> >> - A bounded source could be computed stage by stage without
> >>
> >> bringing
> >>
> >> up
> >>
> >> all
> >>
> >> the tasks at the same time.
> >>
> >> 3. Operator behaviors
> >> - A bounded source indicates the records are finite so some
> >>
> >> operators
> >>
> >> can
> >>
> >> wait until it receives all the records before it starts the
> >>
> >> processing.
> >>
> >> In the above impact, only 1 is relevant to the API design. And the
> >>
> >> current
> >>
> >> proposal in FLIP-27 is following 1.b.
> >>
> >> // boundedness depends of source property, imo this should always
> >>
> >> be
> >>
> >> preferred
> >>
> >>
> >> DataStream<MyType> stream = env.source(theSource);
> >>
> >>
> >> In your proposal, does DataStream have bounded stream only methods?
> >>
> >> It
> >>
> >> looks it should have, otherwise passing a bounded Source to
> >>
> >> env.source()
> >>
> >> would be confusing. In that case, we will essentially do 1.a if an
> >> unbounded Source is created from env.source(unboundedSource).
> >>
> >> If we have the methods only supported for bounded streams in
> >>
> >> DataStream,
> >>
> >> it
> >>
> >> seems a little weird to have a separate BoundedDataStream
> >>
> >> interface.
> >>
> >> Am I understand it correctly?
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >>
> >> On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <
> >>
> >> dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
> >>
> >> wrote:
> >>
> >>
> >> Hi all,
> >>
> >> Really well written proposal and very important one. I must admit
> >>
> >> I
> >>
> >> have
> >>
> >> not understood all the intricacies of it yet.
> >>
> >> One question I have though is about where does the information
> >>
> >> about
> >>
> >> boundedness come from. I think in most cases it is a property of
> >>
> >> the
> >>
> >> source. As you described it might be e.g. end offset, a flag
> >>
> >> should
> >>
> >> it
> >>
> >> monitor new splits etc. I think it would be a really nice use case
> >>
> >> to
> >>
> >> be
> >>
> >> able to say:
> >>
> >> new KafkaSource().readUntil(long timestamp),
> >>
> >> which could work as an "end offset". Moreover I think all Bounded
> >>
> >> sources
> >>
> >> support continuous mode, but no intrinsically continuous source
> >>
> >> support
> >>
> >> the
> >>
> >> Bounded mode. If I understood the proposal correctly it suggest
> >>
> >> the
> >>
> >> boundedness sort of "comes" from the outside of the source, from
> >>
> >> the
> >>
> >> invokation of either boundedStream or continousSource.
> >>
> >> I am wondering if it would make sense to actually change the
> >>
> >> method
> >>
> >> boolean Source#supportsBoundedness(Boundedness)
> >>
> >> to
> >>
> >> Boundedness Source#getBoundedness().
> >>
> >> As for the methods #boundedSource, #continousSource, assuming the
> >> boundedness is property of the source they do not affect how the
> >>
> >> enumerator
> >>
> >> works, but mostly how the dag is scheduled, right? I am not
> >>
> >> against
> >>
> >> those
> >>
> >> methods, but I think it is a very specific use case to actually
> >>
> >> override
> >>
> >> the property of the source. In general I would expect users to
> >>
> >> only
> >>
> >> call
> >>
> >> env.source(theSource), where the source tells if it is bounded or
> >>
> >> not. I
> >>
> >> would suggest considering following set of methods:
> >>
> >> // boundedness depends of source property, imo this should always
> >>
> >> be
> >>
> >> preferred
> >>
> >> DataStream<MyType> stream = env.source(theSource);
> >>
> >>
> >> // always continous execution, whether bounded or unbounded source
> >>
> >> DataStream<MyType> boundedStream = env.continousSource(theSource);
> >>
> >> // imo this would make sense if the BoundedDataStream provides
> >>
> >> additional features unavailable for continous mode
> >>
> >> BoundedDataStream<MyType> batch = env.boundedSource(theSource);
> >>
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >>
> >> On 04/12/2019 11:25, Stephan Ewen wrote:
> >>
> >> Thanks, Becket, for updating this.
> >>
> >> I agree with moving the aspects you mentioned into separate FLIPs
> >>
> >> -
> >>
> >> this
> >>
> >> one way becoming unwieldy in size.
> >>
> >> +1 to the FLIP in its current state. Its a very detailed write-up,
> >>
> >> nicely
> >>
> >> done!
> >>
> >> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com
> <mailto:becket....@gmail.com>> <becket....@gmail.com <mailto:
> becket....@gmail.com>>
> >>
> >> <
> >>
> >> becket....@gmail.com <mailto:becket....@gmail.com>> wrote:
> >>
> >> Hi all,
> >>
> >> Sorry for the long belated update. I have updated FLIP-27 wiki
> >>
> >> page
> >>
> >> with
> >>
> >> the latest proposals. Some noticeable changes include:
> >> 1. A new generic communication mechanism between SplitEnumerator
> >>
> >> and
> >>
> >> SourceReader.
> >> 2. Some detail API method signature changes.
> >>
> >> We left a few things out of this FLIP and will address them in
> >>
> >> separate
> >>
> >> FLIPs. Including:
> >> 1. Per split event time.
> >> 2. Event time alignment.
> >> 3. Fine grained failover for SplitEnumerator failure.
> >>
> >> Please let us know if you have any question.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org <mailto:
> se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>> <
> >>
> >> se...@apache.org <mailto:se...@apache.org>> wrote:
> >>
> >> Hi  Łukasz!
> >>
> >> Becket and me are working hard on figuring out the last details
> >>
> >> and
> >>
> >> implementing the first PoC. We would update the FLIP hopefully
> >>
> >> next
> >>
> >> week.
> >>
> >> There is a fair chance that a first version of this will be in
> >>
> >> 1.10,
> >>
> >> but
> >>
> >> I
> >>
> >> think it will take another release to battle test it and migrate
> >>
> >> the
> >>
> >> connectors.
> >>
> >> Best,
> >> Stephan
> >>
> >>
> >>
> >>
> >> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl
> <mailto:l...@touk.pl>
> >>
> >> <
> >>
> >> l...@touk.pl <mailto:l...@touk.pl>>
> >>
> >> wrote:
> >>
> >> Hi,
> >>
> >> This proposal looks very promising for us. Do you have any plans
> >>
> >> in
> >>
> >> which
> >>
> >> Flink release it is going to be released? We are thinking on
> >>
> >> using a
> >>
> >> Data
> >>
> >> Set API for our future use cases but on the other hand Data Set
> >>
> >> API
> >>
> >> is
> >>
> >> going to be deprecated so using proposed bounded data streams
> >>
> >> solution
> >>
> >> could be more viable in the long term.
> >>
> >> Thanks,
> >> Łukasz
> >>
> >> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com <mailto:
> thomas.we...@gmail.com>> <thomas.we...@gmail.com <mailto:
> thomas.we...@gmail.com>> <
> >>
> >> thomas.we...@gmail.com <mailto:thomas.we...@gmail.com>> wrote:
> >>
> >> Thanks for putting together this proposal!
> >>
> >> I see that the "Per Split Event Time" and "Event Time Alignment"
> >>
> >> sections
> >>
> >> are still TBD.
> >>
> >> It would probably be good to flesh those out a bit before
> >>
> >> proceeding
> >>
> >> too
> >>
> >> far
> >>
> >> as the event time alignment will probably influence the
> >>
> >> interaction
> >>
> >> with
> >>
> >> the split reader, specifically ReaderStatus
> >>
> >> emitNext(SourceOutput<E>
> >>
> >> output).
> >>
> >> We currently have only one implementation for event time alignment
> >>
> >> in
> >>
> >> the
> >>
> >> Kinesis consumer. The synchronization in that case takes place as
> >>
> >> the
> >>
> >> last
> >>
> >> step before records are emitted downstream (RecordEmitter). With
> >>
> >> the
> >>
> >> currently proposed interfaces, the equivalent can be implemented
> >>
> >> in
> >>
> >> the
> >>
> >> reader loop, although note that in the Kinesis consumer the per
> >>
> >> shard
> >>
> >> threads push records.
> >>
> >> Synchronization has not been implemented for the Kafka consumer
> >>
> >> yet.
> >>
> >> https://issues.apache.org/jira/browse/FLINK-12675 <
> https://issues.apache.org/jira/browse/FLINK-12675>
> >>
> >> When I looked at it, I realized that the implementation will look
> >>
> >> quite
> >>
> >> different
> >> from Kinesis because it needs to take place in the pull part,
> >>
> >> where
> >>
> >> records
> >>
> >> are taken from the Kafka client. Due to the multiplexing it cannot
> >>
> >> be
> >>
> >> done
> >>
> >> by blocking the split thread like it currently works for Kinesis.
> >>
> >> Reading
> >>
> >> from individual Kafka partitions needs to be controlled via
> >>
> >> pause/resume
> >>
> >> on the Kafka client.
> >>
> >> To take on that responsibility the split thread would need to be
> >>
> >> aware
> >>
> >> of
> >>
> >> the
> >> watermarks or at least whether it should or should not continue to
> >>
> >> consume
> >>
> >> a given split and this may require a different SourceReader or
> >>
> >> SourceOutput
> >>
> >> interface.
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com <mailto:
> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> <
> >>
> >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> wrote:
> >>
> >> Hi Stephan,
> >>
> >> Thank you for feedback!
> >> Will take a look at your branch before public discussing.
> >>
> >>
> >> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org
> <mailto:se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>>
> >>
> >> <
> >>
> >> se...@apache.org <mailto:se...@apache.org>>
> >>
> >> wrote:
> >>
> >> Hi Biao!
> >>
> >> Thanks for reviving this. I would like to join this discussion,
> >>
> >> but
> >>
> >> am
> >>
> >> quite occupied with the 1.9 release, so can we maybe pause this
> >>
> >> discussion
> >>
> >> for a week or so?
> >>
> >> In the meantime I can share some suggestion based on prior
> >>
> >> experiments:
> >>
> >> How to do watermarks / timestamp extractors in a simpler and more
> >>
> >> flexible
> >>
> >> way. I think that part is quite promising should be part of the
> >>
> >> new
> >>
> >> source
> >>
> >> interface.
> >>
> >>
> >>
> >>
> >>
> >>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> <
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> >
> >>
> >>
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> <
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> >
> >>
> >> Some experiments on how to build the source reader and its
> >>
> >> library
> >>
> >> for
> >>
> >> common threading/split patterns:
> >>
> >>
> >>
> >>
> >>
> >>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> <
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> >
> >>
> >> Best,
> >> Stephan
> >>
> >>
> >> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com <mailto:
> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> <
> >>
> >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> >>
> >> wrote:
> >>
> >> Hi devs,
> >>
> >> Since 1.9 is nearly released, I think we could get back to
> >>
> >> FLIP-27.
> >>
> >> I
> >>
> >> believe it should be included in 1.10.
> >>
> >> There are so many things mentioned in document of FLIP-27. [1] I
> >>
> >> think
> >>
> >> we'd better discuss them separately. However the wiki is not a
> >>
> >> good
> >>
> >> place
> >>
> >> to discuss. I wrote google doc about SplitReader API which
> >>
> >> misses
> >>
> >> some
> >>
> >> details in the document. [2]
> >>
> >> 1.
> >>
> >>
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> >
> >>
> >> 2.
> >>
> >>
> >>
> >>
> >>
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> <
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> >
> >>
> >> CC Stephan, Aljoscha, Piotrek, Becket
> >>
> >>
> >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com <mailto:
> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> <
> >>
> >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> >>
> >> wrote:
> >>
> >> Hi Steven,
> >> Thank you for the feedback. Please take a look at the document
> >>
> >> FLIP-27
> >>
> >> <
> >>
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >
> >>
> >> which
> >>
> >> is updated recently. A lot of details of enumerator were added
> >>
> >> in
> >>
> >> this
> >>
> >> document. I think it would help.
> >>
> >> Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com>> <
> stevenz...@gmail.com <mailto:stevenz...@gmail.com>> <stevenz...@gmail.com
> <mailto:stevenz...@gmail.com>> <stevenz...@gmail.com <mailto:
> stevenz...@gmail.com>>
> >>
> >> 于2019年3月28日周四
> >>
> >> 下午12:52写道:
> >>
> >> This proposal mentioned that SplitEnumerator might run on the
> >> JobManager or
> >> in a single task on a TaskManager.
> >>
> >> if enumerator is a single task on a taskmanager, then the job
> >>
> >> DAG
> >>
> >> can
> >>
> >> never
> >> been embarrassingly parallel anymore. That will nullify the
> >>
> >> leverage
> >>
> >> of
> >>
> >> fine-grained recovery for embarrassingly parallel jobs.
> >>
> >> It's not clear to me what's the implication of running
> >>
> >> enumerator
> >>
> >> on
> >>
> >> the
> >>
> >> jobmanager. So I will leave that out for now.
> >>
> >> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com <mailto:
> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> <
> >>
> >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> >>
> >> wrote:
> >>
> >> Hi Stephan & Piotrek,
> >>
> >> Thank you for feedback.
> >>
> >> It seems that there are a lot of things to do in community.
> >>
> >> I
> >>
> >> am
> >>
> >> just
> >>
> >> afraid that this discussion may be forgotten since there so
> >>
> >> many
> >>
> >> proposals
> >>
> >> recently.
> >> Anyway, wish to see the split topics soon :)
> >>
> >> Piotr Nowojski <pi...@da-platform.com <mailto:pi...@da-platform.com>> <
> pi...@da-platform.com <mailto:pi...@da-platform.com>> <
> pi...@da-platform.com <mailto:pi...@da-platform.com>> <
> pi...@da-platform.com <mailto:pi...@da-platform.com>>
> >>
> >> 于2019年1月24日周四
> >>
> >> 下午8:21写道:
> >>
> >> Hi Biao!
> >>
> >> This discussion was stalled because of preparations for
> >>
> >> the
> >>
> >> open
> >>
> >> sourcing
> >>
> >> & merging Blink. I think before creating the tickets we
> >>
> >> should
> >>
> >> split this
> >>
> >> discussion into topics/areas outlined by Stephan and
> >>
> >> create
> >>
> >> Flips
> >>
> >> for
> >>
> >> that.
> >>
> >> I think there is no chance for this to be completed in
> >>
> >> couple
> >>
> >> of
> >>
> >> remaining
> >>
> >> weeks/1 month before 1.8 feature freeze, however it would
> >>
> >> be
> >>
> >> good
> >>
> >> to aim
> >>
> >> with those changes for 1.9.
> >>
> >> Piotrek
> >>
> >>
> >> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com <mailto:
> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> <
> >>
> >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> >>
> >> wrote:
> >>
> >> Hi community,
> >> The summary of Stephan makes a lot sense to me. It is
> >>
> >> much
> >>
> >> clearer
> >>
> >> indeed
> >>
> >> after splitting the complex topic into small ones.
> >> I was wondering is there any detail plan for next step?
> >>
> >> If
> >>
> >> not,
> >>
> >> I
> >>
> >> would
> >>
> >> like to push this thing forward by creating some JIRA
> >>
> >> issues.
> >>
> >> Another question is that should version 1.8 include
> >>
> >> these
> >>
> >> features?
> >>
> >> Stephan Ewen <se...@apache.org <mailto:se...@apache.org>> <
> se...@apache.org <mailto:se...@apache.org>> <se...@apache.org <mailto:
> se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>>
> 于2018年12月1日周六
> >>
> >> 上午4:20写道:
> >>
> >> Thanks everyone for the lively discussion. Let me try
> >>
> >> to
> >>
> >> summarize
> >>
> >> where I
> >>
> >> see convergence in the discussion and open issues.
> >> I'll try to group this by design aspect of the source.
> >>
> >> Please
> >>
> >> let me
> >>
> >> know
> >>
> >> if I got things wrong or missed something crucial here.
> >>
> >> For issues 1-3, if the below reflects the state of the
> >>
> >> discussion, I
> >>
> >> would
> >>
> >> try and update the FLIP in the next days.
> >> For the remaining ones we need more discussion.
> >>
> >> I would suggest to fork each of these aspects into a
> >>
> >> separate
> >>
> >> mail
> >>
> >> thread,
> >>
> >> or will loose sight of the individual aspects.
> >>
> >> *(1) Separation of Split Enumerator and Split Reader*
> >>
> >> - All seem to agree this is a good thing
> >> - Split Enumerator could in the end live on JobManager
> >>
> >> (and
> >>
> >> assign
> >>
> >> splits
> >>
> >> via RPC) or in a task (and assign splits via data
> >>
> >> streams)
> >>
> >> - this discussion is orthogonal and should come later,
> >>
> >> when
> >>
> >> the
> >>
> >> interface
> >>
> >> is agreed upon.
> >>
> >> *(2) Split Readers for one or more splits*
> >>
> >> - Discussion seems to agree that we need to support
> >>
> >> one
> >>
> >> reader
> >>
> >> that
> >>
> >> possibly handles multiple splits concurrently.
> >> - The requirement comes from sources where one
> >>
> >> poll()-style
> >>
> >> call
> >>
> >> fetches
> >>
> >> data from different splits / partitions
> >>   --> example sources that require that would be for
> >>
> >> example
> >>
> >> Kafka,
> >>
> >> Pravega, Pulsar
> >>
> >> - Could have one split reader per source, or multiple
> >>
> >> split
> >>
> >> readers
> >>
> >> that
> >>
> >> share the "poll()" function
> >> - To not make it too complicated, we can start with
> >>
> >> thinking
> >>
> >> about
> >>
> >> one
> >>
> >> split reader for all splits initially and see if that
> >>
> >> covers
> >>
> >> all
> >>
> >> requirements
> >>
> >> *(3) Threading model of the Split Reader*
> >>
> >> - Most active part of the discussion ;-)
> >>
> >> - A non-blocking way for Flink's task code to interact
> >>
> >> with
> >>
> >> the
> >>
> >> source
> >>
> >> is
> >>
> >> needed in order to a task runtime code based on a
> >> single-threaded/actor-style task design
> >>   --> I personally am a big proponent of that, it will
> >>
> >> help
> >>
> >> with
> >>
> >> well-behaved checkpoints, efficiency, and simpler yet
> >>
> >> more
> >>
> >> robust
> >>
> >> runtime
> >>
> >> code
> >>
> >> - Users care about simple abstraction, so as a
> >>
> >> subclass
> >>
> >> of
> >>
> >> SplitReader
> >>
> >> (non-blocking / async) we need to have a
> >>
> >> BlockingSplitReader
> >>
> >> which
> >>
> >> will
> >>
> >> form the basis of most source implementations.
> >>
> >> BlockingSplitReader
> >>
> >> lets
> >>
> >> users do blocking simple poll() calls.
> >> - The BlockingSplitReader would spawn a thread (or
> >>
> >> more)
> >>
> >> and
> >>
> >> the
> >>
> >> thread(s) can make blocking calls and hand over data
> >>
> >> buffers
> >>
> >> via
> >>
> >> a
> >>
> >> blocking
> >>
> >> queue
> >> - This should allow us to cover both, a fully async
> >>
> >> runtime,
> >>
> >> and a
> >>
> >> simple
> >>
> >> blocking interface for users.
> >> - This is actually very similar to how the Kafka
> >>
> >> connectors
> >>
> >> work.
> >>
> >> Kafka
> >>
> >> 9+ with one thread, Kafka 8 with multiple threads
> >>
> >> - On the base SplitReader (the async one), the
> >>
> >> non-blocking
> >>
> >> method
> >>
> >> that
> >>
> >> gets the next chunk of data would signal data
> >>
> >> availability
> >>
> >> via
> >>
> >> a
> >>
> >> CompletableFuture, because that gives the best
> >>
> >> flexibility
> >>
> >> (can
> >>
> >> await
> >>
> >> completion or register notification handlers).
> >> - The source task would register a "thenHandle()" (or
> >>
> >> similar)
> >>
> >> on the
> >>
> >> future to put a "take next data" task into the
> >>
> >> actor-style
> >>
> >> mailbox
> >>
> >> *(4) Split Enumeration and Assignment*
> >>
> >> - Splits may be generated lazily, both in cases where
> >>
> >> there
> >>
> >> is a
> >>
> >> limited
> >>
> >> number of splits (but very many), or splits are
> >>
> >> discovered
> >>
> >> over
> >>
> >> time
> >>
> >> - Assignment should also be lazy, to get better load
> >>
> >> balancing
> >>
> >> - Assignment needs support locality preferences
> >>
> >> - Possible design based on discussion so far:
> >>
> >>   --> SplitReader has a method "addSplits(SplitT...)"
> >>
> >> to
> >>
> >> add
> >>
> >> one or
> >>
> >> more
> >>
> >> splits. Some split readers might assume they have only
> >>
> >> one
> >>
> >> split
> >>
> >> ever,
> >>
> >> concurrently, others assume multiple splits. (Note:
> >>
> >> idea
> >>
> >> behind
> >>
> >> being
> >>
> >> able
> >>
> >> to add multiple splits at the same time is to ease
> >>
> >> startup
> >>
> >> where
> >>
> >> multiple
> >>
> >> splits may be assigned instantly.)
> >>   --> SplitReader has a context object on which it can
> >>
> >> call
> >>
> >> indicate
> >>
> >> when
> >>
> >> splits are completed. The enumerator gets that
> >>
> >> notification and
> >>
> >> can
> >>
> >> use
> >>
> >> to
> >>
> >> decide when to assign new splits. This should help both
> >>
> >> in
> >>
> >> cases
> >>
> >> of
> >>
> >> sources
> >>
> >> that take splits lazily (file readers) and in case the
> >>
> >> source
> >>
> >> needs to
> >>
> >> preserve a partial order between splits (Kinesis,
> >>
> >> Pravega,
> >>
> >> Pulsar may
> >>
> >> need
> >>
> >> that).
> >>   --> SplitEnumerator gets notification when
> >>
> >> SplitReaders
> >>
> >> start
> >>
> >> and
> >>
> >> when
> >>
> >> they finish splits. They can decide at that moment to
> >>
> >> push
> >>
> >> more
> >>
> >> splits
> >>
> >> to
> >>
> >> that reader
> >>   --> The SplitEnumerator should probably be aware of
> >>
> >> the
> >>
> >> source
> >>
> >> parallelism, to build its initial distribution.
> >>
> >> - Open question: Should the source expose something
> >>
> >> like
> >>
> >> "host
> >>
> >> preferences", so that yarn/mesos/k8s can take this into
> >>
> >> account
> >>
> >> when
> >>
> >> selecting a node to start a TM on?
> >>
> >> *(5) Watermarks and event time alignment*
> >>
> >> - Watermark generation, as well as idleness, needs to
> >>
> >> be
> >>
> >> per
> >>
> >> split
> >>
> >> (like
> >>
> >> currently in the Kafka Source, per partition)
> >> - It is desirable to support optional
> >>
> >> event-time-alignment,
> >>
> >> meaning
> >>
> >> that
> >>
> >> splits that are ahead are back-pressured or temporarily
> >>
> >> unsubscribed
> >>
> >> - I think i would be desirable to encapsulate
> >>
> >> watermark
> >>
> >> generation
> >>
> >> logic
> >>
> >> in watermark generators, for a separation of concerns.
> >>
> >> The
> >>
> >> watermark
> >>
> >> generators should run per split.
> >> - Using watermark generators would also help with
> >>
> >> another
> >>
> >> problem of
> >>
> >> the
> >>
> >> suggested interface, namely supporting non-periodic
> >>
> >> watermarks
> >>
> >> efficiently.
> >>
> >> - Need a way to "dispatch" next record to different
> >>
> >> watermark
> >>
> >> generators
> >>
> >> - Need a way to tell SplitReader to "suspend" a split
> >>
> >> until a
> >>
> >> certain
> >>
> >> watermark is reached (event time backpressure)
> >> - This would in fact be not needed (and thus simpler)
> >>
> >> if
> >>
> >> we
> >>
> >> had
> >>
> >> a
> >>
> >> SplitReader per split and may be a reason to re-open
> >>
> >> that
> >>
> >> discussion
> >>
> >> *(6) Watermarks across splits and in the Split
> >>
> >> Enumerator*
> >>
> >> - The split enumerator may need some watermark
> >>
> >> awareness,
> >>
> >> which
> >>
> >> should
> >>
> >> be
> >>
> >> purely based on split metadata (like create timestamp
> >>
> >> of
> >>
> >> file
> >>
> >> splits)
> >>
> >> - If there are still more splits with overlapping
> >>
> >> event
> >>
> >> time
> >>
> >> range
> >>
> >> for
> >>
> >> a
> >>
> >> split reader, then that split reader should not advance
> >>
> >> the
> >>
> >> watermark
> >>
> >> within the split beyond the overlap boundary. Otherwise
> >>
> >> future
> >>
> >> splits
> >>
> >> will
> >>
> >> produce late data.
> >>
> >> - One way to approach this could be that the split
> >>
> >> enumerator
> >>
> >> may
> >>
> >> send
> >>
> >> watermarks to the readers, and the readers cannot emit
> >>
> >> watermarks
> >>
> >> beyond
> >>
> >> that received watermark.
> >> - Many split enumerators would simply immediately send
> >>
> >> Long.MAX
> >>
> >> out
> >>
> >> and
> >>
> >> leave the progress purely to the split readers.
> >>
> >> - For event-time alignment / split back pressure, this
> >>
> >> begs
> >>
> >> the
> >>
> >> question
> >>
> >> how we can avoid deadlocks that may arise when splits
> >>
> >> are
> >>
> >> suspended
> >>
> >> for
> >>
> >> event time back pressure,
> >>
> >> *(7) Batch and streaming Unification*
> >>
> >> - Functionality wise, the above design should support
> >>
> >> both
> >>
> >> - Batch often (mostly) does not care about reading "in
> >>
> >> order"
> >>
> >> and
> >>
> >> generating watermarks
> >>   --> Might use different enumerator logic that is
> >>
> >> more
> >>
> >> locality
> >>
> >> aware
> >>
> >> and ignores event time order
> >>   --> Does not generate watermarks
> >> - Would be great if bounded sources could be
> >>
> >> identified
> >>
> >> at
> >>
> >> compile
> >>
> >> time,
> >>
> >> so that "env.addBoundedSource(...)" is type safe and
> >>
> >> can
> >>
> >> return a
> >>
> >> "BoundedDataStream".
> >> - Possible to defer this discussion until later
> >>
> >> *Miscellaneous Comments*
> >>
> >> - Should the source have a TypeInformation for the
> >>
> >> produced
> >>
> >> type,
> >>
> >> instead
> >>
> >> of a serializer? We need a type information in the
> >>
> >> stream
> >>
> >> anyways, and
> >>
> >> can
> >>
> >> derive the serializer from that. Plus, creating the
> >>
> >> serializer
> >>
> >> should
> >>
> >> respect the ExecutionConfig.
> >>
> >> - The TypeSerializer interface is very powerful but
> >>
> >> also
> >>
> >> not
> >>
> >> easy to
> >>
> >> implement. Its purpose is to handle data super
> >>
> >> efficiently,
> >>
> >> support
> >>
> >> flexible ways of evolution, etc.
> >> For metadata I would suggest to look at the
> >>
> >> SimpleVersionedSerializer
> >>
> >> instead, which is used for example for checkpoint
> >>
> >> master
> >>
> >> hooks,
> >>
> >> or for
> >>
> >> the
> >>
> >> streaming file sink. I think that is is a good match
> >>
> >> for
> >>
> >> cases
> >>
> >> where
> >>
> >> we
> >>
> >> do
> >>
> >> not need more than ser/deser (no copy, etc.) and don't
> >>
> >> need to
> >>
> >> push
> >>
> >> versioning out of the serialization paths for best
> >>
> >> performance
> >>
> >> (as in
> >>
> >> the
> >>
> >> TypeSerializer)
> >>
> >>
> >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> >>
> >> k.klou...@data-artisans.com>
> >>
> >> wrote:
> >>
> >>
> >> Hi Biao,
> >>
> >> Thanks for the answer!
> >>
> >> So given the multi-threaded readers, now we have as
> >>
> >> open
> >>
> >> questions:
> >>
> >> 1) How do we let the checkpoints pass through our
> >>
> >> multi-threaded
> >>
> >> reader
> >>
> >> operator?
> >>
> >> 2) Do we have separate reader and source operators or
> >>
> >> not? In
> >>
> >> the
> >>
> >> strategy
> >>
> >> that has a separate source, the source operator has a
> >>
> >> parallelism of
> >>
> >> 1
> >>
> >> and
> >>
> >> is responsible for split recovery only.
> >>
> >> For the first one, given also the constraints
> >>
> >> (blocking,
> >>
> >> finite
> >>
> >> queues,
> >>
> >> etc), I do not have an answer yet.
> >>
> >> For the 2nd, I think that we should go with separate
> >>
> >> operators
> >>
> >> for
> >>
> >> the
> >>
> >> source and the readers, for the following reasons:
> >>
> >> 1) This is more aligned with a potential future
> >>
> >> improvement
> >>
> >> where the
> >>
> >> split
> >>
> >> discovery becomes a responsibility of the JobManager
> >>
> >> and
> >>
> >> readers are
> >>
> >> pooling more work from the JM.
> >>
> >> 2) The source is going to be the "single point of
> >>
> >> truth".
> >>
> >> It
> >>
> >> will
> >>
> >> know
> >>
> >> what
> >>
> >> has been processed and what not. If the source and the
> >>
> >> readers
> >>
> >> are a
> >>
> >> single
> >>
> >> operator with parallelism > 1, or in general, if the
> >>
> >> split
> >>
> >> discovery
> >>
> >> is
> >>
> >> done by each task individually, then:
> >>  i) we have to have a deterministic scheme for each
> >>
> >> reader to
> >>
> >> assign
> >>
> >> splits to itself (e.g. mod subtaskId). This is not
> >>
> >> necessarily
> >>
> >> trivial
> >>
> >> for
> >>
> >> all sources.
> >>  ii) each reader would have to keep a copy of all its
> >>
> >> processed
> >>
> >> slpits
> >>
> >>  iii) the state has to be a union state with a
> >>
> >> non-trivial
> >>
> >> merging
> >>
> >> logic
> >>
> >> in order to support rescaling.
> >>
> >> Two additional points that you raised above:
> >>
> >> i) The point that you raised that we need to keep all
> >>
> >> splits
> >>
> >> (processed
> >>
> >> and
> >>
> >> not-processed) I think is a bit of a strong
> >>
> >> requirement.
> >>
> >> This
> >>
> >> would
> >>
> >> imply
> >>
> >> that for infinite sources the state will grow
> >>
> >> indefinitely.
> >>
> >> This is
> >>
> >> problem
> >>
> >> is even more pronounced if we do not have a single
> >>
> >> source
> >>
> >> that
> >>
> >> assigns
> >>
> >> splits to readers, as each reader will have its own
> >>
> >> copy
> >>
> >> of
> >>
> >> the
> >>
> >> state.
> >>
> >> ii) it is true that for finite sources we need to
> >>
> >> somehow
> >>
> >> not
> >>
> >> close
> >>
> >> the
> >>
> >> readers when the source/split discoverer finishes. The
> >> ContinuousFileReaderOperator has a work-around for
> >>
> >> that.
> >>
> >> It is
> >>
> >> not
> >>
> >> elegant,
> >>
> >> and checkpoints are not emitted after closing the
> >>
> >> source,
> >>
> >> but
> >>
> >> this, I
> >>
> >> believe, is a bigger problem which requires more
> >>
> >> changes
> >>
> >> than
> >>
> >> just
> >>
> >> refactoring the source interface.
> >>
> >> Cheers,
> >> Kostas
> >>
> >>
> >>
> >>
> >> --
> >> Best, Jingsong Lee
>
>

Reply via email to