dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r659616835



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, 
testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): 
ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && 
topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.asJava))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.asJava))
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    val returnedOffsets = assignments.map { results =>
+      results.map { assignment =>
+        new TopicPartition(assignment.topic.get, assignment.partition.get) -> 
assignment.offset
+      }.toMap
+    }.getOrElse(Map.empty)
+    // Results should have information for all assigned topic partition (even 
if there is not Offset's information at all, because they get fills with None)
+    // Results should have information only for unassigned topic partitions if 
and only if there is information about them (including with null values)

Review comment:
       Is this comment still relevant? It seems that we have a value for all 
partitions now.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, 
testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): 
ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && 
topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.asJava))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.asJava))

Review comment:
       nit: Same here.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, 
testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): 
ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && 
topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(new 
DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.asJava))

Review comment:
       nit: We usually put a space before the `{`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to