[ 
https://issues.apache.org/jira/browse/FLINK-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241891#comment-16241891
 ] 

ASF GitHub Bot commented on FLINK-8001:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/4967

    [FLINK-8001] [kafka] Prevent PeriodicWatermarkEmitter from violating …

    ## What is the purpose of the change
    
    Prior to this PR, a bug exists such that if a Kafka consumer subtask 
initially marks itself as idle because it didn't have any partitions to 
subscribe to, that idleness status will be violated when the 
`PeriodicWatermarkEmitter` is fired.
    
    The problem is that the PeriodicWatermarkEmitter incorrectly yields a 
`Long.MAX_VALUE` watermark even when there are no partitions to subscribe to. 
This commit fixes this by additionally ensuring that the aggregated watermark 
in the `PeriodicWatermarkEmitter` is an effective one (i.e., is really 
aggregated from some partition).
    
    ## Brief change log
    
    Only contains one commit, which addresses the above described issue.
    
    ## Verifying this change
    
    A new test is added to `AbstractFetcherTest`, that verifies when there is 
no subscribed partitions and the periodic watermark emitter fires, no watermark 
is emitted.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-8001

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4967.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4967
    
----
commit d81d685d57625f9aac44f721f4dc993432ef0399
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-11-07T11:35:33Z

    [FLINK-8001] [kafka] Prevent PeriodicWatermarkEmitter from violating IDLE 
status
    
    Prior to this commit, a bug exists such that if a Kafka consumer subtask
    initially marks itself as idle because it didn't have any partitions to
    subscribe to, that idleness status will be violated when the
    PeriodicWatermarkEmitter is fired.
    
    The problem is that the PeriodicWatermarkEmitter incorrecty yields a
    Long.MAX_VALUE watermark even when there are no partitions to subscribe
    to. This commit fixes this by additionally ensuring that the aggregated
    watermark in the PeriodicWatermarkEmitterr is an effective one (i.e., is
    really aggregated from some partition).

----


> Mark Kafka Consumer as idle if it doesn't have partitions
> ---------------------------------------------------------
>
>                 Key: FLINK-8001
>                 URL: https://issues.apache.org/jira/browse/FLINK-8001
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Aljoscha Krettek
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> In Flink 1.3.x the Kafka Consumer will emit a {{Long.MAX_VALUE}} watermark if 
> it has zero partitions assigned. If this happens and other parallel instances 
> of the Kafka Consumer are marked as idle (which currently never happens by 
> default but does happen in custom forks of our Kafka code) this means that 
> the watermark jumps to {{Long.MAX_VALUE}} downstream.
> In Flink 1.4.x this happens implicitly in the {{PeriodicWatermarkEmitter}} in 
> {{AbstractFetcher}} where the watermark is {{Long.MAX_VALUE}} if we don't 
> have any partitions. This should be changed to mark the source as idle 
> instead, if we don't have any partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to