[ https://issues.apache.org/jira/browse/KAFKA-10894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson updated KAFKA-10894: ------------------------------------ Description: I noticed an NPE in the client quota callback `updateClusterMetadata` due to the presence of null nodes inside a `PartitionInfo` instance. Here is the trace: {code} java.lang.NullPointerException at org.apache.kafka.common.PartitionInfo.formatNodeIds(PartitionInfo.java:143) at org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:132) at java.base/java.lang.String.valueOf(String.java:3388) at java.base/java.lang.StringBuilder.append(StringBuilder.java:167) at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457) at java.base/java.util.Collections$UnmodifiableCollection.toString(Collections.java:1042) at java.base/java.lang.String.valueOf(String.java:3388) at java.base/java.lang.StringBuilder.append(StringBuilder.java:167) at org.apache.kafka.common.Cluster.toString(Cluster.java:348) {code} After some debugging, I found that `PartitionInfo.replicas` had a null value. The javadoc for this field is the following: {code} /** * The complete set of replicas for this partition regardless of whether they are alive or up-to-date */ public Node[] replicas() { return replicas; } {code} It's pretty clear that the expectation is that arrays do not contain null values. On the client in `MetadataResponse`, we use the following logic to deal with nodes which are not alive: {code} private static Node[] convertToNodeArray(List<Integer> replicaIds, Map<Integer, Node> nodesById) { return replicaIds.stream().map(replicaId -> { Node node = nodesById.get(replicaId); if (node == null) return new Node(replicaId, "", -1); return node; }).toArray(Node[]::new); } {code} However, inside `MetadataCache.getClusterMetadata` (which is used in the quota callback), we have the following logic: {code} val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, nodes.get(listenerName).orNull) } def node(id: Integer): Node = nodes.get(id.toLong).orNull val partitions = getAllPartitions(snapshot) .filter { case (_, state) => state.leader != LeaderAndIsr.LeaderDuringDelete } .map { case (tp, state) => new PartitionInfo(tp.topic, tp.partition, node(state.leader), state.replicas.asScala.map(node).toArray, state.isr.asScala.map(node).toArray, state.offlineReplicas.asScala.map(node).toArray) } {code} Note specifically that the nested `node` method returns null if the replica is not alive. It looks like we need to mimic the same logic from `MetadataResponse` here. was: I noticed an NPE in the client quota callback `updateClusterMetadata` due to the presence of null nodes inside a `PartitionInfo` instance. Here is the trace: ``` java.lang.NullPointerException at org.apache.kafka.common.PartitionInfo.formatNodeIds(PartitionInfo.java:143) at org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:132) at java.base/java.lang.String.valueOf(String.java:3388) at java.base/java.lang.StringBuilder.append(StringBuilder.java:167) at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457) at java.base/java.util.Collections$UnmodifiableCollection.toString(Collections.java:1042) at java.base/java.lang.String.valueOf(String.java:3388) at java.base/java.lang.StringBuilder.append(StringBuilder.java:167) at org.apache.kafka.common.Cluster.toString(Cluster.java:348) ``` After some debugging, I found that `PartitionInfo.replicas` had a null value. The javadoc for this field is the following: ``` /** * The complete set of replicas for this partition regardless of whether they are alive or up-to-date */ public Node[] replicas() { return replicas; } ``` It's pretty clear that the expectation is that arrays do not contain null values. On the client in `MetadataResponse`, we use the following logic to deal with nodes which are not alive: ``` private static Node[] convertToNodeArray(List<Integer> replicaIds, Map<Integer, Node> nodesById) { return replicaIds.stream().map(replicaId -> { Node node = nodesById.get(replicaId); if (node == null) return new Node(replicaId, "", -1); return node; }).toArray(Node[]::new); } ``` However, inside `MetadataCache.getClusterMetadata` (which is used in the quota callback), we have the following logic: ``` val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, nodes.get(listenerName).orNull) } def node(id: Integer): Node = nodes.get(id.toLong).orNull val partitions = getAllPartitions(snapshot) .filter { case (_, state) => state.leader != LeaderAndIsr.LeaderDuringDelete } .map { case (tp, state) => new PartitionInfo(tp.topic, tp.partition, node(state.leader), state.replicas.asScala.map(node).toArray, state.isr.asScala.map(node).toArray, state.offlineReplicas.asScala.map(node).toArray) } ``` Note specifically that the nested `node` method returns null if the replica is not alive. It looks like we need to mimic the same logic from `MetadataResponse` here. > Null replica nodes included in client quota callback Cluster > ------------------------------------------------------------ > > Key: KAFKA-10894 > URL: https://issues.apache.org/jira/browse/KAFKA-10894 > Project: Kafka > Issue Type: Bug > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Priority: Major > > I noticed an NPE in the client quota callback `updateClusterMetadata` due to > the presence of null nodes inside a `PartitionInfo` instance. Here is the > trace: > {code} > java.lang.NullPointerException > at > org.apache.kafka.common.PartitionInfo.formatNodeIds(PartitionInfo.java:143) > at > org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:132) > at java.base/java.lang.String.valueOf(String.java:3388) > at java.base/java.lang.StringBuilder.append(StringBuilder.java:167) > at > java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457) > at > java.base/java.util.Collections$UnmodifiableCollection.toString(Collections.java:1042) > at java.base/java.lang.String.valueOf(String.java:3388) > at java.base/java.lang.StringBuilder.append(StringBuilder.java:167) > at org.apache.kafka.common.Cluster.toString(Cluster.java:348) > {code} > After some debugging, I found that `PartitionInfo.replicas` had a null value. > The javadoc for this field is the following: > {code} > /** > * The complete set of replicas for this partition regardless of whether > they are alive or up-to-date > */ > public Node[] replicas() { > return replicas; > } > {code} > It's pretty clear that the expectation is that arrays do not contain null > values. On the client in `MetadataResponse`, we use the following logic to > deal with nodes which are not alive: > {code} > private static Node[] convertToNodeArray(List<Integer> replicaIds, > Map<Integer, Node> nodesById) { > return replicaIds.stream().map(replicaId -> { > Node node = nodesById.get(replicaId); > if (node == null) > return new Node(replicaId, "", -1); > return node; > }).toArray(Node[]::new); > } > {code} > However, inside `MetadataCache.getClusterMetadata` (which is used in the > quota callback), we have the following logic: > {code} > val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, > nodes.get(listenerName).orNull) } > def node(id: Integer): Node = nodes.get(id.toLong).orNull > val partitions = getAllPartitions(snapshot) > .filter { case (_, state) => state.leader != > LeaderAndIsr.LeaderDuringDelete } > .map { case (tp, state) => > new PartitionInfo(tp.topic, tp.partition, node(state.leader), > state.replicas.asScala.map(node).toArray, > state.isr.asScala.map(node).toArray, > state.offlineReplicas.asScala.map(node).toArray) > } > {code} > Note specifically that the nested `node` method returns null if the replica > is not alive. It looks like we need to mimic the same logic from > `MetadataResponse` here. -- This message was sent by Atlassian Jira (v8.3.4#803005)