Repository: kafka Updated Branches: refs/heads/trunk a7ab9cb83 -> 2a660f13d
KAFKA-2311; Make KafkaConsumer's ensureNotClosed method thread-safe Here is the patch on github ijuma. Acquiring the consumer lock (the single thread access controls) requires that the consumer be open. I changed the closed variable to be volatile so that another thread's writes will visible to the reading thread. Additionally, there was an additional check if the consumer was closed after the lock was acquired. This check is no longer necessary. This is my original work and I license it to the project under the project's open source license. Author: Tim Brooks <t...@uncontended.net> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #1637 from tbrooks8/KAFKA-2311 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a660f13 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a660f13 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a660f13 Branch: refs/heads/trunk Commit: 2a660f13d2a805f0e27351996e904b1ea2365eba Parents: a7ab9cb Author: Tim Brooks <t...@uncontended.net> Authored: Mon Sep 12 20:28:01 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Sep 12 20:28:01 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2a660f13/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index ade4243..cfa046f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -508,7 +508,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private final Metadata metadata; private final long retryBackoffMs; private final long requestTimeoutMs; - private boolean closed = false; + private volatile boolean closed = false; // currentThread holds the threadId of the current thread accessing KafkaConsumer // and is used to prevent multi-threaded access @@ -1397,7 +1397,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public void close() { acquire(); try { - if (closed) return; close(false); } finally { release();