This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d95c191 KAFKA-12926: ConsumerGroupCommand's
java.lang.NullPointerException at negative offsets while running
kafka-consumer-groups.sh (#10858)
d95c191 is described below
commit d95c1919458bd5621774394c9eb61698ce2187b8
Author: Ignacio Acuña Frías <[email protected]>
AuthorDate: Tue Jun 29 03:00:56 2021 -0400
KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at
negative offsets while running kafka-consumer-groups.sh (#10858)
This patch fixes the `ConsumerGroupCommand` to correctly handle missing
offsets, which are returned as `null` by the admin API.
Reviewers: David Jacot <[email protected]>
---
.../scala/kafka/admin/ConsumerGroupCommand.scala | 11 ++-
.../kafka/admin/ConsumerGroupServiceTest.scala | 82 ++++++++++++++++++++++
2 files changed, 86 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 6c6090f..74b7224 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -560,27 +560,24 @@ object ConsumerGroupCommand extends Logging {
val groupOffsets = TreeMap[String, (Option[String],
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <-
consumerGroups) yield {
val state = consumerGroup.state
val committedOffsets = getCommittedOffsets(groupId)
+ // The admin client returns `null` as a value to indicate that there
is not committed offset for a partition.
+ def getPartitionOffset(tp: TopicPartition): Option[Long] =
committedOffsets.get(tp).filter(_ != null).map(_.offset)
var assignedTopicPartitions = ListBuffer[TopicPartition]()
val rowsWithConsumer =
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
.sortWith(_.assignment.topicPartitions.size >
_.assignment.topicPartitions.size).flatMap { consumerSummary =>
val topicPartitions =
consumerSummary.assignment.topicPartitions.asScala
assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
- val partitionOffsets =
consumerSummary.assignment.topicPartitions.asScala
- .map { topicPartition =>
- topicPartition ->
committedOffsets.get(topicPartition).map(_.offset)
- }.toMap
collectConsumerAssignment(groupId,
Option(consumerGroup.coordinator), topicPartitions.toList,
- partitionOffsets, Some(s"${consumerSummary.consumerId}"),
Some(s"${consumerSummary.host}"),
+ getPartitionOffset, Some(s"${consumerSummary.consumerId}"),
Some(s"${consumerSummary.host}"),
Some(s"${consumerSummary.clientId}"))
}
-
val unassignedPartitions = committedOffsets.filterNot { case (tp, _)
=> assignedTopicPartitions.contains(tp) }
val rowsWithoutConsumer = if (unassignedPartitions.nonEmpty) {
collectConsumerAssignment(
groupId,
Option(consumerGroup.coordinator),
unassignedPartitions.keySet.toSeq,
- unassignedPartitions.map { case (tp, offset) => tp ->
Some(offset.offset) },
+ getPartitionOffset,
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE)).toSeq
diff --git
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
index 86bf674..3b3b781 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
+import org.mockito.ArgumentMatcher
import scala.jdk.CollectionConverters._
@@ -63,6 +64,87 @@ class ConsumerGroupServiceTest {
}
@Test
+ def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+ val args = Array("--bootstrap-server", "localhost:9092", "--group", group,
"--describe", "--offsets")
+ val groupService = consumerGroupService(args)
+
+ val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+ val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+ val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+ val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+ val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+ val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+ // Some topic's partitions gets valid OffsetAndMetada values, other gets
nulls values (negative integers) and others aren't defined
+ val commitedOffsets = Map(
+ testTopicPartition1 -> new OffsetAndMetadata(100),
+ testTopicPartition2 -> null,
+ testTopicPartition3 -> new OffsetAndMetadata(100),
+ testTopicPartition4 -> new OffsetAndMetadata(100),
+ testTopicPartition5 -> null,
+ ).asJava
+
+ val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100,
System.currentTimeMillis, Optional.of(1))
+ val endOffsets = Map(
+ testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+ )
+ val assignedTopicPartitions = Set(testTopicPartition0,
testTopicPartition1, testTopicPartition2)
+ val unassignedTopicPartitions = Set(testTopicPartition3,
testTopicPartition4, testTopicPartition5)
+
+ val consumerGroupDescription = new ConsumerGroupDescription(group,
+ true,
+ Collections.singleton(new MemberDescription("member1",
Optional.of("instance1"), "client1", "host1", new
MemberAssignment(assignedTopicPartitions.asJava))),
+ classOf[RangeAssignor].getName,
+ ConsumerGroupState.STABLE,
+ new Node(1, "localhost", 9092))
+
+ def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]):
ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+ topicPartitionOffsets => topicPartitionOffsets != null &&
topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+ }
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any()))
+ .thenReturn(new
DescribeConsumerGroupsResult(Collections.singletonMap(group,
KafkaFuture.completedFuture(consumerGroupDescription))))
+ when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+ when(admin.listOffsets(
+ ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+ any()
+ )).thenReturn(new ListOffsetsResult(endOffsets.filter { case (tp, _) =>
assignedTopicPartitions.contains(tp) }.asJava))
+ when(admin.listOffsets(
+ ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+ any()
+ )).thenReturn(new ListOffsetsResult(endOffsets.filter { case (tp, _) =>
unassignedTopicPartitions.contains(tp) }.asJava))
+
+ val (state, assignments) = groupService.collectGroupOffsets(group)
+ val returnedOffsets = assignments.map { results =>
+ results.map { assignment =>
+ new TopicPartition(assignment.topic.get, assignment.partition.get) ->
assignment.offset
+ }.toMap
+ }.getOrElse(Map.empty)
+
+ val expectedOffsets = Map(
+ testTopicPartition0 -> None,
+ testTopicPartition1 -> Some(100),
+ testTopicPartition2 -> None,
+ testTopicPartition3 -> Some(100),
+ testTopicPartition4 -> Some(100),
+ testTopicPartition5 -> None
+ )
+ assertEquals(Some("Stable"), state)
+ assertEquals(expectedOffsets, returnedOffsets)
+
+ verify(admin,
times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any())
+ verify(admin,
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+ verify(admin,
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
any())
+ verify(admin,
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
any())
+ }
+
+ @Test
def testAdminRequestsForResetOffsets(): Unit = {
val args = Seq("--bootstrap-server", "localhost:9092", "--group", group,
"--reset-offsets", "--to-latest")
val topicsWithoutPartitionsSpecified = topics.tail