Repository: kafka
Updated Branches:
  refs/heads/trunk efe4f6540 -> 334a30bcf


KAFKA-5730; Consumer should invoke async commit callback before sync commit 
returns

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Randall Hauch <rha...@gmail.com>, Ewen Cheslack-Postava 
<m...@ewencp.org>

Closes #3666 from hachikuji/KAFKA-5730


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/334a30bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/334a30bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/334a30bc

Branch: refs/heads/trunk
Commit: 334a30bcfffc3759aeded08f2089cea4ed6e9937
Parents: efe4f65
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Aug 17 15:04:03 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Aug 17 15:04:03 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/KafkaClient.java   |  2 +-
 .../org/apache/kafka/clients/NetworkClient.java |  8 +++
 .../kafka/clients/consumer/KafkaConsumer.java   | 16 +++++
 .../consumer/internals/AbstractCoordinator.java | 10 +--
 .../consumer/internals/ConsumerCoordinator.java |  5 ++
 .../internals/ConsumerNetworkClient.java        | 15 ++++-
 .../apache/kafka/clients/NetworkClientTest.java | 14 ++--
 .../internals/ConsumerCoordinatorTest.java      | 68 ++++++++++++++++++++
 .../internals/ConsumerNetworkClientTest.java    | 22 +++++++
 9 files changed, 147 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 5bca261..2faebfd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -86,7 +86,7 @@ public interface KafkaClient extends Closeable {
     List<ClientResponse> poll(long timeout, long now);
 
     /**
-     * Diconnects the connection to a particular node, if there is one.
+     * Disconnects the connection to a particular node, if there is one.
      * Any pending ClientRequests for this connection will receive 
disconnections.
      *
      * @param nodeId The id of the node

http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 60b1598..4fe55ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -226,6 +226,11 @@ public class NetworkClient implements KafkaClient {
         return false;
     }
 
+    // Visible for testing
+    boolean canConnect(Node node, long now) {
+        return connectionStates.canConnect(node.idString(), now);
+    }
+
     /**
      * Disconnects the connection to a particular node, if there is one.
      * Any pending ClientRequests for this connection will receive 
disconnections.
@@ -234,6 +239,9 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public void disconnect(String nodeId) {
+        if (connectionStates.isDisconnected(nodeId))
+            return;
+
         selector.close(nodeId);
         List<ApiKeys> requestTypes = new ArrayList<>();
         long now = time.milliseconds();

http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4ddd648..f1351b7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1119,6 +1119,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * <p>
      * This is a synchronous commits and will block until either the commit 
succeeds or an unrecoverable error is
      * encountered (in which case it is thrown to the caller).
+     * <p>
+     * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
+     * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
      *
      * @throws org.apache.kafka.clients.consumer.CommitFailedException if the 
commit failed and cannot be retried.
      *             This can only occur if you are using automatic group 
management with {@link #subscribe(Collection)},
@@ -1152,6 +1155,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * <p>
      * This is a synchronous commits and will block until either the commit 
succeeds or an unrecoverable error is
      * encountered (in which case it is thrown to the caller).
+     * <p>
+     * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
+     * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
      *
      * @param offsets A map of offsets by partition with associated metadata
      * @throws org.apache.kafka.clients.consumer.CommitFailedException if the 
commit failed and cannot be retried.
@@ -1194,6 +1200,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * <p>
      * This is an asynchronous call and will not block. Any errors encountered 
are either passed to the callback
      * (if provided) or discarded.
+     * <p>
+     * Offsets committed through multiple calls to this API are guaranteed to 
be sent in the same order as
+     * the invocations. Corresponding commit callbacks are also invoked in the 
same order. Additionally note that
+     * offsets committed through this API are guaranteed to complete before a 
subsequent call to {@link #commitSync()}
+     * (and variants) returns.
      *
      * @param callback Callback to invoke when the commit completes
      */
@@ -1217,6 +1228,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * <p>
      * This is an asynchronous call and will not block. Any errors encountered 
are either passed to the callback
      * (if provided) or discarded.
+     * <p>
+     * Offsets committed through multiple calls to this API are guaranteed to 
be sent in the same order as
+     * the invocations. Corresponding commit callbacks are also invoked in the 
same order. Additionally note that
+     * offsets committed through this API are guaranteed to complete before a 
subsequent call to {@link #commitSync()}
+     * (and variants) returns.
      *
      * @param offsets A map of offsets by partition with associate metadata. 
This map will be copied internally, so it
      *                is safe to mutate the map after returning.

http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 0f594df..74ef20a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -59,7 +59,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.kafka.common.errors.InterruptException;
 
 /**
  * AbstractCoordinator implements group management for a single group member 
by interacting with
@@ -247,8 +246,6 @@ public abstract class AbstractCoordinator implements 
Closeable {
             // find a node to ask about the coordinator
             Node node = this.client.leastLoadedNode();
             if (node == null) {
-                // TODO: If there are no brokers left, perhaps we should use 
the bootstrap set
-                // from configuration?
                 log.debug("No broker available to send GroupCoordinator 
request for group {}", groupId);
                 return RequestFuture.noBrokersAvailable();
             } else
@@ -651,7 +648,10 @@ public abstract class AbstractCoordinator implements 
Closeable {
     protected synchronized void coordinatorDead() {
         if (this.coordinator != null) {
             log.info("Marking the coordinator {} dead for group {}", 
this.coordinator, groupId);
-            client.failUnsentRequests(this.coordinator, 
CoordinatorNotAvailableException.INSTANCE);
+
+            // Disconnect from the coordinator to ensure that there are no 
in-flight requests remaining.
+            // Pending callbacks will be invoked with a DisconnectException.
+            client.disconnect(this.coordinator);
             this.coordinator = null;
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index d39b369..5ba9ccb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -604,6 +604,11 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
             client.poll(future, remainingMs);
 
+            // We may have had in-flight offset commits when the synchronous 
commit began. If so, ensure that
+            // the corresponding callbacks are invoked prior to returning in 
order to preserve the order that
+            // the offset commits were applied.
+            invokeCompletedOffsetCommitCallbacks();
+
             if (future.succeeded()) {
                 if (interceptors != null)
                     interceptors.onCommit(offsets);

http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 84e9a81..bb7f9f2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -263,8 +263,7 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     /**
-     * Poll for network IO and return immediately. This will not trigger 
wakeups,
-     * nor will it execute any delayed tasks.
+     * Poll for network IO and return immediately. This will not trigger 
wakeups.
      */
     public void pollNoWakeup() {
         poll(0, time.milliseconds(), null, true);
@@ -374,6 +373,16 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
+    public void disconnect(Node node) {
+        synchronized (this) {
+            failUnsentRequests(node, DisconnectException.INSTANCE);
+            client.disconnect(node.idString());
+        }
+
+        // We need to poll to ensure callbacks from in-flight requests on the 
disconnected socket are fired
+        pollNoWakeup();
+    }
+
     private void failExpiredRequests(long now) {
         // clear all expired unsent requests and fail their corresponding 
futures
         Collection<ClientRequest> expiredRequests = 
unsent.removeExpiredRequests(now, unsentExpiryMs);
@@ -383,7 +392,7 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
-    public void failUnsentRequests(Node node, RuntimeException e) {
+    private void failUnsentRequests(Node node, RuntimeException e) {
         // clear unsent requests to node and fail their corresponding futures
         synchronized (this) {
             Collection<ClientRequest> unsentRequests = unsent.remove(node);

http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 77960e1..069fb7a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -22,14 +22,13 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
@@ -270,14 +269,21 @@ public class NetworkClientTest {
     public void testCallDisconnect() throws Exception {
         awaitReady(client, node);
         assertTrue("Expected NetworkClient to be ready to send to node " + 
node.idString(),
-            client.isReady(node, Time.SYSTEM.milliseconds()));
+            client.isReady(node, time.milliseconds()));
         assertFalse("Did not expect connection to node " + node.idString() + " 
to be failed",
             client.connectionFailed(node));
         client.disconnect(node.idString());
         assertFalse("Expected node " + node.idString() + " to be 
disconnected.",
-            client.isReady(node, Time.SYSTEM.milliseconds()));
+            client.isReady(node, time.milliseconds()));
         assertTrue("Expected connection to node " + node.idString() + " to be 
failed after disconnect",
             client.connectionFailed(node));
+        assertFalse(client.canConnect(node, time.milliseconds()));
+
+        // ensure disconnect does not reset blackout period if already 
disconnected
+        time.sleep(reconnectBackoffMsTest);
+        assertTrue(client.canConnect(node, time.milliseconds()));
+        client.disconnect(node.idString());
+        assertTrue(client.canConnect(node, time.milliseconds()));
     }
 
     private static class TestCallbackHandler implements 
RequestCompletionHandler {

http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3c1b411..18e18ed 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -58,6 +58,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -951,6 +952,37 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testCoordinatorDisconnectAfterNotCoordinatorError() {
+        
testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.NOT_COORDINATOR);
+    }
+
+    @Test
+    public void testCoordinatorDisconnectAfterCoordinatorNotAvailableError() {
+        
testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.COORDINATOR_NOT_AVAILABLE);
+    }
+
+    private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors 
error) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady();
+
+        // Send two async commits and fail the first one with an error.
+        // This should cause a coordinator disconnect which will cancel the 
second request.
+
+        MockCommitCallback firstCommitCallback = new MockCommitCallback();
+        MockCommitCallback secondCommitCallback = new MockCommitCallback();
+        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new 
OffsetAndMetadata(100L)), firstCommitCallback);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new 
OffsetAndMetadata(100L)), secondCommitCallback);
+
+        client.respond(offsetCommitResponse(Collections.singletonMap(t1p, 
error)));
+        consumerClient.pollNoWakeup();
+        coordinator.invokeCompletedOffsetCommitCallbacks();
+
+        assertTrue(coordinator.coordinatorUnknown());
+        assertTrue(firstCommitCallback.exception instanceof 
RetriableCommitFailedException);
+        assertTrue(secondCommitCallback.exception instanceof 
RetriableCommitFailedException);
+    }
+
+    @Test
     public void testAutoCommitDynamicAssignment() {
         final String consumerId = "consumer";
 
@@ -1212,6 +1244,42 @@ public class ConsumerCoordinatorTest {
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new 
OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
+    @Test
+    public void testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion() 
throws Exception {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady();
+
+        final List<OffsetAndMetadata> committedOffsets = 
Collections.synchronizedList(new ArrayList<OffsetAndMetadata>());
+        final OffsetAndMetadata firstOffset = new OffsetAndMetadata(0L);
+        final OffsetAndMetadata secondOffset = new OffsetAndMetadata(1L);
+
+        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, 
firstOffset), new OffsetCommitCallback() {
+            @Override
+            public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception exception) {
+                committedOffsets.add(firstOffset);
+            }
+        });
+
+        // Do a synchronous commit in the background so that we can send both 
responses at the same time
+        Thread thread = new Thread() {
+            @Override
+            public void run() {
+                coordinator.commitOffsetsSync(Collections.singletonMap(t1p, 
secondOffset), 10000);
+                committedOffsets.add(secondOffset);
+            }
+        };
+
+        thread.start();
+
+        client.waitForRequests(2, 5000);
+        client.respond(offsetCommitResponse(Collections.singletonMap(t1p, 
Errors.NONE)));
+        client.respond(offsetCommitResponse(Collections.singletonMap(t1p, 
Errors.NONE)));
+
+        thread.join();
+
+        assertEquals(Arrays.asList(firstOffset, secondOffset), 
committedOffsets);
+    }
+
     @Test(expected = KafkaException.class)
     public void testCommitUnknownTopicOrPartition() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));

http://git-wip-us.apache.org/repos/asf/kafka/blob/334a30bc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index b46b657..8e71dd5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.HeartbeatRequest;
@@ -81,6 +82,27 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
+    public void testDisconnectWithUnsentRequests() {
+        RequestFuture<ClientResponse> future = consumerClient.send(node, 
heartbeat());
+        assertTrue(consumerClient.hasPendingRequests(node));
+        assertFalse(client.hasInFlightRequests(node.idString()));
+        consumerClient.disconnect(node);
+        assertTrue(future.failed());
+        assertTrue(future.exception() instanceof DisconnectException);
+    }
+
+    @Test
+    public void testDisconnectWithInFlightRequests() {
+        RequestFuture<ClientResponse> future = consumerClient.send(node, 
heartbeat());
+        consumerClient.pollNoWakeup();
+        assertTrue(consumerClient.hasPendingRequests(node));
+        assertTrue(client.hasInFlightRequests(node.idString()));
+        consumerClient.disconnect(node);
+        assertTrue(future.failed());
+        assertTrue(future.exception() instanceof DisconnectException);
+    }
+
+    @Test
     public void doNotBlockIfPollConditionIsSatisfied() {
         NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000);

Reply via email to