Repository: kafka Updated Branches: refs/heads/trunk 2d2e9adb5 -> e39104547
KAFKA-5127; Replace pattern matching with foreach where the case None is ignored Author: Balint Molnar <[email protected]> Reviewers: Vahid Hashemian <[email protected]>, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> Closes #2919 from baluchicken/KAFKA-5127 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e3910454 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e3910454 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e3910454 Branch: refs/heads/trunk Commit: e391045473f258562d55bbc07faafe75ec7213ac Parents: 2d2e9ad Author: Balint Molnar <[email protected]> Authored: Fri Jul 14 09:57:01 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri Jul 14 09:57:14 2017 -0700 ---------------------------------------------------------------------- .../consumer/ZookeeperConsumerConnector.scala | 35 +++++------- .../kafka/controller/KafkaController.scala | 44 +++++++-------- .../kafka/controller/ReplicaStateMachine.scala | 2 +- .../scala/kafka/metrics/KafkaMetricsGroup.scala | 6 +- .../kafka/network/RequestOrResponseSend.scala | 6 +- .../producer/async/DefaultEventHandler.scala | 5 +- .../main/scala/kafka/security/auth/Acl.scala | 26 ++++----- .../main/scala/kafka/server/KafkaServer.scala | 58 ++++++++++---------- .../kafka/tools/ConsumerOffsetChecker.scala | 47 ++++++---------- .../kafka/tools/StateChangeLogMerger.scala | 18 +++--- .../scala/kafka/utils/ReplicationUtils.scala | 18 +++--- core/src/main/scala/kafka/utils/ZkUtils.scala | 48 +++++++--------- .../kafka/api/ConsumerBounceTest.scala | 10 ++-- .../util/ReplicaFetcherMockBlockingSend.scala | 5 +- 14 files changed, 133 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index ba2fce1..cdf730f 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -220,10 +220,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, try { if (config.autoCommitEnable) scheduler.shutdown() - fetcher match { - case Some(f) => f.stopConnections - case None => - } + fetcher.foreach(_.stopConnections()) sendShutdownToAllQueues() if (config.autoCommitEnable) commitOffsets(true) @@ -780,23 +777,21 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, messageStreams: Map[String,List[KafkaStream[_,_]]], queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) { val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten - fetcher match { - case Some(f) => - f.stopConnections - clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) - /** - * here, we need to commit offsets before stopping the consumer from returning any more messages - * from the current data chunk. Since partition ownership is not yet released, this commit offsets - * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition - * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated - * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes - * successfully and the fetchers restart to fetch more data chunks - **/ + fetcher.foreach { f => + f.stopConnections() + clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) + /** + * here, we need to commit offsets before stopping the consumer from returning any more messages + * from the current data chunk. Since partition ownership is not yet released, this commit offsets + * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition + * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated + * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes + * successfully and the fetchers restart to fetch more data chunks + **/ if (config.autoCommitEnable) { info("Committing all offsets after clearing the fetcher queues") commitOffsets(true) } - case None => } } @@ -833,11 +828,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, info("Consumer " + consumerIdString + " selected partitions : " + allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(",")) - fetcher match { - case Some(f) => - f.startConnections(allPartitionInfos, cluster) - case None => - } + fetcher.foreach(_.startConnections(allPartitionInfos, cluster)) } private def reflectPartitionOwnershipDecision(partitionAssignment: Map[TopicAndPartition, ConsumerThreadId]): Boolean = { http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e7f98e5..0ba412b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1273,29 +1273,27 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met override def process(): Unit = { if (!isActive) return // check if this partition is still being reassigned or not - controllerContext.partitionsBeingReassigned.get(topicAndPartition) match { - case Some(reassignedPartitionContext) => - // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object - val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition) - newLeaderAndIsrOpt match { - case Some(leaderAndIsr) => // check if new replicas have joined ISR - val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet - if(caughtUpReplicas == reassignedReplicas) { - // resume the partition reassignment process - info("%d/%d replicas have caught up with the leader for partition %s being reassigned." - .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + - "Resuming partition reassignment") - onPartitionReassignment(topicAndPartition, reassignedPartitionContext) - } - else { - info("%d/%d replicas have caught up with the leader for partition %s being reassigned." - .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + - "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(","))) - } - case None => error("Error handling reassignment of partition %s to replicas %s as it was never created" - .format(topicAndPartition, reassignedReplicas.mkString(","))) - } - case None => + controllerContext.partitionsBeingReassigned.get(topicAndPartition).foreach { reassignedPartitionContext => + // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object + val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition) + newLeaderAndIsrOpt match { + case Some(leaderAndIsr) => // check if new replicas have joined ISR + val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet + if(caughtUpReplicas == reassignedReplicas) { + // resume the partition reassignment process + info("%d/%d replicas have caught up with the leader for partition %s being reassigned." + .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + + "Resuming partition reassignment") + onPartitionReassignment(topicAndPartition, reassignedPartitionContext) + } + else { + info("%d/%d replicas have caught up with the leader for partition %s being reassigned." + .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + + "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(","))) + } + case None => error("Error handling reassignment of partition %s to replicas %s as it was never created" + .format(topicAndPartition, reassignedReplicas.mkString(","))) + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 60b9990..43fac19 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -198,7 +198,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case None => // that means the partition was never in OnlinePartition state, this means the broker never - // started a log for that partition and does not have a high watermark value for this partition + // started a log for that partition and does not have a high watermark value for this partition } } replicaState.put(partitionAndReplica, OnlineReplica) http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index 0847625..ca623ae 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -68,11 +68,7 @@ trait KafkaMetricsGroup extends Logging { val scope: String = KafkaMetricsGroup.toScope(tags).getOrElse(null) val tagsName = KafkaMetricsGroup.toMBeanName(tags) - tagsName match { - case Some(tn) => - nameBuilder.append(",").append(tn) - case None => - } + tagsName.foreach(nameBuilder.append(",").append(_)) new MetricName(group, typeName, name, scope, nameBuilder.toString()) } http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/network/RequestOrResponseSend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala index 1bfbf53..7a14e5e 100644 --- a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala +++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala @@ -27,11 +27,7 @@ import org.apache.kafka.common.network.NetworkSend object RequestOrResponseSend { def serialize(request: RequestOrResponse): ByteBuffer = { val buffer = ByteBuffer.allocate(request.sizeInBytes + request.requestId.fold(0)(_ => 2)) - request.requestId match { - case Some(requestId) => - buffer.putShort(requestId) - case None => - } + request.requestId.foreach(buffer.putShort) request.writeTo(buffer) buffer.rewind() buffer http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 77c3b7d..3e4eaa3 100755 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -114,10 +114,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, case Some(messageSetPerBroker) => val failedTopicPartitions = send(brokerid, messageSetPerBroker) failedTopicPartitions.foreach(topicPartition => { - messagesPerBrokerMap.get(topicPartition) match { - case Some(data) => failedProduceRequests.appendAll(data) - case None => // nothing - } + messagesPerBrokerMap.get(topicPartition).foreach(failedProduceRequests.appendAll) }) case None => // failed to group messages messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/security/auth/Acl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala index c23dd2d..f99a088 100644 --- a/core/src/main/scala/kafka/security/auth/Acl.scala +++ b/core/src/main/scala/kafka/security/auth/Acl.scala @@ -57,20 +57,18 @@ object Acl { return collection.immutable.Set.empty[Acl] var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]() - Json.parseFull(aclJson) match { - case Some(m) => - val aclMap = m.asInstanceOf[Map[String, Any]] - //the acl json version. - require(aclMap(VersionKey) == CurrentVersion) - val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]] - aclSet.foreach(item => { - val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String]) - val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String]) - val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String]) - val host: String = item(HostsKey).asInstanceOf[String] - acls += new Acl(principal, permissionType, host, operation) - }) - case None => + Json.parseFull(aclJson).foreach { m => + val aclMap = m.asInstanceOf[Map[String, Any]] + //the acl json version. + require(aclMap(VersionKey) == CurrentVersion) + val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]] + aclSet.foreach(item => { + val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String]) + val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String]) + val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String]) + val host: String = item(HostsKey).asInstanceOf[String] + acls += new Acl(principal, permissionType, host, operation) + }) } acls.toSet } http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0a87750..cc34e14 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -410,21 +410,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP // Get the current controller info. This is to ensure we use the most recent info to issue the // controlled shutdown request val controllerId = zkUtils.getController() - zkUtils.getBrokerInfo(controllerId) match { - case Some(broker) => - // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous - // attempt, connect to the most recent controller - if (ioException || broker != prevController) { + //If this method returns None ignore and try again + zkUtils.getBrokerInfo(controllerId).foreach { broker => + // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous + // attempt, connect to the most recent controller + if (ioException || broker != prevController) { - ioException = false + ioException = false - if (prevController != null) - networkClient.close(node(prevController).idString) + if (prevController != null) + networkClient.close(node(prevController).idString) - prevController = broker - metadataUpdater.setNodes(Seq(node(prevController)).asJava) - } - case None => //ignore and try again + prevController = broker + metadataUpdater.setNodes(Seq(node(prevController)).asJava) + } } // 2. issue a controlled shutdown to the controller @@ -483,24 +482,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP // Get the current controller info. This is to ensure we use the most recent info to issue the // controlled shutdown request val controllerId = zkUtils.getController() - zkUtils.getBrokerInfo(controllerId) match { - case Some(broker) => - if (channel == null || prevController == null || !prevController.equals(broker)) { - // if this is the first attempt or if the controller has changed, create a channel to the most recent - // controller - if (channel != null) - channel.disconnect() - - val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName) - channel = new BlockingChannel(brokerEndPoint.host, - brokerEndPoint.port, - BlockingChannel.UseDefaultBufferSize, - BlockingChannel.UseDefaultBufferSize, - config.controllerSocketTimeoutMs) - channel.connect() - prevController = broker - } - case None => //ignore and try again + //If this method returns None ignore and try again + zkUtils.getBrokerInfo(controllerId).foreach { broker => + if (channel == null || prevController == null || !prevController.equals(broker)) { + // if this is the first attempt or if the controller has changed, create a channel to the most recent + // controller + if (channel != null) + channel.disconnect() + + val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName) + channel = new BlockingChannel(brokerEndPoint.host, + brokerEndPoint.port, + BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, + config.controllerSocketTimeoutMs) + channel.connect() + prevController = broker + } } // 2. issue a controlled shutdown to the controller http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d5e29ac..87147dc 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -63,17 +63,15 @@ object ConsumerOffsetChecker extends Logging { zkUtils.getLeaderForPartition(topic, producerId) match { case Some(bid) => val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid)) - consumerOpt match { - case Some(consumer) => - val topicAndPartition = TopicAndPartition(topic, producerId) - val request = - OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - - val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), - owner match {case Some(ownerStr) => ownerStr case None => "none"})) - case None => // ignore + consumerOpt.foreach { consumer => + val topicAndPartition = TopicAndPartition(topic, producerId) + val request = + OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + + val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) + println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), + owner match {case Some(ownerStr) => ownerStr case None => "none"})) } case None => println("No broker for partition %s - %s".format(topic, producerId)) @@ -81,22 +79,18 @@ object ConsumerOffsetChecker extends Logging { } private def processTopic(zkUtils: ZkUtils, group: String, topic: String) { - topicPidMap.get(topic) match { - case Some(producerIds) => - producerIds.sorted.foreach { - producerId => processPartition(zkUtils, group, topic, producerId) + topicPidMap.get(topic).foreach { producerIds => + producerIds.sorted.foreach { + producerId => processPartition(zkUtils, group, topic, producerId) } - case None => // ignore } } private def printBrokerInfo() { println("BROKER INFO") for ((bid, consumerOpt) <- consumerMap) - consumerOpt match { - case Some(consumer) => - println("%s -> %s:%d".format(bid, consumer.host, consumer.port)) - case None => // ignore + consumerOpt.foreach { consumer => + println("%s -> %s:%d".format(bid, consumer.host, consumer.port)) } } @@ -197,23 +191,14 @@ object ConsumerOffsetChecker extends Logging { if (options.has("broker-info")) printBrokerInfo() - for ((_, consumerOpt) <- consumerMap) - consumerOpt match { - case Some(consumer) => consumer.close() - case None => // ignore - } + consumerMap.values.flatten.foreach(_.close()) } catch { case t: Throwable => println("Exiting due to: %s.".format(t.getMessage)) } finally { - for (consumerOpt <- consumerMap.values) { - consumerOpt match { - case Some(consumer) => consumer.close() - case None => // ignore - } - } + consumerMap.values.flatten.foreach(_.close()) if (zkUtils != null) zkUtils.close() http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index f2b929a..a3c80d1 100755 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -167,18 +167,14 @@ object StateChangeLogMerger extends Logging { def getNextLine(itr: Iterator[String]): LineIterator = { while (itr != null && itr.hasNext) { val nextLine = itr.next - dateRegex.findFirstIn(nextLine) match { - case Some(d) => - val date = dateFormat.parse(d) - if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) { - topicPartitionRegex.findFirstMatchIn(nextLine) match { - case Some(matcher) => - if ((topic == null || topic == matcher.group(1)) && (partitions.isEmpty || partitions.contains(matcher.group(3).toInt))) - return new LineIterator(nextLine, itr) - case None => - } + dateRegex.findFirstIn(nextLine).foreach { d => + val date = dateFormat.parse(d) + if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) { + topicPartitionRegex.findFirstMatchIn(nextLine).foreach { matcher => + if ((topic == null || topic == matcher.group(1)) && (partitions.isEmpty || partitions.contains(matcher.group(3).toInt))) + return new LineIterator(nextLine, itr) } - case None => + } } } new LineIterator() http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/utils/ReplicationUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index c0cb5aa..fe31d7f 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -53,16 +53,14 @@ object ReplicationUtils extends Logging { val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 val writtenStat = writtenLeaderAndIsrInfo._2 val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat) - writtenLeaderOpt match { - case Some(writtenData) => - val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat) - (expectedLeader,writtenLeader) match { - case (Some(expectedLeader),Some(writtenLeader)) => - if(expectedLeader == writtenLeader) - return (true, writtenStat.getVersion()) - case _ => - } - case None => + writtenLeaderOpt.foreach { writtenData => + val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat) + (expectedLeader,writtenLeader) match { + case (Some(expectedLeader),Some(writtenLeader)) => + if(expectedLeader == writtenLeader) + return (true, writtenStat.getVersion()) + case _ => + } } } catch { case _: Exception => http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index e03893c..0035120 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -187,18 +187,14 @@ object ZkUtils { def parseTopicsData(jsonData: String): Seq[String] = { var topics = List.empty[String] - Json.parseFull(jsonData) match { - case Some(m) => - m.asInstanceOf[Map[String, Any]].get("topics") match { - case Some(partitionsSeq) => - val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]] - mapPartitionSeq.foreach(p => { - val topic = p.get("topic").get.asInstanceOf[String] - topics ++= List(topic) - }) - case None => - } - case None => + Json.parseFull(jsonData).foreach { m => + m.asInstanceOf[Map[String, Any]].get("topics").foreach { partitionsSeq => + val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]] + mapPartitionSeq.foreach(p => { + val topic = p.get("topic").get.asInstanceOf[String] + topics ++= List(topic) + }) + } } topics } @@ -696,9 +692,8 @@ class ZkUtils(val zkClient: ZkClient, def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] for(topicAndPartition <- topicAndPartitions) { - ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition) match { - case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch) - case None => + ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition).foreach { leaderIsrAndControllerEpoch => + ret.put(topicAndPartition, leaderIsrAndControllerEpoch) } } ret @@ -708,21 +703,16 @@ class ZkUtils(val zkClient: ZkClient, val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]] topics.foreach { topic => val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1 - jsonPartitionMapOpt match { - case Some(jsonPartitionMap) => - Json.parseFull(jsonPartitionMap) match { - case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match { - case Some(repl) => - val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]] - for((partition, replicas) <- replicaMap){ - ret.put(TopicAndPartition(topic, partition.toInt), replicas) - debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) - } - case None => - } - case None => + jsonPartitionMapOpt.foreach { jsonPartitionMap => + Json.parseFull(jsonPartitionMap).foreach { m => + m.asInstanceOf[Map[String, Any]].get("partitions").foreach { repl => + val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]] + for((partition, replicas) <- replicaMap){ + ret.put(TopicAndPartition(topic, partition.toInt), replicas) + debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) + } } - case None => + } } } ret http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 2fa4d15..d146e9d 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -386,13 +386,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { info("Closing consumer with timeout " + closeTimeoutMs + " ms.") consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS) val timeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startNanos) - maxCloseTimeMs match { - case Some(ms) => assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs) - case None => + maxCloseTimeMs.foreach { ms => + assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs) } - minCloseTimeMs match { - case Some(ms) => assertTrue("Close finished too quickly " + timeTakenMs, timeTakenMs >= ms) - case None => + minCloseTimeMs.foreach { ms => + assertTrue("Close finished too quickly " + timeTakenMs, timeTakenMs >= ms) } info("consumer.close() completed in " + timeTakenMs + " ms.") }, 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala index e04bd95..0692afb 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala @@ -48,10 +48,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc //Create a suitable response based on the API key val response = requestBuilder.apiKey() match { case ApiKeys.OFFSET_FOR_LEADER_EPOCH => - callback match { - case Some(f) => f() - case None => //nothing - } + callback.foreach(_.apply()) epochFetchCount += 1 new OffsetsForLeaderEpochResponse(offsets)
