Repository: kafka Updated Branches: refs/heads/trunk 71f7e7c3a -> 9aaeb33c1
KAFKA-4098: NetworkClient should not intercept user metdata requests on disconnect Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1798 from hachikuji/KAFKA-4098 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9aaeb33c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9aaeb33c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9aaeb33c Branch: refs/heads/trunk Commit: 9aaeb33c16d4a319967560215e2ea72082367f92 Parents: 71f7e7c Author: Jason Gustafson <[email protected]> Authored: Mon Aug 29 13:13:08 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Aug 29 13:13:08 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/NetworkClient.java | 2 +- .../apache/kafka/clients/NetworkClientTest.java | 38 +++++++++++++++----- 2 files changed, 30 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9aaeb33c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index bd54a29..8ab634d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -559,7 +559,7 @@ public class NetworkClient implements KafkaClient { public boolean maybeHandleDisconnection(ClientRequest request) { ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); - if (requestKey == ApiKeys.METADATA) { + if (requestKey == ApiKeys.METADATA && request.isInitiatedByNetworkClient()) { Cluster cluster = metadata.fetch(); if (cluster.isBootstrapConfigured()) { int nodeId = Integer.parseInt(request.request().destination()); http://git-wip-us.apache.org/repos/asf/kafka/blob/9aaeb33c/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 18f7ecb..b556240 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -16,15 +16,6 @@ */ package org.apache.kafka.clients; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -43,6 +34,15 @@ import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class NetworkClientTest { private final int requestTimeoutMs = 1000; @@ -204,6 +204,26 @@ public class NetworkClientTest { assertEquals(reconnectBackoffMsTest, delay); } + + @Test + public void testDisconnectDuringUserMetadataRequest() { + // this test ensures that the default metadata updater does not intercept a user-initiated + // metadata request when the remote node disconnects with the request in-flight. + awaitReady(client, node); + RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.METADATA); + + Struct req = new MetadataRequest(Collections.<String>emptyList()).toStruct(); + RequestSend send = new RequestSend(node.idString(), reqHeader, req); + ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null); + client.send(request, time.milliseconds()); + client.poll(requestTimeoutMs, time.milliseconds()); + assertEquals(1, client.inFlightRequestCount(node.idString())); + + selector.close(node.idString()); + List<ClientResponse> responses = client.poll(requestTimeoutMs, time.milliseconds()); + assertEquals(1, responses.size()); + assertTrue(responses.iterator().next().wasDisconnected()); + } private static class TestCallbackHandler implements RequestCompletionHandler { public boolean executed = false;
