Repository: kafka
Updated Branches:
  refs/heads/trunk a7ab9cb83 -> 2a660f13d


KAFKA-2311; Make KafkaConsumer's ensureNotClosed method thread-safe

Here is the patch on github ijuma.

Acquiring the consumer lock (the single thread access controls) requires that 
the consumer be open. I changed the closed variable to be volatile so that 
another thread's writes will visible to the reading thread.

Additionally, there was an additional check if the consumer was closed after 
the lock was acquired. This check is no longer necessary.

This is my original work and I license it to the project under the project's 
open source license.

Author: Tim Brooks <t...@uncontended.net>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #1637 from tbrooks8/KAFKA-2311


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a660f13
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a660f13
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a660f13

Branch: refs/heads/trunk
Commit: 2a660f13d2a805f0e27351996e904b1ea2365eba
Parents: a7ab9cb
Author: Tim Brooks <t...@uncontended.net>
Authored: Mon Sep 12 20:28:01 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Sep 12 20:28:01 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java     | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2a660f13/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ade4243..cfa046f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -508,7 +508,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final Metadata metadata;
     private final long retryBackoffMs;
     private final long requestTimeoutMs;
-    private boolean closed = false;
+    private volatile boolean closed = false;
 
     // currentThread holds the threadId of the current thread accessing 
KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -1397,7 +1397,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     public void close() {
         acquire();
         try {
-            if (closed) return;
             close(false);
         } finally {
             release();

Reply via email to