This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5fc3fd4  MINOR: Use TopicPartition in ConsumerGroupCommand instead of 
TopicAndPartition where possible (#4333)
5fc3fd4 is described below

commit 5fc3fd48b0f43056e524b4c0f35aea3e3f9653e8
Author: Vahid Hashemian <[email protected]>
AuthorDate: Fri Dec 22 18:37:36 2017 -0800

    MINOR: Use TopicPartition in ConsumerGroupCommand instead of 
TopicAndPartition where possible (#4333)
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 40 +++++++++++-----------
 1 file changed, 20 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 918593b..e835510 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -284,8 +284,8 @@ object ConsumerGroupCommand extends Logging {
 
     protected def collectConsumerAssignment(group: String,
                                             coordinator: Option[Node],
-                                            topicPartitions: 
Seq[TopicAndPartition],
-                                            getPartitionOffset: 
TopicAndPartition => Option[Long],
+                                            topicPartitions: 
Seq[TopicPartition],
+                                            getPartitionOffset: TopicPartition 
=> Option[Long],
                                             consumerIdOpt: Option[String],
                                             hostOpt: Option[String],
                                             clientIdOpt: Option[String]): 
Array[PartitionAssignmentState] = {
@@ -328,7 +328,6 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-
     def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new 
UnsupportedOperationException
 
     def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, 
OffsetAndMetadata]): String = throw new UnsupportedOperationException
@@ -396,18 +395,18 @@ object ConsumerGroupCommand extends Logging {
         topicsByConsumerId(consumerId).flatMap { _ =>
           // since consumers with no topic partitions are processed here, we 
pass empty for topic partitions and offsets
           // since consumer id is repeated in client id, leave host and client 
id empty
-          collectConsumerAssignment(group, None, Array[TopicAndPartition](), 
Map[TopicAndPartition, Option[Long]](), Some(consumerId), None, None)
+          collectConsumerAssignment(group, None, Array[TopicPartition](), 
Map[TopicPartition, Option[Long]](), Some(consumerId), None, None)
         }
       }
 
       (None, Some(assignmentRows))
     }
 
-    private def getAllTopicPartitions(topics: Seq[String]): 
Seq[TopicAndPartition] = {
+    private def getAllTopicPartitions(topics: Seq[String]): 
Seq[TopicPartition] = {
       val topicPartitionMap = zkUtils.getPartitionsForTopics(topics)
       topics.flatMap { topic =>
         val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
-        partitions.map(TopicAndPartition(topic, _))
+        partitions.map(new TopicPartition(topic, _))
       }
     }
 
@@ -416,7 +415,7 @@ object ConsumerGroupCommand extends Logging {
         case Some(-1) => LogOffsetResult.Unknown
         case Some(brokerId) =>
           getZkConsumer(brokerId).map { consumer =>
-            val topicAndPartition = TopicAndPartition(topicPartition.topic, 
topicPartition.partition)
+            val topicAndPartition = new TopicAndPartition(topicPartition)
             val request = OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
             val logEndOffset = 
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
             consumer.close()
@@ -429,12 +428,12 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def getPartitionOffsets(group: String,
-                                    topicPartitions: Seq[TopicAndPartition],
+                                    topicPartitions: Seq[TopicPartition],
                                     channelSocketTimeoutMs: Int,
-                                    channelRetryBackoffMs: Int): 
Map[TopicAndPartition, Long] = {
+                                    channelRetryBackoffMs: Int): 
Map[TopicPartition, Long] = {
       val offsetMap = mutable.Map[TopicAndPartition, Long]()
       val channel = ClientUtils.channelToOffsetManager(group, zkUtils, 
channelSocketTimeoutMs, channelRetryBackoffMs)
-      channel.send(OffsetFetchRequest(group, topicPartitions))
+      channel.send(OffsetFetchRequest(group, topicPartitions.map(new 
TopicAndPartition(_))))
       val offsetFetchResponse = 
OffsetFetchResponse.readFrom(channel.receive().payload())
 
       offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, 
offsetAndMetadata) =>
@@ -457,7 +456,9 @@ object ConsumerGroupCommand extends Logging {
         }
       }
       channel.disconnect()
-      offsetMap.toMap
+      offsetMap.map { case (topicAndPartition, offset) =>
+        (new TopicPartition(topicAndPartition.topic, 
topicAndPartition.partition), offset)
+      }.toMap
     }
 
     private def deleteForGroup() {
@@ -541,10 +542,10 @@ object ConsumerGroupCommand extends Logging {
                 List[PartitionAssignmentState]()
               else {
                 
consumers.filter(_.assignment.nonEmpty).sortWith(_.assignment.size > 
_.assignment.size).flatMap { consumerSummary =>
-                  val topicPartitions = consumerSummary.assignment.map(tp => 
TopicAndPartition(tp.topic, tp.partition))
+                  val topicPartitions = consumerSummary.assignment
                   assignedTopicPartitions = assignedTopicPartitions ++ 
consumerSummary.assignment
-                  val partitionOffsets: Map[TopicAndPartition, Option[Long]] = 
consumerSummary.assignment.map { topicPartition =>
-                    new TopicAndPartition(topicPartition) -> 
offsets.get(topicPartition)
+                  val partitionOffsets: Map[TopicPartition, Option[Long]] = 
consumerSummary.assignment.map { topicPartition =>
+                    new TopicPartition(topicPartition.topic, 
topicPartition.partition) -> offsets.get(topicPartition)
                   }.toMap
                   collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), topicPartitions,
                     partitionOffsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
@@ -554,9 +555,8 @@ object ConsumerGroupCommand extends Logging {
 
             val rowsWithoutConsumer = 
offsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
               case (topicPartition, offset) =>
-                val topicAndPartition = new TopicAndPartition(topicPartition)
-                collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), Seq(topicAndPartition),
-                    Map(topicAndPartition -> Some(offset)), 
Some(MISSING_COLUMN_VALUE),
+                collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), Seq(topicPartition),
+                    Map(topicPartition -> Some(offset)), 
Some(MISSING_COLUMN_VALUE),
                     Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
             }
 
@@ -658,9 +658,9 @@ object ConsumerGroupCommand extends Logging {
 
     private def parseTopicPartitionsToReset(topicArgs: Seq[String]): 
Seq[TopicPartition] = topicArgs.flatMap {
       case topicArg if topicArg.contains(":") =>
-        val topicAndPartitions = topicArg.split(":")
-        val topic = topicAndPartitions(0)
-        topicAndPartitions(1).split(",").map(partition => new 
TopicPartition(topic, partition.toInt))
+        val topicPartitions = topicArg.split(":")
+        val topic = topicPartitions(0)
+        topicPartitions(1).split(",").map(partition => new 
TopicPartition(topic, partition.toInt))
       case topic => getConsumer.partitionsFor(topic).asScala
         .map(partitionInfo => new TopicPartition(topic, 
partitionInfo.partition))
     }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to