IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r659039997



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, 
testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ 
case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || 
map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       Actually yes, is because of the fact that not all the end offsets are 
returned all the time now, but only for unassigned partitions case.
   
   Because the ConsumerGroupCommand defines unassigned topic partition as:
   `val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => 
assignedTopicPartitions.contains(tp) }`
   
   The definition that I gave to the test is breaking that logic, because if 
the topic partition is not in the commitedOffsets, by definition shouldn't be 
on the unassignedPartitions.
   The test, as as I primarily defined, is declaring testTopicPartition3 as an 
unassigned partition, but that partition is not being defined also on the 
commitedOffsets. So is not respecting the previous statment `val 
unassignedPartitions = committedOffsets.filterNot { case (tp, _) => 
assignedTopicPartitions.contains(tp) }`. That is the reason that the test is 
failing.
   
   So the test case should be:
   ```
   val commitedOffsets = Map(
         testTopicPartition1 -> new OffsetAndMetadata(100),
         testTopicPartition2 -> null,
         testTopicPartition3 -> new OffsetAndMetadata(100),
         testTopicPartition4 -> new OffsetAndMetadata(100),
         testTopicPartition5 -> null,
       ).asJava
   ```
   Just the testTopicPartition0 (which is assigned) as non defined, because as 
I seem there is never going to be a case where there is a non defined 
unassigned partition, because it is a requirement to be defined on 
commitedOffsets in order to be consider as unasigned.
   On the other way, there could be assigned but not commited partitions.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, 
testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ 
case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || 
map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       Makes sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to