This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8d665c42e23 MINOR: Small cleanups in integration.kafka tests (#12480) 8d665c42e23 is described below commit 8d665c42e231fc74ab21e6c0226b572d6328bcc6 Author: Mickael Maison <mimai...@users.noreply.github.com> AuthorDate: Thu Aug 25 09:55:43 2022 +0200 MINOR: Small cleanups in integration.kafka tests (#12480) Reviewers: Luke Chen <show...@gmail.com>, Divij Vaidya <di...@amazon.com>, Christo Lolov <christo_lo...@yahoo.com> --- .../admin/ReassignPartitionsIntegrationTest.scala | 2 +- .../kafka/api/AbstractConsumerTest.scala | 8 ++-- .../AdminClientWithPoliciesIntegrationTest.scala | 2 +- .../kafka/api/BaseProducerSendTest.scala | 10 ++--- .../integration/kafka/api/BaseQuotaTest.scala | 4 +- .../kafka/api/EndToEndClusterIdTest.scala | 2 +- .../scala/integration/kafka/api/MetricsTest.scala | 4 +- .../SaslPlainSslEndToEndAuthorizationTest.scala | 29 +++++++-------- .../kafka/network/DynamicConnectionQuotaTest.scala | 2 +- .../server/DynamicBrokerReconfigurationTest.scala | 43 ++++++++++------------ .../kafka/server/KRaftClusterTest.scala | 30 +++++++-------- ...ListenersWithSameSecurityProtocolBaseTest.scala | 3 +- .../kafka/server/QuorumTestHarness.scala | 6 +-- .../kafka/server/RaftClusterSnapshotTest.scala | 4 +- 14 files changed, 71 insertions(+), 78 deletions(-) diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala index d1c1c0919a2..9b3b935f23e 100644 --- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala @@ -643,7 +643,7 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness { case (topicName, parts) => val partMap = new HashMap[Integer, List[Integer]]() parts.zipWithIndex.foreach { - case (part, index) => partMap.put(index, part.map(Integer.valueOf(_)).asJava) + case (part, index) => partMap.put(index, part.map(Integer.valueOf).asJava) } new NewTopic(topicName, partMap) }.toList.asJava).all().get() diff --git a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala index 23b56b8e91f..b2a983c1035 100644 --- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala @@ -134,7 +134,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest { if (timestampType == TimestampType.CREATE_TIME) { assertEquals(timestampType, record.timestampType) val timestamp = startingTimestamp + i - assertEquals(timestamp.toLong, record.timestamp) + assertEquals(timestamp, record.timestamp) } else assertTrue(record.timestamp >= startingTimestamp && record.timestamp <= now, s"Got unexpected timestamp ${record.timestamp}. Timestamp should be between [$startingTimestamp, $now}]") @@ -185,7 +185,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest { protected def sendAndAwaitAsyncCommit[K, V](consumer: Consumer[K, V], offsetsOpt: Option[Map[TopicPartition, OffsetAndMetadata]] = None): Unit = { - def sendAsyncCommit(callback: OffsetCommitCallback) = { + def sendAsyncCommit(callback: OffsetCommitCallback): Unit = { offsetsOpt match { case Some(offsets) => consumer.commitAsync(offsets.asJava, callback) case None => consumer.commitAsync(callback) @@ -362,13 +362,13 @@ abstract class AbstractConsumerTest extends BaseRequestTest { private var topicsSubscription = topicsToSubscribe val rebalanceListener: ConsumerRebalanceListener = new ConsumerRebalanceListener { - override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = { + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { partitionAssignment ++= partitions.toArray(new Array[TopicPartition](0)) if (userRebalanceListener != null) userRebalanceListener.onPartitionsAssigned(partitions) } - override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { partitionAssignment --= partitions.toArray(new Array[TopicPartition](0)) if (userRebalanceListener != null) userRebalanceListener.onPartitionsRevoked(partitions) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index 44fe4ef64a2..caf5c4bed20 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -238,7 +238,7 @@ object AdminClientWithPoliciesIntegrationTest { def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = { validations.append(requestMetadata) require(!closed, "Policy should not be closed") - require(!configs.isEmpty, "configure should have been called with non empty configs") + require(configs.nonEmpty, "configure should have been called with non empty configs") require(!requestMetadata.configs.isEmpty, "request configs should not be empty") require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty") if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index cbd65e1cf2f..65ba9099d26 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -431,9 +431,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { "value".getBytes(StandardCharsets.UTF_8)) for (_ <- 0 until 50) { val responses = (0 until numRecords) map (_ => producer.send(record)) - assertTrue(responses.forall(!_.isDone()), "No request is complete.") + assertTrue(responses.forall(!_.isDone), "No request is complete.") producer.flush() - assertTrue(responses.forall(_.isDone()), "All requests are complete.") + assertTrue(responses.forall(_.isDone), "All requests are complete.") } } finally { producer.close() @@ -456,7 +456,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { for (_ <- 0 until 50) { val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) val responses = (0 until numRecords) map (_ => producer.send(record0)) - assertTrue(responses.forall(!_.isDone()), "No request is complete.") + assertTrue(responses.forall(!_.isDone), "No request is complete.") producer.close(Duration.ZERO) responses.foreach { future => val e = assertThrows(classOf[ExecutionException], () => future.get()) @@ -497,10 +497,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { // Only send the records in the first callback since we close the producer in the callback and no records // can be sent afterwards. val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer, i == 0))) - assertTrue(responses.forall(!_.isDone()), "No request is complete.") + assertTrue(responses.forall(!_.isDone), "No request is complete.") // flush the messages. producer.flush() - assertTrue(responses.forall(_.isDone()), "All requests are complete.") + assertTrue(responses.forall(_.isDone), "All requests are complete.") // Check the messages received by broker. TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords) } finally { diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index 40d4cef7f82..006ce508130 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -385,9 +385,9 @@ abstract class QuotaTestClients(topic: String, s"ClientId $producerClientId of user $userPrincipal must have producer quota") assertEquals(Quota.upperBound(consumerQuota.toDouble), overrideConsumerQuota, s"ClientId $consumerClientId of user $userPrincipal must have consumer quota") - assertEquals(Quota.upperBound(requestQuota.toDouble), overrideProducerRequestQuota, + assertEquals(Quota.upperBound(requestQuota), overrideProducerRequestQuota, s"ClientId $producerClientId of user $userPrincipal must have request quota") - assertEquals(Quota.upperBound(requestQuota.toDouble), overrideConsumerRequestQuota, + assertEquals(Quota.upperBound(requestQuota), overrideConsumerRequestQuota, s"ClientId $consumerClientId of user $userPrincipal must have request quota") } } diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala index 25f7ce6a8c0..70ddb474e96 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala @@ -110,7 +110,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - MockDeserializer.resetStaticVariables + MockDeserializer.resetStaticVariables() // create the consumer offset topic createTopic(topic, 2, serverCount) } diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index 612092f41eb..107442649ee 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -108,7 +108,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { } private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, - recordSize: Int, tp: TopicPartition) = { + recordSize: Int, tp: TopicPartition): Unit = { val bytes = new Array[Byte](recordSize) (0 until numRecords).map { i => producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, bytes)) @@ -226,7 +226,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = { - def errorMetricCount = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.filter(_.getName == "ErrorsPerSec").size + def errorMetricCount = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.count(_.getName == "ErrorsPerSec") val startErrorMetricCount = errorMetricCount val errorMetricPrefix = "kafka.network:type=RequestMetrics,name=ErrorsPerSec" diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index 772780381ee..b8cb83d133b 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -70,13 +70,12 @@ object SaslPlainSslEndToEndAuthorizationTest { def handle(callbacks: Array[Callback]): Unit = { var username: String = null for (callback <- callbacks) { - if (callback.isInstanceOf[NameCallback]) - username = callback.asInstanceOf[NameCallback].getDefaultName - else if (callback.isInstanceOf[PlainAuthenticateCallback]) { - val plainCallback = callback.asInstanceOf[PlainAuthenticateCallback] - plainCallback.authenticated(Credentials.allUsers(username) == new String(plainCallback.password)) - } else - throw new UnsupportedCallbackException(callback) + callback match { + case nameCallback: NameCallback => username = nameCallback.getDefaultName + case plainCallback: PlainAuthenticateCallback => + plainCallback.authenticated(Credentials.allUsers(username) == new String(plainCallback.password)) + case _ => throw new UnsupportedCallbackException(callback) + } } } def close(): Unit = {} @@ -85,16 +84,16 @@ object SaslPlainSslEndToEndAuthorizationTest { class TestClientCallbackHandler extends AuthenticateCallbackHandler { def configure(configs: java.util.Map[String, _], saslMechanism: String, jaasConfigEntries: java.util.List[AppConfigurationEntry]): Unit = {} def handle(callbacks: Array[Callback]): Unit = { - val subject = Subject.getSubject(AccessController.getContext()) + val subject = Subject.getSubject(AccessController.getContext) val username = subject.getPublicCredentials(classOf[String]).iterator().next() for (callback <- callbacks) { - if (callback.isInstanceOf[NameCallback]) - callback.asInstanceOf[NameCallback].setName(username) - else if (callback.isInstanceOf[PasswordCallback]) { - if (username == KafkaPlainUser || username == KafkaPlainAdmin) - callback.asInstanceOf[PasswordCallback].setPassword(Credentials.allUsers(username).toCharArray) - } else - throw new UnsupportedCallbackException(callback) + callback match { + case nameCallback: NameCallback => nameCallback.setName(username) + case passwordCallback: PasswordCallback => + if (username == KafkaPlainUser || username == KafkaPlainAdmin) + passwordCallback.setPassword(Credentials.allUsers(username).toCharArray) + case _ => throw new UnsupportedCallbackException(callback) + } } } def close(): Unit = {} diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala index 14f3e8f75dd..d705231cd9b 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala @@ -51,7 +51,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { var executor: ExecutorService = _ override def brokerPropertyOverrides(properties: Properties): Unit = { - properties.put(KafkaConfig.NumQuotaSamplesProp, "2".toString) + properties.put(KafkaConfig.NumQuotaSamplesProp, "2") properties.put("listener.name.plaintext.max.connection.creation.rate", plaintextListenerDefaultQuota.toString) } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 295ad061211..8da5b456cde 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -102,9 +102,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup private val sslProperties2 = TestUtils.sslConfigs(Mode.SERVER, clientCert = false, Some(trustStoreFile2), "kafka") private val invalidSslProperties = invalidSslConfigs - def addExtraProps(props: Properties): Unit = { - } - @BeforeEach override def setUp(testInfo: TestInfo): Unit = { startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))) @@ -136,7 +133,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString) - addExtraProps(props) props ++= sslProperties1 props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureInternal)) @@ -144,7 +140,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup // Set invalid top-level properties to ensure that listener config is used // Don't set any dynamic configs here since they get overridden in tests props ++= invalidSslProperties - props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS, "") + props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS) props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal)) val kafkaConfig = KafkaConfig.fromProps(props) @@ -232,7 +228,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup keyStoreProps.forEach { configName => val desc = configEntry(configDesc, s"$prefix$configName") val isSensitive = configName.contains("password") - verifyConfig(configName, desc, isSensitive, isReadOnly = prefix.nonEmpty, if (prefix.isEmpty) invalidSslProperties else sslProperties1) + verifyConfig(configName, desc, isSensitive, isReadOnly = prefix.nonEmpty, expectedProps) val defaultValue = if (configName == SSL_KEYSTORE_TYPE_CONFIG) Some("JKS") else None verifySynonyms(configName, desc.synonyms, isSensitive, prefix, defaultValue) } @@ -383,8 +379,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup verifyProduceConsume(producer, consumer, 10, topic2) // Broker keystore update for internal listener with incompatible keystore should fail without update - val adminClient = adminClients.head - alterSslKeystore(adminClient, sslProperties2, SecureInternal, expectFailure = true) + alterSslKeystore(sslProperties2, SecureInternal, expectFailure = true) verifyProduceConsume(producer, consumer, 10, topic2) // Broker keystore update for internal listener with compatible keystore should succeed @@ -393,7 +388,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val newFile = File.createTempFile("keystore", ".jks") Files.copy(oldFile.toPath, newFile.toPath, StandardCopyOption.REPLACE_EXISTING) sslPropertiesCopy.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, newFile.getPath) - alterSslKeystore(adminClient, sslPropertiesCopy, SecureInternal) + alterSslKeystore(sslPropertiesCopy, SecureInternal) verifyProduceConsume(producer, consumer, 10, topic2) // Verify that keystores can be updated using same file name. @@ -402,7 +397,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup reusableProps.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, reusableFile.getPath) Files.copy(new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath, reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING) - alterSslKeystore(adminClient, reusableProps, SecureExternal) + alterSslKeystore(reusableProps, SecureExternal) val producer3 = ProducerBuilder().trustStoreProps(sslProperties2).maxRetries(0).build() verifyAuthenticationFailure(producer3) // Now alter using same file name. We can't check if the update has completed by comparing config on @@ -410,7 +405,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup Files.copy(new File(sslProperties2.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath, reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING) reusableFile.setLastModified(System.currentTimeMillis() + 1000) - alterSslKeystore(adminClient, reusableProps, SecureExternal) + alterSslKeystore(reusableProps, SecureExternal) TestUtils.waitUntilTrue(() => { try { producer3.partitionsFor(topic).size() == numPartitions @@ -819,7 +814,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup fetcherThreadPrefix -> leftOverThreadCount(fetcherThreadPrefix, servers.head.config.numReplicaFetchers) ) - def maybeVerifyThreadPoolSize(propName: String, size: Int, threadPrefix: String): Unit = { + def maybeVerifyThreadPoolSize(size: Int, threadPrefix: String): Unit = { val ignoreCount = leftOverThreads.getOrElse(threadPrefix, 0) val expectedCountPerBroker = threadMultiplier.getOrElse(threadPrefix, 0) * size if (expectedCountPerBroker > 0) @@ -842,11 +837,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val props = new Properties props.put(propName, newSize.toString) reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString)) - maybeVerifyThreadPoolSize(propName, newSize, threadPrefix) + maybeVerifyThreadPoolSize(newSize, threadPrefix) } def verifyThreadPoolResize(propName: String, currentSize: => Int, threadPrefix: String, mayReceiveDuplicates: Boolean): Unit = { - maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix) + maybeVerifyThreadPoolSize(currentSize, threadPrefix) val numRetries = if (mayReceiveDuplicates) 100 else 0 val (producerThread, consumerThread) = startProduceConsume(retries = numRetries) var threadPoolSize = currentSize @@ -858,7 +853,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } stopAndVerifyProduceConsume(producerThread, consumerThread, mayReceiveDuplicates) // Verify that all threads are alive - maybeVerifyThreadPoolSize(propName, threadPoolSize, threadPrefix) + maybeVerifyThreadPoolSize(threadPoolSize, threadPrefix) } val config = servers.head.config @@ -900,8 +895,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertEquals(2, kafkaMetrics.size) // 2 listeners // 2 threads per listener - assertEquals(2, kafkaMetrics.get("INTERNAL").get.groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size) - assertEquals(2, kafkaMetrics.get("EXTERNAL").get.groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size) + assertEquals(2, kafkaMetrics("INTERNAL").groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size) + assertEquals(2, kafkaMetrics("EXTERNAL").groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size) KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala .filter(isProcessorMetric) @@ -1307,7 +1302,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup .saslMechanism(mechanism) .maxRetries(retries) .build() - val consumer = ConsumerBuilder(s"add-listener-group-$securityProtocol-$mechanism") + val consumer = ConsumerBuilder(groupId) .listenerName(securityProtocol.name) .securityProtocol(securityProtocol) .saslMechanism(mechanism) @@ -1369,7 +1364,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup verifyAuthenticationFailure(producerBuilder.build()) true } catch { - case e: Error => false + case _: Error => false } }, "Did not fail authentication with invalid config") } @@ -1422,7 +1417,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup newStoreProps } - private def alterSslKeystore(adminClient: Admin, props: Properties, listener: String, expectFailure: Boolean = false): Unit = { + private def alterSslKeystore(props: Properties, listener: String, expectFailure: Boolean = false): Unit = { val configPrefix = listenerPrefix(listener) val newProps = securityProps(props, KEYSTORE_PROPS, configPrefix) reconfigureServers(newProps, perBrokerConfig = true, @@ -1505,7 +1500,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup Seq(new ConfigResource(ConfigResource.Type.BROKER, "")) brokerResources.foreach { brokerResource => val exception = assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get) - assertEquals(classOf[InvalidRequestException], exception.getCause().getClass()) + assertEquals(classOf[InvalidRequestException], exception.getCause.getClass) } servers.foreach { server => assertEquals(oldProps, server.config.values.asScala.filter { case (k, _) => newProps.containsKey(k) }) @@ -1708,9 +1703,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup private abstract class ClientBuilder[T]() { protected var _bootstrapServers: Option[String] = None - protected var _listenerName = SecureExternal + protected var _listenerName: String = SecureExternal protected var _securityProtocol = SecurityProtocol.SASL_SSL - protected var _saslMechanism = kafkaClientSaslMechanism + protected var _saslMechanism: String = kafkaClientSaslMechanism protected var _clientId = "test-client" protected val _propsOverride: Properties = new Properties @@ -1961,6 +1956,6 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close class MockFileConfigProvider extends FileConfigProvider { @throws(classOf[IOException]) override def reader(path: String): Reader = { - new StringReader("key=testKey\npassword=ServerPassword\ninterval=1000\nupdinterval=2000\nstoretype=JKS"); + new StringReader("key=testKey\npassword=ServerPassword\ninterval=1000\nupdinterval=2000\nstoretype=JKS") } } diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index c550553917b..d31222a0f9d 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -51,7 +51,7 @@ import scala.jdk.CollectionConverters._ @Tag("integration") class KRaftClusterTest { val log = LoggerFactory.getLogger(classOf[KRaftClusterTest]) - val log2 = LoggerFactory.getLogger(classOf[KRaftClusterTest].getCanonicalName() + "2") + val log2 = LoggerFactory.getLogger(classOf[KRaftClusterTest].getCanonicalName + "2") @Test def testCreateClusterAndClose(): Unit = { @@ -411,7 +411,7 @@ class KRaftClusterTest { Optional.of(new NewPartitionReassignment(Arrays.asList(3, 2, 0, 1)))) admin.alterPartitionReassignments(reassignments).all().get() TestUtils.waitUntilTrue( - () => admin.listPartitionReassignments().reassignments().get().isEmpty(), + () => admin.listPartitionReassignments().reassignments().get().isEmpty, "The reassignment never completed.") var currentMapping: Seq[Seq[Int]] = Seq() val expectedMapping = Seq(Seq(2, 1, 0), Seq(0, 1, 2), Seq(2, 3), Seq(3, 2, 0, 1)) @@ -475,7 +475,7 @@ class KRaftClusterTest { expectedAbsent: Seq[String]): Unit = { val topicsNotFound = new util.HashSet[String] var extraTopics: mutable.Set[String] = null - expectedPresent.foreach(topicsNotFound.add(_)) + expectedPresent.foreach(topicsNotFound.add) TestUtils.waitUntilTrue(() => { admin.listTopics().names().get().forEach(name => topicsNotFound.remove(name)) extraTopics = admin.listTopics().names().get().asScala.filter(expectedAbsent.contains(_)) @@ -619,29 +619,29 @@ class KRaftClusterTest { new ApiError(INVALID_REQUEST, "APPEND operation is not allowed for the BROKER_LOGGER resource")), incrementalAlter(admin, Seq( (broker2, Seq( - new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"), OpType.SET), - new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"), OpType.SET))), + new AlterConfigOp(new ConfigEntry(log.getName, "TRACE"), OpType.SET), + new AlterConfigOp(new ConfigEntry(log2.getName, "TRACE"), OpType.SET))), (broker3, Seq( - new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"), OpType.APPEND), - new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"), OpType.APPEND)))))) + new AlterConfigOp(new ConfigEntry(log.getName, "TRACE"), OpType.APPEND), + new AlterConfigOp(new ConfigEntry(log2.getName, "TRACE"), OpType.APPEND)))))) validateConfigs(admin, Map(broker2 -> Seq( - (log.getName(), "TRACE"), - (log2.getName(), "TRACE")))) + (log.getName, "TRACE"), + (log2.getName, "TRACE")))) assertEquals(Seq(ApiError.NONE, new ApiError(INVALID_REQUEST, "SUBTRACT operation is not allowed for the BROKER_LOGGER resource")), incrementalAlter(admin, Seq( (broker2, Seq( - new AlterConfigOp(new ConfigEntry(log.getName(), ""), OpType.DELETE), - new AlterConfigOp(new ConfigEntry(log2.getName(), ""), OpType.DELETE))), + new AlterConfigOp(new ConfigEntry(log.getName, ""), OpType.DELETE), + new AlterConfigOp(new ConfigEntry(log2.getName, ""), OpType.DELETE))), (broker3, Seq( - new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"), OpType.SUBTRACT), - new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"), OpType.SUBTRACT)))))) + new AlterConfigOp(new ConfigEntry(log.getName, "TRACE"), OpType.SUBTRACT), + new AlterConfigOp(new ConfigEntry(log2.getName, "TRACE"), OpType.SUBTRACT)))))) validateConfigs(admin, Map(broker2 -> Seq( - (log.getName(), initialLog4j.get(broker2).get.get(log.getName())), - (log2.getName(), initialLog4j.get(broker2).get.get(log2.getName()))))) + (log.getName, initialLog4j(broker2).get(log.getName)), + (log2.getName, initialLog4j(broker2).get(log2.getName))))) } finally { admin.close() } diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala index 1d865f916e9..30c9b6d2670 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala @@ -27,7 +27,6 @@ import kafka.coordinator.group.OffsetConfig import kafka.utils.JaasTestUtils.JaasSection import kafka.utils.{JaasTestUtils, TestUtils} import kafka.utils.Implicits._ -import kafka.server.QuorumTestHarness import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.config.SslConfigs @@ -180,7 +179,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT props.put(s"${prefix}${KafkaConfig.SaslJaasConfigProp}", jaasConfig) } - case class ClientMetadata(val listenerName: ListenerName, val saslMechanism: String, topic: String) { + case class ClientMetadata(listenerName: ListenerName, saslMechanism: String, topic: String) { override def hashCode: Int = Objects.hash(listenerName, saslMechanism) override def equals(obj: Any): Boolean = obj match { case other: ClientMetadata => listenerName == other.listenerName && saslMechanism == other.saslMechanism && topic == other.topic diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 0e2280dd705..d1152b022be 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -287,10 +287,10 @@ abstract class QuorumTestHarness extends Logging { val nodeId = Integer.parseInt(props.getProperty(KafkaConfig.NodeIdProp)) val metadataDir = TestUtils.tempDir() val metaProperties = new MetaProperties(Uuid.randomUuid().toString, nodeId) - formatDirectories(immutable.Seq(metadataDir.getAbsolutePath()), metaProperties) + formatDirectories(immutable.Seq(metadataDir.getAbsolutePath), metaProperties) val controllerMetrics = new Metrics() - props.setProperty(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath()) - val proto = controllerListenerSecurityProtocol.toString() + props.setProperty(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath) + val proto = controllerListenerSecurityProtocol.toString props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"CONTROLLER:${proto}") props.setProperty(KafkaConfig.ListenersProp, s"CONTROLLER://localhost:0") props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala index f8dccd17d0d..f1c47ff1bea 100644 --- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala +++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala @@ -83,10 +83,10 @@ class RaftClusterSnapshotTest { ) ) { snapshot => // Check that the snapshot is non-empty - assertTrue(snapshot.hasNext()) + assertTrue(snapshot.hasNext) // Check that we can read the entire snapshot - while (snapshot.hasNext()) { + while (snapshot.hasNext) { val batch = snapshot.next() assertTrue(batch.sizeInBytes > 0) assertNotEquals(Collections.emptyList(), batch.records())