Hi Dong,

I think I am beginning to understand your idea. Since SourceReaderBase
is marked as PublicEvolving can you also update the FLIP with the
changes you want to make to it? Ideally, connector developers do not
have to change their SourceReaders to implement this new logic.

My idea was to introduce a second source interface that extends the
existing interface and offers only the method getRecordEvaluator().
The record evaluator is still passed as you have described through the
builder and at the end held by the source object. This way the source
framework can automatically use the evaluator without the need that
connector developers have to implement the complicated stopping logic
or change their SourceReaders.

Best,
Fabian


On Wed, Jan 12, 2022 at 2:22 AM Dong Lin <lindon...@gmail.com> wrote:
>
> Hi Fabian,
>
> Thanks for the comments. Please see my reply inline.
>
> On Tue, Jan 11, 2022 at 11:46 PM Fabian Paul <fp...@apache.org> wrote:
>
> > Hi Dong,
> >
> > I wouldn't change the org.apache.flink.api.connector.source.Source
> > interface because it either breaks existing sinks or we introduce it
> > as some kind of optional. I deem both options as not great. My idea is
> > to introduce a new interface that extends the Source. This way users
> > who want to develop a source that stops with the record evaluator can
> > implement the new interface. It also has the nice benefit that we can
> > give this new type of source a lower stability guarantee than Public
> > to allow some changes.
> >
>
> Currently the eofRecodEvaluator can be passed from
> KafkaSourceBuilder/PulsarSourceBuilder
> to SingleThreadMultiplexSourceReaderBase and SourceReaderBase. This
> approach also allows developers who want to develop a source that stops
> with the record evaluator to implement the new feature. Adding a new
> interface could increase the complexity in our interface and
> infrastructure. I am not sure if it has added benefits compared to the
> existing proposal. Could you explain more?
>
> I am not very sure what "new type of source a lower stability guarantee"
> you are referring to. Could you explain more? It looks like a new feature
> not mentioned in the FLIP. If the changes proposed in this FLIP also
> support the feature you have in mind, could we discuss this in a separate
> FLIP?
>
> In the SourceOperatorFactory we can then access the record evaluator
> > from the respective sources and pass it to the source operator.
> >
> > Hopefully, this makes sense. So far I did not find information about
> > the actual stopping logic in the FLIP maybe you had something
> > different in mind.
> >
>
> By "actual stopping logic", do you mean an example implementation of the
> RecordEvalutor? I think the use-case is described in the motivation
> section, which is about a pipeline processing stock transaction data.
>
> We can support this use-case with this FLIP, by implementing this
> RecordEvaluator that stops reading data from a split when there is a
> message that says "EOF". Users can trigger this feature by sending messages
> with "EOF" in the payload to all partitions of the source Kafka topic.
>
> Does this make sense?
>
>
> >
> > Best,
> > Fabian
> >
> > On Tue, Jan 11, 2022 at 1:40 AM Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > Hi Fabian,
> > >
> > > Thanks for the comments!
> > >
> > > By "add a source mixin interface", are you suggesting to update
> > > the org.apache.flink.api.connector.source.Source interface to add the API
> > > "RecordEvaluator<T> getRecordEvaluator()"? If so, it seems to add more
> > > public API and thus more complexity than the solution in the FLIP. Could
> > > you help explain more about the benefits of doing this?
> > >
> > > Regarding the 2nd question, I think this FLIP does not change whether
> > > sources are treated as bounded or unbounded. For example, the
> > KafkaSource's
> > > boundedness will continue to be determined with the API
> > > KafkaSourceBuilder::setBounded(..) and
> > > KafkaSourceBuilder::setUnbounded(..). Does this answer your question?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Jan 10, 2022 at 8:01 PM Fabian Paul <fp...@apache.org> wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thank you for updating the FLIP and making it applicable for all
> > > > sources. I am a bit unsure about the implementation part. I would
> > > > propose to add a source mixin interface that implements
> > > > `getRecordEvaluator` and sources that want to allow dynamically
> > > > stopping implement that interface.
> > > >
> > > > Another question I had was how do we treat sources using the record
> > > > evaluator as bounded or unbounded?
> > > >
> > > > Best,
> > > > Fabian
> > > >
> > > > On Sat, Jan 8, 2022 at 11:52 AM Dong Lin <lindon...@gmail.com> wrote:
> > > > >
> > > > > Hi Martijn and Qingsheng,
> > > > >
> > > > > The FLIP has been updated to extend the dynamic EOF support for the
> > > > > PulsarSource. I have not extended this feature to other sources yet
> > > > since I
> > > > > am not sure it is a requirement to ensure feature consistency across
> > > > > different sources. Could you take another look?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Fri, Jan 7, 2022 at 11:49 PM Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Martijn,
> > > > > >
> > > > > > Thanks for the comments! In general I agree we should avoid feature
> > > > > > sparsity.
> > > > > >
> > > > > > In this particular case, connectors are a bit different than most
> > other
> > > > > > features in Flink. AFAIK, we plan to move connectors (including
> > Kafka
> > > > and
> > > > > > Pulsar) out of the Flink project in the future, which means that
> > the
> > > > > > development of these connectors will be mostly de-centralized
> > (outside
> > > > of
> > > > > > Flink) and be up to their respective maintainers. While I agree
> > that we
> > > > > > should provide API/infrastructure in Flink (as this FLIP does) to
> > > > support
> > > > > > feature consistency across connectors, I am not sure we should own
> > the
> > > > > > responsibility to actually update all connectors to achieve feature
> > > > > > consistency, given that we don't plan to do it in Flink anyway due
> > to
> > > > its
> > > > > > heavy burden.
> > > > > >
> > > > > > With that being said, I am happy to follow the community guideline
> > if
> > > > we
> > > > > > decide that connector-related FLIP should update every connector's
> > API
> > > > to
> > > > > > ensure feature consistency (to a reasonable extent). For example,
> > in
> > > > this
> > > > > > particular case, it looks like the EOF-detection feature can be
> > > > applied to
> > > > > > every connector (including bounded sources). Is it still
> > sufficient to
> > > > just
> > > > > > update Kafka, Pulsar and Kinesis?
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > > On Tue, Jan 4, 2022 at 3:31 PM Martijn Visser <
> > mart...@ververica.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Dong,
> > > > > >>
> > > > > >> Thanks for writing the FLIP. It focusses only on the KafkaSource,
> > but
> > > > I
> > > > > >> would expect that if such a functionality is desired, it should be
> > > > made
> > > > > >> available for all unbounded sources (for example, Pulsar and
> > > > Kinesis). If
> > > > > >> it's only available for Kafka, I see it as if we're increasing
> > feature
> > > > > >> sparsity while we actually want to decrease that. What do you
> > think?
> > > > > >>
> > > > > >> Best regards,
> > > > > >>
> > > > > >> Martijn
> > > > > >>
> > > > > >> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > > >>
> > > > > >> > Hi all,
> > > > > >> >
> > > > > >> > We created FLIP-208: Update KafkaSource to detect EOF based on
> > > > > >> > de-serialized records. Please find the KIP wiki in the link
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records
> > > > > >> > .
> > > > > >> >
> > > > > >> > This FLIP aims to address the use-case where users need to stop
> > a
> > > > Flink
> > > > > >> job
> > > > > >> > gracefully based on the content of de-serialized records
> > observed
> > > > in the
> > > > > >> > KafkaSource. This feature is needed by users who currently
> > depend on
> > > > > >> > KafkaDeserializationSchema::isEndOfStream() to migrate their
> > Flink
> > > > job
> > > > > >> from
> > > > > >> > FlinkKafkaConsumer to KafkaSource.
> > > > > >> >
> > > > > >> > Could you help review this FLIP when you get time? Your
> > comments are
> > > > > >> > appreciated!
> > > > > >> >
> > > > > >> > Cheers,
> > > > > >> > Dong
> > > > > >> >
> > > > > >>
> > > > > >
> > > >
> >

Reply via email to