urbandan commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r563673881



##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))
+        tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics)
+      } else if (parts.length == 2) {
+        val partitionFilter = createPartitionFilter(parts(1))
+
+        if (parts(0).trim().isEmpty) {
+          tp: PartitionInfo => partitionFilter.apply(tp.partition)
+        } else {
+          val whitelist = IncludeList(parts(0))
+          tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics) && partitionFilter.apply(tp.partition)
+        }
+      } else {
+        throw new IllegalArgumentException(s"Invalid topic-partition rule: 
$ruleSpec")
+      }
+    })
+
+    tp => rules.exists(rule => rule.apply(tp))
+  }
+
+  /**
+   * Creates a partition filter based on a single id or a range.
+   * Expected format:
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+   */
+  private def createPartitionFilter(spec: String): Int => Boolean = {
+    if (spec.indexOf('-') != -1) {
+      val rangeParts = spec.split("-", -1)
+      if(rangeParts.length != 2 || rangeParts(0).isEmpty && 
rangeParts(1).isEmpty) {
+        throw new IllegalArgumentException(s"Invalid range specification: 
$spec")
+      }
+
+      if(rangeParts(0).isEmpty) {
+        val max = rangeParts(1).toInt
+        partition: Int => partition < max
+      } else if(rangeParts(1).isEmpty) {
+        val min = rangeParts(0).toInt
+        partition: Int => partition >= min
+      } else {
+        val min = rangeParts(0).toInt
+        val max = rangeParts(1).toInt
+
+        if (min > max) {
+          throw new IllegalArgumentException(s"Range lower bound cannot be 
greater than upper bound: $spec")
+        }
+
+        partition: Int => partition >= min && partition < max
+      }
+    } else {
+      val number = spec.toInt
+      partition: Int => partition == number
+    }
   }
 
+  /**
+   * Creates a topic-partition filter based on a topic pattern and a set of 
partition ids.
+   */
+  private def createTopicPartitionFilterWithTopicAndPartitionPattern(topicOpt: 
Option[String], excludeInternalTopics: Boolean, partitionIds: Set[Int]): 
Option[PartitionInfo => Boolean] = {
+    topicOpt match {
+      case Some(topic) =>
+        val topicsFilter = IncludeList(topic)
+        if(partitionIds.isEmpty)
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics))
+        else
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics) && partitionIds.contains(t.partition))
+      case None =>
+        if(excludeInternalTopics) {
+          if(partitionIds.isEmpty)
+            Some(t => !Topic.isInternal(t.topic))
+          else
+            Some(t => !Topic.isInternal(t.topic) && 
partitionIds.contains(t.partition))
+        } else {
+          if(partitionIds.isEmpty)
+            None
+          else
+            Some(t => partitionIds.contains(t.partition))
+        }
+    }

Review comment:
       I was mostly focusing on avoiding unnecessary filtering, but you are 
right, it shouldn't be an issue, your code is much cleaner




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to