KAFKA-2998: log warnings when client is disconnected from bootstrap brokers
Author: Jason Gustafson <[email protected]> Reviewers: Grant Henke, Guozhang Wang Closes #769 from hachikuji/KAFKA-2998 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c36268f7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c36268f7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c36268f7 Branch: refs/heads/0.10.0 Commit: c36268f77fbf7f6a47a1e09ec3e38c20173a06c5 Parents: 9897813 Author: Jason Gustafson <[email protected]> Authored: Mon Apr 4 21:28:59 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Apr 5 17:08:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/NetworkClient.java | 8 ++++++++ .../main/java/org/apache/kafka/common/Cluster.java | 17 +++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c36268f7/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d22b508..d2eaace 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -556,6 +556,14 @@ public class NetworkClient implements KafkaClient { ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); if (requestKey == ApiKeys.METADATA) { + Cluster cluster = metadata.fetch(); + if (cluster.isBootstrapConfigured()) { + int nodeId = Integer.parseInt(request.request().destination()); + Node node = cluster.nodeById(nodeId); + if (node != null) + log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port()); + } + metadataFetchInProgress = false; return true; } http://git-wip-us.apache.org/repos/asf/kafka/blob/c36268f7/clients/src/main/java/org/apache/kafka/common/Cluster.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 8e85df8..e1bf581 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -29,6 +29,7 @@ import java.util.Set; */ public final class Cluster { + private final boolean isBootstrapConfigured; private final List<Node> nodes; private final Set<String> unauthorizedTopics; private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; @@ -45,11 +46,19 @@ public final class Cluster { public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions, Set<String> unauthorizedTopics) { + this(false, nodes, partitions, unauthorizedTopics); + } + + private Cluster(boolean isBootstrapConfigured, + Collection<Node> nodes, + Collection<PartitionInfo> partitions, + Set<String> unauthorizedTopics) { + this.isBootstrapConfigured = isBootstrapConfigured; + // make a randomized, unmodifiable copy of the nodes List<Node> copy = new ArrayList<>(nodes); Collections.shuffle(copy); this.nodes = Collections.unmodifiableList(copy); - this.nodesById = new HashMap<>(); for (Node node : nodes) this.nodesById.put(node.id(), node); @@ -115,7 +124,7 @@ public final class Cluster { int nodeId = -1; for (InetSocketAddress address : addresses) nodes.add(new Node(nodeId--, address.getHostString(), address.getPort())); - return new Cluster(nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet()); + return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet()); } /** @@ -214,6 +223,10 @@ public final class Cluster { return unauthorizedTopics; } + public boolean isBootstrapConfigured() { + return isBootstrapConfigured; + } + @Override public String toString() { return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
