[ https://issues.apache.org/jira/browse/FLINK-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241455#comment-16241455 ]
Tzu-Li (Gordon) Tai edited comment on FLINK-8001 at 11/7/17 4:36 AM: --------------------------------------------------------------------- Seems like we have to check that all the per-partition watermark emitters within the Kafka consumer implements the idleness logic correctly, such that we emit the {{IDLE}} marker instead of the {{Long.MAX_VALUE}} watermark. I'll try to see if I can reuse the {{StatusWatermarkValve}} in the consumer. was (Author: tzulitai): Seems like we have to check that all the per-partition watermark logic within the Kafka consumer implements the idleness logic correctly. I'll try to see if I can reuse the {{StatusWatermarkValve}} in the consumer. > Mark Kafka Consumer as idle if it doesn't have partitions > --------------------------------------------------------- > > Key: FLINK-8001 > URL: https://issues.apache.org/jira/browse/FLINK-8001 > Project: Flink > Issue Type: Bug > Reporter: Aljoscha Krettek > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > In Flink 1.3.x the Kafka Consumer will emit a {{Long.MAX_VALUE}} watermark if > it has zero partitions assigned. If this happens and other parallel instances > of the Kafka Consumer are marked as idle (which currently never happens by > default but does happen in custom forks of our Kafka code) this means that > the watermark jumps to {{Long.MAX_VALUE}} downstream. > In Flink 1.4.x this happens implicitly in the {{PeriodicWatermarkEmitter}} in > {{AbstractFetcher}} where the watermark is {{Long.MAX_VALUE}} if we don't > have any partitions. This should be changed to mark the source as idle > instead, if we don't have any partitions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)