IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r659051878
########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -62,6 +62,84 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { + val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") + val groupService = consumerGroupService(args) + + val testTopicPartition0 = new TopicPartition("testTopic1", 0); + val testTopicPartition1 = new TopicPartition("testTopic1", 1); + val testTopicPartition2 = new TopicPartition("testTopic1", 2); + val testTopicPartition3 = new TopicPartition("testTopic2", 0); + val testTopicPartition4 = new TopicPartition("testTopic2", 1); + val testTopicPartition5 = new TopicPartition("testTopic2", 2); + + // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined + val commitedOffsets = Map( + testTopicPartition1 -> new OffsetAndMetadata(100), + testTopicPartition2 -> null, + testTopicPartition4 -> new OffsetAndMetadata(100), + testTopicPartition5 -> null, + ).asJava + + val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) + val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), + ) + val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2) + val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5) + + val consumerGroupDescription = new ConsumerGroupDescription(group, + true, + Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))), + classOf[RangeAssignor].getName, + ConsumerGroupState.STABLE, + new Node(1, "localhost", 9092)) + + def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => + (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } + } + + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription)))) + when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets)) + when(admin.listOffsets(offsetsArgMatcher, any())) + .thenReturn(new ListOffsetsResult(endOffsets.asJava)) Review comment: And why the test was passing before, was because of this: ``` def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = { val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) } } ``` The expectedOffsetsUnassigned was a subset of commitedOffsets (`commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap`) The expectedOffsetsAssignedTopics was a subset of endOffsets (`endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap`) It was defined that way to preserve the logic of `val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }`. But your suggestion of use endOffsets on both as a filter seems cleaner to me. Just needed to adjust the test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org