Hi Qingsheng,

Thanks for the comment!

After double checking the solution of passing recordEvaluator via
env.fromSource(...),
I realized that this approach does not automatically bring the feature for
all sources. We would still need to update the implementation of every
source in order to pass recordEvaluator from DataStreamSource to
SourceReaderBase. Could you check whether this is the case?

Now we have two options under discussion. One option is to pass
recordEvaluator via XXXSourceBuilder for each connector. And the other
option is to pass recordEvaluator via env.fromSource(...). Suppose the
amount of the implementation overhead is similar (e.g. we need to change
implementation of every source), it seems that the first option is slightly
better. This is because most existing parameters of a source, such as
boundedness and deserializers, are passed to XXXSourceBuilder directly. It
might be better to follow the existing paradigm.

What do you think?

Thanks,
Dong


On Wed, Jan 5, 2022 at 5:33 PM Qingsheng Ren <renqs...@gmail.com> wrote:

> Hi Dong,
>
> Thanks for making this FLIP. I share the same concern with Martijn. This
> looks like a feature that could be shared across all sources so I think
> it’ll be great to make it a general one.
>
> Instead of passing the RecordEvaluator to SourceReaderBase, what about
> embedding the evaluator into SourceOperator? We can create a wrapper
> SourceOutput in SourceOperator and intercept records emitted by
> SourceReader. This could make this feature individual from implementation
>  of SourceReader so it's applicable for all sources. The API to users looks
> like:
>
> env.fromSource(source, watermarkStrategy, name, recordEvaluator)
>
> or
>
> env.fromSource(…).withRecordEvaluator(evaluator)
>
> What do you think?
>
> Best regards,
>
> Qingsheng
>
> > On Jan 4, 2022, at 3:31 PM, Martijn Visser <mart...@ververica.com>
> wrote:
> >
> > Hi Dong,
> >
> > Thanks for writing the FLIP. It focusses only on the KafkaSource, but I
> > would expect that if such a functionality is desired, it should be made
> > available for all unbounded sources (for example, Pulsar and Kinesis). If
> > it's only available for Kafka, I see it as if we're increasing feature
> > sparsity while we actually want to decrease that. What do you think?
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com> wrote:
> >
> >> Hi all,
> >>
> >> We created FLIP-208: Update KafkaSource to detect EOF based on
> >> de-serialized records. Please find the KIP wiki in the link
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records
> >> .
> >>
> >> This FLIP aims to address the use-case where users need to stop a Flink
> job
> >> gracefully based on the content of de-serialized records observed in the
> >> KafkaSource. This feature is needed by users who currently depend on
> >> KafkaDeserializationSchema::isEndOfStream() to migrate their Flink job
> from
> >> FlinkKafkaConsumer to KafkaSource.
> >>
> >> Could you help review this FLIP when you get time? Your comments are
> >> appreciated!
> >>
> >> Cheers,
> >> Dong
> >>
>
>

Reply via email to