[ 
https://issues.apache.org/jira/browse/FLINK-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333922#comment-16333922
 ] 

ASF GitHub Bot commented on FLINK-8409:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5329

    [FLINK-8409] [kafka] Fix potential NPE in KafkaConsumerThread

    ## What is the purpose of the change
    
    This PR fixes a race condition that may lead to a NPE in the async 
callbacks for Kafka offset committing.
    
    The following lines in the KafkaConsumerThread::setOffsetsToCommit(...) 
suggests a race condition with the asynchronous callback from committing 
offsets to Kafka:
    
    ```
    // record the work to be committed by the main consumer thread and make 
sure the consumer notices that
    if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
        log.warn("Committing offsets to Kafka takes longer than the checkpoint 
interval. " +
            "Skipping commit of previous offsets because newer complete 
checkpoint offsets are available. " +
            "This does not compromise Flink's checkpoint integrity.");
    }
    this.offsetCommitCallback = commitCallback;
    ```
    
    In the main consumer thread's main loop, `nextOffsetsToCommit` will be 
checked if there are any offsets to commit. If so, an asynchronous offset 
commit operation will be performed. The NPE happens in the case when the commit 
completes, but `this.offsetCommitCallback = commitCallback;` is not yet reached.
    
    This PR fixes this by making setting the `commitCallback` and 
`nextOffsetsToCommit` an atomic operation.
    
    ## Verifying this change
    
    No new tests were added.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-8409

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5329.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5329
    
----
commit c8081acaf51b3407af7d425063572f03a68021ac
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-13T12:13:20Z

    [FLINK-8409] [kafka] Fix offset committing race condition in 
KafkaConsumerThread

commit a1bf7af3a80bbbb6871b1177cb58c127c94bd75c
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-13T12:35:03Z

    [hotfix] [kafka] Make AbortedReassignmentException a static class

----


> Race condition in KafkaConsumerThread leads to potential NPE
> ------------------------------------------------------------
>
>                 Key: FLINK-8409
>                 URL: https://issues.apache.org/jira/browse/FLINK-8409
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0, 1.3.2, 1.5.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}} 
> suggests a race condition with the asynchronous callback from committing 
> offsets to Kafka:
> {code}
> // record the work to be committed by the main consumer thread and make sure 
> the consumer notices that
> if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
>     log.warn("Committing offsets to Kafka takes longer than the checkpoint 
> interval. " +
>         "Skipping commit of previous offsets because newer complete 
> checkpoint offsets are available. " +
>         "This does not compromise Flink's checkpoint integrity.");
> }
> this.offsetCommitCallback = commitCallback;
> {code}
> In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be 
> checked if there are any offsets to commit. If so, an asynchronous offset 
> commit operation will be performed. The NPE happens in the case when the 
> commit completes, but {{this.offsetCommitCallback = commitCallback;}} is not 
> yet reached.
> A possible fix is to make setting the next offsets to commit along with the 
> callback instance a single atomic operation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to