[ https://issues.apache.org/jira/browse/KAFKA-7044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554905#comment-16554905 ]
ErikPearson commented on KAFKA-7044: ------------------------------------ I tried increasing the Duration explicitly in getConsumer.endOffsets() but I get the same error. I narrowed down where the request loses the partition. It looks like Fetcher.groupListOffsetRequests() is where the topicPartition is being dropped. When the function is called the second time with a timestampsToSearch Map of 16, the sum of topicPartitions in the returned in the Map of Maps is 15. It looks like Fetcher.groupListOffsetRequests() is called for each Client ID in the consumer group. On the first invocation there is no metadata and an empty map is returned, which eventually causes a metadata refresh in Fetcher.sendListOffsetsRequests() {quote}Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = groupListOffsetRequests(timestampsToSearch); if (timestampsToSearchByNode.isEmpty()) return RequestFuture.failure(new StaleMetadataException()); {quote} On the second invocation of Fetcher.groupListsOffsetRequests(), the dropped partition is the last partition in the for loop. It's also the first reference to a specific topic, whose remaining partitions are being handled by a yet-to-processed Client ID. The topic wasn't refreshed from the first invocation since the first Client ID didn't process any partitions for this topic. Calling: {quote}PartitionInfo info = metadata.fetch().partition(tp); {quote} returns null. There's a call to add the topic to the metadata and request a metadata refresh, but the topicPartition is never added to the result. The loop ends and the result is returned with 1 less partition in the Map of Maps. I'm not too familiar with the code, but I wanted to see if a metadata refresh would fix it since the topic was added for metadata refresh. I added a second check after the .isEmpty check to verify the number of offsets to search are the same. Please take this will many grains of salt; there probably are better ways to fix this issue: {quote}// Hacking around to avoid null exception. Assume if the timestampsToSearchByNode // does NOT have the same number of offsets as timestampsToSearch then the metadata is stale. int sumTopicPartitionsByNode = 0; for (Map<TopicPartition, Long> nodeMap : timestampsToSearchByNode.values()) { sumTopicPartitionsByNode += nodeMap.size(); } if (sumTopicPartitionsByNode != timestampsToSearch.size()) { log.warn("Expected offsets: " + timestampsToSearch.size() + " Offsets across nodes: " + sumTopicPartitionsByNode); return RequestFuture.failure(new StaleMetadataException()); }{quote} Now the kafka-consumer-group command works. It also shows it hits that log.warn() at least on another Client ID in my problematic group as well. > kafka-consumer-groups.sh NullPointerException describing round robin or > sticky assignors > ---------------------------------------------------------------------------------------- > > Key: KAFKA-7044 > URL: https://issues.apache.org/jira/browse/KAFKA-7044 > Project: Kafka > Issue Type: Bug > Components: tools > Affects Versions: 1.1.0 > Environment: CentOS 7.4, Oracle JDK 1.8 > Reporter: Jeff Field > Assignee: Vahid Hashemian > Priority: Minor > > We've recently moved to using the round robin assignor for one of our > consumer groups, and started testing the sticky assignor. In both cases, > using Kafka 1.1.0 we get a null pointer exception *unless* the group being > described is rebalancing: > {code:java} > kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group > groupname-for-consumer > Error: Executing consumer group command failed due to null > [2018-06-12 01:32:34,179] DEBUG Exception in consumer group command > (kafka.admin.ConsumerGroupCommand$) > java.lang.NullPointerException > at scala.Predef$.Long2long(Predef.scala:363) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:296) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.collectConsumerAssignment(ConsumerGroupCommand.scala:308) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectConsumerAssignment(ConsumerGroupCommand.scala:544) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:571) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:565) > at scala.collection.immutable.List.flatMap(List.scala:338) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:565) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:558) > at scala.Option.map(Option.scala:146) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:558) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:271) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:544) > at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:77) > at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) > [2018-06-12 01:32:34,255] DEBUG Removed sensor with name connections-closed: > (org.apache.kafka.common.metrics.Metrics){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)