Repository: kafka Updated Branches: refs/heads/trunk ecff8544d -> f72203ee9
KAFKA-4426; Add close with timeout for KafkaConsumer (KIP-102) Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Apurva Mehta <apurva.1...@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #2285 from rajinisivaram/KAFKA-4426 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f72203ee Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f72203ee Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f72203ee Branch: refs/heads/trunk Commit: f72203ee9223d3b724ee67bdad9912612dd72f63 Parents: ecff854 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Wed Jan 11 15:56:44 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Jan 11 15:56:56 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/clients/consumer/Consumer.java | 6 + .../kafka/clients/consumer/KafkaConsumer.java | 45 +++- .../kafka/clients/consumer/MockConsumer.java | 6 + .../consumer/internals/AbstractCoordinator.java | 46 +++- .../consumer/internals/ConsumerCoordinator.java | 57 ++--- .../clients/consumer/KafkaConsumerTest.java | 82 ++++++- .../internals/ConsumerCoordinatorTest.java | 203 ++++++++++++++++- .../main/scala/kafka/admin/AdminClient.scala | 2 +- .../kafka/api/ConsumerBounceTest.scala | 218 ++++++++++++++++++- .../integration/KafkaServerTestHarness.scala | 6 +- 10 files changed, 603 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 18cb560..cdcab5d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; /** @@ -171,6 +172,11 @@ public interface Consumer<K, V> extends Closeable { public void close(); /** + * @see KafkaConsumer#close(long, TimeUnit) + */ + public void close(long timeout, TimeUnit unit); + + /** * @see KafkaConsumer#wakeup() */ public void wakeup(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 04fe789..9c1bc89 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -64,7 +65,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import org.apache.kafka.common.errors.InterruptException; /** * A client that consumes records from a Kafka cluster. @@ -518,6 +518,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { ApiKeys.OFFSET_COMMIT, ApiKeys.OFFSET_FETCH, ApiKeys.SYNC_GROUP); + static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000; private final String clientId; private final ConsumerCoordinator coordinator; @@ -719,7 +720,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } catch (Throwable t) { // call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 - close(true); + close(0, true); // now propagate the exception throw new KafkaException("Failed to construct kafka consumer", t); } @@ -1129,7 +1130,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) { acquire(); try { - coordinator.commitOffsetsSync(offsets); + coordinator.commitOffsetsSync(offsets, Long.MAX_VALUE); } finally { release(); } @@ -1496,17 +1497,40 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is enabled, this - * will commit the current offsets. Note that {@link #wakeup()} cannot be use to interrupt close. + * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. + * If auto-commit is enabled, this will commit the current offsets if possible within the default + * timeout. See {@link #close(long, TimeUnit)} for details. Note that {@link #wakeup()} + * cannot be used to interrupt close. * * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted * before or while this function is called */ @Override public void close() { + close(DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + + /** + * Tries to close the consumer cleanly within the specified timeout. This method waits up to + * <code>timeout</code> for the consumer to complete pending commits and leave the group. + * If auto-commit is enabled, this will commit the current offsets if possible within the + * timeout. If rebalance is in progress, auto-commits and pending asynchronous commits may + * be aborted if the coordinator is not known within the timeout. If the consumer is unable + * to complete commit and leave group requests before the timeout expires, the consumer is + * force closed. Note that {@link #wakeup()} cannot be used to interrupt close. + * + * @param timeout The maximum time to wait for consumer to close gracefully. The value should be + * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. + * @param timeUnit The time unit for the <code>timeout</code> + * @throws InterruptException If the thread is interrupted before or while this function is called + * @throws IllegalArgumentException If the <code>timeout</code> is negative. + */ + public void close(long timeout, TimeUnit timeUnit) { + if (timeout < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); acquire(); try { - close(false); + close(timeUnit.toMillis(timeout), false); } finally { release(); } @@ -1532,11 +1556,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { return clusterResourceListeners; } - private void close(boolean swallowException) { + private void close(long timeoutMs, boolean swallowException) { log.trace("Closing the Kafka consumer."); AtomicReference<Throwable> firstException = new AtomicReference<>(); this.closed = true; - ClientUtils.closeQuietly(coordinator, "coordinator", firstException); + try { + coordinator.close(Math.min(timeoutMs, requestTimeoutMs)); + } catch (Throwable t) { + firstException.compareAndSet(null, t); + log.error("Failed to close coordinator", t); + } ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException); ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); ClientUtils.closeQuietly(client, "consumer network client", firstException); http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index a2ba480..9d57f83 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; @@ -332,6 +333,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> { @Override public void close() { + close(KafkaConsumer.DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + + @Override + public void close(long timeout, TimeUnit unit) { ensureNotClosed(); this.closed = true; } http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 1af324e..0bd93cb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -95,7 +95,7 @@ public abstract class AbstractCoordinator implements Closeable { STABLE, // the client has joined and is sending heartbeats } - private final int rebalanceTimeoutMs; + protected final int rebalanceTimeoutMs; private final int sessionTimeoutMs; private final GroupCoordinatorMetrics sensors; private final Heartbeat heartbeat; @@ -188,14 +188,22 @@ public abstract class AbstractCoordinator implements Closeable { * Block until the coordinator for this group is known and is ready to receive requests. */ public synchronized void ensureCoordinatorReady() { + // Using zero as current time since timeout is effectively infinite + ensureCoordinatorReady(0, Long.MAX_VALUE); + } + + protected synchronized long ensureCoordinatorReady(long now, long timeoutMs) { + long remainingMs = timeoutMs; + long startTimeMs = now; while (coordinatorUnknown()) { RequestFuture<Void> future = lookupCoordinator(); - client.poll(future); + client.poll(future, remainingMs); if (future.failed()) { - if (future.isRetriable()) - client.awaitMetadataUpdate(); - else + if (future.isRetriable()) { + remainingMs = timeoutMs - (time.milliseconds() - startTimeMs); + client.awaitMetadataUpdate(remainingMs); + } else throw future.exception(); } else if (coordinator != null && client.connectionFailed(coordinator)) { // we found the coordinator, but the connection has failed, so mark @@ -203,7 +211,11 @@ public abstract class AbstractCoordinator implements Closeable { coordinatorDead(); time.sleep(retryBackoffMs); } + remainingMs = timeoutMs - (time.milliseconds() - startTimeMs); + if (remainingMs <= 0) + break; } + return remainingMs; } protected synchronized RequestFuture<Void> lookupCoordinator() { @@ -623,9 +635,33 @@ public abstract class AbstractCoordinator implements Closeable { */ @Override public synchronized void close() { + close(0); + } + + protected synchronized void close(long timeoutMs) { if (heartbeatThread != null) heartbeatThread.close(); maybeLeaveGroup(); + + // At this point, there may be pending commits (async commits or sync commits that were + // interrupted using wakeup) and the leave group request which have been queued, but not + // yet sent to the broker. Wait up to close timeout for these pending requests to be processed. + // If coordinator is not known, requests are aborted. + long now = time.milliseconds(); + long endTimeMs = now + timeoutMs; + Node coordinator = coordinator(); + while (coordinator != null && client.pendingRequestCount(coordinator) > 0) { + if (Thread.currentThread().isInterrupted()) + throw new InterruptException("Consumer close was interrupted"); + long remainingTimeMs = endTimeMs - now; + client.poll(remainingTimeMs > 0 ? remainingTimeMs : 0); + now = time.milliseconds(); + if (client.pendingRequestCount(coordinator) > 0 && now >= endTimeMs) { + log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", + client.pendingRequestCount(coordinator), groupId); + break; + } + } } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 9006d70..c963cca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; @@ -54,7 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.kafka.common.errors.InterruptException; +import java.util.concurrent.atomic.AtomicInteger; /** * This class manages the coordination process with the consumer coordinator. @@ -63,8 +64,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class); - private static final long CLOSE_TIMEOUT_MS = 5000; - private final List<PartitionAssignor> assignors; private final Metadata metadata; private final ConsumerCoordinatorMetrics sensors; @@ -74,6 +73,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final int autoCommitIntervalMs; private final ConsumerInterceptors<?, ?> interceptors; private final boolean excludeInternalTopics; + private final AtomicInteger pendingAsyncCommits; // this collection must be thread-safe because it is modified from the response handler // of offset commit requests, which may be invoked from the heartbeat thread @@ -125,6 +125,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); this.interceptors = interceptors; this.excludeInternalTopics = excludeInternalTopics; + this.pendingAsyncCommits = new AtomicInteger(); if (autoCommitEnabled) this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs; @@ -327,7 +328,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @Override protected void onJoinPrepare(int generation, String memberId) { // commit offsets prior to rebalance if auto-commit enabled - maybeAutoCommitOffsetsSync(); + maybeAutoCommitOffsetsSync(rebalanceTimeoutMs); // execute the user's callback before rebalance ConsumerRebalanceListener listener = subscriptions.listener(); @@ -401,26 +402,21 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } - @Override - public void close() { + public void close(long timeoutMs) { // we do not need to re-enable wakeups since we are closing already client.disableWakeups(); + + long now = time.milliseconds(); + long endTimeMs = now + timeoutMs; try { - maybeAutoCommitOffsetsSync(); - - Node coordinator; - long endTimeMs = time.milliseconds() + CLOSE_TIMEOUT_MS; - while ((coordinator = coordinator()) != null && client.pendingRequestCount(coordinator) > 0) { - long remainingTimeMs = endTimeMs - time.milliseconds(); - if (remainingTimeMs > 0) - client.poll(remainingTimeMs); - else { - log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", client.pendingRequestCount(coordinator), groupId); - break; - } + maybeAutoCommitOffsetsSync(timeoutMs); + now = time.milliseconds(); + if (pendingAsyncCommits.get() > 0) { + ensureCoordinatorReady(now, endTimeMs - now); + now = time.milliseconds(); } } finally { - super.close(); + super.close(endTimeMs - now); } } @@ -447,14 +443,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // coordinator lookup request. This is fine because the listeners will be invoked in // the same order that they were added. Note also that AbstractCoordinator prevents // multiple concurrent coordinator lookup requests. + pendingAsyncCommits.incrementAndGet(); lookupCoordinator().addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { + pendingAsyncCommits.decrementAndGet(); doCommitOffsetsAsync(offsets, callback); } @Override public void onFailure(RuntimeException e) { + pendingAsyncCommits.decrementAndGet(); completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e))); } }); @@ -499,17 +498,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator { * or to any of the specified partitions * @throws CommitFailedException if an unrecoverable error occurs before the commit can be completed */ - public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) { + public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) { invokeCompletedOffsetCommitCallbacks(); if (offsets.isEmpty()) return; - while (true) { - ensureCoordinatorReady(); + long now = time.milliseconds(); + long startMs = now; + long remainingMs = timeoutMs; + do { + remainingMs = ensureCoordinatorReady(now, remainingMs); RequestFuture<Void> future = sendOffsetCommitRequest(offsets); - client.poll(future); + client.poll(future, remainingMs); if (future.succeeded()) { if (interceptors != null) @@ -521,7 +523,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { throw future.exception(); time.sleep(retryBackoffMs); - } + + now = time.milliseconds(); + remainingMs = timeoutMs - (now - startMs); + } while (remainingMs > 0); } private void maybeAutoCommitOffsetsAsync(long now) { @@ -555,10 +560,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { }); } - private void maybeAutoCommitOffsetsSync() { + private void maybeAutoCommitOffsetsSync(long timeoutMs) { if (autoCommitEnabled) { try { - commitOffsetsSync(subscriptions.allConsumed()); + commitOffsetsSync(subscriptions.allConsumed(), timeoutMs); } catch (WakeupException | InterruptException e) { // rethrow wakeups since they are triggered by the user throw e; http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 88f55d9..28febc1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.Selectable; @@ -42,12 +43,14 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FetchResponse.PartitionData; import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupResponse; +import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; @@ -81,16 +84,17 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.apache.kafka.common.errors.InterruptException; import org.junit.Rule; import org.junit.rules.ExpectedException; @@ -1124,15 +1128,40 @@ public class KafkaConsumerTest { @Test public void testGracefulClose() throws Exception { - consumerCloseTest(true); + Map<TopicPartition, Short> response = new HashMap<>(); + response.put(tp0, Errors.NONE.code()); + OffsetCommitResponse commitResponse = offsetCommitResponse(response); + LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(Errors.NONE.code()); + consumerCloseTest(5000, Arrays.asList(commitResponse, leaveGroupResponse), 0, false); } @Test public void testCloseTimeout() throws Exception { - consumerCloseTest(false); + consumerCloseTest(5000, Collections.<AbstractResponse>emptyList(), 5000, false); + } + + @Test + public void testLeaveGroupTimeout() throws Exception { + Map<TopicPartition, Short> response = new HashMap<>(); + response.put(tp0, Errors.NONE.code()); + OffsetCommitResponse commitResponse = offsetCommitResponse(response); + consumerCloseTest(5000, Arrays.asList(commitResponse), 5000, false); + } + + @Test + public void testCloseNoWait() throws Exception { + consumerCloseTest(0, Collections.<AbstractResponse>emptyList(), 0, false); } - private void consumerCloseTest(boolean graceful) throws Exception { + @Test + public void testCloseInterrupt() throws Exception { + consumerCloseTest(Long.MAX_VALUE, Collections.<AbstractResponse>emptyList(), 0, true); + } + + private void consumerCloseTest(final long closeTimeoutMs, + List<? extends AbstractResponse> responses, + long waitMs, + boolean interrupt) throws Exception { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 5000; @@ -1163,32 +1192,61 @@ public class KafkaConsumerTest { // Kafka consumer is single-threaded, but the implementation allows calls on a // different thread as long as the calls are not executed concurrently. So this is safe. ExecutorService executor = Executors.newSingleThreadExecutor(); + final AtomicReference<Exception> closeException = new AtomicReference<Exception>(); try { Future<?> future = executor.submit(new Runnable() { @Override public void run() { consumer.commitAsync(); - consumer.close(); + try { + consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + closeException.set(e); + } } }); // Close task should not complete until commit succeeds or close times out + // if close timeout is not zero, try { future.get(100, TimeUnit.MILLISECONDS); - fail("Close completed without waiting for commit response"); + if (closeTimeoutMs != 0) + fail("Close completed without waiting for commit or leave response"); } catch (TimeoutException e) { // Expected exception } // In graceful mode, commit response results in close() completing immediately without a timeout // In non-graceful mode, close() times out without an exception even though commit response is pending - if (graceful) { - Map<TopicPartition, Short> response = new HashMap<>(); - response.put(tp0, Errors.NONE.code()); - client.respondFrom(offsetCommitResponse(response), coordinator); + for (int i = 0; i < responses.size(); i++) { + client.respondFrom(responses.get(i), coordinator); + if (i != responses.size() - 1) { + try { + future.get(100, TimeUnit.MILLISECONDS); + fail("Close completed without waiting for response"); + } catch (TimeoutException e) { + // Expected exception + } + } + } + + if (waitMs > 0) + time.sleep(waitMs); + if (interrupt) + assertTrue("Close terminated prematurely", future.cancel(true)); + + // Make sure that close task completes and another task can be run on the single threaded executor + executor.submit(new Runnable() { + @Override + public void run() { + } + }).get(500, TimeUnit.MILLISECONDS); + + if (!interrupt) { + future.get(500, TimeUnit.MILLISECONDS); // Should succeed without TimeoutException or ExecutionException + assertNull("Unexpected exception during close", closeException.get()); } else - time.sleep(5000); - future.get(500, TimeUnit.MILLISECONDS); // Should succeed without TimeoutException or ExecutionException + assertTrue("Expected exception not thrown " + closeException, closeException.get() instanceof InterruptException); } finally { executor.shutdownNow(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index e7ba401..0637ea4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -66,6 +66,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; @@ -76,6 +81,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class ConsumerCoordinatorTest { @@ -482,7 +488,7 @@ public class ConsumerCoordinatorTest { leaveRequest.groupId().equals(groupId); } }, new LeaveGroupResponse(Errors.NONE.code())); - coordinator.close(); + coordinator.close(0); assertTrue(received.get()); } @@ -1028,7 +1034,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code()))); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } @Test @@ -1040,7 +1046,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } @Test @@ -1052,7 +1058,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } @Test(expected = KafkaException.class) @@ -1061,7 +1067,7 @@ public class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = OffsetMetadataTooLarge.class) @@ -1071,7 +1077,7 @@ public class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = CommitFailedException.class) @@ -1081,7 +1087,7 @@ public class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = CommitFailedException.class) @@ -1091,7 +1097,7 @@ public class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = CommitFailedException.class) @@ -1101,7 +1107,7 @@ public class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.REBALANCE_IN_PROGRESS.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = KafkaException.class) @@ -1111,13 +1117,13 @@ public class ConsumerCoordinatorTest { // sync commit with invalid partitions should throw if we have no callback client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN.code())), false); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } @Test(expected = IllegalArgumentException.class) public void testCommitSyncNegativeOffset() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(-1L))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(-1L)), Long.MAX_VALUE); } @Test @@ -1220,6 +1226,181 @@ public class ConsumerCoordinatorTest { } } + @Test + public void testCloseDynamicAssignment() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + gracefulCloseTest(coordinator, true); + } + + @Test + public void testCloseManualAssignment() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true); + gracefulCloseTest(coordinator, false); + } + + @Test + public void testCloseCoordinatorNotKnownManualAssignment() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true); + makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP); + time.sleep(autoCommitIntervalMs); + closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); + } + + @Test + public void testCloseCoordinatorNotKnownNoCommits() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false); + makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP); + closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); + } + + @Test + public void testCloseCoordinatorNotKnownWithCommits() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP); + time.sleep(autoCommitIntervalMs); + closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); + } + + @Test + public void testCloseCoordinatorUnavailableNoCommits() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false); + makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE); + closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); + } + + @Test + public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE); + time.sleep(autoCommitIntervalMs); + closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); + } + + @Test + public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE); + time.sleep(autoCommitIntervalMs); + closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); + } + + @Test + public void testCloseNoResponseForCommit() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + time.sleep(autoCommitIntervalMs); + closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); + } + + @Test + public void testCloseNoResponseForLeaveGroup() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false); + closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); + } + + @Test + public void testCloseNoWait() throws Exception { + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + time.sleep(autoCommitIntervalMs); + closeVerifyTimeout(coordinator, 0, 60000, 0, 0); + } + + @Test + public void testHeartbeatThreadClose() throws Exception { + groupId = "testCloseTimeoutWithHeartbeatThread"; + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + coordinator.ensureActiveGroup(); + time.sleep(heartbeatIntervalMs + 100); + Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat + closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); + Thread[] threads = new Thread[Thread.activeCount()]; + int threadCount = Thread.enumerate(threads); + for (int i = 0; i < threadCount; i++) + assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId)); + } + + private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean useGroupManagement, boolean autoCommit) { + final String consumerId = "consumer"; + ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorReady(); + if (useGroupManagement) { + subscriptions.subscribe(singleton(topicName), rebalanceListener); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); + coordinator.joinGroupIfNeeded(); + } else + subscriptions.assignFromUser(singleton(tp)); + + subscriptions.seek(tp, 100); + coordinator.poll(time.milliseconds()); + + return coordinator; + } + + private void makeCoordinatorUnknown(ConsumerCoordinator coordinator, Errors errorCode) { + time.sleep(sessionTimeoutMs); + coordinator.sendHeartbeatRequest(); + client.prepareResponse(heartbeatResponse(errorCode.code())); + time.sleep(sessionTimeoutMs); + consumerClient.poll(0); + assertTrue(coordinator.coordinatorUnknown()); + } + private void closeVerifyTimeout(final ConsumerCoordinator coordinator, + final long closeTimeoutMs, final long requestTimeoutMs, + long expectedMinTimeMs, long expectedMaxTimeMs) throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future<?> future = executor.submit(new Runnable() { + @Override + public void run() { + coordinator.close(Math.min(closeTimeoutMs, requestTimeoutMs)); + } + }); + Thread.sleep(100); + if (expectedMinTimeMs > 0) { + time.sleep(expectedMinTimeMs - 1); + try { + future.get(500, TimeUnit.MILLISECONDS); + fail("Close completed ungracefully without waiting for timeout"); + } catch (TimeoutException e) { + // Expected timeout + } + } + if (expectedMaxTimeMs >= 0) + time.sleep(expectedMaxTimeMs - expectedMinTimeMs + 2); + future.get(2000, TimeUnit.MILLISECONDS); + } finally { + executor.shutdownNow(); + } + } + + private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean dynamicAssignment) throws Exception { + final AtomicBoolean commitRequested = new AtomicBoolean(); + final AtomicBoolean leaveGroupRequested = new AtomicBoolean(); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + commitRequested.set(true); + OffsetCommitRequest commitRequest = (OffsetCommitRequest) body; + return commitRequest.groupId().equals(groupId); + } + }, new OffsetCommitResponse(new HashMap<TopicPartition, Short>())); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + leaveGroupRequested.set(true); + LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body; + return leaveRequest.groupId().equals(groupId); + } + }, new LeaveGroupResponse(Errors.NONE.code())); + + closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); + assertTrue("Commit not requested", commitRequested.get()); + if (dynamicAssignment) + assertTrue("Leave group not requested", leaveGroupRequested.get()); + } + private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors, boolean excludeInternalTopics, http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index ab29c88..e43de5d 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -63,7 +63,7 @@ class AdminClient(val time: Time, throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers") } - private def findCoordinator(groupId: String): Node = { + def findCoordinator(groupId: String): Node = { val requestBuilder = new GroupCoordinatorRequest.Builder(groupId) val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse] Errors.forCode(response.errorCode()).maybeThrow() http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 10d49f5..f98716a 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -13,20 +13,21 @@ package kafka.api -import java.util.Collections +import java.util.{Collection, Collections} +import java.util.concurrent.{Callable, Executors, ExecutorService, Future, Semaphore, TimeUnit} +import kafka.admin.AdminClient import kafka.server.KafkaConfig -import kafka.utils.{Logging, ShutdownableThread, TestUtils} +import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.junit.Assert._ -import org.junit.{Before, Test} +import org.junit.{Before, After, Test} import scala.collection.JavaConverters._ - /** * Integration tests for the new consumer that cover basic usage as well as server failures */ @@ -40,6 +41,10 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { val part = 0 val tp = new TopicPartition(topic, part) + // Time to process commit and leave group requests in tests when brokers are available + val gracefulCloseTimeMs = 1000 + val executor = Executors.newFixedThreadPool(2) + // configure the servers and clients this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset @@ -65,6 +70,15 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { TestUtils.createTopic(this.zkUtils, topic, 1, serverCount, this.servers) } + @After + override def tearDown() { + try { + executor.shutdownNow() + } finally { + super.tearDown() + } + } + @Test def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10) @@ -147,6 +161,202 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { } } + + @Test + def testClose() { + val numRecords = 10 + sendRecords(numRecords) + + checkCloseGoodPath(numRecords, "group1") + checkCloseWithCoordinatorFailure(numRecords, "group2", "group3") + checkCloseWithClusterFailure(numRecords, "group4", "group5") + } + + /** + * Consumer is closed while cluster is healthy. Consumer should complete pending offset commits + * and leave group. New consumer instance should be able join group and start consuming from + * last committed offset. + */ + private def checkCloseGoodPath(numRecords: Int, groupId: String) { + val consumer = createConsumerAndReceive(groupId, false, numRecords) + val future = submitCloseAndValidate(consumer, Long.MaxValue, None, Some(gracefulCloseTimeMs)) + future.get + checkClosedState(groupId, numRecords) + } + + /** + * Consumer closed while coordinator is unavailable. Close of consumers using group + * management should complete after commit attempt even though commits fail due to rebalance. + * Close of consumers using manual assignment should complete with successful commits since a + * broker is available. + */ + private def checkCloseWithCoordinatorFailure(numRecords: Int, dynamicGroup: String, manualGroup: String) { + val consumer1 = createConsumerAndReceive(dynamicGroup, false, numRecords) + val consumer2 = createConsumerAndReceive(manualGroup, true, numRecords) + + val adminClient = AdminClient.createSimplePlaintext(this.brokerList) + killBroker(adminClient.findCoordinator(dynamicGroup).id) + killBroker(adminClient.findCoordinator(manualGroup).id) + + val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs)) + val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs)) + future1.get + future2.get + + restartDeadBrokers() + checkClosedState(dynamicGroup, 0) + checkClosedState(manualGroup, numRecords) + } + + /** + * Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets since + * there is no coordinator, but close should timeout and return. If close is invoked with a very + * large timeout, close should timeout after request timeout. + */ + private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String) { + val consumer1 = createConsumerAndReceive(group1, false, numRecords) + + val requestTimeout = 6000 + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000") + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString) + val consumer2 = createConsumerAndReceive(group2, true, numRecords) + + servers.foreach(server => killBroker(server.config.brokerId)) + val closeTimeout = 2000 + val future1 = submitCloseAndValidate(consumer1, closeTimeout, Some(closeTimeout), Some(closeTimeout)) + val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, Some(requestTimeout), Some(requestTimeout)) + future1.get + future2.get + } + + /** + * Consumer is closed during rebalance. Close should leave group and close + * immediately if rebalance is in progress. If brokers are not available, + * close should terminate immediately without sending leave group. + */ + @Test + def testCloseDuringRebalance() { + val topic = "closetest" + TestUtils.createTopic(this.zkUtils, topic, 10, serverCount, this.servers) + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + checkCloseDuringRebalance("group1", topic, executor, true) + } + + private def checkCloseDuringRebalance(groupId: String, topic: String, executor: ExecutorService, brokersAvailableDuringClose: Boolean) { + + def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], revokeSemaphore: Option[Semaphore] = None): Future[Any] = { + executor.submit(CoreUtils.runnable { + consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener { + def onPartitionsAssigned(partitions: Collection[TopicPartition]) { + } + def onPartitionsRevoked(partitions: Collection[TopicPartition]) { + revokeSemaphore.foreach(s => s.release()) + } + }) + consumer.poll(0) + }, 0) + } + + def waitForRebalance(timeoutMs: Long, future: Future[Any], otherConsumers: KafkaConsumer[Array[Byte], Array[Byte]]*) { + val startMs = System.currentTimeMillis + while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone) + otherConsumers.foreach(consumer => consumer.poll(100)) + assertTrue("Rebalance did not complete in time", future.isDone) + } + + def createConsumerToRebalance(): Future[Any] = { + val consumer = createConsumer(groupId) + val rebalanceSemaphore = new Semaphore(0) + val future = subscribeAndPoll(consumer, Some(rebalanceSemaphore)) + // Wait for consumer to poll and trigger rebalance + assertTrue("Rebalance not triggered", rebalanceSemaphore.tryAcquire(2000, TimeUnit.MILLISECONDS)) + // Rebalance is blocked by other consumers not polling + assertFalse("Rebalance completed too early", future.isDone) + future + } + + val consumer1 = createConsumer(groupId) + waitForRebalance(2000, subscribeAndPoll(consumer1)) + val consumer2 = createConsumer(groupId) + waitForRebalance(2000, subscribeAndPoll(consumer2), consumer1) + val rebalanceFuture = createConsumerToRebalance() + + // consumer1 should leave group and close immediately even though rebalance is in progress + submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs)) + + // Rebalance should complete without waiting for consumer1 to timeout since consumer1 has left the group + waitForRebalance(2000, rebalanceFuture, consumer2) + + // Trigger another rebalance and shutdown all brokers + createConsumerToRebalance() + servers.foreach(server => killBroker(server.config.brokerId)) + + // consumer2 should close immediately without LeaveGroup request since there are no brokers available + submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0)) + } + + private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] = { + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) + createNewConsumer + } + + private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int) : KafkaConsumer[Array[Byte], Array[Byte]] = { + val consumer = createConsumer(groupId) + if (manualAssign) + consumer.assign(Collections.singleton(tp)) + else + consumer.subscribe(Collections.singleton(topic)) + receiveRecords(consumer, numRecords) + consumer + } + + private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) { + var received = 0 + while (received < numRecords) + received += consumer.poll(1000).count() + } + + private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], Array[Byte]], + closeTimeoutMs: Long, minCloseTimeMs: Option[Long], maxCloseTimeMs: Option[Long]): Future[Any] = { + executor.submit(CoreUtils.runnable { + val closeGraceTimeMs = 2000 + val startNanos = System.nanoTime + info("Closing consumer with timeout " + closeTimeoutMs + " ms.") + consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS) + val timeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startNanos) + maxCloseTimeMs match { + case Some(ms) => assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs) + case None => + } + minCloseTimeMs match { + case Some(ms) => assertTrue("Close finished too quickly " + timeTakenMs, timeTakenMs >= ms) + case None => + } + info("consumer.close() completed in " + timeTakenMs + " ms.") + }, 0) + } + + private def checkClosedState(groupId: String, committedRecords: Int) { + // Check that close was graceful with offsets committed and leave group sent. + // New instance of consumer should be assigned partitions immediately and should see committed offsets. + val assignSemaphore = new Semaphore(0) + val consumer = createConsumer(groupId) + consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener { + def onPartitionsAssigned(partitions: Collection[TopicPartition]) { + assignSemaphore.release() + } + def onPartitionsRevoked(partitions: Collection[TopicPartition]) { + }}) + consumer.poll(3000) + assertTrue("Assigment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS)) + if (committedRecords > 0) + assertEquals(committedRecords, consumer.committed(tp).offset) + consumer.close() + } + private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false) { var iter: Int = 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/f72203ee/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 13b37e1..4bbdedb 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -101,12 +101,16 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { */ def killRandomBroker(): Int = { val index = TestUtils.random.nextInt(servers.length) + killBroker(index) + index + } + + def killBroker(index: Int) { if(alive(index)) { servers(index).shutdown() servers(index).awaitShutdown() alive(index) = false } - index } /**