[ 
https://issues.apache.org/jira/browse/KAFKA-7044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554905#comment-16554905
 ] 

ErikPearson commented on KAFKA-7044:
------------------------------------

I tried increasing the Duration explicitly in getConsumer.endOffsets() but I 
get the same error.

I narrowed down where the request loses the partition.  It looks like 
Fetcher.groupListOffsetRequests() is where the topicPartition is being dropped. 
 When the function is called the second time with a timestampsToSearch Map of 
16, the sum of topicPartitions in the returned in the Map of Maps is 15.

It looks like Fetcher.groupListOffsetRequests() is called for each Client ID in 
the consumer group.  On the first invocation there is no metadata and an empty 
map is returned, which eventually causes a metadata refresh in 
Fetcher.sendListOffsetsRequests()
{quote}Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = 
groupListOffsetRequests(timestampsToSearch);

if (timestampsToSearchByNode.isEmpty())
    return RequestFuture.failure(new StaleMetadataException());
{quote}
On the second invocation of Fetcher.groupListsOffsetRequests(), the dropped 
partition is the last partition in the for loop.  It's also the first reference 
to a specific topic, whose remaining partitions are being handled by a 
yet-to-processed Client ID.  The topic wasn't refreshed from the first 
invocation since the first Client ID didn't process any partitions for this 
topic.  Calling:
{quote}PartitionInfo info = metadata.fetch().partition(tp);
{quote}
returns null.  There's a call to add the topic to the metadata and request a 
metadata refresh, but the topicPartition is never added to the result.  The 
loop ends and the result is returned with 1 less partition in the Map of Maps.

I'm not too familiar with the code, but I wanted to see if a metadata refresh 
would fix it since the topic was added for metadata refresh.  I added a second 
check after the .isEmpty check to verify the number of offsets to search are 
the same.  Please take this will many grains of salt; there probably are better 
ways to fix this issue:
{quote}// Hacking around to avoid null exception. Assume if the 
timestampsToSearchByNode
// does NOT have the same number of offsets as timestampsToSearch then the 
metadata is stale.
int sumTopicPartitionsByNode = 0;
for (Map<TopicPartition, Long> nodeMap : timestampsToSearchByNode.values()) {
    sumTopicPartitionsByNode += nodeMap.size();
}
if (sumTopicPartitionsByNode != timestampsToSearch.size()) {
    log.warn("Expected offsets: " + timestampsToSearch.size() + " Offsets 
across nodes: " + sumTopicPartitionsByNode);
    return RequestFuture.failure(new StaleMetadataException());
}{quote}
Now the kafka-consumer-group command works.  It also shows it hits that 
log.warn() at least on another Client ID in my problematic group as well.

 

 

 

> kafka-consumer-groups.sh NullPointerException describing round robin or 
> sticky assignors
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7044
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7044
>             Project: Kafka
>          Issue Type: Bug
>          Components: tools
>    Affects Versions: 1.1.0
>         Environment: CentOS 7.4, Oracle JDK 1.8
>            Reporter: Jeff Field
>            Assignee: Vahid Hashemian
>            Priority: Minor
>
> We've recently moved to using the round robin assignor for one of our 
> consumer groups, and started testing the sticky assignor. In both cases, 
> using Kafka 1.1.0 we get a null pointer exception *unless* the group being 
> described is rebalancing:
> {code:java}
> kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group 
> groupname-for-consumer
> Error: Executing consumer group command failed due to null
> [2018-06-12 01:32:34,179] DEBUG Exception in consumer group command 
> (kafka.admin.ConsumerGroupCommand$)
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.collectConsumerAssignment(ConsumerGroupCommand.scala:308)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectConsumerAssignment(ConsumerGroupCommand.scala:544)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:571)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:565)
> at scala.collection.immutable.List.flatMap(List.scala:338)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:565)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:558)
> at scala.Option.map(Option.scala:146)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:558)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:271)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:544)
> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:77)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> [2018-06-12 01:32:34,255] DEBUG Removed sensor with name connections-closed: 
> (org.apache.kafka.common.metrics.Metrics){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to