IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657960159



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+      // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] 
= {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, 
OffsetSpec] = {
+      val expectedOffsets = offsets.asScala.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+    doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics,
 any())
+    doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics,
 any())

Review comment:
       Doing following on the stack with some printlns, in fact is not that 
Mockito throwing an Null Pointer Error, but when calling the second time it 
doesnt identified the argumentMatcher, so it invokes the function as normal. 
That normal invocation throw a null (because there is not data).
   
   Guessing that Mockito doesnt register the second ArgumentMatcher, and for 
that reason doesn't return the expected values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to