Hi Martijn,

Thank you for the comments. Please find my reply inline.

On Wed, Jan 12, 2022 at 3:07 AM Martijn Visser <mart...@ververica.com>
wrote:

> Hi Dong,
>
> Thanks for updating the FLIP and including Pulsar. I was indeed referring
> that we should have a generic interface that allows connector maintainers
> to implement this capability if they think it should be supported.
>

We are on the same page :)


>
> Could you see a feature like this also be useful for a connector like
> FileSystem?
>

Regarding the use-case for eofRecordEvaluator for a connector like
FileSystem, here is one fabricat use-case: Users want to stop processing
data from a FileSystem source when the data schema is found changed or
there is abnormal data, in order to stop emitting abnormal data to
downstream sink.

Since this is just a fabricated use-case and we agree the development of
particular connectors should be left to their maintainers, we won't support
eofRecordEvaluator with FileSystem connector in this FLIP.


>
> Best regards,
>
> Martijn
>
> On Tue, 11 Jan 2022 at 16:47, Fabian Paul <fp...@apache.org> wrote:
>
> > Hi Dong,
> >
> > I wouldn't change the org.apache.flink.api.connector.source.Source
> > interface because it either breaks existing sinks or we introduce it
> > as some kind of optional. I deem both options as not great. My idea is
> > to introduce a new interface that extends the Source. This way users
> > who want to develop a source that stops with the record evaluator can
> > implement the new interface. It also has the nice benefit that we can
> > give this new type of source a lower stability guarantee than Public
> > to allow some changes.
> > In the SourceOperatorFactory we can then access the record evaluator
> > from the respective sources and pass it to the source operator.
> >
> > Hopefully, this makes sense. So far I did not find information about
> > the actual stopping logic in the FLIP maybe you had something
> > different in mind.
> >
> > Best,
> > Fabian
> >
> > On Tue, Jan 11, 2022 at 1:40 AM Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > Hi Fabian,
> > >
> > > Thanks for the comments!
> > >
> > > By "add a source mixin interface", are you suggesting to update
> > > the org.apache.flink.api.connector.source.Source interface to add the
> API
> > > "RecordEvaluator<T> getRecordEvaluator()"? If so, it seems to add more
> > > public API and thus more complexity than the solution in the FLIP.
> Could
> > > you help explain more about the benefits of doing this?
> > >
> > > Regarding the 2nd question, I think this FLIP does not change whether
> > > sources are treated as bounded or unbounded. For example, the
> > KafkaSource's
> > > boundedness will continue to be determined with the API
> > > KafkaSourceBuilder::setBounded(..) and
> > > KafkaSourceBuilder::setUnbounded(..). Does this answer your question?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Jan 10, 2022 at 8:01 PM Fabian Paul <fp...@apache.org> wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thank you for updating the FLIP and making it applicable for all
> > > > sources. I am a bit unsure about the implementation part. I would
> > > > propose to add a source mixin interface that implements
> > > > `getRecordEvaluator` and sources that want to allow dynamically
> > > > stopping implement that interface.
> > > >
> > > > Another question I had was how do we treat sources using the record
> > > > evaluator as bounded or unbounded?
> > > >
> > > > Best,
> > > > Fabian
> > > >
> > > > On Sat, Jan 8, 2022 at 11:52 AM Dong Lin <lindon...@gmail.com>
> wrote:
> > > > >
> > > > > Hi Martijn and Qingsheng,
> > > > >
> > > > > The FLIP has been updated to extend the dynamic EOF support for the
> > > > > PulsarSource. I have not extended this feature to other sources yet
> > > > since I
> > > > > am not sure it is a requirement to ensure feature consistency
> across
> > > > > different sources. Could you take another look?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Fri, Jan 7, 2022 at 11:49 PM Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Martijn,
> > > > > >
> > > > > > Thanks for the comments! In general I agree we should avoid
> feature
> > > > > > sparsity.
> > > > > >
> > > > > > In this particular case, connectors are a bit different than most
> > other
> > > > > > features in Flink. AFAIK, we plan to move connectors (including
> > Kafka
> > > > and
> > > > > > Pulsar) out of the Flink project in the future, which means that
> > the
> > > > > > development of these connectors will be mostly de-centralized
> > (outside
> > > > of
> > > > > > Flink) and be up to their respective maintainers. While I agree
> > that we
> > > > > > should provide API/infrastructure in Flink (as this FLIP does) to
> > > > support
> > > > > > feature consistency across connectors, I am not sure we should
> own
> > the
> > > > > > responsibility to actually update all connectors to achieve
> feature
> > > > > > consistency, given that we don't plan to do it in Flink anyway
> due
> > to
> > > > its
> > > > > > heavy burden.
> > > > > >
> > > > > > With that being said, I am happy to follow the community
> guideline
> > if
> > > > we
> > > > > > decide that connector-related FLIP should update every
> connector's
> > API
> > > > to
> > > > > > ensure feature consistency (to a reasonable extent). For example,
> > in
> > > > this
> > > > > > particular case, it looks like the EOF-detection feature can be
> > > > applied to
> > > > > > every connector (including bounded sources). Is it still
> > sufficient to
> > > > just
> > > > > > update Kafka, Pulsar and Kinesis?
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > > On Tue, Jan 4, 2022 at 3:31 PM Martijn Visser <
> > mart...@ververica.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Dong,
> > > > > >>
> > > > > >> Thanks for writing the FLIP. It focusses only on the
> KafkaSource,
> > but
> > > > I
> > > > > >> would expect that if such a functionality is desired, it should
> be
> > > > made
> > > > > >> available for all unbounded sources (for example, Pulsar and
> > > > Kinesis). If
> > > > > >> it's only available for Kafka, I see it as if we're increasing
> > feature
> > > > > >> sparsity while we actually want to decrease that. What do you
> > think?
> > > > > >>
> > > > > >> Best regards,
> > > > > >>
> > > > > >> Martijn
> > > > > >>
> > > > > >> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > > >>
> > > > > >> > Hi all,
> > > > > >> >
> > > > > >> > We created FLIP-208: Update KafkaSource to detect EOF based on
> > > > > >> > de-serialized records. Please find the KIP wiki in the link
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records
> > > > > >> > .
> > > > > >> >
> > > > > >> > This FLIP aims to address the use-case where users need to
> stop
> > a
> > > > Flink
> > > > > >> job
> > > > > >> > gracefully based on the content of de-serialized records
> > observed
> > > > in the
> > > > > >> > KafkaSource. This feature is needed by users who currently
> > depend on
> > > > > >> > KafkaDeserializationSchema::isEndOfStream() to migrate their
> > Flink
> > > > job
> > > > > >> from
> > > > > >> > FlinkKafkaConsumer to KafkaSource.
> > > > > >> >
> > > > > >> > Could you help review this FLIP when you get time? Your
> > comments are
> > > > > >> > appreciated!
> > > > > >> >
> > > > > >> > Cheers,
> > > > > >> > Dong
> > > > > >> >
> > > > > >>
> > > > > >
> > > >
> >
>

Reply via email to