Hi Dong, 

Thanks for making this FLIP. I share the same concern with Martijn. This looks 
like a feature that could be shared across all sources so I think it’ll be 
great to make it a general one. 

Instead of passing the RecordEvaluator to SourceReaderBase, what about 
embedding the evaluator into SourceOperator? We can create a wrapper 
SourceOutput in SourceOperator and intercept records emitted by SourceReader. 
This could make this feature individual from implementation   of SourceReader 
so it's applicable for all sources. The API to users looks like:

env.fromSource(source, watermarkStrategy, name, recordEvaluator)

or

env.fromSource(…).withRecordEvaluator(evaluator)

What do you think?

Best regards, 

Qingsheng

> On Jan 4, 2022, at 3:31 PM, Martijn Visser <mart...@ververica.com> wrote:
> 
> Hi Dong,
> 
> Thanks for writing the FLIP. It focusses only on the KafkaSource, but I
> would expect that if such a functionality is desired, it should be made
> available for all unbounded sources (for example, Pulsar and Kinesis). If
> it's only available for Kafka, I see it as if we're increasing feature
> sparsity while we actually want to decrease that. What do you think?
> 
> Best regards,
> 
> Martijn
> 
> On Tue, 4 Jan 2022 at 08:04, Dong Lin <lindon...@gmail.com> wrote:
> 
>> Hi all,
>> 
>> We created FLIP-208: Update KafkaSource to detect EOF based on
>> de-serialized records. Please find the KIP wiki in the link
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records
>> .
>> 
>> This FLIP aims to address the use-case where users need to stop a Flink job
>> gracefully based on the content of de-serialized records observed in the
>> KafkaSource. This feature is needed by users who currently depend on
>> KafkaDeserializationSchema::isEndOfStream() to migrate their Flink job from
>> FlinkKafkaConsumer to KafkaSource.
>> 
>> Could you help review this FLIP when you get time? Your comments are
>> appreciated!
>> 
>> Cheers,
>> Dong
>> 

Reply via email to