This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8a228d122b MINOR: Improve Scala collection usage (#12900)
c8a228d122b is described below

commit c8a228d122b32d9df9f56b43fe55a27a07ee6a12
Author: Divij Vaidya <[email protected]>
AuthorDate: Fri Nov 25 11:14:21 2022 +0100

    MINOR: Improve Scala collection usage (#12900)
    
    
    Reviewers: Mickael Maison <[email protected]>
, Christo Lolov 
<[email protected]>
---
 core/src/main/scala/kafka/Kafka.scala                               | 2 +-
 core/src/main/scala/kafka/admin/ConfigCommand.scala                 | 2 +-
 core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala          | 6 +++---
 core/src/main/scala/kafka/admin/TopicCommand.scala                  | 4 ++--
 core/src/main/scala/kafka/cluster/Partition.scala                   | 2 +-
 core/src/main/scala/kafka/controller/ControllerChannelManager.scala | 2 +-
 core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala     | 2 +-
 core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala           | 2 +-
 core/src/main/scala/kafka/server/DynamicConfig.scala                | 2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala                  | 6 +++---
 core/src/main/scala/kafka/server/ZkAdminManager.scala               | 6 +++---
 core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala  | 4 ++--
 core/src/main/scala/kafka/tools/GetOffsetShell.scala                | 2 +-
 core/src/main/scala/kafka/tools/JmxTool.scala                       | 2 +-
 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala       | 2 +-
 core/src/main/scala/kafka/tools/StateChangeLogMerger.scala          | 2 +-
 core/src/main/scala/kafka/utils/CommandLineUtils.scala              | 4 ++--
 core/src/main/scala/kafka/zk/KafkaZkClient.scala                    | 4 ++--
 core/src/test/scala/kafka/tools/LogCompactionTester.scala           | 2 +-
 core/src/test/scala/unit/kafka/admin/AclCommandTest.scala           | 2 +-
 .../src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala | 6 +++---
 core/src/test/scala/unit/kafka/cluster/PartitionTest.scala          | 4 ++--
 core/src/test/scala/unit/kafka/log/LogCleanerTest.scala             | 4 ++--
 core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala | 2 +-
 .../src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala | 2 +-
 core/src/test/scala/unit/kafka/utils/TestUtils.scala                | 2 +-
 26 files changed, 40 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/Kafka.scala 
b/core/src/main/scala/kafka/Kafka.scala
index 5dc829fd3b1..e1bd575f0c2 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -40,7 +40,7 @@ object Kafka extends Logging {
     // This is a bit of an ugly crutch till we get a chance to rework the 
entire command line parsing
     optionParser.accepts("version", "Print version information and exit.")
 
-    if (args.length == 0 || args.contains("--help")) {
+    if (args.isEmpty || args.contains("--help")) {
       CommandLineUtils.printUsageAndDie(optionParser,
         "USAGE: java [options] %s server.properties [--override 
property=value]*".format(this.getClass.getCanonicalName.split('$').head))
     }
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 9a42f9b874d..c5a62e7eb95 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -213,7 +213,7 @@ object ConfigCommand extends Logging {
       throw new IllegalArgumentException("Password encoder secret not 
specified"))
     PasswordEncoder.encrypting(new Password(encoderSecret),
       None,
-      
encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm),
+      encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderCipherAlgorithmProp, 
Defaults.PasswordEncoderCipherAlgorithm),
       
encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength),
       
encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderIterations))
   }
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 8cc056f617e..f3b67a030f9 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -298,7 +298,7 @@ object ConsumerGroupCommand extends Logging {
                 maxGroupInstanceIdLen =  Math.max(maxGroupInstanceIdLen, 
memberAssignment.groupInstanceId.length)
                 maxHostLen = Math.max(maxHostLen, memberAssignment.host.length)
                 maxClientIdLen = Math.max(maxClientIdLen, 
memberAssignment.clientId.length)
-                includeGroupInstanceId = includeGroupInstanceId || 
memberAssignment.groupInstanceId.length > 0
+                includeGroupInstanceId = includeGroupInstanceId || 
memberAssignment.groupInstanceId.nonEmpty
               }
           }
 
@@ -563,8 +563,8 @@ object ConsumerGroupCommand extends Logging {
         // The admin client returns `null` as a value to indicate that there 
is not committed offset for a partition.
         def getPartitionOffset(tp: TopicPartition): Option[Long] = 
committedOffsets.get(tp).filter(_ != null).map(_.offset)
         var assignedTopicPartitions = ListBuffer[TopicPartition]()
-        val rowsWithConsumer = 
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
-          .sortWith(_.assignment.topicPartitions.size > 
_.assignment.topicPartitions.size).flatMap { consumerSummary =>
+        val rowsWithConsumer = 
consumerGroup.members.asScala.filterNot(_.assignment.topicPartitions.isEmpty).toSeq
+          
.sortBy(_.assignment.topicPartitions.size)(Ordering[Int].reverse).flatMap { 
consumerSummary =>
           val topicPartitions = 
consumerSummary.assignment.topicPartitions.asScala
           assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
           collectConsumerAssignment(groupId, 
Option(consumerGroup.coordinator), topicPartitions.toList,
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 5e7d98c1d2e..e5d60670892 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -104,7 +104,7 @@ object TopicCommand extends Logging {
                               markedForDeletion: Boolean) {
 
     def printDescription(): Unit = {
-      val configsAsString = config.entries.asScala.filter(!_.isDefault).map { 
ce => s"${ce.name}=${ce.value}" }.mkString(",")
+      val configsAsString = config.entries.asScala.filterNot(_.isDefault).map 
{ ce => s"${ce.name}=${ce.value}" }.mkString(",")
       print(s"Topic: $topic")
       if(topicId != Uuid.ZERO_UUID) print(s"\tTopicId: $topicId")
       print(s"\tPartitionCount: $numPartitions")
@@ -604,7 +604,7 @@ object TopicCommand extends Logging {
     def configsToDelete: Option[util.List[String]] = 
valuesAsOption(deleteConfigOpt)
 
     def checkArgs(): Unit = {
-      if (args.length == 0)
+      if (args.isEmpty)
         CommandLineUtils.printUsageAndDie(parser, "Create, delete, describe, 
or change a topic.")
 
       CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to 
create, delete, describe, or change a topic.")
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index e1d2d855783..63f8f908859 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -808,7 +808,7 @@ class Partition(val topicPartition: TopicPartition,
   ): Unit = {
     if (isLeader) {
       val followers = replicas.filter(_ != localBrokerId)
-      val removedReplicas = 
remoteReplicasMap.keys.filter(!followers.contains(_))
+      val removedReplicas = 
remoteReplicasMap.keys.filterNot(followers.contains(_))
 
       // Due to code paths accessing remoteReplicasMap without a lock,
       // first add the new replicas and then remove the old ones
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index d900a7ccea9..f6b9cbd07dd 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -432,7 +432,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
       controllerContext.partitionLeadershipInfo(partition) match {
         case Some(LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)) 
=>
           val replicas = 
controllerContext.partitionReplicaAssignment(partition)
-          val offlineReplicas = 
replicas.filter(!controllerContext.isReplicaOnline(_, partition))
+          val offlineReplicas = 
replicas.filterNot(controllerContext.isReplicaOnline(_, partition))
           val updatedLeaderAndIsr =
             if (beingDeleted) LeaderAndIsr.duringDelete(leaderAndIsr.isr)
             else leaderAndIsr
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index c2e2176856c..3c146dd9c13 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -384,7 +384,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
 
   def currentState: GroupState = state
 
-  def notYetRejoinedMembers: Map[String, MemberMetadata] = 
members.filter(!_._2.isAwaitingJoin).toMap
+  def notYetRejoinedMembers: Map[String, MemberMetadata] = 
members.filterNot(_._2.isAwaitingJoin).toMap
 
   def hasAllMembersJoined: Boolean = members.size == numMembersAwaitingJoin && 
pendingMembers.isEmpty
 
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 161d1f2f364..8a9ed89f3fc 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -97,7 +97,7 @@ trait KafkaMetricsGroup extends Logging {
     if (filteredTags.nonEmpty) {
       // convert dot to _ since reporters like Graphite typically use dot to 
represent hierarchy
       val tagsString = filteredTags
-        .toList.sortWith((t1, t2) => t1._1 < t2._1)
+        .toList.sortBy(_._1)
         .map { case (key, value) => "%s.%s".format(key, 
value.replaceAll("\\.", "_"))}
         .mkString(".")
 
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala 
b/core/src/main/scala/kafka/server/DynamicConfig.scala
index ddcda030ad6..e3479e3641a 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -117,7 +117,7 @@ object DynamicConfig {
     val names = configDef.names()
     val propKeys = props.keySet.asScala.map(_.asInstanceOf[String])
     if (!customPropsAllowed) {
-      val unknownKeys = propKeys.filter(!names.contains(_))
+      val unknownKeys = propKeys.filterNot(names.contains(_))
       require(unknownKeys.isEmpty, s"Unknown Dynamic Configuration: 
$unknownKeys.")
     }
     val propResolved = DynamicBrokerConfig.resolveVariableConfigs(props)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2f9e9e6dba0..98953e338de 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -667,7 +667,7 @@ object KafkaConfig {
   val ZkSslClientEnableDoc = "Set client to use TLS when connecting to 
ZooKeeper." +
     " An explicit value overrides any value set via the 
<code>zookeeper.client.secure</code> system property (note the different 
name)." +
     s" Defaults to false if neither is set; when true, 
<code>$ZkClientCnxnSocketProp</code> must be set (typically to 
<code>org.apache.zookeeper.ClientCnxnSocketNetty</code>); other values to set 
may include " +
-    ZkSslConfigToSystemPropertyMap.keys.toList.sorted.filter(x => x != 
ZkSslClientEnableProp && x != ZkClientCnxnSocketProp).mkString("<code>", 
"</code>, <code>", "</code>")
+    ZkSslConfigToSystemPropertyMap.keys.toList.filter(x => x != 
ZkSslClientEnableProp && x != ZkClientCnxnSocketProp).sorted.mkString("<code>", 
"</code>, <code>", "</code>")
   val ZkClientCnxnSocketDoc = "Typically set to 
<code>org.apache.zookeeper.ClientCnxnSocketNetty</code> when using TLS 
connectivity to ZooKeeper." +
     s" Overrides any explicit value set via the same-named 
<code>${ZkSslConfigToSystemPropertyMap(ZkClientCnxnSocketProp)}</code> system 
property."
   val ZkSslKeyStoreLocationDoc = "Keystore location when using a client-side 
certificate with TLS connectivity to ZooKeeper." +
@@ -1732,7 +1732,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     Option(getString(KafkaConfig.EarlyStartListenersProp)) match {
       case None => controllerListenersSet
       case Some(str) =>
-        str.split(",").map(_.trim()).filter(!_.isEmpty).map { str =>
+        str.split(",").map(_.trim()).filterNot(_.isEmpty).map { str =>
           val listenerName = new ListenerName(str)
           if (!listenersSet.contains(listenerName) && 
!controllerListenersSet.contains(listenerName))
             throw new ConfigException(s"${KafkaConfig.EarlyStartListenersProp} 
contains " +
@@ -2086,7 +2086,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
         mapValue // don't add default mappings since we found something that 
is SSL or SASL_*
       } else {
         // add the PLAINTEXT mappings for all controller listener names that 
are not explicitly PLAINTEXT
-        mapValue ++ 
controllerListenerNames.filter(!SecurityProtocol.PLAINTEXT.name.equals(_)).map(
+        mapValue ++ 
controllerListenerNames.filterNot(SecurityProtocol.PLAINTEXT.name.equals(_)).map(
           new ListenerName(_) -> SecurityProtocol.PLAINTEXT)
       }
     } else {
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala 
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index f65367606da..634ce6b097c 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -493,7 +493,7 @@ class ZkAdminManager(val config: KafkaConfig,
 
         resource.`type` match {
           case ConfigResource.Type.TOPIC =>
-            if (resource.name.isEmpty()) {
+            if (resource.name.isEmpty) {
               throw new InvalidRequestException("Default topic resources are 
not allowed.")
             }
             val configProps = 
adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name)
@@ -696,11 +696,11 @@ class ZkAdminManager(val config: KafkaConfig,
     def matches(nameComponent: Option[ClientQuotaFilterComponent], name: 
Option[String]): Boolean = nameComponent match {
       case Some(component) =>
         toOption(component.`match`) match {
-          case Some(n) => name.exists(_ == n)
+          case Some(n) => name.contains(n)
           case None => name.isDefined
         }
       case None =>
-        !name.isDefined || !strict
+        name.isEmpty || !strict
     }
 
     (userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) 
=>
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 52577211503..7bd1c6343d9 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -213,7 +213,7 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
   override def getAliveBrokers(): Iterable[BrokerMetadata] = 
getAliveBrokers(_currentImage)
 
   private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] 
= {
-    image.cluster().brokers().values().asScala.filter(!_.fenced()).
+    image.cluster().brokers().values().asScala.filterNot(_.fenced()).
       map(b => BrokerMetadata(b.id, b.rack.asScala))
   }
 
@@ -223,7 +223,7 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
   }
 
   override def getAliveBrokerNodes(listenerName: ListenerName): Seq[Node] = {
-    _currentImage.cluster().brokers().values().asScala.filter(!_.fenced()).
+    _currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()).
       flatMap(_.node(listenerName.value()).asScala).toSeq
   }
 
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala 
b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 03f9c819260..a8fd87cbe87 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -81,7 +81,7 @@ object GetOffsetShell {
                            .ofType(classOf[String])
     val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", 
s"By default, internal topics are included. If specified, internal topics are 
excluded.")
 
-    if (args.length == 0)
+    if (args.isEmpty)
       CommandLineUtils.printUsageAndDie(parser, "An interactive shell for 
getting topic-partition offsets.")
 
     val options = parser.parse(args : _*)
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala 
b/core/src/main/scala/kafka/tools/JmxTool.scala
index f7ace833728..c0f6d4a5ead 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -98,7 +98,7 @@ object JmxTool extends Logging {
     val helpOpt = parser.accepts("help", "Print usage information.")
 
 
-    if(args.length == 0)
+    if(args.isEmpty)
       CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard 
output.")
 
     val options = parser.parse(args : _*)
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index a9bd2c936e1..fb687e4f8e8 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -114,7 +114,7 @@ object ReplicaVerificationTool extends Logging {
 
     val options = parser.parse(args: _*)
 
-    if (args.length == 0 || options.has(helpOpt)) {
+    if (args.isEmpty || options.has(helpOpt)) {
       CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas 
for a set of topics have the same data.")
     }
 
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index 1a7f679fe36..32e43a6606d 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -88,7 +88,7 @@ object StateChangeLogMerger extends Logging {
                               .ofType(classOf[String])
                               .defaultsTo("9999-12-31 23:59:59,999")
                               
-    if(args.length == 0)
+    if(args.isEmpty)
       CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log 
files from several brokers to reconnstruct a unified history of what happened.")
 
 
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala 
b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 1eaee484168..0aa233babcd 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -33,7 +33,7 @@ object CommandLineUtils extends Logging {
     * @return true on matching the help check condition
     */
   def isPrintHelpNeeded(commandOpts: CommandDefaultOptions): Boolean = {
-    commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt)
+    commandOpts.args.isEmpty || commandOpts.options.has(commandOpts.helpOpt)
   }
 
   def isPrintVersionNeeded(commandOpts: CommandDefaultOptions): Boolean = {
@@ -113,7 +113,7 @@ object CommandLineUtils extends Logging {
    * value may contain equals sign
    */
   def parseKeyValueArgs(args: Iterable[String], acceptMissingValue: Boolean = 
true): Properties = {
-    val splits = args.map(_.split("=", 2)).filterNot(_.length == 0)
+    val splits = args.map(_.split("=", 2)).filterNot(_.isEmpty)
 
     val props = new Properties
     for (a <- splits) {
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index fa7ce00882a..747673d37db 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -211,7 +211,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    * @return sequence of CreateResponse whose contexts are the partitions they 
are associated with.
    */
   def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: 
Map[TopicPartition, LeaderIsrAndControllerEpoch], 
expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
-    
createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq,
 expectedControllerEpochZkVersion)
+    
createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSeq.distinct,
 expectedControllerEpochZkVersion)
     createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq, 
expectedControllerEpochZkVersion)
     val createRequests = leaderIsrAndControllerEpochs.map { case (partition, 
leaderIsrAndControllerEpoch) =>
       val path = TopicPartitionStateZNode.path(partition)
@@ -701,7 +701,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
       val topic = topicAndPartitionMap._1
       val partitionMap = topicAndPartitionMap._2
-      topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
+      topic -> partitionMap.keys.toSeq.sorted
     }
   }
 
diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala 
b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
index da8a3c03a17..1b3b9f5b18f 100755
--- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala
+++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
@@ -97,7 +97,7 @@ object LogCompactionTester {
 
     val options = parser.parse(args: _*)
 
-    if (args.length == 0)
+    if (args.isEmpty)
       CommandLineUtils.printUsageAndDie(parser, "A tool to test log 
compaction. Valid options are: ")
 
     CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, 
numMessagesOpt)
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index b5d9692040e..dd81ede4604 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -175,7 +175,7 @@ class AclCommandTest extends QuorumTestHarness with Logging 
{
   private def assertOutputContains(prefix: String, resources: 
Set[ResourcePattern], resourceCmd: Array[String], output: String): Unit = {
     resources.foreach { resource =>
       val resourceType = resource.resourceType.toString
-      (if (resource == ClusterResource) Array("kafka-cluster") else 
resourceCmd.filter(!_.startsWith("--"))).foreach { name =>
+      (if (resource == ClusterResource) Array("kafka-cluster") else 
resourceCmd.filterNot(_.startsWith("--"))).foreach { name =>
         val expected = s"$prefix for resource 
`ResourcePattern(resourceType=$resourceType, name=$name, patternType=LITERAL)`:"
         assertTrue(output.contains(expected), s"Substring $expected not in 
output:\n$output")
       }
diff --git 
a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala 
b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 3815705971c..d8127746163 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -184,7 +184,7 @@ class DescribeConsumerGroupTest extends 
ConsumerGroupCommandTest {
 
       TestUtils.waitUntilTrue(() => {
         val (output, error) = 
TestUtils.grabConsoleOutputAndError(service.describeGroups())
-        val numLines = output.trim.split("\n").filterNot(line => 
line.isEmpty).length
+        val numLines = output.trim.split("\n").count(line => line.nonEmpty)
         (numLines == expectedNumLines) && error.isEmpty
       }, s"Expected a data row and no error in describe results with describe 
type ${describeType.mkString(" ")}.")
     }
@@ -208,7 +208,7 @@ class DescribeConsumerGroupTest extends 
ConsumerGroupCommandTest {
 
       TestUtils.waitUntilTrue(() => {
         val (output, error) = 
TestUtils.grabConsoleOutputAndError(service.describeGroups())
-        val numLines = output.trim.split("\n").filterNot(line => 
line.isEmpty).length
+        val numLines = output.trim.split("\n").count(line => line.nonEmpty)
         (numLines == expectedNumLines) && error.isEmpty
       }, s"Expected a data row and no error in describe results with describe 
type ${describeType.mkString(" ")}.")
     }
@@ -468,7 +468,7 @@ class DescribeConsumerGroupTest extends 
ConsumerGroupCommandTest {
         assignments.get.count(_.group == group) == 2 &&
         assignments.get.count { x => x.group == group && x.numPartitions == 1 
} == 1 &&
         assignments.get.count { x => x.group == group && x.numPartitions == 0 
} == 1 &&
-        assignments.get.count(_.assignment.nonEmpty) == 0
+        !assignments.get.exists(_.assignment.nonEmpty)
     }, "Expected rows for consumers with no assigned partitions in describe 
group results")
 
     val (state, assignments) = service.collectGroupMembers(group, true)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 7abff0dc3ff..0c5453b54d3 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -2334,9 +2334,9 @@ class PartitionTest extends AbstractPartitionTest {
       "AtMinIsr")
 
     def getMetric(metric: String): Option[Metric] = {
-      KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.filter { case 
(metricName, _) =>
+      KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.find { case 
(metricName, _) =>
         metricName.getName == metric && metricName.getType == "Partition"
-      }.headOption.map(_._2)
+      }.map(_._2)
     }
 
     assertTrue(metricsToCheck.forall(getMetric(_).isDefined))
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 34963a41e06..819278bd280 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -86,7 +86,7 @@ class LogCleanerTest {
     val segments = log.logSegments.take(3).toSeq
     val stats = new CleanerStats()
     val expectedBytesRead = segments.map(_.size).sum
-    val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
+    val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
     cleaner.cleanSegments(log, segments, map, 0L, stats, new 
CleanedTransactionMetadata, -1)
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
     assertEquals(expectedBytesRead, stats.bytesRead)
@@ -844,7 +844,7 @@ class LogCleanerTest {
     // clean the log
     val stats = new CleanerStats()
     cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new 
CleanedTransactionMetadata, -1)
-    val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
+    val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index 904fbbc2165..4ff43cfc6af 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -588,7 +588,7 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
   private def alterClientQuotas(request: Map[ClientQuotaEntity, Map[String, 
Option[Double]]], validateOnly: Boolean) = {
     val entries = request.map { case (entity, alter) =>
       val ops = alter.map { case (key, value) =>
-        new ClientQuotaAlteration.Op(key, 
value.map(Double.box).getOrElse(null))
+        new ClientQuotaAlteration.Op(key, value.map(Double.box).orNull)
       }.asJavaCollection
       new ClientQuotaAlteration(entity, ops)
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 40b839fdd38..db13f225359 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -48,7 +48,7 @@ object KafkaMetricsReporterTest {
     }
 
     private def contextLabelOrNull(name: String, metricsContext: 
MetricsContext): String = {
-      Option(metricsContext.contextLabels().get(name)).flatMap(v => 
Option(v.toString())).getOrElse(null)
+      Option(metricsContext.contextLabels().get(name)).flatMap(v => 
Option(v.toString())).orNull
     }
 
     override def configure(configs: util.Map[String, _]): Unit = {}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 882f64b569c..0162ebdd5e3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1895,7 +1895,7 @@ object TestUtils extends Logging {
   def alterClientQuotas(adminClient: Admin, request: Map[ClientQuotaEntity, 
Map[String, Option[Double]]]): AlterClientQuotasResult = {
     val entries = request.map { case (entity, alter) =>
       val ops = alter.map { case (key, value) =>
-        new ClientQuotaAlteration.Op(key, 
value.map(Double.box).getOrElse(null))
+        new ClientQuotaAlteration.Op(key, value.map(Double.box).orNull)
       }.asJavaCollection
       new ClientQuotaAlteration(entity, ops)
     }.asJavaCollection

Reply via email to