This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d665c42e23 MINOR: Small cleanups in integration.kafka tests (#12480)
8d665c42e23 is described below

commit 8d665c42e231fc74ab21e6c0226b572d6328bcc6
Author: Mickael Maison <mimai...@users.noreply.github.com>
AuthorDate: Thu Aug 25 09:55:43 2022 +0200

    MINOR: Small cleanups in integration.kafka tests (#12480)
    
    
    Reviewers: Luke Chen <show...@gmail.com>, Divij Vaidya <di...@amazon.com>, 
Christo Lolov <christo_lo...@yahoo.com>
---
 .../admin/ReassignPartitionsIntegrationTest.scala  |  2 +-
 .../kafka/api/AbstractConsumerTest.scala           |  8 ++--
 .../AdminClientWithPoliciesIntegrationTest.scala   |  2 +-
 .../kafka/api/BaseProducerSendTest.scala           | 10 ++---
 .../integration/kafka/api/BaseQuotaTest.scala      |  4 +-
 .../kafka/api/EndToEndClusterIdTest.scala          |  2 +-
 .../scala/integration/kafka/api/MetricsTest.scala  |  4 +-
 .../SaslPlainSslEndToEndAuthorizationTest.scala    | 29 +++++++--------
 .../kafka/network/DynamicConnectionQuotaTest.scala |  2 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 43 ++++++++++------------
 .../kafka/server/KRaftClusterTest.scala            | 30 +++++++--------
 ...ListenersWithSameSecurityProtocolBaseTest.scala |  3 +-
 .../kafka/server/QuorumTestHarness.scala           |  6 +--
 .../kafka/server/RaftClusterSnapshotTest.scala     |  4 +-
 14 files changed, 71 insertions(+), 78 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index d1c1c0919a2..9b3b935f23e 100644
--- 
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -643,7 +643,7 @@ class ReassignPartitionsIntegrationTest extends 
QuorumTestHarness {
         case (topicName, parts) =>
           val partMap = new HashMap[Integer, List[Integer]]()
           parts.zipWithIndex.foreach {
-            case (part, index) => partMap.put(index, 
part.map(Integer.valueOf(_)).asJava)
+            case (part, index) => partMap.put(index, 
part.map(Integer.valueOf).asJava)
           }
           new NewTopic(topicName, partMap)
       }.toList.asJava).all().get()
diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index 23b56b8e91f..b2a983c1035 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -134,7 +134,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest 
{
       if (timestampType == TimestampType.CREATE_TIME) {
         assertEquals(timestampType, record.timestampType)
         val timestamp = startingTimestamp + i
-        assertEquals(timestamp.toLong, record.timestamp)
+        assertEquals(timestamp, record.timestamp)
       } else
         assertTrue(record.timestamp >= startingTimestamp && record.timestamp 
<= now,
           s"Got unexpected timestamp ${record.timestamp}. Timestamp should be 
between [$startingTimestamp, $now}]")
@@ -185,7 +185,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest 
{
   protected def sendAndAwaitAsyncCommit[K, V](consumer: Consumer[K, V],
                                               offsetsOpt: 
Option[Map[TopicPartition, OffsetAndMetadata]] = None): Unit = {
 
-    def sendAsyncCommit(callback: OffsetCommitCallback) = {
+    def sendAsyncCommit(callback: OffsetCommitCallback): Unit = {
       offsetsOpt match {
         case Some(offsets) => consumer.commitAsync(offsets.asJava, callback)
         case None => consumer.commitAsync(callback)
@@ -362,13 +362,13 @@ abstract class AbstractConsumerTest extends 
BaseRequestTest {
     private var topicsSubscription = topicsToSubscribe
 
     val rebalanceListener: ConsumerRebalanceListener = new 
ConsumerRebalanceListener {
-      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]) = {
+      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
         partitionAssignment ++= partitions.toArray(new 
Array[TopicPartition](0))
         if (userRebalanceListener != null)
           userRebalanceListener.onPartitionsAssigned(partitions)
       }
 
-      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]) = {
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
         partitionAssignment --= partitions.toArray(new 
Array[TopicPartition](0))
         if (userRebalanceListener != null)
           userRebalanceListener.onPartitionsRevoked(partitions)
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 44fe4ef64a2..caf5c4bed20 100644
--- 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -238,7 +238,7 @@ object AdminClientWithPoliciesIntegrationTest {
     def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = {
       validations.append(requestMetadata)
       require(!closed, "Policy should not be closed")
-      require(!configs.isEmpty, "configure should have been called with non 
empty configs")
+      require(configs.nonEmpty, "configure should have been called with non 
empty configs")
       require(!requestMetadata.configs.isEmpty, "request configs should not be 
empty")
       require(requestMetadata.resource.name.nonEmpty, "resource name should 
not be empty")
       if 
(requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index cbd65e1cf2f..65ba9099d26 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -431,9 +431,9 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
         "value".getBytes(StandardCharsets.UTF_8))
       for (_ <- 0 until 50) {
         val responses = (0 until numRecords) map (_ => producer.send(record))
-        assertTrue(responses.forall(!_.isDone()), "No request is complete.")
+        assertTrue(responses.forall(!_.isDone), "No request is complete.")
         producer.flush()
-        assertTrue(responses.forall(_.isDone()), "All requests are complete.")
+        assertTrue(responses.forall(_.isDone), "All requests are complete.")
       }
     } finally {
       producer.close()
@@ -456,7 +456,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
     for (_ <- 0 until 50) {
       val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs 
= Int.MaxValue)
       val responses = (0 until numRecords) map (_ => producer.send(record0))
-      assertTrue(responses.forall(!_.isDone()), "No request is complete.")
+      assertTrue(responses.forall(!_.isDone), "No request is complete.")
       producer.close(Duration.ZERO)
       responses.foreach { future =>
         val e = assertThrows(classOf[ExecutionException], () => future.get())
@@ -497,10 +497,10 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
         // Only send the records in the first callback since we close the 
producer in the callback and no records
         // can be sent afterwards.
         val responses = (0 until numRecords) map (i => producer.send(record, 
new CloseCallback(producer, i == 0)))
-        assertTrue(responses.forall(!_.isDone()), "No request is complete.")
+        assertTrue(responses.forall(!_.isDone), "No request is complete.")
         // flush the messages.
         producer.flush()
-        assertTrue(responses.forall(_.isDone()), "All requests are complete.")
+        assertTrue(responses.forall(_.isDone), "All requests are complete.")
         // Check the messages received by broker.
         TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords)
       } finally {
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 40d4cef7f82..006ce508130 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -385,9 +385,9 @@ abstract class QuotaTestClients(topic: String,
         s"ClientId $producerClientId of user $userPrincipal must have producer 
quota")
       assertEquals(Quota.upperBound(consumerQuota.toDouble), 
overrideConsumerQuota,
         s"ClientId $consumerClientId of user $userPrincipal must have consumer 
quota")
-      assertEquals(Quota.upperBound(requestQuota.toDouble), 
overrideProducerRequestQuota,
+      assertEquals(Quota.upperBound(requestQuota), 
overrideProducerRequestQuota,
         s"ClientId $producerClientId of user $userPrincipal must have request 
quota")
-      assertEquals(Quota.upperBound(requestQuota.toDouble), 
overrideConsumerRequestQuota,
+      assertEquals(Quota.upperBound(requestQuota), 
overrideConsumerRequestQuota,
         s"ClientId $consumerClientId of user $userPrincipal must have request 
quota")
     }
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 25f7ce6a8c0..70ddb474e96 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -110,7 +110,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    MockDeserializer.resetStaticVariables
+    MockDeserializer.resetStaticVariables()
     // create the consumer offset topic
     createTopic(topic, 2, serverCount)
   }
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala 
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 612092f41eb..107442649ee 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -108,7 +108,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
   }
 
   private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], 
numRecords: Int,
-      recordSize: Int, tp: TopicPartition) = {
+      recordSize: Int, tp: TopicPartition): Unit = {
     val bytes = new Array[Byte](recordSize)
     (0 until numRecords).map { i =>
       producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key 
$i".getBytes, bytes))
@@ -226,7 +226,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
 
   private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
 
-    def errorMetricCount = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.filter(_.getName 
== "ErrorsPerSec").size
+    def errorMetricCount = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.count(_.getName == 
"ErrorsPerSec")
 
     val startErrorMetricCount = errorMetricCount
     val errorMetricPrefix = 
"kafka.network:type=RequestMetrics,name=ErrorsPerSec"
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index 772780381ee..b8cb83d133b 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -70,13 +70,12 @@ object SaslPlainSslEndToEndAuthorizationTest {
     def handle(callbacks: Array[Callback]): Unit = {
       var username: String = null
       for (callback <- callbacks) {
-        if (callback.isInstanceOf[NameCallback])
-          username = callback.asInstanceOf[NameCallback].getDefaultName
-        else if (callback.isInstanceOf[PlainAuthenticateCallback]) {
-          val plainCallback = callback.asInstanceOf[PlainAuthenticateCallback]
-          plainCallback.authenticated(Credentials.allUsers(username) == new 
String(plainCallback.password))
-        } else
-          throw new UnsupportedCallbackException(callback)
+        callback match {
+          case nameCallback: NameCallback => username = 
nameCallback.getDefaultName
+          case plainCallback: PlainAuthenticateCallback =>
+            plainCallback.authenticated(Credentials.allUsers(username) == new 
String(plainCallback.password))
+          case _ => throw new UnsupportedCallbackException(callback)
+        }
       }
     }
     def close(): Unit = {}
@@ -85,16 +84,16 @@ object SaslPlainSslEndToEndAuthorizationTest {
   class TestClientCallbackHandler extends AuthenticateCallbackHandler {
     def configure(configs: java.util.Map[String, _], saslMechanism: String, 
jaasConfigEntries: java.util.List[AppConfigurationEntry]): Unit = {}
     def handle(callbacks: Array[Callback]): Unit = {
-      val subject = Subject.getSubject(AccessController.getContext())
+      val subject = Subject.getSubject(AccessController.getContext)
       val username = 
subject.getPublicCredentials(classOf[String]).iterator().next()
       for (callback <- callbacks) {
-        if (callback.isInstanceOf[NameCallback])
-          callback.asInstanceOf[NameCallback].setName(username)
-        else if (callback.isInstanceOf[PasswordCallback]) {
-          if (username == KafkaPlainUser || username == KafkaPlainAdmin)
-            
callback.asInstanceOf[PasswordCallback].setPassword(Credentials.allUsers(username).toCharArray)
-        } else
-          throw new UnsupportedCallbackException(callback)
+        callback match {
+          case nameCallback: NameCallback => nameCallback.setName(username)
+          case passwordCallback: PasswordCallback =>
+            if (username == KafkaPlainUser || username == KafkaPlainAdmin)
+              
passwordCallback.setPassword(Credentials.allUsers(username).toCharArray)
+          case _ => throw new UnsupportedCallbackException(callback)
+        }
       }
     }
     def close(): Unit = {}
diff --git 
a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
 
b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index 14f3e8f75dd..d705231cd9b 100644
--- 
a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++ 
b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -51,7 +51,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
   var executor: ExecutorService = _
 
   override def brokerPropertyOverrides(properties: Properties): Unit = {
-    properties.put(KafkaConfig.NumQuotaSamplesProp, "2".toString)
+    properties.put(KafkaConfig.NumQuotaSamplesProp, "2")
     properties.put("listener.name.plaintext.max.connection.creation.rate", 
plaintextListenerDefaultQuota.toString)
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 295ad061211..8da5b456cde 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -102,9 +102,6 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
   private val sslProperties2 = TestUtils.sslConfigs(Mode.SERVER, clientCert = 
false, Some(trustStoreFile2), "kafka")
   private val invalidSslProperties = invalidSslConfigs
 
-  def addExtraProps(props: Properties): Unit = {
-  }
-
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism)))
@@ -136,7 +133,6 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
       props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString)
       props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString)
-      addExtraProps(props)
 
       props ++= sslProperties1
       props ++= securityProps(sslProperties1, KEYSTORE_PROPS, 
listenerPrefix(SecureInternal))
@@ -144,7 +140,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       // Set invalid top-level properties to ensure that listener config is 
used
       // Don't set any dynamic configs here since they get overridden in tests
       props ++= invalidSslProperties
-      props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS, "")
+      props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS)
       props ++= securityProps(sslProperties1, KEYSTORE_PROPS, 
listenerPrefix(SecureExternal))
 
       val kafkaConfig = KafkaConfig.fromProps(props)
@@ -232,7 +228,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       keyStoreProps.forEach { configName =>
         val desc = configEntry(configDesc, s"$prefix$configName")
         val isSensitive = configName.contains("password")
-        verifyConfig(configName, desc, isSensitive, isReadOnly = 
prefix.nonEmpty, if (prefix.isEmpty) invalidSslProperties else sslProperties1)
+        verifyConfig(configName, desc, isSensitive, isReadOnly = 
prefix.nonEmpty, expectedProps)
         val defaultValue = if (configName == SSL_KEYSTORE_TYPE_CONFIG) 
Some("JKS") else None
         verifySynonyms(configName, desc.synonyms, isSensitive, prefix, 
defaultValue)
       }
@@ -383,8 +379,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     verifyProduceConsume(producer, consumer, 10, topic2)
 
     // Broker keystore update for internal listener with incompatible keystore 
should fail without update
-    val adminClient = adminClients.head
-    alterSslKeystore(adminClient, sslProperties2, SecureInternal, 
expectFailure = true)
+    alterSslKeystore(sslProperties2, SecureInternal, expectFailure = true)
     verifyProduceConsume(producer, consumer, 10, topic2)
 
     // Broker keystore update for internal listener with compatible keystore 
should succeed
@@ -393,7 +388,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     val newFile = File.createTempFile("keystore", ".jks")
     Files.copy(oldFile.toPath, newFile.toPath, 
StandardCopyOption.REPLACE_EXISTING)
     sslPropertiesCopy.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, 
newFile.getPath)
-    alterSslKeystore(adminClient, sslPropertiesCopy, SecureInternal)
+    alterSslKeystore(sslPropertiesCopy, SecureInternal)
     verifyProduceConsume(producer, consumer, 10, topic2)
 
     // Verify that keystores can be updated using same file name.
@@ -402,7 +397,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     reusableProps.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, 
reusableFile.getPath)
     Files.copy(new 
File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath,
       reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING)
-    alterSslKeystore(adminClient, reusableProps, SecureExternal)
+    alterSslKeystore(reusableProps, SecureExternal)
     val producer3 = 
ProducerBuilder().trustStoreProps(sslProperties2).maxRetries(0).build()
     verifyAuthenticationFailure(producer3)
     // Now alter using same file name. We can't check if the update has 
completed by comparing config on
@@ -410,7 +405,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     Files.copy(new 
File(sslProperties2.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath,
       reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING)
     reusableFile.setLastModified(System.currentTimeMillis() + 1000)
-    alterSslKeystore(adminClient, reusableProps, SecureExternal)
+    alterSslKeystore(reusableProps, SecureExternal)
     TestUtils.waitUntilTrue(() => {
       try {
         producer3.partitionsFor(topic).size() == numPartitions
@@ -819,7 +814,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       fetcherThreadPrefix -> leftOverThreadCount(fetcherThreadPrefix, 
servers.head.config.numReplicaFetchers)
     )
 
-    def maybeVerifyThreadPoolSize(propName: String, size: Int, threadPrefix: 
String): Unit = {
+    def maybeVerifyThreadPoolSize(size: Int, threadPrefix: String): Unit = {
       val ignoreCount = leftOverThreads.getOrElse(threadPrefix, 0)
       val expectedCountPerBroker = threadMultiplier.getOrElse(threadPrefix, 0) 
* size
       if (expectedCountPerBroker > 0)
@@ -842,11 +837,11 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       val props = new Properties
       props.put(propName, newSize.toString)
       reconfigureServers(props, perBrokerConfig = false, (propName, 
newSize.toString))
-      maybeVerifyThreadPoolSize(propName, newSize, threadPrefix)
+      maybeVerifyThreadPoolSize(newSize, threadPrefix)
     }
 
     def verifyThreadPoolResize(propName: String, currentSize: => Int, 
threadPrefix: String, mayReceiveDuplicates: Boolean): Unit = {
-      maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix)
+      maybeVerifyThreadPoolSize(currentSize, threadPrefix)
       val numRetries = if (mayReceiveDuplicates) 100 else 0
       val (producerThread, consumerThread) = startProduceConsume(retries = 
numRetries)
       var threadPoolSize = currentSize
@@ -858,7 +853,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       }
       stopAndVerifyProduceConsume(producerThread, consumerThread, 
mayReceiveDuplicates)
       // Verify that all threads are alive
-      maybeVerifyThreadPoolSize(propName, threadPoolSize, threadPrefix)
+      maybeVerifyThreadPoolSize(threadPoolSize, threadPrefix)
     }
 
     val config = servers.head.config
@@ -900,8 +895,8 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
 
     assertEquals(2, kafkaMetrics.size) // 2 listeners
     // 2 threads per listener
-    assertEquals(2, 
kafkaMetrics.get("INTERNAL").get.groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size)
-    assertEquals(2, 
kafkaMetrics.get("EXTERNAL").get.groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size)
+    assertEquals(2, 
kafkaMetrics("INTERNAL").groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size)
+    assertEquals(2, 
kafkaMetrics("EXTERNAL").groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size)
 
     KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
       .filter(isProcessorMetric)
@@ -1307,7 +1302,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       .saslMechanism(mechanism)
       .maxRetries(retries)
       .build()
-    val consumer = 
ConsumerBuilder(s"add-listener-group-$securityProtocol-$mechanism")
+    val consumer = ConsumerBuilder(groupId)
       .listenerName(securityProtocol.name)
       .securityProtocol(securityProtocol)
       .saslMechanism(mechanism)
@@ -1369,7 +1364,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
         verifyAuthenticationFailure(producerBuilder.build())
         true
       } catch {
-        case e: Error => false
+        case _: Error => false
       }
     }, "Did not fail authentication with invalid config")
   }
@@ -1422,7 +1417,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     newStoreProps
   }
 
-  private def alterSslKeystore(adminClient: Admin, props: Properties, 
listener: String, expectFailure: Boolean  = false): Unit = {
+  private def alterSslKeystore(props: Properties, listener: String, 
expectFailure: Boolean  = false): Unit = {
     val configPrefix = listenerPrefix(listener)
     val newProps = securityProps(props, KEYSTORE_PROPS, configPrefix)
     reconfigureServers(newProps, perBrokerConfig = true,
@@ -1505,7 +1500,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
         Seq(new ConfigResource(ConfigResource.Type.BROKER, ""))
       brokerResources.foreach { brokerResource =>
         val exception = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerResource).get)
-        assertEquals(classOf[InvalidRequestException], 
exception.getCause().getClass())
+        assertEquals(classOf[InvalidRequestException], 
exception.getCause.getClass)
       }
       servers.foreach { server =>
         assertEquals(oldProps, server.config.values.asScala.filter { case (k, 
_) => newProps.containsKey(k) })
@@ -1708,9 +1703,9 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
 
   private abstract class ClientBuilder[T]() {
     protected var _bootstrapServers: Option[String] = None
-    protected var _listenerName = SecureExternal
+    protected var _listenerName: String = SecureExternal
     protected var _securityProtocol = SecurityProtocol.SASL_SSL
-    protected var _saslMechanism = kafkaClientSaslMechanism
+    protected var _saslMechanism: String = kafkaClientSaslMechanism
     protected var _clientId = "test-client"
     protected val _propsOverride: Properties = new Properties
 
@@ -1961,6 +1956,6 @@ class TestMetricsReporter extends MetricsReporter with 
Reconfigurable with Close
 class MockFileConfigProvider extends FileConfigProvider {
   @throws(classOf[IOException])
   override def reader(path: String): Reader = {
-    new 
StringReader("key=testKey\npassword=ServerPassword\ninterval=1000\nupdinterval=2000\nstoretype=JKS");
+    new 
StringReader("key=testKey\npassword=ServerPassword\ninterval=1000\nupdinterval=2000\nstoretype=JKS")
   }
 }
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index c550553917b..d31222a0f9d 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -51,7 +51,7 @@ import scala.jdk.CollectionConverters._
 @Tag("integration")
 class KRaftClusterTest {
   val log = LoggerFactory.getLogger(classOf[KRaftClusterTest])
-  val log2 = 
LoggerFactory.getLogger(classOf[KRaftClusterTest].getCanonicalName() + "2")
+  val log2 = 
LoggerFactory.getLogger(classOf[KRaftClusterTest].getCanonicalName + "2")
 
   @Test
   def testCreateClusterAndClose(): Unit = {
@@ -411,7 +411,7 @@ class KRaftClusterTest {
           Optional.of(new NewPartitionReassignment(Arrays.asList(3, 2, 0, 1))))
         admin.alterPartitionReassignments(reassignments).all().get()
         TestUtils.waitUntilTrue(
-          () => 
admin.listPartitionReassignments().reassignments().get().isEmpty(),
+          () => 
admin.listPartitionReassignments().reassignments().get().isEmpty,
           "The reassignment never completed.")
         var currentMapping: Seq[Seq[Int]] = Seq()
         val expectedMapping = Seq(Seq(2, 1, 0), Seq(0, 1, 2), Seq(2, 3), 
Seq(3, 2, 0, 1))
@@ -475,7 +475,7 @@ class KRaftClusterTest {
                                   expectedAbsent: Seq[String]): Unit = {
     val topicsNotFound = new util.HashSet[String]
     var extraTopics: mutable.Set[String] = null
-    expectedPresent.foreach(topicsNotFound.add(_))
+    expectedPresent.foreach(topicsNotFound.add)
     TestUtils.waitUntilTrue(() => {
       admin.listTopics().names().get().forEach(name => 
topicsNotFound.remove(name))
       extraTopics = 
admin.listTopics().names().get().asScala.filter(expectedAbsent.contains(_))
@@ -619,29 +619,29 @@ class KRaftClusterTest {
             new ApiError(INVALID_REQUEST, "APPEND operation is not allowed for 
the BROKER_LOGGER resource")),
           incrementalAlter(admin, Seq(
             (broker2, Seq(
-              new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"), 
OpType.SET),
-              new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"), 
OpType.SET))),
+              new AlterConfigOp(new ConfigEntry(log.getName, "TRACE"), 
OpType.SET),
+              new AlterConfigOp(new ConfigEntry(log2.getName, "TRACE"), 
OpType.SET))),
             (broker3, Seq(
-              new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"), 
OpType.APPEND),
-              new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"), 
OpType.APPEND))))))
+              new AlterConfigOp(new ConfigEntry(log.getName, "TRACE"), 
OpType.APPEND),
+              new AlterConfigOp(new ConfigEntry(log2.getName, "TRACE"), 
OpType.APPEND))))))
 
         validateConfigs(admin, Map(broker2 -> Seq(
-          (log.getName(), "TRACE"),
-          (log2.getName(), "TRACE"))))
+          (log.getName, "TRACE"),
+          (log2.getName, "TRACE"))))
 
         assertEquals(Seq(ApiError.NONE,
           new ApiError(INVALID_REQUEST, "SUBTRACT operation is not allowed for 
the BROKER_LOGGER resource")),
           incrementalAlter(admin, Seq(
             (broker2, Seq(
-              new AlterConfigOp(new ConfigEntry(log.getName(), ""), 
OpType.DELETE),
-              new AlterConfigOp(new ConfigEntry(log2.getName(), ""), 
OpType.DELETE))),
+              new AlterConfigOp(new ConfigEntry(log.getName, ""), 
OpType.DELETE),
+              new AlterConfigOp(new ConfigEntry(log2.getName, ""), 
OpType.DELETE))),
             (broker3, Seq(
-              new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"), 
OpType.SUBTRACT),
-              new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"), 
OpType.SUBTRACT))))))
+              new AlterConfigOp(new ConfigEntry(log.getName, "TRACE"), 
OpType.SUBTRACT),
+              new AlterConfigOp(new ConfigEntry(log2.getName, "TRACE"), 
OpType.SUBTRACT))))))
 
         validateConfigs(admin, Map(broker2 -> Seq(
-          (log.getName(), initialLog4j.get(broker2).get.get(log.getName())),
-          (log2.getName(), 
initialLog4j.get(broker2).get.get(log2.getName())))))
+          (log.getName, initialLog4j(broker2).get(log.getName)),
+          (log2.getName, initialLog4j(broker2).get(log2.getName)))))
       } finally {
         admin.close()
       }
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 1d865f916e9..30c9b6d2670 100644
--- 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -27,7 +27,6 @@ import kafka.coordinator.group.OffsetConfig
 import kafka.utils.JaasTestUtils.JaasSection
 import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.utils.Implicits._
-import kafka.server.QuorumTestHarness
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.config.SslConfigs
@@ -180,7 +179,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
     props.put(s"${prefix}${KafkaConfig.SaslJaasConfigProp}", jaasConfig)
   }
 
-  case class ClientMetadata(val listenerName: ListenerName, val saslMechanism: 
String, topic: String) {
+  case class ClientMetadata(listenerName: ListenerName, saslMechanism: String, 
topic: String) {
     override def hashCode: Int = Objects.hash(listenerName, saslMechanism)
     override def equals(obj: Any): Boolean = obj match {
       case other: ClientMetadata => listenerName == other.listenerName && 
saslMechanism == other.saslMechanism && topic == other.topic
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 0e2280dd705..d1152b022be 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -287,10 +287,10 @@ abstract class QuorumTestHarness extends Logging {
     val nodeId = Integer.parseInt(props.getProperty(KafkaConfig.NodeIdProp))
     val metadataDir = TestUtils.tempDir()
     val metaProperties = new MetaProperties(Uuid.randomUuid().toString, nodeId)
-    formatDirectories(immutable.Seq(metadataDir.getAbsolutePath()), 
metaProperties)
+    formatDirectories(immutable.Seq(metadataDir.getAbsolutePath), 
metaProperties)
     val controllerMetrics = new Metrics()
-    props.setProperty(KafkaConfig.MetadataLogDirProp, 
metadataDir.getAbsolutePath())
-    val proto = controllerListenerSecurityProtocol.toString()
+    props.setProperty(KafkaConfig.MetadataLogDirProp, 
metadataDir.getAbsolutePath)
+    val proto = controllerListenerSecurityProtocol.toString
     props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"CONTROLLER:${proto}")
     props.setProperty(KafkaConfig.ListenersProp, s"CONTROLLER://localhost:0")
     props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
diff --git 
a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala 
b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index f8dccd17d0d..f1c47ff1bea 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -83,10 +83,10 @@ class RaftClusterSnapshotTest {
           )
         ) { snapshot =>
           // Check that the snapshot is non-empty
-          assertTrue(snapshot.hasNext())
+          assertTrue(snapshot.hasNext)
 
           // Check that we can read the entire snapshot
-          while (snapshot.hasNext()) {
+          while (snapshot.hasNext) {
             val batch = snapshot.next()
             assertTrue(batch.sizeInBytes > 0)
             assertNotEquals(Collections.emptyList(), batch.records())

Reply via email to