Hi Till,

>However, this will stall the whole reading process if there is a partition 
>which has no more data. Hence, you will probably also need a mechanism to 
>advance the watermark if the partition becomes idle.
This is why I need to find out is partition idle. Looks like Kafka Flink 
Connector definitely has this information,  looks like derived class 
KafkaTopicPartitionStateWithWatermarkGenerator has immediateOutput and 
deferredOutput have field state which has idle flag.

Thank you for information about new KafkaConnector, I assume that you are 
referring to [1], but it seems also stalled. Or you are talking about different 
task ?

[1]-https://issues.apache.org/jira/browse/FLINK-18450
[FLINK-18450] Add watermark alignment logic to SourceReaderBase. - ASF 
JIRA<https://issues.apache.org/jira/browse/FLINK-18450>
trigger comment-preview_link fieldId comment fieldName Comment rendererType 
atlassian-wiki-renderer issueKey FLINK-18450 Preview comment
issues.apache.org

Thanks,
Alexey
________________________________
From: Till Rohrmann <trohrm...@apache.org>
Sent: Tuesday, June 1, 2021 6:24 AM
To: Alexey Trenikhun <yen...@msn.com>
Cc: Flink User Mail List <user@flink.apache.org>
Subject: Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

Hi Alexey,

looking at KafkaTopicPartitionStatus, it looks that it does not contain this 
information. In a nutshell, what you probably have to do is to aggregate the 
watermarks across all partitions and then pause the consumption of a partition 
if its watermark advances too much wrt to the minimum watermark. However, this 
will stall the whole reading process if there is a partition which has no more 
data. Hence, you will probably also need a mechanism to advance the watermark 
if the partition becomes idle.

Note that the community is currently working on a new KafkaConnector based on 
Flink's new source API (FLIP-27). If I am not mistaken, then these new 
interfaces should eventually also support event time alignment.

Cheers,
Till

On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun 
<yen...@msn.com<mailto:yen...@msn.com>> wrote:
Hello,
I'm thinking about implementing custom Kafka connector which provides event 
alignment (similar to FLINK-10921, which seems abandoned). What is the way to 
determine is partition is idle from override of 
AbstractFetcher.emitRecordsWithTimestamps()? Does KafkaTopicPartitionState has 
this information ?

Thanks,
Alexey

Reply via email to