This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8d1e961 MINOR: Replace unused variables by underscore (#5003) 8d1e961 is described below commit 8d1e96181da777800e82b5c34457dc313539eced Author: Chia-Ping Tsai <chia7...@gmail.com> AuthorDate: Sat May 26 15:54:41 2018 +0800 MINOR: Replace unused variables by underscore (#5003) And remove one unused expression. Reviewers: Ismael Juma <ism...@juma.me.uk> --- core/src/main/scala/kafka/admin/LogDirsCommand.scala | 2 +- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 4 ++-- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 2 +- core/src/main/scala/kafka/controller/PartitionStateMachine.scala | 8 ++++---- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala | 2 +- .../main/scala/kafka/coordinator/group/GroupMetadataManager.scala | 2 +- .../kafka/coordinator/transaction/TransactionStateManager.scala | 2 +- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 4 ++-- core/src/main/scala/kafka/log/ProducerStateManager.scala | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala | 6 +++--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 2 +- core/src/main/scala/kafka/server/ReplicaManager.scala | 4 ++-- core/src/main/scala/kafka/zk/AdminZkClient.scala | 3 +-- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +- 16 files changed, 24 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala index d8e1beb..9257942 100644 --- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala +++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala @@ -65,7 +65,7 @@ object LogDirsCommand { Map( "logDir" -> logDir, "error" -> logDirInfo.error.exceptionName(), - "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, replicaInfo) => + "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) => topicSet.isEmpty || topicSet.contains(topicPartition.topic) }.map { case (topicPartition, replicaInfo) => Map( diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index ed9414b..f765b94 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -256,7 +256,7 @@ object ReassignPartitionsCommand extends Logging { val newReplicas = partitionFields("replicas").to[Seq[Int]] val newLogDirs = partitionFields.get("log_dirs") match { case Some(jsonValue) => jsonValue.to[Seq[String]] - case None => newReplicas.map(r => AnyLogDir) + case None => newReplicas.map(_ => AnyLogDir) } if (newReplicas.size != newLogDirs.size) throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different from " + @@ -569,7 +569,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient, } catch { case t: ExecutionException => t.getCause match { - case e: ReplicaNotAvailableException => None // It is OK if the replica is not available at this moment + case _: ReplicaNotAvailableException => None // It is OK if the replica is not available at this moment case e: Throwable => throw new AdminCommandFailedException(s"Failed to alter dir for $replica", e) } } diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 5d4fb8b..7d49b99 100755 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -79,7 +79,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { if (ctx.consumersForTopic.nonEmpty) { // Collect consumer thread ids across all topics, remove duplicates, and sort to ensure determinism - val allThreadIds = ctx.consumersForTopic.flatMap { case (topic, threadIds) => + val allThreadIds = ctx.consumersForTopic.flatMap { case (_, threadIds) => threadIds }.toSet.toSeq.sorted diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 6805e32..db4c716 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -300,7 +300,7 @@ class PartitionStateMachine(config: KafkaConfig, failedElections.put(partition, getDataResponse.resultException.get) } } - val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (partition, leaderIsrAndControllerEpoch) => + val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch } invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) => @@ -323,12 +323,12 @@ class PartitionStateMachine(config: KafkaConfig, case ControlledShutdownPartitionLeaderElectionStrategy => leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty } } - partitionsWithoutLeaders.foreach { case (partition, leaderAndIsrOpt, recipients) => + partitionsWithoutLeaders.foreach { case (partition, _, _) => val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy" failedElections.put(partition, new StateChangeFailedException(failMsg)) } - val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap - val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap + val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap + val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr( adjustedLeaderAndIsrs, controllerContext.epoch) successfulUpdates.foreach { case (partition, leaderAndIsr) => diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 5fafcc4..c9f0640 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -292,7 +292,7 @@ class ReplicaStateMachine(config: KafkaConfig, Seq[TopicPartition], Map[TopicPartition, Exception]) = { val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk, failedStateReads) = getTopicPartitionStatesFromZk(partitions) - val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (partition, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) } + val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) } val adjustedLeaderAndIsrs = leaderAndIsrsWithReplica.mapValues { leaderAndIsr => val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index c31735b..2787251 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -465,7 +465,7 @@ class GroupMetadataManager(brokerId: Int, } case Some(topicPartitions) => - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicPartitions.map { topicPartition => val partitionData = group.offset(topicPartition) match { case None => new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index da61077..5b82be4 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -140,7 +140,7 @@ class TransactionStateManager(brokerId: Int, val now = time.milliseconds() inReadLock(stateLock) { val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (partition, entry) => + transactionMetadataCache.flatMap { case (_, entry) => entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { case Empty | CompleteCommit | CompleteAbort => true case _ => false diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index af83775..118288b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -755,7 +755,7 @@ class Log(@volatile var dir: File, records = validRecords) // update the producer state - for ((producerId, producerAppendInfo) <- updatedProducers) { + for ((_, producerAppendInfo) <- updatedProducers) { producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) producerStateManager.update(producerAppendInfo) } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index f26a84c..c0ac3b8 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -878,9 +878,9 @@ class LogManager(logDirs: Seq[File], def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values def logsByTopic(topic: String): Seq[Log] = { - (currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, log) => + (currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, _) => topicPartition.topic() == topic - }.map { case (topicPartition, log) => log } + }.map { case (_, log) => log } } /** diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index d2c3b39..abeac6e 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -582,7 +582,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, * Expire any producer ids which have been idle longer than the configured maximum expiration timeout. */ def removeExpiredProducers(currentTimeMs: Long) { - producers.retain { case (producerId, lastEntry) => + producers.retain { case (_, lastEntry) => !isProducerExpired(currentTimeMs, lastEntry) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8c17a82..f86026d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -512,7 +512,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) } else { - fetchContext.foreachPartition((part, data) => { + fetchContext.foreachPartition((part, _) => { erroneous += part -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 30e6c07..ba7203e 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -150,7 +150,7 @@ class ReplicaAlterLogDirsThread(name: String, .filter { case (_, state) => state.isTruncatingLog } .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap - val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty } + val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty } val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() } ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet) @@ -217,7 +217,7 @@ class ReplicaAlterLogDirsThread(name: String, def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[FetchRequest] = { // Only include replica in the fetch request if it is not throttled. - val maxPartitionOpt = partitionMap.filter { case (topicPartition, partitionFetchState) => + val maxPartitionOpt = partitionMap.filter { case (_, partitionFetchState) => partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded }.reduceLeftOption { (left, right) => if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition())) @@ -237,7 +237,7 @@ class ReplicaAlterLogDirsThread(name: String, val logStartOffset = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId).logStartOffset requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize)) } catch { - case e: KafkaStorageException => + case _: KafkaStorageException => partitionsWithError += topicPartition } } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 68fa873..72b6616 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -333,7 +333,7 @@ class ReplicaFetcherThread(name: String, .filter { case (_, state) => state.isTruncatingLog } .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap - val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty } + val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty } debug(s"Build leaderEpoch request $partitionsWithEpoch") val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0518e03..5dbe25b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -560,7 +560,7 @@ class ReplicaManager(val config: KafkaConfig, // 1. the delete records operation on this partition is successful // 2. low watermark of this partition is smaller than the specified offset private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = { - localDeleteRecordsResults.exists{ case (tp, deleteRecordsResult) => + localDeleteRecordsResults.exists{ case (_, deleteRecordsResult) => deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset } } @@ -654,7 +654,7 @@ class ReplicaManager(val config: KafkaConfig, } } catch { - case e: KafkaStorageException => + case _: KafkaStorageException => (absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava)) case t: Throwable => error(s"Error while describing replica in dir $absolutePath", t) diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 6d7df3f..2f8da36 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -372,8 +372,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { */ def changeBrokerConfig(broker: Option[Int], configs: Properties): Unit = { validateBrokerConfig(configs) - val entityName = broker.map(_.toString).getOrElse(ConfigEntityName.Default) - changeEntityConfig(ConfigType.Broker, broker.map(String.valueOf).getOrElse(ConfigEntityName.Default), configs) + changeEntityConfig(ConfigType.Broker, broker.map(_.toString).getOrElse(ConfigEntityName.Default), configs) } /** diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 42f352b..0cf158e 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1315,7 +1315,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean createRecursive(ClusterIdZNode.path, ClusterIdZNode.toJson(proposedClusterId)) proposedClusterId } catch { - case e: NodeExistsException => getClusterId.getOrElse( + case _: NodeExistsException => getClusterId.getOrElse( throw new KafkaException("Failed to get cluster id from Zookeeper. This can happen if /cluster/id is deleted from Zookeeper.")) } } -- To stop receiving notification emails like this one, please contact ij...@apache.org.