[ 
https://issues.apache.org/jira/browse/KAFKA-10894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10894:
------------------------------------
    Description: 
I noticed an NPE in the client quota callback `updateClusterMetadata` due to 
the presence of null nodes inside a `PartitionInfo` instance. Here is the trace:
{code}
java.lang.NullPointerException
        at 
org.apache.kafka.common.PartitionInfo.formatNodeIds(PartitionInfo.java:143)
        at 
org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:132)
        at java.base/java.lang.String.valueOf(String.java:3388)
        at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
        at 
java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457)
        at 
java.base/java.util.Collections$UnmodifiableCollection.toString(Collections.java:1042)
        at java.base/java.lang.String.valueOf(String.java:3388)
        at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
        at org.apache.kafka.common.Cluster.toString(Cluster.java:348)
{code}

After some debugging, I found that `PartitionInfo.replicas` had a null value. 
The javadoc for this field is the following:
{code}
    /**
     * The complete set of replicas for this partition regardless of whether 
they are alive or up-to-date
     */
    public Node[] replicas() {
        return replicas;
    }
{code}

It's pretty clear that the expectation is that arrays do not contain null 
values. On the client in `MetadataResponse`, we use the following logic to deal 
with nodes which are not alive:
{code}
    private static Node[] convertToNodeArray(List<Integer> replicaIds, 
Map<Integer, Node> nodesById) {
        return replicaIds.stream().map(replicaId -> {
            Node node = nodesById.get(replicaId);
            if (node == null)
                return new Node(replicaId, "", -1);
            return node;
        }).toArray(Node[]::new);
    }
{code}

However, inside `MetadataCache.getClusterMetadata` (which is used in the quota 
callback), we have the following logic:
{code}
    val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, 
nodes.get(listenerName).orNull) }
    def node(id: Integer): Node = nodes.get(id.toLong).orNull
    val partitions = getAllPartitions(snapshot)
      .filter { case (_, state) => state.leader != 
LeaderAndIsr.LeaderDuringDelete }
      .map { case (tp, state) =>
        new PartitionInfo(tp.topic, tp.partition, node(state.leader),
          state.replicas.asScala.map(node).toArray,
          state.isr.asScala.map(node).toArray,
          state.offlineReplicas.asScala.map(node).toArray)
      }
{code}
Note specifically that the nested `node` method returns null if the replica is 
not alive. It looks like we need to mimic the same logic from 
`MetadataResponse` here.




  was:
I noticed an NPE in the client quota callback `updateClusterMetadata` due to 
the presence of null nodes inside a `PartitionInfo` instance. Here is the trace:
```
java.lang.NullPointerException
        at 
org.apache.kafka.common.PartitionInfo.formatNodeIds(PartitionInfo.java:143)
        at 
org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:132)
        at java.base/java.lang.String.valueOf(String.java:3388)
        at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
        at 
java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457)
        at 
java.base/java.util.Collections$UnmodifiableCollection.toString(Collections.java:1042)
        at java.base/java.lang.String.valueOf(String.java:3388)
        at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
        at org.apache.kafka.common.Cluster.toString(Cluster.java:348)
```

After some debugging, I found that `PartitionInfo.replicas` had a null value. 
The javadoc for this field is the following:
```
    /**
     * The complete set of replicas for this partition regardless of whether 
they are alive or up-to-date
     */
    public Node[] replicas() {
        return replicas;
    }
```

It's pretty clear that the expectation is that arrays do not contain null 
values. On the client in `MetadataResponse`, we use the following logic to deal 
with nodes which are not alive:
```
    private static Node[] convertToNodeArray(List<Integer> replicaIds, 
Map<Integer, Node> nodesById) {
        return replicaIds.stream().map(replicaId -> {
            Node node = nodesById.get(replicaId);
            if (node == null)
                return new Node(replicaId, "", -1);
            return node;
        }).toArray(Node[]::new);
    }
```

However, inside `MetadataCache.getClusterMetadata` (which is used in the quota 
callback), we have the following logic:
```
    val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, 
nodes.get(listenerName).orNull) }
    def node(id: Integer): Node = nodes.get(id.toLong).orNull
    val partitions = getAllPartitions(snapshot)
      .filter { case (_, state) => state.leader != 
LeaderAndIsr.LeaderDuringDelete }
      .map { case (tp, state) =>
        new PartitionInfo(tp.topic, tp.partition, node(state.leader),
          state.replicas.asScala.map(node).toArray,
          state.isr.asScala.map(node).toArray,
          state.offlineReplicas.asScala.map(node).toArray)
      }
```
Note specifically that the nested `node` method returns null if the replica is 
not alive. It looks like we need to mimic the same logic from 
`MetadataResponse` here.





> Null replica nodes included in client quota callback Cluster
> ------------------------------------------------------------
>
>                 Key: KAFKA-10894
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10894
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Major
>
> I noticed an NPE in the client quota callback `updateClusterMetadata` due to 
> the presence of null nodes inside a `PartitionInfo` instance. Here is the 
> trace:
> {code}
> java.lang.NullPointerException
>       at 
> org.apache.kafka.common.PartitionInfo.formatNodeIds(PartitionInfo.java:143)
>       at 
> org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:132)
>       at java.base/java.lang.String.valueOf(String.java:3388)
>       at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
>       at 
> java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457)
>       at 
> java.base/java.util.Collections$UnmodifiableCollection.toString(Collections.java:1042)
>       at java.base/java.lang.String.valueOf(String.java:3388)
>       at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
>       at org.apache.kafka.common.Cluster.toString(Cluster.java:348)
> {code}
> After some debugging, I found that `PartitionInfo.replicas` had a null value. 
> The javadoc for this field is the following:
> {code}
>     /**
>      * The complete set of replicas for this partition regardless of whether 
> they are alive or up-to-date
>      */
>     public Node[] replicas() {
>         return replicas;
>     }
> {code}
> It's pretty clear that the expectation is that arrays do not contain null 
> values. On the client in `MetadataResponse`, we use the following logic to 
> deal with nodes which are not alive:
> {code}
>     private static Node[] convertToNodeArray(List<Integer> replicaIds, 
> Map<Integer, Node> nodesById) {
>         return replicaIds.stream().map(replicaId -> {
>             Node node = nodesById.get(replicaId);
>             if (node == null)
>                 return new Node(replicaId, "", -1);
>             return node;
>         }).toArray(Node[]::new);
>     }
> {code}
> However, inside `MetadataCache.getClusterMetadata` (which is used in the 
> quota callback), we have the following logic:
> {code}
>     val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, 
> nodes.get(listenerName).orNull) }
>     def node(id: Integer): Node = nodes.get(id.toLong).orNull
>     val partitions = getAllPartitions(snapshot)
>       .filter { case (_, state) => state.leader != 
> LeaderAndIsr.LeaderDuringDelete }
>       .map { case (tp, state) =>
>         new PartitionInfo(tp.topic, tp.partition, node(state.leader),
>           state.replicas.asScala.map(node).toArray,
>           state.isr.asScala.map(node).toArray,
>           state.offlineReplicas.asScala.map(node).toArray)
>       }
> {code}
> Note specifically that the nested `node` method returns null if the replica 
> is not alive. It looks like we need to mimic the same logic from 
> `MetadataResponse` here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to