Repository: kafka
Updated Branches:
  refs/heads/trunk 71f7e7c3a -> 9aaeb33c1


KAFKA-4098: NetworkClient should not intercept user metdata requests on 
disconnect

Author: Jason Gustafson <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes #1798 from hachikuji/KAFKA-4098


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9aaeb33c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9aaeb33c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9aaeb33c

Branch: refs/heads/trunk
Commit: 9aaeb33c16d4a319967560215e2ea72082367f92
Parents: 71f7e7c
Author: Jason Gustafson <[email protected]>
Authored: Mon Aug 29 13:13:08 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Mon Aug 29 13:13:08 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |  2 +-
 .../apache/kafka/clients/NetworkClientTest.java | 38 +++++++++++++++-----
 2 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9aaeb33c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index bd54a29..8ab634d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -559,7 +559,7 @@ public class NetworkClient implements KafkaClient {
         public boolean maybeHandleDisconnection(ClientRequest request) {
             ApiKeys requestKey = 
ApiKeys.forId(request.request().header().apiKey());
 
-            if (requestKey == ApiKeys.METADATA) {
+            if (requestKey == ApiKeys.METADATA && 
request.isInitiatedByNetworkClient()) {
                 Cluster cluster = metadata.fetch();
                 if (cluster.isBootstrapConfigured()) {
                     int nodeId = 
Integer.parseInt(request.request().destination());

http://git-wip-us.apache.org/repos/asf/kafka/blob/9aaeb33c/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 18f7ecb..b556240 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -16,15 +16,6 @@
  */
 package org.apache.kafka.clients;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -43,6 +34,15 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class NetworkClientTest {
 
     private final int requestTimeoutMs = 1000;
@@ -204,6 +204,26 @@ public class NetworkClientTest {
 
         assertEquals(reconnectBackoffMsTest, delay);
     }
+
+    @Test
+    public void testDisconnectDuringUserMetadataRequest() {
+        // this test ensures that the default metadata updater does not 
intercept a user-initiated
+        // metadata request when the remote node disconnects with the request 
in-flight.
+        awaitReady(client, node);
+        RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.METADATA);
+
+        Struct req = new 
MetadataRequest(Collections.<String>emptyList()).toStruct();
+        RequestSend send = new RequestSend(node.idString(), reqHeader, req);
+        ClientRequest request = new ClientRequest(time.milliseconds(), true, 
send, null);
+        client.send(request, time.milliseconds());
+        client.poll(requestTimeoutMs, time.milliseconds());
+        assertEquals(1, client.inFlightRequestCount(node.idString()));
+
+        selector.close(node.idString());
+        List<ClientResponse> responses = client.poll(requestTimeoutMs, 
time.milliseconds());
+        assertEquals(1, responses.size());
+        assertTrue(responses.iterator().next().wasDisconnected());
+    }
     
     private static class TestCallbackHandler implements 
RequestCompletionHandler {
         public boolean executed = false;

Reply via email to