MINOR: Enable a number of xlint scalac warnings Update the code where possible to fix the warnings. The unused warning introduced in Scala 2.12 is quite handy and provides a reason to compile with Scala 2.12.
Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3464 from ijuma/scala-xlint Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1685e711 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1685e711 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1685e711 Branch: refs/heads/trunk Commit: 1685e7112c5d4dc723ffcfa219febaed045b6426 Parents: e391045 Author: Ismael Juma <[email protected]> Authored: Fri Jul 14 11:44:42 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri Jul 14 11:44:42 2017 -0700 ---------------------------------------------------------------------- build.gradle | 22 ++++++++++++++- .../kafka/admin/BrokerApiVersionsCommand.scala | 3 +-- .../kafka/admin/DeleteRecordsCommand.scala | 2 -- .../main/scala/kafka/admin/TopicCommand.scala | 2 +- .../scala/kafka/admin/ZkSecurityMigrator.scala | 2 -- .../kafka/api/ControlledShutdownResponse.scala | 2 +- .../src/main/scala/kafka/api/FetchRequest.scala | 1 - .../scala/kafka/api/TopicMetadataRequest.scala | 2 +- core/src/main/scala/kafka/cluster/Replica.scala | 2 -- .../scala/kafka/common/TopicAndPartition.scala | 1 - .../kafka/consumer/ConsumerConnector.scala | 2 +- .../kafka/consumer/ConsumerFetcherManager.scala | 2 -- .../scala/kafka/consumer/ConsumerIterator.scala | 2 +- .../main/scala/kafka/consumer/KafkaStream.scala | 2 +- .../kafka/consumer/PartitionAssignor.scala | 1 - .../controller/ControllerChannelManager.scala | 20 ++------------ .../kafka/controller/KafkaController.scala | 2 +- ...nsactionMarkerRequestCompletionHandler.scala | 10 +++---- .../kafka/javaapi/TopicMetadataRequest.scala | 2 +- .../scala/kafka/javaapi/producer/Producer.scala | 2 +- .../main/scala/kafka/log/AbstractIndex.scala | 6 ++--- .../scala/kafka/log/LogCleanerManager.scala | 2 +- core/src/main/scala/kafka/log/OffsetIndex.scala | 5 ++-- core/src/main/scala/kafka/log/TimeIndex.scala | 9 +++---- .../scala/kafka/metrics/KafkaMetricsGroup.scala | 4 +-- .../main/scala/kafka/producer/Producer.scala | 2 +- .../kafka/producer/async/EventHandler.scala | 2 +- .../producer/async/ProducerSendThread.scala | 2 +- .../main/scala/kafka/security/auth/Acl.scala | 21 +++++++-------- .../scala/kafka/server/ClientQuotaManager.scala | 2 +- .../scala/kafka/utils/IteratorTemplate.scala | 4 +-- .../main/scala/kafka/utils/KafkaScheduler.scala | 4 +-- .../admin/BrokerApiVersionsCommandTest.scala | 2 +- .../kafka/api/AdminClientIntegrationTest.scala | 2 +- .../kafka/api/ConsumerBounceTest.scala | 2 +- .../kafka/api/EndToEndAuthorizationTest.scala | 28 +++++++++----------- .../kafka/api/EndToEndClusterIdTest.scala | 2 +- .../kafka/api/ProducerBounceTest.scala | 2 +- .../kafka/api/ProducerFailureHandlingTest.scala | 2 +- .../api/RackAwareAutoTopicCreationTest.scala | 2 +- .../api/SaslEndToEndAuthorizationTest.scala | 2 +- .../kafka/api/TransactionsBounceTest.scala | 13 +++++---- .../tools/MirrorMakerIntegrationTest.scala | 4 +-- .../unit/kafka/admin/AddPartitionsTest.scala | 12 ++++----- .../kafka/admin/DeleteConsumerGroupTest.scala | 2 +- .../kafka/admin/DescribeConsumerGroupTest.scala | 3 ++- .../kafka/admin/ListConsumerGroupTest.scala | 3 ++- .../admin/ReassignPartitionsClusterTest.scala | 4 +-- .../admin/ReassignPartitionsCommandTest.scala | 2 +- .../unit/kafka/cluster/BrokerEndPointTest.scala | 2 +- .../ZkNodeChangeNotificationListenerTest.scala | 4 +-- .../kafka/consumer/ConsumerIteratorTest.scala | 2 +- .../ZookeeperConsumerConnectorTest.scala | 4 +-- .../controller/ControllerFailoverTest.scala | 2 +- .../kafka/integration/AutoOffsetResetTest.scala | 2 +- .../unit/kafka/integration/FetcherTest.scala | 2 +- ...MetricsDuringTopicCreationDeletionTest.scala | 2 +- .../kafka/integration/MinIsrConfigTest.scala | 2 +- .../kafka/integration/PrimitiveApiTest.scala | 2 +- .../kafka/integration/TopicMetadataTest.scala | 22 +++++++-------- .../integration/UncleanLeaderElectionTest.scala | 14 +++++----- .../ZookeeperConsumerConnectorTest.scala | 3 ++- .../message/BaseMessageSetTestCases.scala | 2 +- .../scala/unit/kafka/log/TimeIndexTest.scala | 2 +- .../unit/kafka/log/TransactionIndexTest.scala | 4 +-- .../scala/unit/kafka/message/MessageTest.scala | 2 +- .../scala/unit/kafka/metrics/MetricsTest.scala | 2 +- .../unit/kafka/producer/AsyncProducerTest.scala | 2 +- .../unit/kafka/producer/SyncProducerTest.scala | 2 +- .../security/auth/ZkAuthorizationTest.scala | 2 +- .../unit/kafka/server/AdvertiseBrokerTest.scala | 6 ++--- .../unit/kafka/server/ApiVersionsTest.scala | 2 +- .../unit/kafka/server/BaseRequestTest.scala | 2 +- .../kafka/server/DynamicConfigChangeTest.scala | 11 ++++---- .../unit/kafka/server/EdgeCaseRequestTest.scala | 2 +- .../unit/kafka/server/KafkaConfigTest.scala | 2 +- .../unit/kafka/server/LeaderElectionTest.scala | 2 +- .../unit/kafka/server/LogRecoveryTest.scala | 8 +++--- .../unit/kafka/server/ServerStartupTest.scala | 10 +++---- .../epoch/LeaderEpochIntegrationTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 2 +- .../scala/unit/kafka/zk/ZKEphemeralTest.scala | 8 +++--- .../test/scala/unit/kafka/zk/ZKPathTest.scala | 16 +++++------ 83 files changed, 186 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index a493493..2921372 100644 --- a/build.gradle +++ b/build.gradle @@ -309,9 +309,29 @@ subprojects { "-feature", "-language:postfixOps", "-language:implicitConversions", - "-language:existentials" + "-language:existentials", + "-Xlint:by-name-right-associative", + "-Xlint:delayedinit-select", + "-Xlint:doc-detached", + "-Xlint:missing-interpolator", + "-Xlint:nullary-override", + "-Xlint:nullary-unit", + "-Xlint:option-implicit", + "-Xlint:package-object-classes", + "-Xlint:poly-implicit-overload", + "-Xlint:private-shadow", + "-Xlint:stars-align", + "-Xlint:type-parameter-shadow", + "-Xlint:unsound-match", ] + if (versions.baseScala != '2.11') { + scalaCompileOptions.additionalParameters += [ + "-Xlint:constant", + "-Xlint:unused" + ] + } + configure(scalaCompileOptions.forkOptions) { memoryMaximumSize = '1g' jvmArgs = ['-Xss2m'] + maxPermSizeArgs http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index 4aea3c0..b25a8da 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -24,7 +24,6 @@ import kafka.utils.CommandLineUtils import org.apache.kafka.common.utils.Utils import org.apache.kafka.clients.CommonClientConfigs import joptsimple._ -import org.apache.kafka.common.Node import scala.util.{Failure, Success} @@ -41,7 +40,7 @@ object BrokerApiVersionsCommand { val opts = new BrokerVersionCommandOptions(args) val adminClient = createAdminClient(opts) adminClient.awaitBrokers() - var brokerMap = adminClient.listAllBrokerVersionInfo() + val brokerMap = adminClient.listAllBrokerVersionInfo() brokerMap.foreach { case (broker, versionInfoOrError) => versionInfoOrError match { case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n") http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala index 71dae8a..1a3b116 100644 --- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala @@ -28,8 +28,6 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.clients.CommonClientConfigs import joptsimple._ -import scala.util.{Failure, Success} - /** * A command for delete records of the given partitions down to the specified offset. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 9e516b0..882fe21 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -366,7 +366,7 @@ object TopicCommand extends Logging { } } - def askToProceed: Unit = { + def askToProceed(): Unit = { println("Are you sure you want to continue? [y/n]") if (!Console.readLine().equalsIgnoreCase("y")) { println("Ending your session") http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 71153d1..e1d6e02 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -17,8 +17,6 @@ package kafka.admin -import java.util.concurrent.LinkedBlockingQueue - import joptsimple.OptionParser import org.I0Itec.zkclient.exception.ZkException import kafka.utils.{CommandLineUtils, Logging, ZkUtils} http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala index e0a03e8..15992d2 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala @@ -44,7 +44,7 @@ case class ControlledShutdownResponse(correlationId: Int, error: Errors = Errors.NONE, partitionsRemaining: Set[TopicAndPartition]) extends RequestOrResponse() { - def sizeInBytes(): Int ={ + def sizeInBytes: Int = { var size = 4 /* correlation id */ + 2 /* error code */ + http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index ceed815..1f23e40 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -22,7 +22,6 @@ import kafka.api.ApiUtils._ import kafka.common.TopicAndPartition import kafka.consumer.ConsumerConfig import kafka.network.RequestChannel -import kafka.message.MessageSet import java.util.concurrent.atomic.AtomicInteger import java.nio.ByteBuffer import java.util http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/api/TopicMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 6bbcab5..032ff77 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -47,7 +47,7 @@ case class TopicMetadataRequest(versionId: Short, topics.foreach(topic => writeShortString(buffer, topic)) } - def sizeInBytes(): Int = { + def sizeInBytes: Int = { 2 + /* version id */ 4 + /* correlation id */ shortStringLength(clientId) + /* client id */ http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/cluster/Replica.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 8f08089..183dc25 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -22,8 +22,6 @@ import kafka.utils.Logging import kafka.server.{LogOffsetMetadata, LogReadResult} import kafka.common.KafkaException import org.apache.kafka.common.errors.OffsetOutOfRangeException -import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} -import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache} import org.apache.kafka.common.utils.Time class Replica(val brokerId: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/common/TopicAndPartition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala index 35b6bcd..4c94c73 100644 --- a/core/src/main/scala/kafka/common/TopicAndPartition.scala +++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala @@ -1,7 +1,6 @@ package kafka.common import kafka.cluster.{Partition, Replica} -import kafka.utils.Json import org.apache.kafka.common.TopicPartition /** http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/ConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala index 46fbab7..f6d4a74 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala @@ -79,7 +79,7 @@ trait ConsumerConnector { /** * KAFKA-1743: This method added for backward compatibility. */ - def commitOffsets + def commitOffsets() /** * Commit offsets from an external offsets map. http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 51a7a04..7cccfe1 100755 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -45,7 +45,6 @@ class ConsumerFetcherManager(private val consumerIdString: String, extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(Time.SYSTEM.milliseconds), config.clientId, config.numConsumerFetchers) { private var partitionMap: immutable.Map[TopicPartition, PartitionTopicInfo] = null - private var cluster: Cluster = null private val noLeaderPartitionSet = new mutable.HashSet[TopicPartition] private val lock = new ReentrantLock private val cond = lock.newCondition() @@ -126,7 +125,6 @@ class ConsumerFetcherManager(private val consumerIdString: String, inLock(lock) { partitionMap = topicInfos.map(tpi => (new TopicPartition(tpi.topic, tpi.partitionId), tpi)).toMap - this.cluster = cluster noLeaderPartitionSet ++= topicInfos.map(tpi => new TopicPartition(tpi.topic, tpi.partitionId)) cond.signalAll() } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/ConsumerIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 9ca2253..f096c55 100755 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -17,7 +17,7 @@ package kafka.consumer -import kafka.utils.{IteratorTemplate, Logging, CoreUtils} +import kafka.utils.{IteratorTemplate, Logging} import java.util.concurrent.{TimeUnit, BlockingQueue} import kafka.serializer.Decoder import java.util.concurrent.atomic.AtomicReference http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/KafkaStream.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala index faba42f..914cedd 100644 --- a/core/src/main/scala/kafka/consumer/KafkaStream.scala +++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala @@ -37,7 +37,7 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], /** * Create an iterator over messages in the stream. */ - def iterator(): ConsumerIterator[K,V] = iter + def iterator: ConsumerIterator[K,V] = iter /** * This method clears the queue being iterated during the consumer rebalancing. This is mainly http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/PartitionAssignor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 52c3d8b..5d4fb8b 100755 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -17,7 +17,6 @@ package kafka.consumer -import org.I0Itec.zkclient.ZkClient import kafka.common.TopicAndPartition import kafka.utils.{Pool, CoreUtils, ZkUtils, Logging} http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 369da05..ee8fa1e 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -502,33 +502,17 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient, case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null) -class Callbacks private (var leaderAndIsrResponseCallback: AbstractResponse => Unit = null, - var updateMetadataResponseCallback: AbstractResponse => Unit = null, - var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit = null) +class Callbacks private (var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit) object Callbacks { class CallbackBuilder { - var leaderAndIsrResponseCbk: AbstractResponse => Unit = null - var updateMetadataResponseCbk: AbstractResponse => Unit = null var stopReplicaResponseCbk: (AbstractResponse, Int) => Unit = null - def leaderAndIsrCallback(cbk: AbstractResponse => Unit): CallbackBuilder = { - leaderAndIsrResponseCbk = cbk - this - } - - def updateMetadataCallback(cbk: AbstractResponse => Unit): CallbackBuilder = { - updateMetadataResponseCbk = cbk - this - } - def stopReplicaCallback(cbk: (AbstractResponse, Int) => Unit): CallbackBuilder = { stopReplicaResponseCbk = cbk this } - def build: Callbacks = { - new Callbacks(leaderAndIsrResponseCbk, updateMetadataResponseCbk, stopReplicaResponseCbk) - } + def build: Callbacks = new Callbacks(stopReplicaResponseCbk) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 0ba412b..ff47f14 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -613,7 +613,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met def incrementControllerEpoch() = { try { - var newControllerEpoch = controllerContext.epoch + 1 + val newControllerEpoch = controllerContext.epoch + 1 val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists( ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion) if(!updateSucceeded) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index 54960b9..19c37fa 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.WriteTxnMarkersResponse import scala.collection.mutable -import collection.JavaConversions._ +import scala.collection.JavaConverters._ class TransactionMarkerRequestCompletionHandler(brokerId: Int, txnStateManager: TransactionStateManager, @@ -41,7 +41,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, val correlation = requestHeader.correlationId trace(s"Cancelled $api request $requestHeader with correlation id $correlation due to node ${response.destination} being disconnected") - for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) { + for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) { val transactionalId = txnIdAndMarker.txnId val txnMarker = txnIdAndMarker.txnMarkerEntry @@ -82,7 +82,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, txnMarker.producerEpoch, txnMarker.transactionResult, txnMarker.coordinatorEpoch, - txnMarker.partitions.toSet) + txnMarker.partitions.asScala.toSet) } } } @@ -91,7 +91,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse] - for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) { + for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) { val transactionalId = txnIdAndMarker.txnId val txnMarker = txnIdAndMarker.txnMarkerEntry val errors = writeTxnMarkerResponse.errors(txnMarker.producerId) @@ -132,7 +132,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, abortSending = true } else { txnMetadata synchronized { - for ((topicPartition: TopicPartition, error: Errors) <- errors) { + for ((topicPartition, error) <- errors.asScala) { error match { case Errors.NONE => txnMetadata.removePartition(topicPartition) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index efd5405..fdb14cb 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -39,7 +39,7 @@ class TopicMetadataRequest(val versionId: Short, def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer) - def sizeInBytes: Int = underlying.sizeInBytes() + def sizeInBytes: Int = underlying.sizeInBytes override def toString: String = { describe(true) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/javaapi/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala index 44f9245..b0b40b9 100644 --- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala +++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala @@ -48,5 +48,5 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for * Close API to close the producer pool connections to all Kafka brokers. Also closes * the zookeeper client connection if one exists */ - def close = underlying.close + def close() = underlying.close() } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/AbstractIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index a125676..bfc6828 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -37,8 +37,8 @@ import scala.math.ceil * @param baseOffset the base offset of the segment that this index is corresponding to. * @param maxIndexSize The maximum index size in bytes. */ -abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean) - extends Logging { +abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, + val maxIndexSize: Int = -1, val writable: Boolean) extends Logging { protected def entrySize: Int @@ -109,7 +109,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon /* Windows won't let us modify the file length while the file is mmapped :-( */ if (OperatingSystem.IS_WINDOWS) - forceUnmap(mmap); + forceUnmap(mmap) try { raf.setLength(roundedNewSize) mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/LogCleanerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 6e0ebfb..4a4a59f 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -24,7 +24,7 @@ import java.util.concurrent.locks.ReentrantLock import com.yammer.metrics.core.Gauge import kafka.common.LogCleaningAbortedException import kafka.metrics.KafkaMetricsGroup -import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile} +import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.CoreUtils._ import kafka.utils.{Logging, Pool} import org.apache.kafka.common.TopicPartition http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/OffsetIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index e4939e8..53c18fe 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -48,8 +48,9 @@ import kafka.common.InvalidOffsetException * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal * storage format. */ -class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) - extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize, writable) { +// Avoid shadowing mutable `file` in AbstractIndex +class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) + extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) { override def entrySize = 8 http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/TimeIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala index 19ab71a..6c9c32b 100644 --- a/core/src/main/scala/kafka/log/TimeIndex.scala +++ b/core/src/main/scala/kafka/log/TimeIndex.scala @@ -49,11 +49,9 @@ import org.apache.kafka.common.record.RecordBatch * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. * */ -class TimeIndex(file: File, - baseOffset: Long, - maxIndexSize: Int = -1, - writable: Boolean = true) - extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize, writable) with Logging { +// Avoid shadowing mutable file in AbstractIndex +class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) + extends AbstractIndex[Long, Long](_file, baseOffset, maxIndexSize, writable) with Logging { override def entrySize = 12 @@ -206,5 +204,4 @@ class TimeIndex(file: File, "Time index file " + file.getAbsolutePath + " is corrupt, found " + len + " bytes which is not positive or not a multiple of 12.") } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index ca623ae..1894213 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -40,7 +40,7 @@ trait KafkaMetricsGroup extends Logging { * @param tags Additional attributes which mBean will have. * @return Sanitized metric name object. */ - private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = { + private def metricName(name: String, tags: scala.collection.Map[String, String]) = { val klass = this.getClass val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName val simpleName = klass.getSimpleName.replaceAll("\\$$", "") @@ -52,7 +52,7 @@ trait KafkaMetricsGroup extends Logging { } - private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = { + private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String]) = { val nameBuilder: StringBuilder = new StringBuilder nameBuilder.append(group) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 2d2bfdb..d6cf4c8 100755 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -132,7 +132,7 @@ class Producer[K,V](val config: ProducerConfig, KafkaMetricsGroup.removeAllProducerMetrics(config.clientId) if (producerSendThread != null) producerSendThread.shutdown - eventHandler.close + eventHandler.close() info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/producer/async/EventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/EventHandler.scala b/core/src/main/scala/kafka/producer/async/EventHandler.scala index 3a17bfb..44fb1eb 100644 --- a/core/src/main/scala/kafka/producer/async/EventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/EventHandler.scala @@ -33,5 +33,5 @@ trait EventHandler[K,V] { /** * Cleans up and shuts down the event handler */ - def close + def close(): Unit } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 79ed1b8..0377093 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -53,7 +53,7 @@ class ProducerSendThread[K,V](val threadName: String, } } - def shutdown = { + def shutdown(): Unit = { info("Begin shutting down ProducerSendThread") queue.put(shutdownCommand) shutdownLatch.await http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/security/auth/Acl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala index f99a088..b84d75c 100644 --- a/core/src/main/scala/kafka/security/auth/Acl.scala +++ b/core/src/main/scala/kafka/security/auth/Acl.scala @@ -56,21 +56,20 @@ object Acl { if (aclJson == null || aclJson.isEmpty) return collection.immutable.Set.empty[Acl] - var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]() - Json.parseFull(aclJson).foreach { m => + Json.parseFull(aclJson).toSet[Any].flatMap { m => val aclMap = m.asInstanceOf[Map[String, Any]] //the acl json version. require(aclMap(VersionKey) == CurrentVersion) - val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]] - aclSet.foreach(item => { - val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String]) - val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String]) - val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String]) - val host: String = item(HostsKey).asInstanceOf[String] - acls += new Acl(principal, permissionType, host, operation) - }) + val aclSet = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]] + aclSet.map { item => + val principal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String]) + val permissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String]) + val operation = Operation.fromString(item(OperationKey).asInstanceOf[String]) + val host = item(HostsKey).asInstanceOf[String] + new Acl(principal, permissionType, host, operation) + } } - acls.toSet + } def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/server/ClientQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 5c85eef..3970a4b 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -215,7 +215,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, // Compute the delay val clientQuotaEntity = clientSensors.quotaEntity val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId)) - throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).round.toInt + throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).toInt clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/utils/IteratorTemplate.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala b/core/src/main/scala/kafka/utils/IteratorTemplate.scala index 17c152d..7cd161e 100644 --- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala +++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala @@ -42,12 +42,12 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T } def peek(): T = { - if(!hasNext()) + if(!hasNext) throw new NoSuchElementException() nextItem } - def hasNext(): Boolean = { + def hasNext: Boolean = { if(state == FAILED) throw new IllegalStateException("Iterator is in failed state") state match { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/utils/KafkaScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 8e130cf..d20fdd7 100755 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -127,8 +127,8 @@ class KafkaScheduler(val threads: Int, } } - private def ensureRunning = { - if(!isStarted) + private def ensureRunning(): Unit = { + if (!isStarted) throw new IllegalStateException("Kafka scheduler is not running.") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala index 35bdded..00a7c9f 100644 --- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala +++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala @@ -30,7 +30,7 @@ import org.junit.Test class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { - def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps) + def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps) @Test(timeout=120000) def checkBrokerApiVersionCommandOutput() { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 4c74bca..012f254 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -364,7 +364,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { client.close() } - override def generateConfigs() = { + override def generateConfigs = { val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) cfgs.foreach { config => http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index d146e9d..27cafd7 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -59,7 +59,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - override def generateConfigs() = { + override def generateConfigs = { FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false) .map(KafkaConfig.fromProps(_, serverConfig)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 3866cc1..3376d23 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -187,14 +187,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas * Tests the ability of producing and consuming with the appropriate ACLs set. */ @Test - def testProduceConsumeViaAssign { + def testProduceConsumeViaAssign(): Unit = { setAclsAndProduce() consumers.head.assign(List(tp).asJava) consumeRecords(this.consumers.head, numRecords) } @Test - def testProduceConsumeViaSubscribe { + def testProduceConsumeViaSubscribe(): Unit = { setAclsAndProduce() consumers.head.subscribe(List(topic).asJava) consumeRecords(this.consumers.head, numRecords) @@ -215,12 +215,12 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas * isn't set. */ @Test(expected = classOf[TimeoutException]) - def testNoProduceWithoutDescribeAcl { + def testNoProduceWithoutDescribeAcl(): Unit = { sendRecords(numRecords, tp) } @Test - def testNoProduceWithDescribeAcl { + def testNoProduceWithDescribeAcl(): Unit = { AclCommand.main(describeAclArgs) servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource) @@ -239,7 +239,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas * ACL set. */ @Test(expected = classOf[KafkaException]) - def testNoConsumeWithoutDescribeAclViaAssign { + def testNoConsumeWithoutDescribeAclViaAssign(): Unit = { noConsumeWithoutDescribeAclSetup consumers.head.assign(List(tp).asJava) // the exception is expected when the consumer attempts to lookup offsets @@ -247,14 +247,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } @Test(expected = classOf[TimeoutException]) - def testNoConsumeWithoutDescribeAclViaSubscribe { + def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = { noConsumeWithoutDescribeAclSetup consumers.head.subscribe(List(topic).asJava) // this should timeout since the consumer will not be able to fetch any metadata for the topic consumeRecords(this.consumers.head, timeout = 3000) } - private def noConsumeWithoutDescribeAclSetup { + private def noConsumeWithoutDescribeAclSetup(): Unit = { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) servers.foreach { s => @@ -270,13 +270,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } } - - /** - * Tests that a consumer fails to consume messages without the appropriate - * ACL set. - */ + @Test - def testNoConsumeWithDescribeAclViaAssign { + def testNoConsumeWithDescribeAclViaAssign(): Unit = { noConsumeWithDescribeAclSetup consumers.head.assign(List(tp).asJava) @@ -290,7 +286,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } @Test - def testNoConsumeWithDescribeAclViaSubscribe { + def testNoConsumeWithDescribeAclViaSubscribe(): Unit = { noConsumeWithDescribeAclSetup consumers.head.subscribe(List(topic).asJava) @@ -303,7 +299,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } - private def noConsumeWithDescribeAclSetup { + private def noConsumeWithDescribeAclSetup(): Unit = { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) servers.foreach { s => @@ -318,7 +314,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas * ACL set. */ @Test - def testNoGroupAcl { + def testNoGroupAcl(): Unit = { AclCommand.main(produceAclArgs) servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala index 6a4c552..6c61cd9 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala @@ -102,7 +102,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness { val topicAndPartition = new TopicAndPartition(topic, part) this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.api.EndToEndClusterIdTest$MockBrokerMetricsReporter") - override def generateConfigs() = { + override def generateConfigs = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) cfgs.foreach(_.putAll(serverConfig)) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 9fe0e5c..aa92f40 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -51,7 +51,7 @@ class ProducerBounceTest extends KafkaServerTestHarness { // // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems. - override def generateConfigs() = { + override def generateConfigs = { FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true) .map(KafkaConfig.fromProps(_, overridingProps)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 0c44ca9..49a096a 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -49,7 +49,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) - def generateConfigs() = + def generateConfigs = TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps)) private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = null http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala index a2f2041..cb5262d 100644 --- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala +++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala @@ -35,7 +35,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString) overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString) - def generateConfigs() = + def generateConfigs = (0 until numServers) map { node => TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString)) } map (KafkaConfig.fromProps(_, overridingProps)) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index 7e549c8..cc9ee3e 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -53,7 +53,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { * the second one connects ok, but fails to consume messages due to the ACL. */ @Test(timeout = 15000) - def testTwoConsumersWithDifferentSaslCredentials { + def testTwoConsumersWithDifferentSaslCredentials(): Unit = { setAclsAndProduce() val consumer1 = consumers.head http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala index 0e216a2..810f481 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala @@ -22,13 +22,12 @@ import java.util.Properties import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.{ShutdownableThread, TestUtils} -import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} -import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.common.protocol.SecurityProtocol -import org.junit.{Ignore, Test} +import org.junit.Test -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.junit.Assert._ @@ -67,7 +66,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { // // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems. - override def generateConfigs() = { + override def generateConfigs = { FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true) .map(KafkaConfig.fromProps(_, overridingProps)) } @@ -105,7 +104,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { !shouldAbort), new ErrorLoggingCallback(outputTopic, record.key, record.value, true)) } trace(s"Sent ${records.size} messages. Committing offsets.") - producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroup) + producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumerGroup) if (shouldAbort) { trace(s"Committed offsets. Aborting transaction of ${records.size} messages.") @@ -150,7 +149,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = groupId, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props)) - consumer.subscribe(topics) + consumer.subscribe(topics.asJava) consumer } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala index b7b1a12..1f9851d 100644 --- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala @@ -30,8 +30,8 @@ import org.junit.Test class MirrorMakerIntegrationTest extends KafkaServerTestHarness { - override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) - .map(KafkaConfig.fromProps(_, new Properties())) + override def generateConfigs: Seq[KafkaConfig] = + TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties())) @Test def testCommaSeparatedRegex(): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index d08552e..9bc362c 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -64,7 +64,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { } @Test - def testTopicDoesNotExist { + def testTopicDoesNotExist(): Unit = { try { AdminUtils.addPartitions(zkUtils, "Blah", 1) fail("Topic should not exist") @@ -74,7 +74,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { } @Test - def testWrongReplicaCount { + def testWrongReplicaCount(): Unit = { try { AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2") fail("Add partitions should fail") @@ -84,7 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { } @Test - def testIncrementPartitions { + def testIncrementPartitions(): Unit = { AdminUtils.addPartitions(zkUtils, topic1, 3) // wait until leader is elected val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1) @@ -111,7 +111,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { } @Test - def testManualAssignmentOfReplicas { + def testManualAssignmentOfReplicas(): Unit = { AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3") // wait until leader is elected val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1) @@ -139,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { } @Test - def testReplicaPlacementAllServers { + def testReplicaPlacementAllServers(): Unit = { AdminUtils.addPartitions(zkUtils, topic3, 7) // read metadata from a broker and verify the new topic partitions exist @@ -166,7 +166,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { } @Test - def testReplicaPlacementPartialServers { + def testReplicaPlacementPartialServers(): Unit = { AdminUtils.addPartitions(zkUtils, topic2, 3) // read metadata from a broker and verify the new topic partitions exist http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala index aa202bc..a8955f5 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala @@ -28,7 +28,7 @@ import kafka.integration.KafkaServerTestHarness @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0") class DeleteConsumerGroupTest extends KafkaServerTestHarness { - def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps) + def generateConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps) @Test def testGroupWideDeleteInZK() { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 2c09cc4..7000308 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -47,7 +47,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { private var consumerGroupExecutor: ConsumerGroupExecutor = _ // configure the servers and clients - override def generateConfigs() = { + override def generateConfigs = { TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props => KafkaConfig.fromProps(props) } @@ -274,6 +274,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { } } + @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0") private def createOldConsumer(): Unit = { val consumerProps = new Properties consumerProps.setProperty("group.id", group) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala index c03be66..6727fad 100644 --- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala @@ -40,7 +40,8 @@ class ListConsumerGroupTest extends KafkaServerTestHarness { val props = new Properties // configure the servers and clients - override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) + override def generateConfigs = + TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) @Before override def setUp() { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index e3b0aa8..dadd002 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -30,7 +30,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { var servers: Seq[KafkaServer] = null val topicName = "my-topic" val delayMs = 1000 - def zkUpdateDelay = {Thread.sleep(delayMs)} + def zkUpdateDelay(): Unit = Thread.sleep(delayMs) @Before override def setUp() { @@ -49,7 +49,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { } @Test - def shouldMoveSinglePartition { + def shouldMoveSinglePartition(): Unit = { //Given a single replica on server 100 startBrokers(Seq(100, 101)) val partition = 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala index 9e23983..c75c28a 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala @@ -133,7 +133,7 @@ class ReassignPartitionsCommandTest extends Logging { case "topic2" => assertEquals("0:101,0:102", configChange.get(LeaderReplicationThrottledReplicasProp)) assertEquals("0:100", configChange.get(FollowerReplicationThrottledReplicasProp)) - case _ => fail("Unexpected topic $topic") + case _ => fail(s"Unexpected topic $topic") } calls += 1 } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala index 20b7e25..2578243 100644 --- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala @@ -67,7 +67,7 @@ class BrokerEndPointTest extends Logging { } @Test - def testFromJsonV2 { + def testFromJsonV2(): Unit = { val brokerInfoStr = """{ "version":2, "host":"localhost", http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index f7dd40f..368ee0d 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -18,12 +18,12 @@ package kafka.common import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.{TestUtils, ZkUtils} +import kafka.utils.TestUtils import org.junit.Test class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness { - override def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) + override def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) @Test def testProcessNotification() { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 0d38e10..5571e03 100755 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -38,7 +38,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness { val numNodes = 1 - def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps) + def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps) val messages = new mutable.HashMap[Int, Seq[Message]] val topic = "topic" http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index df80d1d..bbf05e4 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -45,8 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging val overridingProps = new Properties() overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) - override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect) - .map(KafkaConfig.fromProps(_, overridingProps)) + override def generateConfigs = + TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) val group = "group1" val consumer0 = "consumer0" http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala index 13b7285..446d8ae 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -40,7 +40,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { val metrics = new Metrics() overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) - override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect) + override def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect) .map(KafkaConfig.fromProps(_, overridingProps)) @After http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 2221d90..fb76ca1 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -31,7 +31,7 @@ import org.junit.Assert._ @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") class AutoOffsetResetTest extends KafkaServerTestHarness with Logging { - def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) + def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) val topic = "test_topic" val group = "default_group" http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 6076089..f23225c 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -32,7 +32,7 @@ import kafka.utils.TestUtils @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0") class FetcherTest extends KafkaServerTestHarness { val numNodes = 1 - def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps) + def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps) val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala index 60a6fb6..bec5026 100644 --- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala @@ -48,7 +48,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with @volatile private var running = true - override def generateConfigs() = TestUtils.createBrokerConfigs(nodesNum, zkConnect) + override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, zkConnect) .map(KafkaConfig.fromProps(_, overridingProps)) @Before http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala index 3977601..455bbde 100644 --- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala +++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala @@ -27,7 +27,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness { val overridingProps = new Properties() overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5") - def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) + def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) @Test def testDefaultKafkaConfig() { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index ff573bc..bc0b81a 100755 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.TopicPartition class PrimitiveApiTest extends ProducerConsumerTestHarness { val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) - def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) + def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) @Test def testFetchRequestCanProperlySerialize() { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 07af590..66103cc 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -17,14 +17,12 @@ package kafka.integration -import java.io.File - import kafka.admin.AdminUtils import kafka.api.TopicMetadataResponse import kafka.client.ClientUtils import kafka.cluster.BrokerEndPoint import kafka.server.{KafkaConfig, KafkaServer, NotRunning} -import kafka.utils.{CoreUtils, TestUtils} +import kafka.utils.TestUtils import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.Errors @@ -59,7 +57,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { } @Test - def testBasicTopicMetadata { + def testBasicTopicMetadata(): Unit = { // create topic val topic = "test" createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) @@ -77,7 +75,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { } @Test - def testGetAllTopicMetadata { + def testGetAllTopicMetadata(): Unit = { // create topic val topic1 = "testGetAllTopicMetadata1" val topic2 = "testGetAllTopicMetadata2" @@ -102,7 +100,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { } @Test - def testAutoCreateTopic { + def testAutoCreateTopic(): Unit = { // auto create topic val topic = "testAutoCreateTopic" var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic", @@ -129,7 +127,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { } @Test - def testAutoCreateTopicWithInvalidReplication { + def testAutoCreateTopicWithInvalidReplication(): Unit = { val adHocProps = createBrokerConfig(2, zkConnect) // Set default replication higher than the number of live brokers adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3") @@ -152,7 +150,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { } @Test - def testAutoCreateTopicWithCollision { + def testAutoCreateTopicWithCollision(): Unit = { // auto create topic val topic1 = "testAutoCreate_Topic" val topic2 = "testAutoCreate.Topic" @@ -212,7 +210,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { } @Test - def testIsrAfterBrokerShutDownAndJoinsBack { + def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = { val numBrokers = 2 //just 2 brokers are enough for the test // start adHoc brokers @@ -260,12 +258,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness { } @Test - def testAliveBrokerListWithNoTopics { + def testAliveBrokerListWithNoTopics(): Unit = { checkMetadata(Seq(server1), 1) } @Test - def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup { + def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup(): Unit = { adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p)) checkMetadata(adHocServers, numConfigs - 1) @@ -278,7 +276,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @Test - def testAliveBrokersListWithNoTopicsAfterABrokerShutdown { + def testAliveBrokersListWithNoTopicsAfterABrokerShutdown(): Unit = { adHocServers = adHocConfigs.map(p => createServer(p)) checkMetadata(adHocServers, numConfigs) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 25ed480..24421d0 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -102,7 +102,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { } @Test - def testUncleanLeaderElectionEnabled { + def testUncleanLeaderElectionEnabled(): Unit = { // enable unclean leader election configProps1.put("unclean.leader.election.enable", "true") configProps2.put("unclean.leader.election.enable", "true") @@ -116,7 +116,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @Test @Ignore // Should be re-enabled after KAFKA-3096 is fixed - def testUncleanLeaderElectionDisabled { + def testUncleanLeaderElectionDisabled(): Unit = { // unclean leader election is disabled by default startBrokers(Seq(configProps1, configProps2)) @@ -127,7 +127,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { } @Test - def testUncleanLeaderElectionEnabledByTopicOverride { + def testUncleanLeaderElectionEnabledByTopicOverride(): Unit = { // disable unclean leader election globally, but enable for our specific test topic configProps1.put("unclean.leader.election.enable", "false") configProps2.put("unclean.leader.election.enable", "false") @@ -144,7 +144,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @Test @Ignore // Should be re-enabled after KAFKA-3096 is fixed - def testCleanLeaderElectionDisabledByTopicOverride { + def testCleanLeaderElectionDisabledByTopicOverride(): Unit = { // enable unclean leader election globally, but disable for our specific test topic configProps1.put("unclean.leader.election.enable", "true") configProps2.put("unclean.leader.election.enable", "true") @@ -160,7 +160,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { } @Test - def testUncleanLeaderElectionInvalidTopicOverride { + def testUncleanLeaderElectionInvalidTopicOverride(): Unit = { startBrokers(Seq(configProps1)) // create topic with an invalid value for unclean leader election @@ -172,7 +172,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { } } - def verifyUncleanLeaderElectionEnabled { + def verifyUncleanLeaderElectionEnabled(): Unit = { // wait until leader is elected val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) @@ -205,7 +205,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { assertEquals(List("first", "third"), consumeAllMessages(topic)) } - def verifyUncleanLeaderElectionDisabled { + def verifyUncleanLeaderElectionDisabled(): Unit = { // wait until leader is elected val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 7d8e0c2..2a0525b 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -45,7 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging val overridingProps = new Properties() overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) - def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) + def generateConfigs = + TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) val group = "group1" val consumer1 = "consumer1" http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala index a53602d..199bbbd 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala @@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet @Test - def testWrittenEqualsRead { + def testWrittenEqualsRead(): Unit = { val messageSet = createMessageSet(messages) assertEquals(messages.toSeq, messageSet.asScala.map(m => m.message)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala index bc60c72..c6112a1 100644 --- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala @@ -35,7 +35,7 @@ class TimeIndexTest extends JUnitSuite { @Before def setup() { - this.idx = new TimeIndex(file = nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12) + this.idx = new TimeIndex(nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12) } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala index 16173eb..9b90e91 100644 --- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala @@ -30,13 +30,13 @@ class TransactionIndexTest extends JUnitSuite { val offset = 0L @Before - def setup: Unit = { + def setup(): Unit = { file = TestUtils.tempFile() index = new TransactionIndex(offset, file) } @After - def teardown: Unit = { + def teardown(): Unit = { index.close() } http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/message/MessageTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index 75a86d2..2390b5b 100755 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -57,7 +57,7 @@ class MessageTest extends JUnitSuite { } @Test - def testFieldValues { + def testFieldValues(): Unit = { for(v <- messages) { // check payload if(v.payload == null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 37c2619..e32f429 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -44,7 +44,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { val overridingProps = new Properties overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) - def generateConfigs() = + def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect, enableDeleteTopic=true).map(KafkaConfig.fromProps(_, overridingProps)) val nMessages = 2 http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index de0f901..6e7353c 100755 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -62,7 +62,7 @@ class AsyncProducerTest { Thread.sleep(500) } - def close {} + def close(): Unit = () } val props = new Properties() http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index cde49de..fa1174d 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -37,7 +37,7 @@ import org.junit.Assert._ class SyncProducerTest extends KafkaServerTestHarness { private val messageBytes = new Array[Byte](2) // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool. - def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head)) + def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head)) private def produceRequest(topic: String, partition: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 4d50cb8..646143c 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -166,7 +166,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { * Tests the migration tool when chroot is being used. */ @Test - def testChroot { + def testChroot(): Unit = { val zkUrl = zkConnect + "/kafka" zkUtils.createPersistentPath("/kafka") val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false)
