Repository: kafka Updated Branches: refs/heads/trunk efe4f6540 -> 334a30bcf
KAFKA-5730; Consumer should invoke async commit callback before sync commit returns Author: Jason Gustafson <ja...@confluent.io> Reviewers: Randall Hauch <rha...@gmail.com>, Ewen Cheslack-Postava <m...@ewencp.org> Closes #3666 from hachikuji/KAFKA-5730 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/334a30bc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/334a30bc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/334a30bc Branch: refs/heads/trunk Commit: 334a30bcfffc3759aeded08f2089cea4ed6e9937 Parents: efe4f65 Author: Jason Gustafson <ja...@confluent.io> Authored: Thu Aug 17 15:04:03 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Aug 17 15:04:03 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/KafkaClient.java | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 8 +++ .../kafka/clients/consumer/KafkaConsumer.java | 16 +++++ .../consumer/internals/AbstractCoordinator.java | 10 +-- .../consumer/internals/ConsumerCoordinator.java | 5 ++ .../internals/ConsumerNetworkClient.java | 15 ++++- .../apache/kafka/clients/NetworkClientTest.java | 14 ++-- .../internals/ConsumerCoordinatorTest.java | 68 ++++++++++++++++++++ .../internals/ConsumerNetworkClientTest.java | 22 +++++++ 9 files changed, 147 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 5bca261..2faebfd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -86,7 +86,7 @@ public interface KafkaClient extends Closeable { List<ClientResponse> poll(long timeout, long now); /** - * Diconnects the connection to a particular node, if there is one. + * Disconnects the connection to a particular node, if there is one. * Any pending ClientRequests for this connection will receive disconnections. * * @param nodeId The id of the node http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 60b1598..4fe55ae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -226,6 +226,11 @@ public class NetworkClient implements KafkaClient { return false; } + // Visible for testing + boolean canConnect(Node node, long now) { + return connectionStates.canConnect(node.idString(), now); + } + /** * Disconnects the connection to a particular node, if there is one. * Any pending ClientRequests for this connection will receive disconnections. @@ -234,6 +239,9 @@ public class NetworkClient implements KafkaClient { */ @Override public void disconnect(String nodeId) { + if (connectionStates.isDisconnected(nodeId)) + return; + selector.close(nodeId); List<ApiKeys> requestTypes = new ArrayList<>(); long now = time.milliseconds(); http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 4ddd648..f1351b7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1119,6 +1119,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * <p> * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is * encountered (in which case it is thrown to the caller). + * <p> + * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} + * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. * * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried. * This can only occur if you are using automatic group management with {@link #subscribe(Collection)}, @@ -1152,6 +1155,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * <p> * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is * encountered (in which case it is thrown to the caller). + * <p> + * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} + * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. * * @param offsets A map of offsets by partition with associated metadata * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried. @@ -1194,6 +1200,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * <p> * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback * (if provided) or discarded. + * <p> + * Offsets committed through multiple calls to this API are guaranteed to be sent in the same order as + * the invocations. Corresponding commit callbacks are also invoked in the same order. Additionally note that + * offsets committed through this API are guaranteed to complete before a subsequent call to {@link #commitSync()} + * (and variants) returns. * * @param callback Callback to invoke when the commit completes */ @@ -1217,6 +1228,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * <p> * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback * (if provided) or discarded. + * <p> + * Offsets committed through multiple calls to this API are guaranteed to be sent in the same order as + * the invocations. Corresponding commit callbacks are also invoked in the same order. Additionally note that + * offsets committed through this API are guaranteed to complete before a subsequent call to {@link #commitSync()} + * (and variants) returns. * * @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it * is safe to mutate the map after returning. http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 0f594df..74ef20a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -59,7 +59,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.apache.kafka.common.errors.InterruptException; /** * AbstractCoordinator implements group management for a single group member by interacting with @@ -247,8 +246,6 @@ public abstract class AbstractCoordinator implements Closeable { // find a node to ask about the coordinator Node node = this.client.leastLoadedNode(); if (node == null) { - // TODO: If there are no brokers left, perhaps we should use the bootstrap set - // from configuration? log.debug("No broker available to send GroupCoordinator request for group {}", groupId); return RequestFuture.noBrokersAvailable(); } else @@ -651,7 +648,10 @@ public abstract class AbstractCoordinator implements Closeable { protected synchronized void coordinatorDead() { if (this.coordinator != null) { log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId); - client.failUnsentRequests(this.coordinator, CoordinatorNotAvailableException.INSTANCE); + + // Disconnect from the coordinator to ensure that there are no in-flight requests remaining. + // Pending callbacks will be invoked with a DisconnectException. + client.disconnect(this.coordinator); this.coordinator = null; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index d39b369..5ba9ccb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -604,6 +604,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator { RequestFuture<Void> future = sendOffsetCommitRequest(offsets); client.poll(future, remainingMs); + // We may have had in-flight offset commits when the synchronous commit began. If so, ensure that + // the corresponding callbacks are invoked prior to returning in order to preserve the order that + // the offset commits were applied. + invokeCompletedOffsetCommitCallbacks(); + if (future.succeeded()) { if (interceptors != null) interceptors.onCommit(offsets); http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 84e9a81..bb7f9f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -263,8 +263,7 @@ public class ConsumerNetworkClient implements Closeable { } /** - * Poll for network IO and return immediately. This will not trigger wakeups, - * nor will it execute any delayed tasks. + * Poll for network IO and return immediately. This will not trigger wakeups. */ public void pollNoWakeup() { poll(0, time.milliseconds(), null, true); @@ -374,6 +373,16 @@ public class ConsumerNetworkClient implements Closeable { } } + public void disconnect(Node node) { + synchronized (this) { + failUnsentRequests(node, DisconnectException.INSTANCE); + client.disconnect(node.idString()); + } + + // We need to poll to ensure callbacks from in-flight requests on the disconnected socket are fired + pollNoWakeup(); + } + private void failExpiredRequests(long now) { // clear all expired unsent requests and fail their corresponding futures Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs); @@ -383,7 +392,7 @@ public class ConsumerNetworkClient implements Closeable { } } - public void failUnsentRequests(Node node, RuntimeException e) { + private void failUnsentRequests(Node node, RuntimeException e) { // clear unsent requests to node and fail their corresponding futures synchronized (this) { Collection<ClientRequest> unsentRequests = unsent.remove(node); http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 77960e1..069fb7a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -22,14 +22,13 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.DelayedReceive; import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils; @@ -270,14 +269,21 @@ public class NetworkClientTest { public void testCallDisconnect() throws Exception { awaitReady(client, node); assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(), - client.isReady(node, Time.SYSTEM.milliseconds())); + client.isReady(node, time.milliseconds())); assertFalse("Did not expect connection to node " + node.idString() + " to be failed", client.connectionFailed(node)); client.disconnect(node.idString()); assertFalse("Expected node " + node.idString() + " to be disconnected.", - client.isReady(node, Time.SYSTEM.milliseconds())); + client.isReady(node, time.milliseconds())); assertTrue("Expected connection to node " + node.idString() + " to be failed after disconnect", client.connectionFailed(node)); + assertFalse(client.canConnect(node, time.milliseconds())); + + // ensure disconnect does not reset blackout period if already disconnected + time.sleep(reconnectBackoffMsTest); + assertTrue(client.canConnect(node, time.milliseconds())); + client.disconnect(node.idString()); + assertTrue(client.canConnect(node, time.milliseconds())); } private static class TestCallbackHandler implements RequestCompletionHandler { http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 3c1b411..18e18ed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -58,6 +58,7 @@ import org.junit.Before; import org.junit.Test; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -951,6 +952,37 @@ public class ConsumerCoordinatorTest { } @Test + public void testCoordinatorDisconnectAfterNotCoordinatorError() { + testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.NOT_COORDINATOR); + } + + @Test + public void testCoordinatorDisconnectAfterCoordinatorNotAvailableError() { + testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.COORDINATOR_NOT_AVAILABLE); + } + + private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(); + + // Send two async commits and fail the first one with an error. + // This should cause a coordinator disconnect which will cancel the second request. + + MockCommitCallback firstCommitCallback = new MockCommitCallback(); + MockCommitCallback secondCommitCallback = new MockCommitCallback(); + coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback); + coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback); + + client.respond(offsetCommitResponse(Collections.singletonMap(t1p, error))); + consumerClient.pollNoWakeup(); + coordinator.invokeCompletedOffsetCommitCallbacks(); + + assertTrue(coordinator.coordinatorUnknown()); + assertTrue(firstCommitCallback.exception instanceof RetriableCommitFailedException); + assertTrue(secondCommitCallback.exception instanceof RetriableCommitFailedException); + } + + @Test public void testAutoCommitDynamicAssignment() { final String consumerId = "consumer"; @@ -1212,6 +1244,42 @@ public class ConsumerCoordinatorTest { coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } + @Test + public void testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion() throws Exception { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(); + + final List<OffsetAndMetadata> committedOffsets = Collections.synchronizedList(new ArrayList<OffsetAndMetadata>()); + final OffsetAndMetadata firstOffset = new OffsetAndMetadata(0L); + final OffsetAndMetadata secondOffset = new OffsetAndMetadata(1L); + + coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, firstOffset), new OffsetCommitCallback() { + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + committedOffsets.add(firstOffset); + } + }); + + // Do a synchronous commit in the background so that we can send both responses at the same time + Thread thread = new Thread() { + @Override + public void run() { + coordinator.commitOffsetsSync(Collections.singletonMap(t1p, secondOffset), 10000); + committedOffsets.add(secondOffset); + } + }; + + thread.start(); + + client.waitForRequests(2, 5000); + client.respond(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); + client.respond(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); + + thread.join(); + + assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets); + } + @Test(expected = KafkaException.class) public void testCommitUnknownTopicOrPartition() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index b46b657..8e71dd5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.HeartbeatRequest; @@ -81,6 +82,27 @@ public class ConsumerNetworkClientTest { } @Test + public void testDisconnectWithUnsentRequests() { + RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat()); + assertTrue(consumerClient.hasPendingRequests(node)); + assertFalse(client.hasInFlightRequests(node.idString())); + consumerClient.disconnect(node); + assertTrue(future.failed()); + assertTrue(future.exception() instanceof DisconnectException); + } + + @Test + public void testDisconnectWithInFlightRequests() { + RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat()); + consumerClient.pollNoWakeup(); + assertTrue(consumerClient.hasPendingRequests(node)); + assertTrue(client.hasInFlightRequests(node.idString())); + consumerClient.disconnect(node); + assertTrue(future.failed()); + assertTrue(future.exception() instanceof DisconnectException); + } + + @Test public void doNotBlockIfPollConditionIsSatisfied() { NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000);