[ https://issues.apache.org/jira/browse/FLINK-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333922#comment-16333922 ]
ASF GitHub Bot commented on FLINK-8409: --------------------------------------- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5329 [FLINK-8409] [kafka] Fix potential NPE in KafkaConsumerThread ## What is the purpose of the change This PR fixes a race condition that may lead to a NPE in the async callbacks for Kafka offset committing. The following lines in the KafkaConsumerThread::setOffsetsToCommit(...) suggests a race condition with the asynchronous callback from committing offsets to Kafka: ``` // record the work to be committed by the main consumer thread and make sure the consumer notices that if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) { log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " + "This does not compromise Flink's checkpoint integrity."); } this.offsetCommitCallback = commitCallback; ``` In the main consumer thread's main loop, `nextOffsetsToCommit` will be checked if there are any offsets to commit. If so, an asynchronous offset commit operation will be performed. The NPE happens in the case when the commit completes, but `this.offsetCommitCallback = commitCallback;` is not yet reached. This PR fixes this by making setting the `commitCallback` and `nextOffsetsToCommit` an atomic operation. ## Verifying this change No new tests were added. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8409 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5329.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5329 ---- commit c8081acaf51b3407af7d425063572f03a68021ac Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-01-13T12:13:20Z [FLINK-8409] [kafka] Fix offset committing race condition in KafkaConsumerThread commit a1bf7af3a80bbbb6871b1177cb58c127c94bd75c Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-01-13T12:35:03Z [hotfix] [kafka] Make AbortedReassignmentException a static class ---- > Race condition in KafkaConsumerThread leads to potential NPE > ------------------------------------------------------------ > > Key: FLINK-8409 > URL: https://issues.apache.org/jira/browse/FLINK-8409 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0, 1.3.2, 1.5.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}} > suggests a race condition with the asynchronous callback from committing > offsets to Kafka: > {code} > // record the work to be committed by the main consumer thread and make sure > the consumer notices that > if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) { > log.warn("Committing offsets to Kafka takes longer than the checkpoint > interval. " + > "Skipping commit of previous offsets because newer complete > checkpoint offsets are available. " + > "This does not compromise Flink's checkpoint integrity."); > } > this.offsetCommitCallback = commitCallback; > {code} > In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be > checked if there are any offsets to commit. If so, an asynchronous offset > commit operation will be performed. The NPE happens in the case when the > commit completes, but {{this.offsetCommitCallback = commitCallback;}} is not > yet reached. > A possible fix is to make setting the next offsets to commit along with the > callback instance a single atomic operation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)