dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r650349885



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], 
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- 
consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = 
getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       Did you consider handling the 'null' as follow?
   
   '''
   topicPartition -> committedOffsets.get(topicPartition).filter(_ != 
null).map(_.offset)
   '''
   
   I feel like that this would be a bit more comprehensive as it explicitly 
handle the 'null' case. We could also add a small comment to explain why it is 
here.
   
   Could you also add a unit test for the bug please?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], 
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- 
consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = 
getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       Thanks. Regarding the test, we could perhaps add a unit test in 
ConsumerGroupServiceTest which relies on a mocked admin client. Would this work?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to