Jason Gustafson created KAFKA-10894: ---------------------------------------
Summary: Null replica nodes included in client quota callback Cluster Key: KAFKA-10894 URL: https://issues.apache.org/jira/browse/KAFKA-10894 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson I noticed an NPE in the client quota callback `updateClusterMetadata` due to the presence of null nodes inside a `PartitionInfo` instance. Here is the trace: ``` java.lang.NullPointerException at org.apache.kafka.common.PartitionInfo.formatNodeIds(PartitionInfo.java:143) at org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:132) at java.base/java.lang.String.valueOf(String.java:3388) at java.base/java.lang.StringBuilder.append(StringBuilder.java:167) at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457) at java.base/java.util.Collections$UnmodifiableCollection.toString(Collections.java:1042) at java.base/java.lang.String.valueOf(String.java:3388) at java.base/java.lang.StringBuilder.append(StringBuilder.java:167) at org.apache.kafka.common.Cluster.toString(Cluster.java:348) ``` After some debugging, I found that `PartitionInfo.replicas` had a null value. The javadoc for this field is the following: ``` /** * The complete set of replicas for this partition regardless of whether they are alive or up-to-date */ public Node[] replicas() { return replicas; } ``` It's pretty clear that the expectation is that arrays do not contain null values. On the client in `MetadataResponse`, we use the following logic to deal with nodes which are not alive: ``` private static Node[] convertToNodeArray(List<Integer> replicaIds, Map<Integer, Node> nodesById) { return replicaIds.stream().map(replicaId -> { Node node = nodesById.get(replicaId); if (node == null) return new Node(replicaId, "", -1); return node; }).toArray(Node[]::new); } ``` However, inside `MetadataCache.getClusterMetadata` (which is used in the quota callback), we have the following logic: ``` val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, nodes.get(listenerName).orNull) } def node(id: Integer): Node = nodes.get(id.toLong).orNull val partitions = getAllPartitions(snapshot) .filter { case (_, state) => state.leader != LeaderAndIsr.LeaderDuringDelete } .map { case (tp, state) => new PartitionInfo(tp.topic, tp.partition, node(state.leader), state.replicas.asScala.map(node).toArray, state.isr.asScala.map(node).toArray, state.offlineReplicas.asScala.map(node).toArray) } ``` Note specifically that the nested `node` method returns null if the replica is not alive. It looks like we need to mimic the same logic from `MetadataResponse` here. -- This message was sent by Atlassian Jira (v8.3.4#803005)