Repository: kafka
Updated Branches:
  refs/heads/trunk ecff8544d -> f72203ee9


KAFKA-4426; Add close with timeout for KafkaConsumer (KIP-102)

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Apurva Mehta 
<apurva.1...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #2285 from rajinisivaram/KAFKA-4426


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f72203ee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f72203ee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f72203ee

Branch: refs/heads/trunk
Commit: f72203ee9223d3b724ee67bdad9912612dd72f63
Parents: ecff854
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Wed Jan 11 15:56:44 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Jan 11 15:56:56 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java |   6 +
 .../kafka/clients/consumer/KafkaConsumer.java   |  45 +++-
 .../kafka/clients/consumer/MockConsumer.java    |   6 +
 .../consumer/internals/AbstractCoordinator.java |  46 +++-
 .../consumer/internals/ConsumerCoordinator.java |  57 ++---
 .../clients/consumer/KafkaConsumerTest.java     |  82 ++++++-
 .../internals/ConsumerCoordinatorTest.java      | 203 ++++++++++++++++-
 .../main/scala/kafka/admin/AdminClient.scala    |   2 +-
 .../kafka/api/ConsumerBounceTest.scala          | 218 ++++++++++++++++++-
 .../integration/KafkaServerTestHarness.scala    |   6 +-
 10 files changed, 603 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 18cb560..cdcab5d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 /**
@@ -171,6 +172,11 @@ public interface Consumer<K, V> extends Closeable {
     public void close();
 
     /**
+     * @see KafkaConsumer#close(long, TimeUnit)
+     */
+    public void close(long timeout, TimeUnit unit);
+
+    /**
      * @see KafkaConsumer#wakeup()
      */
     public void wakeup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 04fe789..9c1bc89 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -64,7 +65,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
-import org.apache.kafka.common.errors.InterruptException;
 
 /**
  * A client that consumes records from a Kafka cluster.
@@ -518,6 +518,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             ApiKeys.OFFSET_COMMIT,
             ApiKeys.OFFSET_FETCH,
             ApiKeys.SYNC_GROUP);
+    static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000;
 
     private final String clientId;
     private final ConsumerCoordinator coordinator;
@@ -719,7 +720,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed
             // this is to prevent resource leak. see KAFKA-2121
-            close(true);
+            close(0, true);
             // now propagate the exception
             throw new KafkaException("Failed to construct kafka consumer", t);
         }
@@ -1129,7 +1130,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
         acquire();
         try {
-            coordinator.commitOffsetsSync(offsets);
+            coordinator.commitOffsetsSync(offsets, Long.MAX_VALUE);
         } finally {
             release();
         }
@@ -1496,17 +1497,40 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     }
 
     /**
-     * Close the consumer, waiting indefinitely for any needed cleanup. If 
auto-commit is enabled, this
-     * will commit the current offsets. Note that {@link #wakeup()} cannot be 
use to interrupt close.
+     * Close the consumer, waiting for up to the default timeout of 30 seconds 
for any needed cleanup.
+     * If auto-commit is enabled, this will commit the current offsets if 
possible within the default
+     * timeout. See {@link #close(long, TimeUnit)} for details. Note that 
{@link #wakeup()}
+     * cannot be used to interrupt close.
      * 
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted
      * before or while this function is called
      */
     @Override
     public void close() {
+        close(DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Tries to close the consumer cleanly within the specified timeout. This 
method waits up to
+     * <code>timeout</code> for the consumer to complete pending commits and 
leave the group.
+     * If auto-commit is enabled, this will commit the current offsets if 
possible within the
+     * timeout. If rebalance is in progress, auto-commits and pending 
asynchronous commits may
+     * be aborted if the coordinator is not known within the timeout. If the 
consumer is unable
+     * to complete commit and leave group requests before the timeout expires, 
the consumer is
+     * force closed. Note that {@link #wakeup()} cannot be used to interrupt 
close.
+     *
+     * @param timeout The maximum time to wait for consumer to close 
gracefully. The value should be
+     *                non-negative. Specifying a timeout of zero means do not 
wait for pending requests to complete.
+     * @param timeUnit The time unit for the <code>timeout</code>
+     * @throws InterruptException If the thread is interrupted before or while 
this function is called
+     * @throws IllegalArgumentException If the <code>timeout</code> is 
negative.
+     */
+    public void close(long timeout, TimeUnit timeUnit) {
+        if (timeout < 0)
+            throw new IllegalArgumentException("The timeout cannot be 
negative.");
         acquire();
         try {
-            close(false);
+            close(timeUnit.toMillis(timeout), false);
         } finally {
             release();
         }
@@ -1532,11 +1556,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         return clusterResourceListeners;
     }
 
-    private void close(boolean swallowException) {
+    private void close(long timeoutMs, boolean swallowException) {
         log.trace("Closing the Kafka consumer.");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
         this.closed = true;
-        ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
+        try {
+            coordinator.close(Math.min(timeoutMs, requestTimeoutMs));
+        } catch (Throwable t) {
+            firstException.compareAndSet(null, t);
+            log.error("Failed to close coordinator", t);
+        }
         ClientUtils.closeQuietly(interceptors, "consumer interceptors", 
firstException);
         ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
         ClientUtils.closeQuietly(client, "consumer network client", 
firstException);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index a2ba480..9d57f83 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
@@ -332,6 +333,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public void close() {
+        close(KafkaConsumer.DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close(long timeout, TimeUnit unit) {
         ensureNotClosed();
         this.closed = true;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 1af324e..0bd93cb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -95,7 +95,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         STABLE,      // the client has joined and is sending heartbeats
     }
 
-    private final int rebalanceTimeoutMs;
+    protected final int rebalanceTimeoutMs;
     private final int sessionTimeoutMs;
     private final GroupCoordinatorMetrics sensors;
     private final Heartbeat heartbeat;
@@ -188,14 +188,22 @@ public abstract class AbstractCoordinator implements 
Closeable {
      * Block until the coordinator for this group is known and is ready to 
receive requests.
      */
     public synchronized void ensureCoordinatorReady() {
+        // Using zero as current time since timeout is effectively infinite
+        ensureCoordinatorReady(0, Long.MAX_VALUE);
+    }
+
+    protected synchronized long ensureCoordinatorReady(long now, long 
timeoutMs) {
+        long remainingMs = timeoutMs;
+        long startTimeMs = now;
         while (coordinatorUnknown()) {
             RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future);
+            client.poll(future, remainingMs);
 
             if (future.failed()) {
-                if (future.isRetriable())
-                    client.awaitMetadataUpdate();
-                else
+                if (future.isRetriable()) {
+                    remainingMs = timeoutMs - (time.milliseconds() - 
startTimeMs);
+                    client.awaitMetadataUpdate(remainingMs);
+                } else
                     throw future.exception();
             } else if (coordinator != null && 
client.connectionFailed(coordinator)) {
                 // we found the coordinator, but the connection has failed, so 
mark
@@ -203,7 +211,11 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 coordinatorDead();
                 time.sleep(retryBackoffMs);
             }
+            remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
+            if (remainingMs <= 0)
+                break;
         }
+        return remainingMs;
     }
 
     protected synchronized RequestFuture<Void> lookupCoordinator() {
@@ -623,9 +635,33 @@ public abstract class AbstractCoordinator implements 
Closeable {
      */
     @Override
     public synchronized void close() {
+        close(0);
+    }
+
+    protected synchronized void close(long timeoutMs) {
         if (heartbeatThread != null)
             heartbeatThread.close();
         maybeLeaveGroup();
+
+        // At this point, there may be pending commits (async commits or sync 
commits that were
+        // interrupted using wakeup) and the leave group request which have 
been queued, but not
+        // yet sent to the broker. Wait up to close timeout for these pending 
requests to be processed.
+        // If coordinator is not known, requests are aborted.
+        long now = time.milliseconds();
+        long endTimeMs = now + timeoutMs;
+        Node coordinator = coordinator();
+        while (coordinator != null && client.pendingRequestCount(coordinator) 
> 0) {
+            if (Thread.currentThread().isInterrupted())
+                throw new InterruptException("Consumer close was interrupted");
+            long remainingTimeMs = endTimeMs - now;
+            client.poll(remainingTimeMs > 0 ? remainingTimeMs : 0);
+            now = time.milliseconds();
+            if (client.pendingRequestCount(coordinator) > 0 && now >= 
endTimeMs) {
+                log.warn("Close timed out with {} pending requests to 
coordinator, terminating client connections for group {}.",
+                        client.pendingRequestCount(coordinator), groupId);
+                break;
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9006d70..c963cca 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
@@ -54,7 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.kafka.common.errors.InterruptException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * This class manages the coordination process with the consumer coordinator.
@@ -63,8 +64,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerCoordinator.class);
 
-    private static final long CLOSE_TIMEOUT_MS = 5000;
-
     private final List<PartitionAssignor> assignors;
     private final Metadata metadata;
     private final ConsumerCoordinatorMetrics sensors;
@@ -74,6 +73,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     private final int autoCommitIntervalMs;
     private final ConsumerInterceptors<?, ?> interceptors;
     private final boolean excludeInternalTopics;
+    private final AtomicInteger pendingAsyncCommits;
 
     // this collection must be thread-safe because it is modified from the 
response handler
     // of offset commit requests, which may be invoked from the heartbeat 
thread
@@ -125,6 +125,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         this.sensors = new ConsumerCoordinatorMetrics(metrics, 
metricGrpPrefix);
         this.interceptors = interceptors;
         this.excludeInternalTopics = excludeInternalTopics;
+        this.pendingAsyncCommits = new AtomicInteger();
 
         if (autoCommitEnabled)
             this.nextAutoCommitDeadline = time.milliseconds() + 
autoCommitIntervalMs;
@@ -327,7 +328,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     @Override
     protected void onJoinPrepare(int generation, String memberId) {
         // commit offsets prior to rebalance if auto-commit enabled
-        maybeAutoCommitOffsetsSync();
+        maybeAutoCommitOffsetsSync(rebalanceTimeoutMs);
 
         // execute the user's callback before rebalance
         ConsumerRebalanceListener listener = subscriptions.listener();
@@ -401,26 +402,21 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         }
     }
 
-    @Override
-    public void close() {
+    public void close(long timeoutMs) {
         // we do not need to re-enable wakeups since we are closing already
         client.disableWakeups();
+
+        long now = time.milliseconds();
+        long endTimeMs = now + timeoutMs;
         try {
-            maybeAutoCommitOffsetsSync();
-
-            Node coordinator;
-            long endTimeMs = time.milliseconds() + CLOSE_TIMEOUT_MS;
-            while ((coordinator = coordinator()) != null && 
client.pendingRequestCount(coordinator) > 0) {
-                long remainingTimeMs = endTimeMs - time.milliseconds();
-                if (remainingTimeMs > 0)
-                    client.poll(remainingTimeMs);
-                else {
-                    log.warn("Close timed out with {} pending requests to 
coordinator, terminating client connections for group {}.", 
client.pendingRequestCount(coordinator), groupId);
-                    break;
-                }
+            maybeAutoCommitOffsetsSync(timeoutMs);
+            now = time.milliseconds();
+            if (pendingAsyncCommits.get() > 0) {
+                ensureCoordinatorReady(now, endTimeMs - now);
+                now = time.milliseconds();
             }
         } finally {
-            super.close();
+            super.close(endTimeMs - now);
         }
     }
 
@@ -447,14 +443,17 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             // coordinator lookup request. This is fine because the listeners 
will be invoked in
             // the same order that they were added. Note also that 
AbstractCoordinator prevents
             // multiple concurrent coordinator lookup requests.
+            pendingAsyncCommits.incrementAndGet();
             lookupCoordinator().addListener(new RequestFutureListener<Void>() {
                 @Override
                 public void onSuccess(Void value) {
+                    pendingAsyncCommits.decrementAndGet();
                     doCommitOffsetsAsync(offsets, callback);
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
+                    pendingAsyncCommits.decrementAndGet();
                     completedOffsetCommits.add(new 
OffsetCommitCompletion(callback, offsets, new 
RetriableCommitFailedException(e)));
                 }
             });
@@ -499,17 +498,20 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
      *             or to any of the specified partitions
      * @throws CommitFailedException if an unrecoverable error occurs before 
the commit can be completed
      */
-    public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> 
offsets) {
+    public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> 
offsets, long timeoutMs) {
         invokeCompletedOffsetCommitCallbacks();
 
         if (offsets.isEmpty())
             return;
 
-        while (true) {
-            ensureCoordinatorReady();
+        long now = time.milliseconds();
+        long startMs = now;
+        long remainingMs = timeoutMs;
+        do {
+            remainingMs = ensureCoordinatorReady(now, remainingMs);
 
             RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
-            client.poll(future);
+            client.poll(future, remainingMs);
 
             if (future.succeeded()) {
                 if (interceptors != null)
@@ -521,7 +523,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 throw future.exception();
 
             time.sleep(retryBackoffMs);
-        }
+
+            now = time.milliseconds();
+            remainingMs = timeoutMs - (now - startMs);
+        } while (remainingMs > 0);
     }
 
     private void maybeAutoCommitOffsetsAsync(long now) {
@@ -555,10 +560,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         });
     }
 
-    private void maybeAutoCommitOffsetsSync() {
+    private void maybeAutoCommitOffsetsSync(long timeoutMs) {
         if (autoCommitEnabled) {
             try {
-                commitOffsetsSync(subscriptions.allConsumed());
+                commitOffsetsSync(subscriptions.allConsumed(), timeoutMs);
             } catch (WakeupException | InterruptException e) {
                 // rethrow wakeups since they are triggered by the user
                 throw e;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 88f55d9..28febc1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.Selectable;
@@ -42,12 +43,14 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FetchResponse.PartitionData;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
@@ -81,16 +84,17 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.kafka.common.errors.InterruptException;
 import org.junit.Rule;
 import org.junit.rules.ExpectedException;
 
@@ -1124,15 +1128,40 @@ public class KafkaConsumerTest {
 
     @Test
     public void testGracefulClose() throws Exception {
-        consumerCloseTest(true);
+        Map<TopicPartition, Short> response = new HashMap<>();
+        response.put(tp0, Errors.NONE.code());
+        OffsetCommitResponse commitResponse = offsetCommitResponse(response);
+        LeaveGroupResponse leaveGroupResponse = new 
LeaveGroupResponse(Errors.NONE.code());
+        consumerCloseTest(5000, Arrays.asList(commitResponse, 
leaveGroupResponse), 0, false);
     }
 
     @Test
     public void testCloseTimeout() throws Exception {
-        consumerCloseTest(false);
+        consumerCloseTest(5000, Collections.<AbstractResponse>emptyList(), 
5000, false);
+    }
+
+    @Test
+    public void testLeaveGroupTimeout() throws Exception {
+        Map<TopicPartition, Short> response = new HashMap<>();
+        response.put(tp0, Errors.NONE.code());
+        OffsetCommitResponse commitResponse = offsetCommitResponse(response);
+        consumerCloseTest(5000, Arrays.asList(commitResponse), 5000, false);
+    }
+
+    @Test
+    public void testCloseNoWait() throws Exception {
+        consumerCloseTest(0, Collections.<AbstractResponse>emptyList(), 0, 
false);
     }
 
-    private void consumerCloseTest(boolean graceful) throws Exception {
+    @Test
+    public void testCloseInterrupt() throws Exception {
+        consumerCloseTest(Long.MAX_VALUE, 
Collections.<AbstractResponse>emptyList(), 0, true);
+    }
+
+    private void consumerCloseTest(final long closeTimeoutMs,
+            List<? extends AbstractResponse> responses,
+            long waitMs,
+            boolean interrupt) throws Exception {
         int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
         int heartbeatIntervalMs = 5000;
@@ -1163,32 +1192,61 @@ public class KafkaConsumerTest {
         // Kafka consumer is single-threaded, but the implementation allows 
calls on a
         // different thread as long as the calls are not executed 
concurrently. So this is safe.
         ExecutorService executor = Executors.newSingleThreadExecutor();
+        final AtomicReference<Exception> closeException = new 
AtomicReference<Exception>();
         try {
             Future<?> future = executor.submit(new Runnable() {
                 @Override
                 public void run() {
                     consumer.commitAsync();
-                    consumer.close();
+                    try {
+                        consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS);
+                    } catch (Exception e) {
+                        closeException.set(e);
+                    }
                 }
             });
 
             // Close task should not complete until commit succeeds or close 
times out
+            // if close timeout is not zero,
             try {
                 future.get(100, TimeUnit.MILLISECONDS);
-                fail("Close completed without waiting for commit response");
+                if (closeTimeoutMs != 0)
+                    fail("Close completed without waiting for commit or leave 
response");
             } catch (TimeoutException e) {
                 // Expected exception
             }
 
             // In graceful mode, commit response results in close() completing 
immediately without a timeout
             // In non-graceful mode, close() times out without an exception 
even though commit response is pending
-            if (graceful) {
-                Map<TopicPartition, Short> response = new HashMap<>();
-                response.put(tp0, Errors.NONE.code());
-                client.respondFrom(offsetCommitResponse(response), 
coordinator);
+            for (int i = 0; i < responses.size(); i++) {
+                client.respondFrom(responses.get(i), coordinator);
+                if (i != responses.size() - 1) {
+                    try {
+                        future.get(100, TimeUnit.MILLISECONDS);
+                        fail("Close completed without waiting for response");
+                    } catch (TimeoutException e) {
+                        // Expected exception
+                    }
+                }
+            }
+
+            if (waitMs > 0)
+                time.sleep(waitMs);
+            if (interrupt)
+                assertTrue("Close terminated prematurely", 
future.cancel(true));
+
+            // Make sure that close task completes and another task can be run 
on the single threaded executor
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                }
+            }).get(500, TimeUnit.MILLISECONDS);
+
+            if (!interrupt) {
+                future.get(500, TimeUnit.MILLISECONDS); // Should succeed 
without TimeoutException or ExecutionException
+                assertNull("Unexpected exception during close", 
closeException.get());
             } else
-                time.sleep(5000);
-            future.get(500, TimeUnit.MILLISECONDS); // Should succeed without 
TimeoutException or ExecutionException
+                assertTrue("Expected exception not thrown " + closeException, 
closeException.get() instanceof InterruptException);
         } finally {
             executor.shutdownNow();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index e7ba401..0637ea4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -66,6 +66,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
@@ -76,6 +81,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ConsumerCoordinatorTest {
 
@@ -482,7 +488,7 @@ public class ConsumerCoordinatorTest {
                         leaveRequest.groupId().equals(groupId);
             }
         }, new LeaveGroupResponse(Errors.NONE.code()));
-        coordinator.close();
+        coordinator.close(0);
         assertTrue(received.get());
     }
 
@@ -1028,7 +1034,7 @@ public class ConsumerCoordinatorTest {
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NOT_COORDINATOR_FOR_GROUP.code())));
         client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test
@@ -1040,7 +1046,7 @@ public class ConsumerCoordinatorTest {
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test
@@ -1052,7 +1058,7 @@ public class ConsumerCoordinatorTest {
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())), true);
         client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test(expected = KafkaException.class)
@@ -1061,7 +1067,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = OffsetMetadataTooLarge.class)
@@ -1071,7 +1077,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.OFFSET_METADATA_TOO_LARGE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = CommitFailedException.class)
@@ -1081,7 +1087,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.ILLEGAL_GENERATION.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = CommitFailedException.class)
@@ -1091,7 +1097,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.UNKNOWN_MEMBER_ID.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = CommitFailedException.class)
@@ -1101,7 +1107,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.REBALANCE_IN_PROGRESS.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = KafkaException.class)
@@ -1111,13 +1117,13 @@ public class ConsumerCoordinatorTest {
 
         // sync commit with invalid partitions should throw if we have no 
callback
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.UNKNOWN.code())), false);
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testCommitSyncNegativeOffset() {
         client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(-1L)));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(-1L)), Long.MAX_VALUE);
     }
 
     @Test
@@ -1220,6 +1226,181 @@ public class ConsumerCoordinatorTest {
         }
     }
 
+    @Test
+    public void testCloseDynamicAssignment() throws Exception {
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
true);
+        gracefulCloseTest(coordinator, true);
+    }
+
+    @Test
+    public void testCloseManualAssignment() throws Exception {
+        ConsumerCoordinator coordinator = 
prepareCoordinatorForCloseTest(false, true);
+        gracefulCloseTest(coordinator, false);
+    }
+
+    @Test
+    public void testCloseCoordinatorNotKnownManualAssignment() throws 
Exception {
+        ConsumerCoordinator coordinator = 
prepareCoordinatorForCloseTest(false, true);
+        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
+        time.sleep(autoCommitIntervalMs);
+        closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
+    }
+
+    @Test
+    public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
false);
+        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
+        closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
+    }
+
+    @Test
+    public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
true);
+        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
+        time.sleep(autoCommitIntervalMs);
+        closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
+    }
+
+    @Test
+    public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
false);
+        makeCoordinatorUnknown(coordinator, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
+        closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
+    }
+
+    @Test
+    public void testCloseTimeoutCoordinatorUnavailableForCommit() throws 
Exception {
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
true);
+        makeCoordinatorUnknown(coordinator, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
+        time.sleep(autoCommitIntervalMs);
+        closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
+    }
+
+    @Test
+    public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws 
Exception {
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
true);
+        makeCoordinatorUnknown(coordinator, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
+        time.sleep(autoCommitIntervalMs);
+        closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
+    }
+
+    @Test
+    public void testCloseNoResponseForCommit() throws Exception {
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
true);
+        time.sleep(autoCommitIntervalMs);
+        closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
+    }
+
+    @Test
+    public void testCloseNoResponseForLeaveGroup() throws Exception {
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
false);
+        closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
+    }
+
+    @Test
+    public void testCloseNoWait() throws Exception {
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
true);
+        time.sleep(autoCommitIntervalMs);
+        closeVerifyTimeout(coordinator, 0, 60000, 0, 0);
+    }
+
+    @Test
+    public void testHeartbeatThreadClose() throws Exception {
+        groupId = "testCloseTimeoutWithHeartbeatThread";
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, 
true);
+        coordinator.ensureActiveGroup();
+        time.sleep(heartbeatIntervalMs + 100);
+        Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
+        closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
+        Thread[] threads = new Thread[Thread.activeCount()];
+        int threadCount = Thread.enumerate(threads);
+        for (int i = 0; i < threadCount; i++)
+            assertFalse("Heartbeat thread active after close", 
threads[i].getName().contains(groupId));
+    }
+
+    private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean 
useGroupManagement, boolean autoCommit) {
+        final String consumerId = "consumer";
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit);
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
+        coordinator.ensureCoordinatorReady();
+        if (useGroupManagement) {
+            subscriptions.subscribe(singleton(topicName), rebalanceListener);
+            client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE.code()));
+            client.prepareResponse(syncGroupResponse(singletonList(tp), 
Errors.NONE.code()));
+            coordinator.joinGroupIfNeeded();
+        } else
+            subscriptions.assignFromUser(singleton(tp));
+
+        subscriptions.seek(tp, 100);
+        coordinator.poll(time.milliseconds());
+
+        return coordinator;
+    }
+
+    private void makeCoordinatorUnknown(ConsumerCoordinator coordinator, 
Errors errorCode) {
+        time.sleep(sessionTimeoutMs);
+        coordinator.sendHeartbeatRequest();
+        client.prepareResponse(heartbeatResponse(errorCode.code()));
+        time.sleep(sessionTimeoutMs);
+        consumerClient.poll(0);
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+    private void closeVerifyTimeout(final ConsumerCoordinator coordinator,
+            final long closeTimeoutMs, final long requestTimeoutMs,
+            long expectedMinTimeMs, long expectedMaxTimeMs) throws Exception {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            Future<?> future = executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    coordinator.close(Math.min(closeTimeoutMs, 
requestTimeoutMs));
+                }
+            });
+            Thread.sleep(100);
+            if (expectedMinTimeMs > 0) {
+                time.sleep(expectedMinTimeMs - 1);
+                try {
+                    future.get(500, TimeUnit.MILLISECONDS);
+                    fail("Close completed ungracefully without waiting for 
timeout");
+                } catch (TimeoutException e) {
+                    // Expected timeout
+                }
+            }
+            if (expectedMaxTimeMs >= 0)
+                time.sleep(expectedMaxTimeMs - expectedMinTimeMs + 2);
+            future.get(2000, TimeUnit.MILLISECONDS);
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean 
dynamicAssignment) throws Exception {
+        final AtomicBoolean commitRequested = new AtomicBoolean();
+        final AtomicBoolean leaveGroupRequested = new AtomicBoolean();
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                commitRequested.set(true);
+                OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
+                return commitRequest.groupId().equals(groupId);
+            }
+        }, new OffsetCommitResponse(new HashMap<TopicPartition, Short>()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                leaveGroupRequested.set(true);
+                LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
+                return leaveRequest.groupId().equals(groupId);
+            }
+        }, new LeaveGroupResponse(Errors.NONE.code()));
+
+        closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
+        assertTrue("Commit not requested", commitRequested.get());
+        if (dynamicAssignment)
+            assertTrue("Leave group not requested", leaveGroupRequested.get());
+    }
+
     private ConsumerCoordinator buildCoordinator(Metrics metrics,
                                                  List<PartitionAssignor> 
assignors,
                                                  boolean excludeInternalTopics,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index ab29c88..e43de5d 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -63,7 +63,7 @@ class AdminClient(val time: Time,
     throw new RuntimeException(s"Request $api failed on brokers 
$bootstrapBrokers")
   }
 
-  private def findCoordinator(groupId: String): Node = {
+  def findCoordinator(groupId: String): Node = {
     val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
     val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, 
requestBuilder).asInstanceOf[GroupCoordinatorResponse]
     Errors.forCode(response.errorCode()).maybeThrow()

http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 10d49f5..f98716a 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -13,20 +13,21 @@
 
 package kafka.api
 
-import java.util.Collections
+import java.util.{Collection, Collections}
+import java.util.concurrent.{Callable, Executors, ExecutorService, Future, 
Semaphore, TimeUnit}
 
+import kafka.admin.AdminClient
 import kafka.server.KafkaConfig
-import kafka.utils.{Logging, ShutdownableThread, TestUtils}
+import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
-import org.junit.{Before, Test}
+import org.junit.{Before, After, Test}
 
 import scala.collection.JavaConverters._
 
 
-
 /**
  * Integration tests for the new consumer that cover basic usage as well as 
server failures
  */
@@ -40,6 +41,10 @@ class ConsumerBounceTest extends IntegrationTestHarness with 
Logging {
   val part = 0
   val tp = new TopicPartition(topic, part)
 
+  // Time to process commit and leave group requests in tests when brokers are 
available
+  val gracefulCloseTimeMs = 1000
+  val executor = Executors.newFixedThreadPool(2)
+
   // configure the servers and clients
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, 
"false") // speed up shutdown
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, 
"3") // don't want to lose offset
@@ -65,6 +70,15 @@ class ConsumerBounceTest extends IntegrationTestHarness with 
Logging {
     TestUtils.createTopic(this.zkUtils, topic, 1, serverCount, this.servers)
   }
 
+  @After
+  override def tearDown() {
+    try {
+      executor.shutdownNow()
+    } finally {
+      super.tearDown()
+    }
+  }
+
   @Test
   def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
 
@@ -147,6 +161,202 @@ class ConsumerBounceTest extends IntegrationTestHarness 
with Logging {
     }
   }
 
+
+  @Test
+  def testClose() {
+    val numRecords = 10
+    sendRecords(numRecords)
+
+    checkCloseGoodPath(numRecords, "group1")
+    checkCloseWithCoordinatorFailure(numRecords, "group2", "group3")
+    checkCloseWithClusterFailure(numRecords, "group4", "group5")
+  }
+
+  /**
+   * Consumer is closed while cluster is healthy. Consumer should complete 
pending offset commits
+   * and leave group. New consumer instance should be able join group and 
start consuming from
+   * last committed offset.
+   */
+  private def checkCloseGoodPath(numRecords: Int, groupId: String) {
+    val consumer = createConsumerAndReceive(groupId, false, numRecords)
+    val future = submitCloseAndValidate(consumer, Long.MaxValue, None, 
Some(gracefulCloseTimeMs))
+    future.get
+    checkClosedState(groupId, numRecords)
+  }
+
+  /**
+   * Consumer closed while coordinator is unavailable. Close of consumers 
using group
+   * management should complete after commit attempt even though commits fail 
due to rebalance.
+   * Close of consumers using manual assignment should complete with 
successful commits since a
+   * broker is available.
+   */
+  private def checkCloseWithCoordinatorFailure(numRecords: Int, dynamicGroup: 
String, manualGroup: String) {
+    val consumer1 = createConsumerAndReceive(dynamicGroup, false, numRecords)
+    val consumer2 = createConsumerAndReceive(manualGroup, true, numRecords)
+
+    val adminClient = AdminClient.createSimplePlaintext(this.brokerList)
+    killBroker(adminClient.findCoordinator(dynamicGroup).id)
+    killBroker(adminClient.findCoordinator(manualGroup).id)
+
+    val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, 
Some(gracefulCloseTimeMs))
+    val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, 
Some(gracefulCloseTimeMs))
+    future1.get
+    future2.get
+
+    restartDeadBrokers()
+    checkClosedState(dynamicGroup, 0)
+    checkClosedState(manualGroup, numRecords)
+  }
+
+  /**
+   * Consumer is closed while all brokers are unavailable. Cannot rebalance or 
commit offsets since
+   * there is no coordinator, but close should timeout and return. If close is 
invoked with a very
+   * large timeout, close should timeout after request timeout.
+   */
+  private def checkCloseWithClusterFailure(numRecords: Int, group1: String, 
group2: String) {
+    val consumer1 = createConsumerAndReceive(group1, false, numRecords)
+
+    val requestTimeout = 6000
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"5000")
+    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeout.toString)
+    val consumer2 = createConsumerAndReceive(group2, true, numRecords)
+
+    servers.foreach(server => killBroker(server.config.brokerId))
+    val closeTimeout = 2000
+    val future1 = submitCloseAndValidate(consumer1, closeTimeout, 
Some(closeTimeout), Some(closeTimeout))
+    val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, 
Some(requestTimeout), Some(requestTimeout))
+    future1.get
+    future2.get
+  }
+
+  /**
+   * Consumer is closed during rebalance. Close should leave group and close
+   * immediately if rebalance is in progress. If brokers are not available,
+   * close should terminate immediately without sending leave group.
+   */
+  @Test
+  def testCloseDuringRebalance() {
+    val topic = "closetest"
+    TestUtils.createTopic(this.zkUtils, topic, 10, serverCount, this.servers)
+    
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"60000")
+    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+    checkCloseDuringRebalance("group1", topic, executor, true)
+  }
+
+  private def checkCloseDuringRebalance(groupId: String, topic: String, 
executor: ExecutorService, brokersAvailableDuringClose: Boolean) {
+
+    def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], 
revokeSemaphore: Option[Semaphore] = None): Future[Any] = {
+      executor.submit(CoreUtils.runnable {
+          consumer.subscribe(Collections.singletonList(topic), new 
ConsumerRebalanceListener {
+            def onPartitionsAssigned(partitions: Collection[TopicPartition]) {
+            }
+            def onPartitionsRevoked(partitions: Collection[TopicPartition]) {
+              revokeSemaphore.foreach(s => s.release())
+            }
+          })
+          consumer.poll(0)
+        }, 0)
+    }
+
+    def waitForRebalance(timeoutMs: Long, future: Future[Any], otherConsumers: 
KafkaConsumer[Array[Byte], Array[Byte]]*) {
+      val startMs = System.currentTimeMillis
+      while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone)
+          otherConsumers.foreach(consumer => consumer.poll(100))
+      assertTrue("Rebalance did not complete in time", future.isDone)
+    }
+
+    def createConsumerToRebalance(): Future[Any] = {
+      val consumer = createConsumer(groupId)
+      val rebalanceSemaphore = new Semaphore(0)
+      val future = subscribeAndPoll(consumer, Some(rebalanceSemaphore))
+      // Wait for consumer to poll and trigger rebalance
+      assertTrue("Rebalance not triggered", 
rebalanceSemaphore.tryAcquire(2000, TimeUnit.MILLISECONDS))
+      // Rebalance is blocked by other consumers not polling
+      assertFalse("Rebalance completed too early", future.isDone)
+      future
+    }
+
+    val consumer1 = createConsumer(groupId)
+    waitForRebalance(2000, subscribeAndPoll(consumer1))
+    val consumer2 = createConsumer(groupId)
+    waitForRebalance(2000, subscribeAndPoll(consumer2), consumer1)
+    val rebalanceFuture = createConsumerToRebalance()
+
+    // consumer1 should leave group and close immediately even though 
rebalance is in progress
+    submitCloseAndValidate(consumer1, Long.MaxValue, None, 
Some(gracefulCloseTimeMs))
+
+    // Rebalance should complete without waiting for consumer1 to timeout 
since consumer1 has left the group
+    waitForRebalance(2000, rebalanceFuture, consumer2)
+
+    // Trigger another rebalance and shutdown all brokers
+    createConsumerToRebalance()
+    servers.foreach(server => killBroker(server.config.brokerId))
+
+    // consumer2 should close immediately without LeaveGroup request since 
there are no brokers available
+    submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0))
+  }
+
+  private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], 
Array[Byte]] = {
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+    createNewConsumer
+  }
+
+  private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, 
numRecords: Int) : KafkaConsumer[Array[Byte], Array[Byte]] = {
+    val consumer = createConsumer(groupId)
+    if (manualAssign)
+      consumer.assign(Collections.singleton(tp))
+    else
+      consumer.subscribe(Collections.singleton(topic))
+    receiveRecords(consumer, numRecords)
+    consumer
+  }
+
+  private def receiveRecords(consumer: KafkaConsumer[Array[Byte], 
Array[Byte]], numRecords: Int) {
+    var received = 0
+    while (received < numRecords)
+      received += consumer.poll(1000).count()
+  }
+
+  private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], 
Array[Byte]],
+      closeTimeoutMs: Long, minCloseTimeMs: Option[Long], maxCloseTimeMs: 
Option[Long]): Future[Any] = {
+    executor.submit(CoreUtils.runnable {
+      val closeGraceTimeMs = 2000
+      val startNanos = System.nanoTime
+      info("Closing consumer with timeout " + closeTimeoutMs + " ms.")
+      consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS)
+      val timeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - 
startNanos)
+      maxCloseTimeMs match {
+        case Some(ms) => assertTrue("Close took too long " + timeTakenMs, 
timeTakenMs < ms + closeGraceTimeMs)
+        case None =>
+      }
+      minCloseTimeMs match {
+        case Some(ms) => assertTrue("Close finished too quickly " + 
timeTakenMs, timeTakenMs >= ms)
+        case None =>
+      }
+      info("consumer.close() completed in " + timeTakenMs + " ms.")
+    }, 0)
+  }
+
+  private def checkClosedState(groupId: String, committedRecords: Int) {
+    // Check that close was graceful with offsets committed and leave group 
sent.
+    // New instance of consumer should be assigned partitions immediately and 
should see committed offsets.
+    val assignSemaphore = new Semaphore(0)
+    val consumer = createConsumer(groupId)
+    consumer.subscribe(Collections.singletonList(topic),  new 
ConsumerRebalanceListener {
+      def onPartitionsAssigned(partitions: Collection[TopicPartition]) {
+        assignSemaphore.release()
+      }
+      def onPartitionsRevoked(partitions: Collection[TopicPartition]) {
+      }})
+    consumer.poll(3000)
+    assertTrue("Assigment did not complete on time", 
assignSemaphore.tryAcquire(1, TimeUnit.SECONDS))
+    if (committedRecords > 0)
+      assertEquals(committedRecords, consumer.committed(tp).offset)
+    consumer.close()
+  }
+
   private class BounceBrokerScheduler(val numIters: Int) extends 
ShutdownableThread("daemon-bounce-broker", false)
   {
     var iter: Int = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 13b37e1..4bbdedb 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -101,12 +101,16 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness 
{
    */
   def killRandomBroker(): Int = {
     val index = TestUtils.random.nextInt(servers.length)
+    killBroker(index)
+    index
+  }
+
+  def killBroker(index: Int) {
     if(alive(index)) {
       servers(index).shutdown()
       servers(index).awaitShutdown()
       alive(index) = false
     }
-    index
   }
   
   /**

Reply via email to