This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8a228d122b MINOR: Improve Scala collection usage (#12900)
c8a228d122b is described below
commit c8a228d122b32d9df9f56b43fe55a27a07ee6a12
Author: Divij Vaidya <[email protected]>
AuthorDate: Fri Nov 25 11:14:21 2022 +0100
MINOR: Improve Scala collection usage (#12900)
Reviewers: Mickael Maison <[email protected]>
, Christo Lolov
<[email protected]>
---
core/src/main/scala/kafka/Kafka.scala | 2 +-
core/src/main/scala/kafka/admin/ConfigCommand.scala | 2 +-
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala | 6 +++---
core/src/main/scala/kafka/admin/TopicCommand.scala | 4 ++--
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
core/src/main/scala/kafka/controller/ControllerChannelManager.scala | 2 +-
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala | 2 +-
core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala | 2 +-
core/src/main/scala/kafka/server/DynamicConfig.scala | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++---
core/src/main/scala/kafka/server/ZkAdminManager.scala | 6 +++---
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala | 4 ++--
core/src/main/scala/kafka/tools/GetOffsetShell.scala | 2 +-
core/src/main/scala/kafka/tools/JmxTool.scala | 2 +-
core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala | 2 +-
core/src/main/scala/kafka/tools/StateChangeLogMerger.scala | 2 +-
core/src/main/scala/kafka/utils/CommandLineUtils.scala | 4 ++--
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 4 ++--
core/src/test/scala/kafka/tools/LogCompactionTester.scala | 2 +-
core/src/test/scala/unit/kafka/admin/AclCommandTest.scala | 2 +-
.../src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala | 6 +++---
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 4 ++--
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 4 ++--
core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala | 2 +-
.../src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala | 2 +-
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
26 files changed, 40 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/kafka/Kafka.scala
b/core/src/main/scala/kafka/Kafka.scala
index 5dc829fd3b1..e1bd575f0c2 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -40,7 +40,7 @@ object Kafka extends Logging {
// This is a bit of an ugly crutch till we get a chance to rework the
entire command line parsing
optionParser.accepts("version", "Print version information and exit.")
- if (args.length == 0 || args.contains("--help")) {
+ if (args.isEmpty || args.contains("--help")) {
CommandLineUtils.printUsageAndDie(optionParser,
"USAGE: java [options] %s server.properties [--override
property=value]*".format(this.getClass.getCanonicalName.split('$').head))
}
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 9a42f9b874d..c5a62e7eb95 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -213,7 +213,7 @@ object ConfigCommand extends Logging {
throw new IllegalArgumentException("Password encoder secret not
specified"))
PasswordEncoder.encrypting(new Password(encoderSecret),
None,
-
encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm),
+ encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderCipherAlgorithmProp,
Defaults.PasswordEncoderCipherAlgorithm),
encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength),
encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderIterations))
}
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 8cc056f617e..f3b67a030f9 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -298,7 +298,7 @@ object ConsumerGroupCommand extends Logging {
maxGroupInstanceIdLen = Math.max(maxGroupInstanceIdLen,
memberAssignment.groupInstanceId.length)
maxHostLen = Math.max(maxHostLen, memberAssignment.host.length)
maxClientIdLen = Math.max(maxClientIdLen,
memberAssignment.clientId.length)
- includeGroupInstanceId = includeGroupInstanceId ||
memberAssignment.groupInstanceId.length > 0
+ includeGroupInstanceId = includeGroupInstanceId ||
memberAssignment.groupInstanceId.nonEmpty
}
}
@@ -563,8 +563,8 @@ object ConsumerGroupCommand extends Logging {
// The admin client returns `null` as a value to indicate that there
is not committed offset for a partition.
def getPartitionOffset(tp: TopicPartition): Option[Long] =
committedOffsets.get(tp).filter(_ != null).map(_.offset)
var assignedTopicPartitions = ListBuffer[TopicPartition]()
- val rowsWithConsumer =
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
- .sortWith(_.assignment.topicPartitions.size >
_.assignment.topicPartitions.size).flatMap { consumerSummary =>
+ val rowsWithConsumer =
consumerGroup.members.asScala.filterNot(_.assignment.topicPartitions.isEmpty).toSeq
+
.sortBy(_.assignment.topicPartitions.size)(Ordering[Int].reverse).flatMap {
consumerSummary =>
val topicPartitions =
consumerSummary.assignment.topicPartitions.asScala
assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
collectConsumerAssignment(groupId,
Option(consumerGroup.coordinator), topicPartitions.toList,
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 5e7d98c1d2e..e5d60670892 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -104,7 +104,7 @@ object TopicCommand extends Logging {
markedForDeletion: Boolean) {
def printDescription(): Unit = {
- val configsAsString = config.entries.asScala.filter(!_.isDefault).map {
ce => s"${ce.name}=${ce.value}" }.mkString(",")
+ val configsAsString = config.entries.asScala.filterNot(_.isDefault).map
{ ce => s"${ce.name}=${ce.value}" }.mkString(",")
print(s"Topic: $topic")
if(topicId != Uuid.ZERO_UUID) print(s"\tTopicId: $topicId")
print(s"\tPartitionCount: $numPartitions")
@@ -604,7 +604,7 @@ object TopicCommand extends Logging {
def configsToDelete: Option[util.List[String]] =
valuesAsOption(deleteConfigOpt)
def checkArgs(): Unit = {
- if (args.length == 0)
+ if (args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "Create, delete, describe,
or change a topic.")
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to
create, delete, describe, or change a topic.")
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index e1d2d855783..63f8f908859 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -808,7 +808,7 @@ class Partition(val topicPartition: TopicPartition,
): Unit = {
if (isLeader) {
val followers = replicas.filter(_ != localBrokerId)
- val removedReplicas =
remoteReplicasMap.keys.filter(!followers.contains(_))
+ val removedReplicas =
remoteReplicasMap.keys.filterNot(followers.contains(_))
// Due to code paths accessing remoteReplicasMap without a lock,
// first add the new replicas and then remove the old ones
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index d900a7ccea9..f6b9cbd07dd 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -432,7 +432,7 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
controllerContext.partitionLeadershipInfo(partition) match {
case Some(LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch))
=>
val replicas =
controllerContext.partitionReplicaAssignment(partition)
- val offlineReplicas =
replicas.filter(!controllerContext.isReplicaOnline(_, partition))
+ val offlineReplicas =
replicas.filterNot(controllerContext.isReplicaOnline(_, partition))
val updatedLeaderAndIsr =
if (beingDeleted) LeaderAndIsr.duringDelete(leaderAndIsr.isr)
else leaderAndIsr
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index c2e2176856c..3c146dd9c13 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -384,7 +384,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
def currentState: GroupState = state
- def notYetRejoinedMembers: Map[String, MemberMetadata] =
members.filter(!_._2.isAwaitingJoin).toMap
+ def notYetRejoinedMembers: Map[String, MemberMetadata] =
members.filterNot(_._2.isAwaitingJoin).toMap
def hasAllMembersJoined: Boolean = members.size == numMembersAwaitingJoin &&
pendingMembers.isEmpty
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 161d1f2f364..8a9ed89f3fc 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -97,7 +97,7 @@ trait KafkaMetricsGroup extends Logging {
if (filteredTags.nonEmpty) {
// convert dot to _ since reporters like Graphite typically use dot to
represent hierarchy
val tagsString = filteredTags
- .toList.sortWith((t1, t2) => t1._1 < t2._1)
+ .toList.sortBy(_._1)
.map { case (key, value) => "%s.%s".format(key,
value.replaceAll("\\.", "_"))}
.mkString(".")
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala
b/core/src/main/scala/kafka/server/DynamicConfig.scala
index ddcda030ad6..e3479e3641a 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -117,7 +117,7 @@ object DynamicConfig {
val names = configDef.names()
val propKeys = props.keySet.asScala.map(_.asInstanceOf[String])
if (!customPropsAllowed) {
- val unknownKeys = propKeys.filter(!names.contains(_))
+ val unknownKeys = propKeys.filterNot(names.contains(_))
require(unknownKeys.isEmpty, s"Unknown Dynamic Configuration:
$unknownKeys.")
}
val propResolved = DynamicBrokerConfig.resolveVariableConfigs(props)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2f9e9e6dba0..98953e338de 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -667,7 +667,7 @@ object KafkaConfig {
val ZkSslClientEnableDoc = "Set client to use TLS when connecting to
ZooKeeper." +
" An explicit value overrides any value set via the
<code>zookeeper.client.secure</code> system property (note the different
name)." +
s" Defaults to false if neither is set; when true,
<code>$ZkClientCnxnSocketProp</code> must be set (typically to
<code>org.apache.zookeeper.ClientCnxnSocketNetty</code>); other values to set
may include " +
- ZkSslConfigToSystemPropertyMap.keys.toList.sorted.filter(x => x !=
ZkSslClientEnableProp && x != ZkClientCnxnSocketProp).mkString("<code>",
"</code>, <code>", "</code>")
+ ZkSslConfigToSystemPropertyMap.keys.toList.filter(x => x !=
ZkSslClientEnableProp && x != ZkClientCnxnSocketProp).sorted.mkString("<code>",
"</code>, <code>", "</code>")
val ZkClientCnxnSocketDoc = "Typically set to
<code>org.apache.zookeeper.ClientCnxnSocketNetty</code> when using TLS
connectivity to ZooKeeper." +
s" Overrides any explicit value set via the same-named
<code>${ZkSslConfigToSystemPropertyMap(ZkClientCnxnSocketProp)}</code> system
property."
val ZkSslKeyStoreLocationDoc = "Keystore location when using a client-side
certificate with TLS connectivity to ZooKeeper." +
@@ -1732,7 +1732,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
Option(getString(KafkaConfig.EarlyStartListenersProp)) match {
case None => controllerListenersSet
case Some(str) =>
- str.split(",").map(_.trim()).filter(!_.isEmpty).map { str =>
+ str.split(",").map(_.trim()).filterNot(_.isEmpty).map { str =>
val listenerName = new ListenerName(str)
if (!listenersSet.contains(listenerName) &&
!controllerListenersSet.contains(listenerName))
throw new ConfigException(s"${KafkaConfig.EarlyStartListenersProp}
contains " +
@@ -2086,7 +2086,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
mapValue // don't add default mappings since we found something that
is SSL or SASL_*
} else {
// add the PLAINTEXT mappings for all controller listener names that
are not explicitly PLAINTEXT
- mapValue ++
controllerListenerNames.filter(!SecurityProtocol.PLAINTEXT.name.equals(_)).map(
+ mapValue ++
controllerListenerNames.filterNot(SecurityProtocol.PLAINTEXT.name.equals(_)).map(
new ListenerName(_) -> SecurityProtocol.PLAINTEXT)
}
} else {
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index f65367606da..634ce6b097c 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -493,7 +493,7 @@ class ZkAdminManager(val config: KafkaConfig,
resource.`type` match {
case ConfigResource.Type.TOPIC =>
- if (resource.name.isEmpty()) {
+ if (resource.name.isEmpty) {
throw new InvalidRequestException("Default topic resources are
not allowed.")
}
val configProps =
adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name)
@@ -696,11 +696,11 @@ class ZkAdminManager(val config: KafkaConfig,
def matches(nameComponent: Option[ClientQuotaFilterComponent], name:
Option[String]): Boolean = nameComponent match {
case Some(component) =>
toOption(component.`match`) match {
- case Some(n) => name.exists(_ == n)
+ case Some(n) => name.contains(n)
case None => name.isDefined
}
case None =>
- !name.isDefined || !strict
+ name.isEmpty || !strict
}
(userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p)
=>
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 52577211503..7bd1c6343d9 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -213,7 +213,7 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
override def getAliveBrokers(): Iterable[BrokerMetadata] =
getAliveBrokers(_currentImage)
private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata]
= {
- image.cluster().brokers().values().asScala.filter(!_.fenced()).
+ image.cluster().brokers().values().asScala.filterNot(_.fenced()).
map(b => BrokerMetadata(b.id, b.rack.asScala))
}
@@ -223,7 +223,7 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
}
override def getAliveBrokerNodes(listenerName: ListenerName): Seq[Node] = {
- _currentImage.cluster().brokers().values().asScala.filter(!_.fenced()).
+ _currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()).
flatMap(_.node(listenerName.value()).asScala).toSeq
}
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 03f9c819260..a8fd87cbe87 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -81,7 +81,7 @@ object GetOffsetShell {
.ofType(classOf[String])
val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics",
s"By default, internal topics are included. If specified, internal topics are
excluded.")
- if (args.length == 0)
+ if (args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "An interactive shell for
getting topic-partition offsets.")
val options = parser.parse(args : _*)
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala
b/core/src/main/scala/kafka/tools/JmxTool.scala
index f7ace833728..c0f6d4a5ead 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -98,7 +98,7 @@ object JmxTool extends Logging {
val helpOpt = parser.accepts("help", "Print usage information.")
- if(args.length == 0)
+ if(args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard
output.")
val options = parser.parse(args : _*)
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index a9bd2c936e1..fb687e4f8e8 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -114,7 +114,7 @@ object ReplicaVerificationTool extends Logging {
val options = parser.parse(args: _*)
- if (args.length == 0 || options.has(helpOpt)) {
+ if (args.isEmpty || options.has(helpOpt)) {
CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas
for a set of topics have the same data.")
}
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index 1a7f679fe36..32e43a6606d 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -88,7 +88,7 @@ object StateChangeLogMerger extends Logging {
.ofType(classOf[String])
.defaultsTo("9999-12-31 23:59:59,999")
- if(args.length == 0)
+ if(args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log
files from several brokers to reconnstruct a unified history of what happened.")
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 1eaee484168..0aa233babcd 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -33,7 +33,7 @@ object CommandLineUtils extends Logging {
* @return true on matching the help check condition
*/
def isPrintHelpNeeded(commandOpts: CommandDefaultOptions): Boolean = {
- commandOpts.args.length == 0 ||
commandOpts.options.has(commandOpts.helpOpt)
+ commandOpts.args.isEmpty || commandOpts.options.has(commandOpts.helpOpt)
}
def isPrintVersionNeeded(commandOpts: CommandDefaultOptions): Boolean = {
@@ -113,7 +113,7 @@ object CommandLineUtils extends Logging {
* value may contain equals sign
*/
def parseKeyValueArgs(args: Iterable[String], acceptMissingValue: Boolean =
true): Properties = {
- val splits = args.map(_.split("=", 2)).filterNot(_.length == 0)
+ val splits = args.map(_.split("=", 2)).filterNot(_.isEmpty)
val props = new Properties
for (a <- splits) {
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index fa7ce00882a..747673d37db 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -211,7 +211,7 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
* @return sequence of CreateResponse whose contexts are the partitions they
are associated with.
*/
def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs:
Map[TopicPartition, LeaderIsrAndControllerEpoch],
expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
-
createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq,
expectedControllerEpochZkVersion)
+
createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSeq.distinct,
expectedControllerEpochZkVersion)
createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq,
expectedControllerEpochZkVersion)
val createRequests = leaderIsrAndControllerEpochs.map { case (partition,
leaderIsrAndControllerEpoch) =>
val path = TopicPartitionStateZNode.path(partition)
@@ -701,7 +701,7 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
val topic = topicAndPartitionMap._1
val partitionMap = topicAndPartitionMap._2
- topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
+ topic -> partitionMap.keys.toSeq.sorted
}
}
diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala
b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
index da8a3c03a17..1b3b9f5b18f 100755
--- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala
+++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
@@ -97,7 +97,7 @@ object LogCompactionTester {
val options = parser.parse(args: _*)
- if (args.length == 0)
+ if (args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "A tool to test log
compaction. Valid options are: ")
CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt,
numMessagesOpt)
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index b5d9692040e..dd81ede4604 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -175,7 +175,7 @@ class AclCommandTest extends QuorumTestHarness with Logging
{
private def assertOutputContains(prefix: String, resources:
Set[ResourcePattern], resourceCmd: Array[String], output: String): Unit = {
resources.foreach { resource =>
val resourceType = resource.resourceType.toString
- (if (resource == ClusterResource) Array("kafka-cluster") else
resourceCmd.filter(!_.startsWith("--"))).foreach { name =>
+ (if (resource == ClusterResource) Array("kafka-cluster") else
resourceCmd.filterNot(_.startsWith("--"))).foreach { name =>
val expected = s"$prefix for resource
`ResourcePattern(resourceType=$resourceType, name=$name, patternType=LITERAL)`:"
assertTrue(output.contains(expected), s"Substring $expected not in
output:\n$output")
}
diff --git
a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 3815705971c..d8127746163 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -184,7 +184,7 @@ class DescribeConsumerGroupTest extends
ConsumerGroupCommandTest {
TestUtils.waitUntilTrue(() => {
val (output, error) =
TestUtils.grabConsoleOutputAndError(service.describeGroups())
- val numLines = output.trim.split("\n").filterNot(line =>
line.isEmpty).length
+ val numLines = output.trim.split("\n").count(line => line.nonEmpty)
(numLines == expectedNumLines) && error.isEmpty
}, s"Expected a data row and no error in describe results with describe
type ${describeType.mkString(" ")}.")
}
@@ -208,7 +208,7 @@ class DescribeConsumerGroupTest extends
ConsumerGroupCommandTest {
TestUtils.waitUntilTrue(() => {
val (output, error) =
TestUtils.grabConsoleOutputAndError(service.describeGroups())
- val numLines = output.trim.split("\n").filterNot(line =>
line.isEmpty).length
+ val numLines = output.trim.split("\n").count(line => line.nonEmpty)
(numLines == expectedNumLines) && error.isEmpty
}, s"Expected a data row and no error in describe results with describe
type ${describeType.mkString(" ")}.")
}
@@ -468,7 +468,7 @@ class DescribeConsumerGroupTest extends
ConsumerGroupCommandTest {
assignments.get.count(_.group == group) == 2 &&
assignments.get.count { x => x.group == group && x.numPartitions == 1
} == 1 &&
assignments.get.count { x => x.group == group && x.numPartitions == 0
} == 1 &&
- assignments.get.count(_.assignment.nonEmpty) == 0
+ !assignments.get.exists(_.assignment.nonEmpty)
}, "Expected rows for consumers with no assigned partitions in describe
group results")
val (state, assignments) = service.collectGroupMembers(group, true)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 7abff0dc3ff..0c5453b54d3 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -2334,9 +2334,9 @@ class PartitionTest extends AbstractPartitionTest {
"AtMinIsr")
def getMetric(metric: String): Option[Metric] = {
- KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.filter { case
(metricName, _) =>
+ KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.find { case
(metricName, _) =>
metricName.getName == metric && metricName.getType == "Partition"
- }.headOption.map(_._2)
+ }.map(_._2)
}
assertTrue(metricsToCheck.forall(getMetric(_).isDefined))
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 34963a41e06..819278bd280 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -86,7 +86,7 @@ class LogCleanerTest {
val segments = log.logSegments.take(3).toSeq
val stats = new CleanerStats()
val expectedBytesRead = segments.map(_.size).sum
- val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
+ val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
cleaner.cleanSegments(log, segments, map, 0L, stats, new
CleanedTransactionMetadata, -1)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead)
@@ -844,7 +844,7 @@ class LogCleanerTest {
// clean the log
val stats = new CleanerStats()
cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new
CleanedTransactionMetadata, -1)
- val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
+ val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
diff --git
a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index 904fbbc2165..4ff43cfc6af 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -588,7 +588,7 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
private def alterClientQuotas(request: Map[ClientQuotaEntity, Map[String,
Option[Double]]], validateOnly: Boolean) = {
val entries = request.map { case (entity, alter) =>
val ops = alter.map { case (key, value) =>
- new ClientQuotaAlteration.Op(key,
value.map(Double.box).getOrElse(null))
+ new ClientQuotaAlteration.Op(key, value.map(Double.box).orNull)
}.asJavaCollection
new ClientQuotaAlteration(entity, ops)
}
diff --git
a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 40b839fdd38..db13f225359 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -48,7 +48,7 @@ object KafkaMetricsReporterTest {
}
private def contextLabelOrNull(name: String, metricsContext:
MetricsContext): String = {
- Option(metricsContext.contextLabels().get(name)).flatMap(v =>
Option(v.toString())).getOrElse(null)
+ Option(metricsContext.contextLabels().get(name)).flatMap(v =>
Option(v.toString())).orNull
}
override def configure(configs: util.Map[String, _]): Unit = {}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 882f64b569c..0162ebdd5e3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1895,7 +1895,7 @@ object TestUtils extends Logging {
def alterClientQuotas(adminClient: Admin, request: Map[ClientQuotaEntity,
Map[String, Option[Double]]]): AlterClientQuotasResult = {
val entries = request.map { case (entity, alter) =>
val ops = alter.map { case (key, value) =>
- new ClientQuotaAlteration.Op(key,
value.map(Double.box).getOrElse(null))
+ new ClientQuotaAlteration.Op(key, value.map(Double.box).orNull)
}.asJavaCollection
new ClientQuotaAlteration(entity, ops)
}.asJavaCollection