IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r658978444



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets 
nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava

Review comment:
       Actually, put that case based on what I saw on that consumer-group:
   
   This was the consumer-group state:
   `(groupId=order-validations, isSimpleConsumerGroup=false, 
members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, 
groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, 
assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)),
 partitionAssignor=RoundRobinAssigner, state=Stable, 
coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])`
   
   It has assigned all the partitions to rtl_orderReceive, but when getting the 
commited offsets:
   
   `Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> 
OffsetAndMetadata{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 
-> null, rtl_orderReceive-3 -> OffsetAndMetadata{offset=33, leaderEpoch=null, 
metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, 
rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)`
   
   Even that partition 6 was assigned, the aren't values commited for it (even 
not a -1). 
   That is for the case of assigned partitions, but for unassigned partitions, 
thinking it now, as it is the subsets of commited offsets that aren't assigned, 
it makes no sense to have non defined commited unassigned partition (because if 
there weren't a commited partition, then wouldn't exist the unassigned 
partition). 
   
   So I am thinking on:
   
   ```
   val commitedOffsets = Map(
         testTopicPartition1 -> new OffsetAndMetadata(100),
         testTopicPartition2 -> null,
         testTopicPartition3 -> new OffsetAndMetadata(100),
         testTopicPartition4 -> new OffsetAndMetadata(100),
         testTopicPartition5 -> null,
       ).asJava
   ```
   
   Just letting testTopicPartition0 as undefined, but defining 
testTopicPartition3 (unassigned). 
   Makes sense?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to