This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3f60c2  KAFKA-12479: Batch partition offset requests in 
ConsumerGroupCommand (#10371)
e3f60c2 is described below

commit e3f60c254c66d7021d3e0b61968a59a70e00cb39
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Mar 23 09:56:56 2021 +0000

    KAFKA-12479: Batch partition offset requests in ConsumerGroupCommand 
(#10371)
    
    Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai 
<[email protected]>, Ismael Juma <[email protected]>
---
 .../kafka/clients/admin/AdminClientTestUtils.java  |  13 ++
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  65 ++++++----
 .../kafka/admin/ConsumerGroupServiceTest.scala     | 133 +++++++++++++++++++++
 3 files changed, 186 insertions(+), 25 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index b0f0055..a64dab6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -18,7 +18,11 @@ package org.apache.kafka.clients.admin;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.stream.Collectors;
+
 import 
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
@@ -89,4 +93,13 @@ public class AdminClientTestUtils {
         future.complete(description);
         return new DescribeTopicsResult(Collections.singletonMap(topic, 
future));
     }
+
+    public static DescribeTopicsResult describeTopicsResult(Map<String, 
TopicDescription> topicDescriptions) {
+        return new DescribeTopicsResult(topicDescriptions.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
KafkaFuture.completedFuture(e.getValue()))));
+    }
+
+    public static ListConsumerGroupOffsetsResult 
listConsumerGroupOffsetsResult(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        return new 
ListConsumerGroupOffsetsResult(KafkaFuture.completedFuture(offsets));
+    }
 }
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index e948f6f..2c0dc8c 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -459,13 +459,7 @@ object ConsumerGroupCommand extends Logging {
       val partitionLevelResult = mutable.Map[TopicPartition, Throwable]()
 
       val (topicWithPartitions, topicWithoutPartitions) = 
topics.partition(_.contains(":"))
-
-      val knownPartitions = topicWithPartitions.flatMap { topicArg =>
-        val split = topicArg.split(":")
-        split(1).split(",").map { partition =>
-          new TopicPartition(split(0), partition.toInt)
-        }
-      }
+      val knownPartitions = 
topicWithPartitions.flatMap(parseTopicsWithPartitions)
 
       // Get the partitions of topics that the user did not explicitly specify 
the partitions
       val describeTopicsResult = adminClient.describeTopics(
@@ -580,18 +574,20 @@ object ConsumerGroupCommand extends Logging {
             partitionOffsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
             Some(s"${consumerSummary.clientId}"))
         }
-        val rowsWithoutConsumer = committedOffsets.filter { case (tp, _) =>
-          !assignedTopicPartitions.contains(tp)
-        }.flatMap { case (topicPartition, offset) =>
+
+        val unassignedPartitions = committedOffsets.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }
+        val rowsWithoutConsumer = if (unassignedPartitions.nonEmpty) {
           collectConsumerAssignment(
             groupId,
             Option(consumerGroup.coordinator),
-            Seq(topicPartition),
-            Map(topicPartition -> Some(offset.offset)),
+            unassignedPartitions.keySet.toSeq,
+            unassignedPartitions.map { case (tp, offset) => tp -> 
Some(offset.offset) },
             Some(MISSING_COLUMN_VALUE),
             Some(MISSING_COLUMN_VALUE),
             Some(MISSING_COLUMN_VALUE)).toSeq
-        }
+        } else
+          Seq.empty
+
         groupId -> (Some(state.toString), Some(rowsWithConsumer ++ 
rowsWithoutConsumer))
       }).toMap
 
@@ -696,7 +692,8 @@ object ConsumerGroupCommand extends Logging {
       adminClient.close()
     }
 
-    private def createAdminClient(configOverrides: Map[String, String]): Admin 
= {
+    // Visibility for testing
+    protected def createAdminClient(configOverrides: Map[String, String]): 
Admin = {
       val props = if (opts.options.has(opts.commandConfigOpt)) 
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new 
Properties()
       props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
       configOverrides.forKeyValue { (k, v) => props.put(k, v)}
@@ -708,22 +705,40 @@ object ConsumerGroupCommand extends Logging {
       options.timeoutMs(t)
     }
 
-    private def parseTopicPartitionsToReset(groupId: String, topicArgs: 
Seq[String]): Seq[TopicPartition] = topicArgs.flatMap {
-      case topicArg if topicArg.contains(":") =>
-        val topicPartitions = topicArg.split(":")
-        val topic = topicPartitions(0)
-        topicPartitions(1).split(",").map(partition => new 
TopicPartition(topic, partition.toInt))
-      case topic =>
+    private def parseTopicsWithPartitions(topicArg: String): 
Seq[TopicPartition] = {
+      def partitionNum(partition: String): Int = {
+        try {
+          partition.toInt
+        } catch {
+          case _: NumberFormatException =>
+            throw new IllegalArgumentException(s"Invalid partition 
'$partition' specified in topic arg '$topicArg''")
+        }
+      }
+      topicArg.split(":") match {
+        case Array(topic, partitions) =>
+          partitions.split(",").map(partition => new TopicPartition(topic, 
partitionNum(partition)))
+        case _ =>
+          throw new IllegalArgumentException(s"Invalid topic arg '$topicArg', 
expected topic name and partitions")
+      }
+    }
+
+    private def parseTopicPartitionsToReset(topicArgs: Seq[String]): 
Seq[TopicPartition] = {
+      val (topicsWithPartitions, topics) = topicArgs.partition(_.contains(":"))
+      val specifiedPartitions = 
topicsWithPartitions.flatMap(parseTopicsWithPartitions)
+
+      val unspecifiedPartitions = if (topics.nonEmpty) {
         val descriptionMap = adminClient.describeTopics(
-          Seq(topic).asJava,
+          topics.asJava,
           withTimeoutMs(new DescribeTopicsOptions)
         ).all().get.asScala
-        val r = descriptionMap.flatMap{ case(topic, description) =>
-          description.partitions().asScala.map{ tpInfo =>
+        descriptionMap.flatMap { case (topic, description) =>
+          description.partitions().asScala.map { tpInfo =>
             new TopicPartition(topic, tpInfo.partition)
           }
         }
-        r
+      } else
+        Seq.empty
+      specifiedPartitions ++ unspecifiedPartitions
     }
 
     private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = {
@@ -731,7 +746,7 @@ object ConsumerGroupCommand extends Logging {
         getCommittedOffsets(groupId).keys.toSeq
       } else if (opts.options.has(opts.topicOpt)) {
         val topics = opts.options.valuesOf(opts.topicOpt).asScala
-        parseTopicPartitionsToReset(groupId, topics)
+        parseTopicPartitionsToReset(topics)
       } else {
         if (opts.options.has(opts.resetFromFileOpt))
           Nil
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
new file mode 100644
index 0000000..86bf674
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.util
+import java.util.{Collections, Optional}
+
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, 
ConsumerGroupService}
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RangeAssignor}
+import org.apache.kafka.common.{ConsumerGroupState, KafkaFuture, Node, 
TopicPartition, TopicPartitionInfo}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+
+import scala.jdk.CollectionConverters._
+
+class ConsumerGroupServiceTest {
+
+  private val group = "testGroup"
+  private val topics = (0 until 5).map(i => s"testTopic$i")
+  private val numPartitions = 10
+  private val topicPartitions = topics.flatMap(topic => (0 until 
numPartitions).map(i => new TopicPartition(topic, i)))
+  private val admin = mock(classOf[Admin])
+
+  @Test
+  def testAdminRequestsForDescribeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(listGroupOffsetsResult)
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(listOffsetsResult)
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    assertEquals(topicPartitions.size, assignments.get.size)
+
+    verify(admin, 
times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any())
+    verify(admin, 
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+    verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
+  }
+
+  @Test
+  def testAdminRequestsForResetOffsets(): Unit = {
+    val args = Seq("--bootstrap-server", "localhost:9092", "--group", group, 
"--reset-offsets", "--to-latest")
+    val topicsWithoutPartitionsSpecified = topics.tail
+    val topicArgs = Seq("--topic", s"${topics.head}:${(0 until 
numPartitions).mkString(",")}") ++
+      topicsWithoutPartitionsSpecified.flatMap(topic => Seq("--topic", topic))
+    val groupService = consumerGroupService((args ++ topicArgs).toArray)
+
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.DEAD))
+    
when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified.asJava),
 any()))
+      .thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(listOffsetsResult)
+
+    val resetResult = groupService.resetOffsets()
+    assertEquals(Set(group), resetResult.keySet)
+    assertEquals(topicPartitions.toSet, resetResult(group).keySet)
+
+    verify(admin, 
times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any())
+    verify(admin, 
times(1)).describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified.asJava),
 any())
+    verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
+  }
+
+  private def consumerGroupService(args: Array[String]): ConsumerGroupService 
= {
+    new ConsumerGroupService(new ConsumerGroupCommandOptions(args)) {
+      override protected def createAdminClient(configOverrides: 
collection.Map[String, String]): Admin = {
+        admin
+      }
+    }
+  }
+
+  private def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
+    val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", null)
+    val description = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(member1),
+      classOf[RangeAssignor].getName,
+      groupState,
+      new Node(1, "localhost", 9092))
+    new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
+  }
+
+  private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {
+    val offsets = topicPartitions.map(_ -> new 
OffsetAndMetadata(100)).toMap.asJava
+    AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
+  }
+
+  private def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+    val expectedOffsets = topicPartitions.map(tp => tp -> 
OffsetSpec.latest).toMap
+    ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+      map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+    }
+  }
+
+  private def listOffsetsResult: ListOffsetsResult = {
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val futures = topicPartitions.map(_ -> 
KafkaFuture.completedFuture(resultInfo)).toMap
+    new ListOffsetsResult(futures.asJava)
+  }
+
+  private def describeTopicsResult(topics: Seq[String]): DescribeTopicsResult 
= {
+   val topicDescriptions = topics.map { topic =>
+      val partitions = (0 until numPartitions).map(i => new 
TopicPartitionInfo(i, null, Collections.emptyList[Node], 
Collections.emptyList[Node]))
+      topic -> new TopicDescription(topic, false, partitions.asJava)
+    }.toMap
+    AdminClientTestUtils.describeTopicsResult(topicDescriptions.asJava)
+  }
+}

Reply via email to