IgnacioAcunaF commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r659039997
########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -62,6 +62,84 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { + val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") + val groupService = consumerGroupService(args) + + val testTopicPartition0 = new TopicPartition("testTopic1", 0); + val testTopicPartition1 = new TopicPartition("testTopic1", 1); + val testTopicPartition2 = new TopicPartition("testTopic1", 2); + val testTopicPartition3 = new TopicPartition("testTopic2", 0); + val testTopicPartition4 = new TopicPartition("testTopic2", 1); + val testTopicPartition5 = new TopicPartition("testTopic2", 2); + + // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined + val commitedOffsets = Map( + testTopicPartition1 -> new OffsetAndMetadata(100), + testTopicPartition2 -> null, + testTopicPartition4 -> new OffsetAndMetadata(100), + testTopicPartition5 -> null, + ).asJava + + val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) + val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), + ) + val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2) + val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5) + + val consumerGroupDescription = new ConsumerGroupDescription(group, + true, + Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))), + classOf[RangeAssignor].getName, + ConsumerGroupState.STABLE, + new Node(1, "localhost", 9092)) + + def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => + (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } + } + + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription)))) + when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets)) + when(admin.listOffsets(offsetsArgMatcher, any())) + .thenReturn(new ListOffsetsResult(endOffsets.asJava)) Review comment: Actually yes, is because of the fact that not all the end offsets are returned all the time now, but only for unassigned partitions case. Because the ConsumerGroupCommand defines unassigned topic partition as: `val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }` The definition that I gave to the test is breaking that logic, because if the topic partition is not in the commitedOffsets, by definition shouldn't be on the unassignedPartitions. The test, as as I primarily defined, is declaring testTopicPartition3 as an unassigned partition, but that partition is not being defined also on the commitedOffsets. So is not respecting the previous statment `val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }`. That is the reason that the test is failing. So the test case should be: ``` val commitedOffsets = Map( testTopicPartition1 -> new OffsetAndMetadata(100), testTopicPartition2 -> null, testTopicPartition3 -> new OffsetAndMetadata(100), testTopicPartition4 -> new OffsetAndMetadata(100), testTopicPartition5 -> null, ).asJava ``` Just the testTopicPartition0 (which is assigned) as non defined, because as I seem there is never going to be a case where there is a non defined unassigned partition, because it is a requirement to be defined on commitedOffsets in order to be consider as unasigned. On the other way, there could be assigned but not commited partitions. ########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -62,6 +62,84 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { + val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") + val groupService = consumerGroupService(args) + + val testTopicPartition0 = new TopicPartition("testTopic1", 0); + val testTopicPartition1 = new TopicPartition("testTopic1", 1); + val testTopicPartition2 = new TopicPartition("testTopic1", 2); + val testTopicPartition3 = new TopicPartition("testTopic2", 0); + val testTopicPartition4 = new TopicPartition("testTopic2", 1); + val testTopicPartition5 = new TopicPartition("testTopic2", 2); + + // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined + val commitedOffsets = Map( + testTopicPartition1 -> new OffsetAndMetadata(100), + testTopicPartition2 -> null, + testTopicPartition4 -> new OffsetAndMetadata(100), + testTopicPartition5 -> null, + ).asJava + + val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) + val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), + ) + val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2) + val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5) + + val consumerGroupDescription = new ConsumerGroupDescription(group, + true, + Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))), + classOf[RangeAssignor].getName, + ConsumerGroupState.STABLE, + new Node(1, "localhost", 9092)) + + def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => + (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } + } + + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription)))) + when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets)) + when(admin.listOffsets(offsetsArgMatcher, any())) + .thenReturn(new ListOffsetsResult(endOffsets.asJava)) Review comment: Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org