Hi folks,

Thanks for the discussion, great feedback. Also thanks Dawid for the
explanation, it is much clearer now.

One thing that is indeed missing from the FLIP is how the boundedness is
passed to the Source implementation. So the API should be
Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext
context)
And we can probably remove the Source#supportBoundedness(Boundedness
boundedness) method.

Assuming we have that, we are essentially choosing from one of the
following two options:

Option 1:
// The source is continuous source, and only unbounded operations can be
performed.
DataStream<Type> datastream = env.continuousSource(someSource);

// The source is bounded source, both bounded and unbounded operations can
be performed.
BoundedDataStream<Type> boundedDataStream = env.boundedSource(someSource);

  - Pros:
       a) explicit boundary between bounded / unbounded streams, it is
quite simple and clear to the users.
  - Cons:
       a) For applications that do not involve bounded operations, they
still have to call different API to distinguish bounded / unbounded streams.
       b) No support for bounded stream to run in a streaming runtime
setting, i.e. scheduling and operators behaviors.


Option 2:
// The source is either bounded or unbounded, but only unbounded operations
could be performed on the returned DataStream.
DataStream<Type> dataStream = env.source(someSource);

// The source must be a bounded source, otherwise exception is thrown.
BoundedDataStream<Type> boundedDataStream =
env.boundedSource(boundedSource);

The pros and cons are exactly the opposite of option 1.
  - Pros:
       a) For applications that do not involve bounded operations, they
still have to call different API to distinguish bounded / unbounded streams.
       b) Support for bounded stream to run in a streaming runtime setting,
i.e. scheduling and operators behaviors.
  - Cons:
       a) Bounded / unbounded streams are kind of mixed, i.e. given a
DataStream, it is not clear whether it is bounded or not, unless you have
the access to its source.


If we only think from the Source API perspective, option 2 seems a better
choice because functionality wise it is a superset of option 1, at the cost
of some seemingly acceptable ambiguity in the DataStream API.
But if we look at the DataStream API as a whole, option 1 seems a clearer
choice. For example, some times a library may have to know whether a
certain task will finish or not. And it would be difficult to tell if the
input is a DataStream, unless additional information is provided all the
way from the Source. One possible solution is to have a *modified option 2*
which adds a method to the DataStream API to indicate boundedness, such as
getBoundedness(). It would solve the problem with a potential confusion of
what is difference between a DataStream with getBoundedness()=true and a
BoundedDataStream. But that seems not super difficult to explain.

So from API's perspective, I don't have a strong opinion between *option 1*
and *modified option 2. *I like the cleanness of option 1, but modified
option 2 would be more attractive if we have concrete use case for the
"Bounded stream with unbounded streaming runtime settings".

Re: Till

> Maybe this has already been asked before but I was wondering why the
> SourceReader interface has the method pollNext which hands the
> responsibility of outputting elements to the SourceReader implementation?
> Has this been done for backwards compatibility reasons with the old source
> interface? If not, then one could define a Collection<E> getNextRecords()
> method which returns the currently retrieved records and then the caller
> emits them outside of the SourceReader. That way the interface would not
> allow to implement an outputting loop where we never hand back control to
> the caller. At the moment, this contract can be easily broken and is only
> mentioned loosely in the JavaDocs.
>

The primary reason we handover the SourceOutput to the SourceReader is
because sometimes it is difficult for a SourceReader to emit one record at
a time. One example is some batched messaging systems which only have an
offset for the entire batch instead of individual messages in the batch. In
that case, returning one record at a time would leave the SourceReader in
an uncheckpointable state because they can only checkpoint at the batch
boundaries.

Thanks,

Jiangjie (Becket) Qin

On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi everyone,
>
> thanks for drafting this FLIP. It reads very well.
>
> Concerning Dawid's proposal, I tend to agree. The boundedness could come
> from the source and tell the system how to treat the operator (scheduling
> wise). From a user's perspective it should be fine to get back a DataStream
> when calling env.source(boundedSource) if he does not need special
> operations defined on a BoundedDataStream. If he needs this, then one could
> use the method BoundedDataStream env.boundedSource(boundedSource).
>
> If possible, we could enforce the proper usage of env.boundedSource() by
> introducing a BoundedSource type so that one cannot pass an
> unbounded source to it. That way users would not be able to shoot
> themselves in the foot.
>
> Maybe this has already been asked before but I was wondering why the
> SourceReader interface has the method pollNext which hands the
> responsibility of outputting elements to the SourceReader implementation?
> Has this been done for backwards compatibility reasons with the old source
> interface? If not, then one could define a Collection<E> getNextRecords()
> method which returns the currently retrieved records and then the caller
> emits them outside of the SourceReader. That way the interface would not
> allow to implement an outputting loop where we never hand back control to
> the caller. At the moment, this contract can be easily broken and is only
> mentioned loosely in the JavaDocs.
>
> Cheers,
> Till
>
> On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I think current design is good.
> >
> > My understanding is:
> >
> > For execution mode: bounded mode and continuous mode, It's totally
> > different. I don't think we have the ability to integrate the two models
> at
> > present. It's about scheduling, memory, algorithms, States, etc. we
> > shouldn't confuse them.
> >
> > For source capabilities: only bounded, only continuous, both bounded and
> > continuous.
> > I think Kafka is a source that can be ran both bounded
> > and continuous execution mode.
> > And Kafka with end offset should be ran both bounded
> > and continuous execution mode.  Using apache Beam with Flink runner, I
> used
> > to run a "bounded" Kafka in streaming mode. For our previous DataStream,
> it
> > is not necessarily required that the source cannot be bounded.
> >
> > So it is my thought for Dawid's question:
> > 1.pass a bounded source to continuousSource() +1
> > 2.pass a continuous source to boundedSource() -1, should throw exception.
> >
> > In StreamExecutionEnvironment, continuousSource and boundedSource define
> > the execution mode. It defines a clear boundary of execution mode.
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com> wrote:
> >
> > > I agree with Dawid's point that the boundedness information should come
> > > from the source itself (e.g. the end timestamp), not through
> > > env.boundedSouce()/continuousSource().
> > > I think if we want to support something like `env.source()` that derive
> > the
> > > execution mode from source, `supportsBoundedness(Boundedness)`
> > > method is not enough, because we don't know whether it is bounded or
> not.
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org>
> > > wrote:
> > >
> > > > One more thing. In the current proposal, with the
> > > > supportsBoundedness(Boundedness) method and the boundedness coming
> from
> > > > either continuousSource or boundedSource I could not find how this
> > > > information is fed back to the SplitEnumerator.
> > > >
> > > > Best,
> > > >
> > > > Dawid
> > > >
> > > > On 09/12/2019 13:52, Becket Qin wrote:
> > > > > Hi Dawid,
> > > > >
> > > > > Thanks for the comments. This actually brings another relevant
> > question
> > > > > about what does a "bounded source" imply. I actually had the same
> > > > > impression when I look at the Source API. Here is what I understand
> > > after
> > > > > some discussion with Stephan. The bounded source has the following
> > > > impacts.
> > > > >
> > > > > 1. API validity.
> > > > > - A bounded source generates a bounded stream so some operations
> that
> > > > only
> > > > > works for bounded records would be performed, e.g. sort.
> > > > > - To expose these bounded stream only APIs, there are two options:
> > > > >      a. Add them to the DataStream API and throw exception if a
> > method
> > > is
> > > > > called on an unbounded stream.
> > > > >      b. Create a BoundedDataStream class which is returned from
> > > > > env.boundedSource(), while DataStream is returned from
> > > > env.continousSource().
> > > > > Note that this cannot be done by having single
> env.source(theSource)
> > > even
> > > > > the Source has a getBoundedness() method.
> > > > >
> > > > > 2. Scheduling
> > > > > - A bounded source could be computed stage by stage without
> bringing
> > up
> > > > all
> > > > > the tasks at the same time.
> > > > >
> > > > > 3. Operator behaviors
> > > > > - A bounded source indicates the records are finite so some
> operators
> > > can
> > > > > wait until it receives all the records before it starts the
> > processing.
> > > > >
> > > > > In the above impact, only 1 is relevant to the API design. And the
> > > > current
> > > > > proposal in FLIP-27 is following 1.b.
> > > > >
> > > > > // boundedness depends of source property, imo this should always
> be
> > > > >> preferred
> > > > >>
> > > > >
> > > > > DataStream<MyType> stream = env.source(theSource);
> > > > >
> > > > >
> > > > > In your proposal, does DataStream have bounded stream only methods?
> > It
> > > > > looks it should have, otherwise passing a bounded Source to
> > > env.source()
> > > > > would be confusing. In that case, we will essentially do 1.a if an
> > > > > unbounded Source is created from env.source(unboundedSource).
> > > > >
> > > > > If we have the methods only supported for bounded streams in
> > > DataStream,
> > > > it
> > > > > seems a little weird to have a separate BoundedDataStream
> interface.
> > > > >
> > > > > Am I understand it correctly?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <
> > > dwysakow...@apache.org>
> > > > > wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Really well written proposal and very important one. I must admit
> I
> > > have
> > > > >> not understood all the intricacies of it yet.
> > > > >>
> > > > >> One question I have though is about where does the information
> about
> > > > >> boundedness come from. I think in most cases it is a property of
> the
> > > > >> source. As you described it might be e.g. end offset, a flag
> should
> > it
> > > > >> monitor new splits etc. I think it would be a really nice use case
> > to
> > > be
> > > > >> able to say:
> > > > >>
> > > > >> new KafkaSource().readUntil(long timestamp),
> > > > >>
> > > > >> which could work as an "end offset". Moreover I think all Bounded
> > > > sources
> > > > >> support continuous mode, but no intrinsically continuous source
> > > support
> > > > the
> > > > >> Bounded mode. If I understood the proposal correctly it suggest
> the
> > > > >> boundedness sort of "comes" from the outside of the source, from
> the
> > > > >> invokation of either boundedStream or continousSource.
> > > > >>
> > > > >> I am wondering if it would make sense to actually change the
> method
> > > > >>
> > > > >> boolean Source#supportsBoundedness(Boundedness)
> > > > >>
> > > > >> to
> > > > >>
> > > > >> Boundedness Source#getBoundedness().
> > > > >>
> > > > >> As for the methods #boundedSource, #continousSource, assuming the
> > > > >> boundedness is property of the source they do not affect how the
> > > > enumerator
> > > > >> works, but mostly how the dag is scheduled, right? I am not
> against
> > > > those
> > > > >> methods, but I think it is a very specific use case to actually
> > > override
> > > > >> the property of the source. In general I would expect users to
> only
> > > call
> > > > >> env.source(theSource), where the source tells if it is bounded or
> > > not. I
> > > > >> would suggest considering following set of methods:
> > > > >>
> > > > >> // boundedness depends of source property, imo this should always
> be
> > > > preferred
> > > > >>
> > > > >> DataStream<MyType> stream = env.source(theSource);
> > > > >>
> > > > >>
> > > > >> // always continous execution, whether bounded or unbounded source
> > > > >>
> > > > >> DataStream<MyType> boundedStream = env.continousSource(theSource);
> > > > >>
> > > > >> // imo this would make sense if the BoundedDataStream provides
> > > > additional features unavailable for continous mode
> > > > >> BoundedDataStream<MyType> batch = env.boundedSource(theSource);
> > > > >>
> > > > >>
> > > > >> Best,
> > > > >>
> > > > >> Dawid
> > > > >>
> > > > >>
> > > > >> On 04/12/2019 11:25, Stephan Ewen wrote:
> > > > >>
> > > > >> Thanks, Becket, for updating this.
> > > > >>
> > > > >> I agree with moving the aspects you mentioned into separate FLIPs
> -
> > > this
> > > > >> one way becoming unwieldy in size.
> > > > >>
> > > > >> +1 to the FLIP in its current state. Its a very detailed write-up,
> > > > nicely
> > > > >> done!
> > > > >>
> > > > >> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com>
> <
> > > > becket....@gmail.com> wrote:
> > > > >>
> > > > >>
> > > > >> Hi all,
> > > > >>
> > > > >> Sorry for the long belated update. I have updated FLIP-27 wiki
> page
> > > with
> > > > >> the latest proposals. Some noticeable changes include:
> > > > >> 1. A new generic communication mechanism between SplitEnumerator
> and
> > > > >> SourceReader.
> > > > >> 2. Some detail API method signature changes.
> > > > >>
> > > > >> We left a few things out of this FLIP and will address them in
> > > separate
> > > > >> FLIPs. Including:
> > > > >> 1. Per split event time.
> > > > >> 2. Event time alignment.
> > > > >> 3. Fine grained failover for SplitEnumerator failure.
> > > > >>
> > > > >> Please let us know if you have any question.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jiangjie (Becket) Qin
> > > > >>
> > > > >> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> <
> > > > se...@apache.org> wrote:
> > > > >>
> > > > >>
> > > > >> Hi  Łukasz!
> > > > >>
> > > > >> Becket and me are working hard on figuring out the last details
> and
> > > > >> implementing the first PoC. We would update the FLIP hopefully
> next
> > > > week.
> > > > >>
> > > > >> There is a fair chance that a first version of this will be in
> 1.10,
> > > but
> > > > >>
> > > > >> I
> > > > >>
> > > > >> think it will take another release to battle test it and migrate
> the
> > > > >> connectors.
> > > > >>
> > > > >> Best,
> > > > >> Stephan
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl
> >
> > <
> > > > l...@touk.pl>
> > > > >>
> > > > >> wrote:
> > > > >>
> > > > >> Hi,
> > > > >>
> > > > >> This proposal looks very promising for us. Do you have any plans
> in
> > > > >>
> > > > >> which
> > > > >>
> > > > >> Flink release it is going to be released? We are thinking on
> using a
> > > > >>
> > > > >> Data
> > > > >>
> > > > >> Set API for our future use cases but on the other hand Data Set
> API
> > is
> > > > >> going to be deprecated so using proposed bounded data streams
> > solution
> > > > >> could be more viable in the long term.
> > > > >>
> > > > >> Thanks,
> > > > >> Łukasz
> > > > >>
> > > > >> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> <
> > > > thomas.we...@gmail.com> wrote:
> > > > >>
> > > > >> Thanks for putting together this proposal!
> > > > >>
> > > > >> I see that the "Per Split Event Time" and "Event Time Alignment"
> > > > >>
> > > > >> sections
> > > > >>
> > > > >> are still TBD.
> > > > >>
> > > > >> It would probably be good to flesh those out a bit before
> proceeding
> > > > >>
> > > > >> too
> > > > >>
> > > > >> far
> > > > >>
> > > > >> as the event time alignment will probably influence the
> interaction
> > > > >>
> > > > >> with
> > > > >>
> > > > >> the split reader, specifically ReaderStatus
> emitNext(SourceOutput<E>
> > > > >> output).
> > > > >>
> > > > >> We currently have only one implementation for event time alignment
> > in
> > > > >>
> > > > >> the
> > > > >>
> > > > >> Kinesis consumer. The synchronization in that case takes place as
> > the
> > > > >>
> > > > >> last
> > > > >>
> > > > >> step before records are emitted downstream (RecordEmitter). With
> the
> > > > >> currently proposed interfaces, the equivalent can be implemented
> in
> > > > >>
> > > > >> the
> > > > >>
> > > > >> reader loop, although note that in the Kinesis consumer the per
> > shard
> > > > >> threads push records.
> > > > >>
> > > > >> Synchronization has not been implemented for the Kafka consumer
> yet.
> > > > >> https://issues.apache.org/jira/browse/FLINK-12675
> > > > >>
> > > > >> When I looked at it, I realized that the implementation will look
> > > > >>
> > > > >> quite
> > > > >>
> > > > >> different
> > > > >> from Kinesis because it needs to take place in the pull part,
> where
> > > > >>
> > > > >> records
> > > > >>
> > > > >> are taken from the Kafka client. Due to the multiplexing it cannot
> > be
> > > > >>
> > > > >> done
> > > > >>
> > > > >> by blocking the split thread like it currently works for Kinesis.
> > > > >>
> > > > >> Reading
> > > > >>
> > > > >> from individual Kafka partitions needs to be controlled via
> > > > >>
> > > > >> pause/resume
> > > > >>
> > > > >> on the Kafka client.
> > > > >>
> > > > >> To take on that responsibility the split thread would need to be
> > > > >>
> > > > >> aware
> > > > >>
> > > > >> of
> > > > >>
> > > > >> the
> > > > >> watermarks or at least whether it should or should not continue to
> > > > >>
> > > > >> consume
> > > > >>
> > > > >> a given split and this may require a different SourceReader or
> > > > >>
> > > > >> SourceOutput
> > > > >>
> > > > >> interface.
> > > > >>
> > > > >> Thanks,
> > > > >> Thomas
> > > > >>
> > > > >>
> > > > >> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> <
> > > > mmyy1...@gmail.com> wrote:
> > > > >>
> > > > >>
> > > > >> Hi Stephan,
> > > > >>
> > > > >> Thank you for feedback!
> > > > >> Will take a look at your branch before public discussing.
> > > > >>
> > > > >>
> > > > >> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org>
> <
> > > > se...@apache.org>
> > > > >>
> > > > >> wrote:
> > > > >>
> > > > >> Hi Biao!
> > > > >>
> > > > >> Thanks for reviving this. I would like to join this discussion,
> > > > >>
> > > > >> but
> > > > >>
> > > > >> am
> > > > >>
> > > > >> quite occupied with the 1.9 release, so can we maybe pause this
> > > > >>
> > > > >> discussion
> > > > >>
> > > > >> for a week or so?
> > > > >>
> > > > >> In the meantime I can share some suggestion based on prior
> > > > >>
> > > > >> experiments:
> > > > >>
> > > > >> How to do watermarks / timestamp extractors in a simpler and more
> > > > >>
> > > > >> flexible
> > > > >>
> > > > >> way. I think that part is quite promising should be part of the
> > > > >>
> > > > >> new
> > > > >>
> > > > >> source
> > > > >>
> > > > >> interface.
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> > > > >>
> > > > >> Some experiments on how to build the source reader and its
> > > > >>
> > > > >> library
> > > > >>
> > > > >> for
> > > > >>
> > > > >> common threading/split patterns:
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> > > > >>
> > > > >> Best,
> > > > >> Stephan
> > > > >>
> > > > >>
> > > > >> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> <
> > > > mmyy1...@gmail.com>
> > > > >>
> > > > >> wrote:
> > > > >>
> > > > >> Hi devs,
> > > > >>
> > > > >> Since 1.9 is nearly released, I think we could get back to
> > > > >>
> > > > >> FLIP-27.
> > > > >>
> > > > >> I
> > > > >>
> > > > >> believe it should be included in 1.10.
> > > > >>
> > > > >> There are so many things mentioned in document of FLIP-27. [1] I
> > > > >>
> > > > >> think
> > > > >>
> > > > >> we'd better discuss them separately. However the wiki is not a
> > > > >>
> > > > >> good
> > > > >>
> > > > >> place
> > > > >>
> > > > >> to discuss. I wrote google doc about SplitReader API which
> > > > >>
> > > > >> misses
> > > > >>
> > > > >> some
> > > > >>
> > > > >> details in the document. [2]
> > > > >>
> > > > >> 1.
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > > > >>
> > > > >> 2.
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> > > > >>
> > > > >> CC Stephan, Aljoscha, Piotrek, Becket
> > > > >>
> > > > >>
> > > > >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> <
> > > > mmyy1...@gmail.com>
> > > > >>
> > > > >> wrote:
> > > > >>
> > > > >> Hi Steven,
> > > > >> Thank you for the feedback. Please take a look at the document
> > > > >>
> > > > >> FLIP-27
> > > > >>
> > > > >> <
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > > > >>
> > > > >> which
> > > > >>
> > > > >> is updated recently. A lot of details of enumerator were added
> > > > >>
> > > > >> in
> > > > >>
> > > > >> this
> > > > >>
> > > > >> document. I think it would help.
> > > > >>
> > > > >> Steven Wu <stevenz...@gmail.com> <stevenz...@gmail.com>
> > 于2019年3月28日周四
> > > > 下午12:52写道:
> > > > >>
> > > > >>
> > > > >> This proposal mentioned that SplitEnumerator might run on the
> > > > >> JobManager or
> > > > >> in a single task on a TaskManager.
> > > > >>
> > > > >> if enumerator is a single task on a taskmanager, then the job
> > > > >>
> > > > >> DAG
> > > > >>
> > > > >> can
> > > > >>
> > > > >> never
> > > > >> been embarrassingly parallel anymore. That will nullify the
> > > > >>
> > > > >> leverage
> > > > >>
> > > > >> of
> > > > >>
> > > > >> fine-grained recovery for embarrassingly parallel jobs.
> > > > >>
> > > > >> It's not clear to me what's the implication of running
> > > > >>
> > > > >> enumerator
> > > > >>
> > > > >> on
> > > > >>
> > > > >> the
> > > > >>
> > > > >> jobmanager. So I will leave that out for now.
> > > > >>
> > > > >> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> <
> > > > mmyy1...@gmail.com>
> > > > >>
> > > > >> wrote:
> > > > >>
> > > > >> Hi Stephan & Piotrek,
> > > > >>
> > > > >> Thank you for feedback.
> > > > >>
> > > > >> It seems that there are a lot of things to do in community.
> > > > >>
> > > > >> I
> > > > >>
> > > > >> am
> > > > >>
> > > > >> just
> > > > >>
> > > > >> afraid that this discussion may be forgotten since there so
> > > > >>
> > > > >> many
> > > > >>
> > > > >> proposals
> > > > >>
> > > > >> recently.
> > > > >> Anyway, wish to see the split topics soon :)
> > > > >>
> > > > >> Piotr Nowojski <pi...@da-platform.com> <pi...@da-platform.com>
> > > > 于2019年1月24日周四
> > > > >>
> > > > >> 下午8:21写道:
> > > > >>
> > > > >> Hi Biao!
> > > > >>
> > > > >> This discussion was stalled because of preparations for
> > > > >>
> > > > >> the
> > > > >>
> > > > >> open
> > > > >>
> > > > >> sourcing
> > > > >>
> > > > >> & merging Blink. I think before creating the tickets we
> > > > >>
> > > > >> should
> > > > >>
> > > > >> split this
> > > > >>
> > > > >> discussion into topics/areas outlined by Stephan and
> > > > >>
> > > > >> create
> > > > >>
> > > > >> Flips
> > > > >>
> > > > >> for
> > > > >>
> > > > >> that.
> > > > >>
> > > > >> I think there is no chance for this to be completed in
> > > > >>
> > > > >> couple
> > > > >>
> > > > >> of
> > > > >>
> > > > >> remaining
> > > > >>
> > > > >> weeks/1 month before 1.8 feature freeze, however it would
> > > > >>
> > > > >> be
> > > > >>
> > > > >> good
> > > > >>
> > > > >> to aim
> > > > >>
> > > > >> with those changes for 1.9.
> > > > >>
> > > > >> Piotrek
> > > > >>
> > > > >>
> > > > >> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> <
> > > > mmyy1...@gmail.com>
> > > > >>
> > > > >> wrote:
> > > > >>
> > > > >> Hi community,
> > > > >> The summary of Stephan makes a lot sense to me. It is
> > > > >>
> > > > >> much
> > > > >>
> > > > >> clearer
> > > > >>
> > > > >> indeed
> > > > >>
> > > > >> after splitting the complex topic into small ones.
> > > > >> I was wondering is there any detail plan for next step?
> > > > >>
> > > > >> If
> > > > >>
> > > > >> not,
> > > > >>
> > > > >> I
> > > > >>
> > > > >> would
> > > > >>
> > > > >> like to push this thing forward by creating some JIRA
> > > > >>
> > > > >> issues.
> > > > >>
> > > > >> Another question is that should version 1.8 include
> > > > >>
> > > > >> these
> > > > >>
> > > > >> features?
> > > > >>
> > > > >> Stephan Ewen <se...@apache.org> <se...@apache.org> 于2018年12月1日周六
> > > > 上午4:20写道:
> > > > >>
> > > > >>
> > > > >> Thanks everyone for the lively discussion. Let me try
> > > > >>
> > > > >> to
> > > > >>
> > > > >> summarize
> > > > >>
> > > > >> where I
> > > > >>
> > > > >> see convergence in the discussion and open issues.
> > > > >> I'll try to group this by design aspect of the source.
> > > > >>
> > > > >> Please
> > > > >>
> > > > >> let me
> > > > >>
> > > > >> know
> > > > >>
> > > > >> if I got things wrong or missed something crucial here.
> > > > >>
> > > > >> For issues 1-3, if the below reflects the state of the
> > > > >>
> > > > >> discussion, I
> > > > >>
> > > > >> would
> > > > >>
> > > > >> try and update the FLIP in the next days.
> > > > >> For the remaining ones we need more discussion.
> > > > >>
> > > > >> I would suggest to fork each of these aspects into a
> > > > >>
> > > > >> separate
> > > > >>
> > > > >> mail
> > > > >>
> > > > >> thread,
> > > > >>
> > > > >> or will loose sight of the individual aspects.
> > > > >>
> > > > >> *(1) Separation of Split Enumerator and Split Reader*
> > > > >>
> > > > >>  - All seem to agree this is a good thing
> > > > >>  - Split Enumerator could in the end live on JobManager
> > > > >>
> > > > >> (and
> > > > >>
> > > > >> assign
> > > > >>
> > > > >> splits
> > > > >>
> > > > >> via RPC) or in a task (and assign splits via data
> > > > >>
> > > > >> streams)
> > > > >>
> > > > >>  - this discussion is orthogonal and should come later,
> > > > >>
> > > > >> when
> > > > >>
> > > > >> the
> > > > >>
> > > > >> interface
> > > > >>
> > > > >> is agreed upon.
> > > > >>
> > > > >> *(2) Split Readers for one or more splits*
> > > > >>
> > > > >>  - Discussion seems to agree that we need to support
> > > > >>
> > > > >> one
> > > > >>
> > > > >> reader
> > > > >>
> > > > >> that
> > > > >>
> > > > >> possibly handles multiple splits concurrently.
> > > > >>  - The requirement comes from sources where one
> > > > >>
> > > > >> poll()-style
> > > > >>
> > > > >> call
> > > > >>
> > > > >> fetches
> > > > >>
> > > > >> data from different splits / partitions
> > > > >>    --> example sources that require that would be for
> > > > >>
> > > > >> example
> > > > >>
> > > > >> Kafka,
> > > > >>
> > > > >> Pravega, Pulsar
> > > > >>
> > > > >>  - Could have one split reader per source, or multiple
> > > > >>
> > > > >> split
> > > > >>
> > > > >> readers
> > > > >>
> > > > >> that
> > > > >>
> > > > >> share the "poll()" function
> > > > >>  - To not make it too complicated, we can start with
> > > > >>
> > > > >> thinking
> > > > >>
> > > > >> about
> > > > >>
> > > > >> one
> > > > >>
> > > > >> split reader for all splits initially and see if that
> > > > >>
> > > > >> covers
> > > > >>
> > > > >> all
> > > > >>
> > > > >> requirements
> > > > >>
> > > > >> *(3) Threading model of the Split Reader*
> > > > >>
> > > > >>  - Most active part of the discussion ;-)
> > > > >>
> > > > >>  - A non-blocking way for Flink's task code to interact
> > > > >>
> > > > >> with
> > > > >>
> > > > >> the
> > > > >>
> > > > >> source
> > > > >>
> > > > >> is
> > > > >>
> > > > >> needed in order to a task runtime code based on a
> > > > >> single-threaded/actor-style task design
> > > > >>    --> I personally am a big proponent of that, it will
> > > > >>
> > > > >> help
> > > > >>
> > > > >> with
> > > > >>
> > > > >> well-behaved checkpoints, efficiency, and simpler yet
> > > > >>
> > > > >> more
> > > > >>
> > > > >> robust
> > > > >>
> > > > >> runtime
> > > > >>
> > > > >> code
> > > > >>
> > > > >>  - Users care about simple abstraction, so as a
> > > > >>
> > > > >> subclass
> > > > >>
> > > > >> of
> > > > >>
> > > > >> SplitReader
> > > > >>
> > > > >> (non-blocking / async) we need to have a
> > > > >>
> > > > >> BlockingSplitReader
> > > > >>
> > > > >> which
> > > > >>
> > > > >> will
> > > > >>
> > > > >> form the basis of most source implementations.
> > > > >>
> > > > >> BlockingSplitReader
> > > > >>
> > > > >> lets
> > > > >>
> > > > >> users do blocking simple poll() calls.
> > > > >>  - The BlockingSplitReader would spawn a thread (or
> > > > >>
> > > > >> more)
> > > > >>
> > > > >> and
> > > > >>
> > > > >> the
> > > > >>
> > > > >> thread(s) can make blocking calls and hand over data
> > > > >>
> > > > >> buffers
> > > > >>
> > > > >> via
> > > > >>
> > > > >> a
> > > > >>
> > > > >> blocking
> > > > >>
> > > > >> queue
> > > > >>  - This should allow us to cover both, a fully async
> > > > >>
> > > > >> runtime,
> > > > >>
> > > > >> and a
> > > > >>
> > > > >> simple
> > > > >>
> > > > >> blocking interface for users.
> > > > >>  - This is actually very similar to how the Kafka
> > > > >>
> > > > >> connectors
> > > > >>
> > > > >> work.
> > > > >>
> > > > >> Kafka
> > > > >>
> > > > >> 9+ with one thread, Kafka 8 with multiple threads
> > > > >>
> > > > >>  - On the base SplitReader (the async one), the
> > > > >>
> > > > >> non-blocking
> > > > >>
> > > > >> method
> > > > >>
> > > > >> that
> > > > >>
> > > > >> gets the next chunk of data would signal data
> > > > >>
> > > > >> availability
> > > > >>
> > > > >> via
> > > > >>
> > > > >> a
> > > > >>
> > > > >> CompletableFuture, because that gives the best
> > > > >>
> > > > >> flexibility
> > > > >>
> > > > >> (can
> > > > >>
> > > > >> await
> > > > >>
> > > > >> completion or register notification handlers).
> > > > >>  - The source task would register a "thenHandle()" (or
> > > > >>
> > > > >> similar)
> > > > >>
> > > > >> on the
> > > > >>
> > > > >> future to put a "take next data" task into the
> > > > >>
> > > > >> actor-style
> > > > >>
> > > > >> mailbox
> > > > >>
> > > > >> *(4) Split Enumeration and Assignment*
> > > > >>
> > > > >>  - Splits may be generated lazily, both in cases where
> > > > >>
> > > > >> there
> > > > >>
> > > > >> is a
> > > > >>
> > > > >> limited
> > > > >>
> > > > >> number of splits (but very many), or splits are
> > > > >>
> > > > >> discovered
> > > > >>
> > > > >> over
> > > > >>
> > > > >> time
> > > > >>
> > > > >>  - Assignment should also be lazy, to get better load
> > > > >>
> > > > >> balancing
> > > > >>
> > > > >>  - Assignment needs support locality preferences
> > > > >>
> > > > >>  - Possible design based on discussion so far:
> > > > >>
> > > > >>    --> SplitReader has a method "addSplits(SplitT...)"
> > > > >>
> > > > >> to
> > > > >>
> > > > >> add
> > > > >>
> > > > >> one or
> > > > >>
> > > > >> more
> > > > >>
> > > > >> splits. Some split readers might assume they have only
> > > > >>
> > > > >> one
> > > > >>
> > > > >> split
> > > > >>
> > > > >> ever,
> > > > >>
> > > > >> concurrently, others assume multiple splits. (Note:
> > > > >>
> > > > >> idea
> > > > >>
> > > > >> behind
> > > > >>
> > > > >> being
> > > > >>
> > > > >> able
> > > > >>
> > > > >> to add multiple splits at the same time is to ease
> > > > >>
> > > > >> startup
> > > > >>
> > > > >> where
> > > > >>
> > > > >> multiple
> > > > >>
> > > > >> splits may be assigned instantly.)
> > > > >>    --> SplitReader has a context object on which it can
> > > > >>
> > > > >> call
> > > > >>
> > > > >> indicate
> > > > >>
> > > > >> when
> > > > >>
> > > > >> splits are completed. The enumerator gets that
> > > > >>
> > > > >> notification and
> > > > >>
> > > > >> can
> > > > >>
> > > > >> use
> > > > >>
> > > > >> to
> > > > >>
> > > > >> decide when to assign new splits. This should help both
> > > > >>
> > > > >> in
> > > > >>
> > > > >> cases
> > > > >>
> > > > >> of
> > > > >>
> > > > >> sources
> > > > >>
> > > > >> that take splits lazily (file readers) and in case the
> > > > >>
> > > > >> source
> > > > >>
> > > > >> needs to
> > > > >>
> > > > >> preserve a partial order between splits (Kinesis,
> > > > >>
> > > > >> Pravega,
> > > > >>
> > > > >> Pulsar may
> > > > >>
> > > > >> need
> > > > >>
> > > > >> that).
> > > > >>    --> SplitEnumerator gets notification when
> > > > >>
> > > > >> SplitReaders
> > > > >>
> > > > >> start
> > > > >>
> > > > >> and
> > > > >>
> > > > >> when
> > > > >>
> > > > >> they finish splits. They can decide at that moment to
> > > > >>
> > > > >> push
> > > > >>
> > > > >> more
> > > > >>
> > > > >> splits
> > > > >>
> > > > >> to
> > > > >>
> > > > >> that reader
> > > > >>    --> The SplitEnumerator should probably be aware of
> > > > >>
> > > > >> the
> > > > >>
> > > > >> source
> > > > >>
> > > > >> parallelism, to build its initial distribution.
> > > > >>
> > > > >>  - Open question: Should the source expose something
> > > > >>
> > > > >> like
> > > > >>
> > > > >> "host
> > > > >>
> > > > >> preferences", so that yarn/mesos/k8s can take this into
> > > > >>
> > > > >> account
> > > > >>
> > > > >> when
> > > > >>
> > > > >> selecting a node to start a TM on?
> > > > >>
> > > > >> *(5) Watermarks and event time alignment*
> > > > >>
> > > > >>  - Watermark generation, as well as idleness, needs to
> > > > >>
> > > > >> be
> > > > >>
> > > > >> per
> > > > >>
> > > > >> split
> > > > >>
> > > > >> (like
> > > > >>
> > > > >> currently in the Kafka Source, per partition)
> > > > >>  - It is desirable to support optional
> > > > >>
> > > > >> event-time-alignment,
> > > > >>
> > > > >> meaning
> > > > >>
> > > > >> that
> > > > >>
> > > > >> splits that are ahead are back-pressured or temporarily
> > > > >>
> > > > >> unsubscribed
> > > > >>
> > > > >>  - I think i would be desirable to encapsulate
> > > > >>
> > > > >> watermark
> > > > >>
> > > > >> generation
> > > > >>
> > > > >> logic
> > > > >>
> > > > >> in watermark generators, for a separation of concerns.
> > > > >>
> > > > >> The
> > > > >>
> > > > >> watermark
> > > > >>
> > > > >> generators should run per split.
> > > > >>  - Using watermark generators would also help with
> > > > >>
> > > > >> another
> > > > >>
> > > > >> problem of
> > > > >>
> > > > >> the
> > > > >>
> > > > >> suggested interface, namely supporting non-periodic
> > > > >>
> > > > >> watermarks
> > > > >>
> > > > >> efficiently.
> > > > >>
> > > > >>  - Need a way to "dispatch" next record to different
> > > > >>
> > > > >> watermark
> > > > >>
> > > > >> generators
> > > > >>
> > > > >>  - Need a way to tell SplitReader to "suspend" a split
> > > > >>
> > > > >> until a
> > > > >>
> > > > >> certain
> > > > >>
> > > > >> watermark is reached (event time backpressure)
> > > > >>  - This would in fact be not needed (and thus simpler)
> > > > >>
> > > > >> if
> > > > >>
> > > > >> we
> > > > >>
> > > > >> had
> > > > >>
> > > > >> a
> > > > >>
> > > > >> SplitReader per split and may be a reason to re-open
> > > > >>
> > > > >> that
> > > > >>
> > > > >> discussion
> > > > >>
> > > > >> *(6) Watermarks across splits and in the Split
> > > > >>
> > > > >> Enumerator*
> > > > >>
> > > > >>  - The split enumerator may need some watermark
> > > > >>
> > > > >> awareness,
> > > > >>
> > > > >> which
> > > > >>
> > > > >> should
> > > > >>
> > > > >> be
> > > > >>
> > > > >> purely based on split metadata (like create timestamp
> > > > >>
> > > > >> of
> > > > >>
> > > > >> file
> > > > >>
> > > > >> splits)
> > > > >>
> > > > >>  - If there are still more splits with overlapping
> > > > >>
> > > > >> event
> > > > >>
> > > > >> time
> > > > >>
> > > > >> range
> > > > >>
> > > > >> for
> > > > >>
> > > > >> a
> > > > >>
> > > > >> split reader, then that split reader should not advance
> > > > >>
> > > > >> the
> > > > >>
> > > > >> watermark
> > > > >>
> > > > >> within the split beyond the overlap boundary. Otherwise
> > > > >>
> > > > >> future
> > > > >>
> > > > >> splits
> > > > >>
> > > > >> will
> > > > >>
> > > > >> produce late data.
> > > > >>
> > > > >>  - One way to approach this could be that the split
> > > > >>
> > > > >> enumerator
> > > > >>
> > > > >> may
> > > > >>
> > > > >> send
> > > > >>
> > > > >> watermarks to the readers, and the readers cannot emit
> > > > >>
> > > > >> watermarks
> > > > >>
> > > > >> beyond
> > > > >>
> > > > >> that received watermark.
> > > > >>  - Many split enumerators would simply immediately send
> > > > >>
> > > > >> Long.MAX
> > > > >>
> > > > >> out
> > > > >>
> > > > >> and
> > > > >>
> > > > >> leave the progress purely to the split readers.
> > > > >>
> > > > >>  - For event-time alignment / split back pressure, this
> > > > >>
> > > > >> begs
> > > > >>
> > > > >> the
> > > > >>
> > > > >> question
> > > > >>
> > > > >> how we can avoid deadlocks that may arise when splits
> > > > >>
> > > > >> are
> > > > >>
> > > > >> suspended
> > > > >>
> > > > >> for
> > > > >>
> > > > >> event time back pressure,
> > > > >>
> > > > >> *(7) Batch and streaming Unification*
> > > > >>
> > > > >>  - Functionality wise, the above design should support
> > > > >>
> > > > >> both
> > > > >>
> > > > >>  - Batch often (mostly) does not care about reading "in
> > > > >>
> > > > >> order"
> > > > >>
> > > > >> and
> > > > >>
> > > > >> generating watermarks
> > > > >>    --> Might use different enumerator logic that is
> > > > >>
> > > > >> more
> > > > >>
> > > > >> locality
> > > > >>
> > > > >> aware
> > > > >>
> > > > >> and ignores event time order
> > > > >>    --> Does not generate watermarks
> > > > >>  - Would be great if bounded sources could be
> > > > >>
> > > > >> identified
> > > > >>
> > > > >> at
> > > > >>
> > > > >> compile
> > > > >>
> > > > >> time,
> > > > >>
> > > > >> so that "env.addBoundedSource(...)" is type safe and
> > > > >>
> > > > >> can
> > > > >>
> > > > >> return a
> > > > >>
> > > > >> "BoundedDataStream".
> > > > >>  - Possible to defer this discussion until later
> > > > >>
> > > > >> *Miscellaneous Comments*
> > > > >>
> > > > >>  - Should the source have a TypeInformation for the
> > > > >>
> > > > >> produced
> > > > >>
> > > > >> type,
> > > > >>
> > > > >> instead
> > > > >>
> > > > >> of a serializer? We need a type information in the
> > > > >>
> > > > >> stream
> > > > >>
> > > > >> anyways, and
> > > > >>
> > > > >> can
> > > > >>
> > > > >> derive the serializer from that. Plus, creating the
> > > > >>
> > > > >> serializer
> > > > >>
> > > > >> should
> > > > >>
> > > > >> respect the ExecutionConfig.
> > > > >>
> > > > >>  - The TypeSerializer interface is very powerful but
> > > > >>
> > > > >> also
> > > > >>
> > > > >> not
> > > > >>
> > > > >> easy to
> > > > >>
> > > > >> implement. Its purpose is to handle data super
> > > > >>
> > > > >> efficiently,
> > > > >>
> > > > >> support
> > > > >>
> > > > >> flexible ways of evolution, etc.
> > > > >>  For metadata I would suggest to look at the
> > > > >>
> > > > >> SimpleVersionedSerializer
> > > > >>
> > > > >> instead, which is used for example for checkpoint
> > > > >>
> > > > >> master
> > > > >>
> > > > >> hooks,
> > > > >>
> > > > >> or for
> > > > >>
> > > > >> the
> > > > >>
> > > > >> streaming file sink. I think that is is a good match
> > > > >>
> > > > >> for
> > > > >>
> > > > >> cases
> > > > >>
> > > > >> where
> > > > >>
> > > > >> we
> > > > >>
> > > > >> do
> > > > >>
> > > > >> not need more than ser/deser (no copy, etc.) and don't
> > > > >>
> > > > >> need to
> > > > >>
> > > > >> push
> > > > >>
> > > > >> versioning out of the serialization paths for best
> > > > >>
> > > > >> performance
> > > > >>
> > > > >> (as in
> > > > >>
> > > > >> the
> > > > >>
> > > > >> TypeSerializer)
> > > > >>
> > > > >>
> > > > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> > > > k.klou...@data-artisans.com>
> > > > >> wrote:
> > > > >>
> > > > >>
> > > > >> Hi Biao,
> > > > >>
> > > > >> Thanks for the answer!
> > > > >>
> > > > >> So given the multi-threaded readers, now we have as
> > > > >>
> > > > >> open
> > > > >>
> > > > >> questions:
> > > > >>
> > > > >> 1) How do we let the checkpoints pass through our
> > > > >>
> > > > >> multi-threaded
> > > > >>
> > > > >> reader
> > > > >>
> > > > >> operator?
> > > > >>
> > > > >> 2) Do we have separate reader and source operators or
> > > > >>
> > > > >> not? In
> > > > >>
> > > > >> the
> > > > >>
> > > > >> strategy
> > > > >>
> > > > >> that has a separate source, the source operator has a
> > > > >>
> > > > >> parallelism of
> > > > >>
> > > > >> 1
> > > > >>
> > > > >> and
> > > > >>
> > > > >> is responsible for split recovery only.
> > > > >>
> > > > >> For the first one, given also the constraints
> > > > >>
> > > > >> (blocking,
> > > > >>
> > > > >> finite
> > > > >>
> > > > >> queues,
> > > > >>
> > > > >> etc), I do not have an answer yet.
> > > > >>
> > > > >> For the 2nd, I think that we should go with separate
> > > > >>
> > > > >> operators
> > > > >>
> > > > >> for
> > > > >>
> > > > >> the
> > > > >>
> > > > >> source and the readers, for the following reasons:
> > > > >>
> > > > >> 1) This is more aligned with a potential future
> > > > >>
> > > > >> improvement
> > > > >>
> > > > >> where the
> > > > >>
> > > > >> split
> > > > >>
> > > > >> discovery becomes a responsibility of the JobManager
> > > > >>
> > > > >> and
> > > > >>
> > > > >> readers are
> > > > >>
> > > > >> pooling more work from the JM.
> > > > >>
> > > > >> 2) The source is going to be the "single point of
> > > > >>
> > > > >> truth".
> > > > >>
> > > > >> It
> > > > >>
> > > > >> will
> > > > >>
> > > > >> know
> > > > >>
> > > > >> what
> > > > >>
> > > > >> has been processed and what not. If the source and the
> > > > >>
> > > > >> readers
> > > > >>
> > > > >> are a
> > > > >>
> > > > >> single
> > > > >>
> > > > >> operator with parallelism > 1, or in general, if the
> > > > >>
> > > > >> split
> > > > >>
> > > > >> discovery
> > > > >>
> > > > >> is
> > > > >>
> > > > >> done by each task individually, then:
> > > > >>   i) we have to have a deterministic scheme for each
> > > > >>
> > > > >> reader to
> > > > >>
> > > > >> assign
> > > > >>
> > > > >> splits to itself (e.g. mod subtaskId). This is not
> > > > >>
> > > > >> necessarily
> > > > >>
> > > > >> trivial
> > > > >>
> > > > >> for
> > > > >>
> > > > >> all sources.
> > > > >>   ii) each reader would have to keep a copy of all its
> > > > >>
> > > > >> processed
> > > > >>
> > > > >> slpits
> > > > >>
> > > > >>   iii) the state has to be a union state with a
> > > > >>
> > > > >> non-trivial
> > > > >>
> > > > >> merging
> > > > >>
> > > > >> logic
> > > > >>
> > > > >> in order to support rescaling.
> > > > >>
> > > > >> Two additional points that you raised above:
> > > > >>
> > > > >> i) The point that you raised that we need to keep all
> > > > >>
> > > > >> splits
> > > > >>
> > > > >> (processed
> > > > >>
> > > > >> and
> > > > >>
> > > > >> not-processed) I think is a bit of a strong
> > > > >>
> > > > >> requirement.
> > > > >>
> > > > >> This
> > > > >>
> > > > >> would
> > > > >>
> > > > >> imply
> > > > >>
> > > > >> that for infinite sources the state will grow
> > > > >>
> > > > >> indefinitely.
> > > > >>
> > > > >> This is
> > > > >>
> > > > >> problem
> > > > >>
> > > > >> is even more pronounced if we do not have a single
> > > > >>
> > > > >> source
> > > > >>
> > > > >> that
> > > > >>
> > > > >> assigns
> > > > >>
> > > > >> splits to readers, as each reader will have its own
> > > > >>
> > > > >> copy
> > > > >>
> > > > >> of
> > > > >>
> > > > >> the
> > > > >>
> > > > >> state.
> > > > >>
> > > > >> ii) it is true that for finite sources we need to
> > > > >>
> > > > >> somehow
> > > > >>
> > > > >> not
> > > > >>
> > > > >> close
> > > > >>
> > > > >> the
> > > > >>
> > > > >> readers when the source/split discoverer finishes. The
> > > > >> ContinuousFileReaderOperator has a work-around for
> > > > >>
> > > > >> that.
> > > > >>
> > > > >> It is
> > > > >>
> > > > >> not
> > > > >>
> > > > >> elegant,
> > > > >>
> > > > >> and checkpoints are not emitted after closing the
> > > > >>
> > > > >> source,
> > > > >>
> > > > >> but
> > > > >>
> > > > >> this, I
> > > > >>
> > > > >> believe, is a bigger problem which requires more
> > > > >>
> > > > >> changes
> > > > >>
> > > > >> than
> > > > >>
> > > > >> just
> > > > >>
> > > > >> refactoring the source interface.
> > > > >>
> > > > >> Cheers,
> > > > >> Kostas
> > > > >>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>

Reply via email to