Repository: kafka Updated Branches: refs/heads/trunk ff300c9d4 -> ab3560606
KAFKA-3768; Replace all pattern match on boolean value by if/else block. Author: Satendra kumar <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1445 from satendrakumar06/remove_boolean_pattern_match Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ab356060 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ab356060 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ab356060 Branch: refs/heads/trunk Commit: ab356060665b3b6502c7d531366b26e1e0f48f9c Parents: ff300c9 Author: Satendra kumar <[email protected]> Authored: Sat Jun 4 21:24:11 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat Jun 4 21:24:45 2016 +0100 ---------------------------------------------------------------------- .../kafka/controller/KafkaController.scala | 98 ++++++++++---------- .../controller/PartitionLeaderSelector.scala | 32 +++---- .../controller/PartitionStateMachine.scala | 11 +-- .../kafka/controller/ReplicaStateMachine.scala | 15 ++- .../message/ByteBufferBackedInputStream.scala | 24 +++-- .../main/scala/kafka/producer/Producer.scala | 14 +-- core/src/main/scala/kafka/tools/JmxTool.scala | 7 +- .../security/auth/ZkAuthorizationTest.scala | 26 +++--- 8 files changed, 111 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 6c503a5..d533a85 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -564,46 +564,45 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat */ def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas - areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { - case false => - info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + - "reassigned not yet caught up with the leader") - val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet - val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet - //1. Update AR in ZK with OAR + RAR. - updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) - //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). - updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), - newAndOldReplicas.toSeq) - //3. replicas in RAR - OAR -> NewReplica - startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) - info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + - "reassigned to catch up with the leader") - case true => - //4. Wait until all replicas in RAR are in sync with the leader. - val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet - //5. replicas in RAR -> OnlineReplica - reassignedReplicas.foreach { replica => - replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, - replica)), OnlineReplica) - } - //6. Set AR to RAR in memory. - //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and - // a new AR (using RAR) and same isr to every broker in RAR - moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) - //8. replicas in OAR - RAR -> Offline (force those replicas out of isr) - //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted) - stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) - //10. Update AR in ZK with RAR. - updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas) - //11. Update the /admin/reassign_partitions path in ZK to remove this partition. - removePartitionFromReassignedPartitions(topicAndPartition) - info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) - controllerContext.partitionsBeingReassigned.remove(topicAndPartition) - //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) - // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed - deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic)) + if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) { + info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + + "reassigned not yet caught up with the leader") + val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet + val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet + //1. Update AR in ZK with OAR + RAR. + updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) + //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). + updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), + newAndOldReplicas.toSeq) + //3. replicas in RAR - OAR -> NewReplica + startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) + info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + + "reassigned to catch up with the leader") + } else { + //4. Wait until all replicas in RAR are in sync with the leader. + val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet + //5. replicas in RAR -> OnlineReplica + reassignedReplicas.foreach { replica => + replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, + replica)), OnlineReplica) + } + //6. Set AR to RAR in memory. + //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and + // a new AR (using RAR) and same isr to every broker in RAR + moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) + //8. replicas in OAR - RAR -> Offline (force those replicas out of isr) + //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted) + stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) + //10. Update AR in ZK with RAR. + updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas) + //11. Update the /admin/reassign_partitions path in ZK to remove this partition. + removePartitionFromReassignedPartitions(topicAndPartition) + info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) + controllerContext.partitionsBeingReassigned.remove(topicAndPartition) + //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) + // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed + deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic)) } } @@ -853,16 +852,15 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector) } else { // check if the leader is alive or not - controllerContext.liveBrokerIds.contains(currentLeader) match { - case true => - info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + - "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(","))) - // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest - updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas) - case false => - info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + - "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(","))) - partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector) + if (controllerContext.liveBrokerIds.contains(currentLeader)) { + info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + + "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(","))) + // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest + updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas) + } else { + info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + + "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(","))) + partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 9d8b0b6..682ce1d 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -57,8 +57,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion - val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { - case true => + val newLeaderAndIsr = + if (liveBrokersInIsr.isEmpty) { // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration // for unclean leader election. if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils, @@ -67,28 +67,26 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) } - debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicas.mkString(","))) - liveAssignedReplicas.isEmpty match { - case true => - throw new NoReplicaOnlineException(("No replica for partition " + - "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + - " Assigned replicas are: [%s]".format(assignedReplicas)) - case false => - ControllerStats.uncleanLeaderElectionRate.mark() - val newLeader = liveAssignedReplicas.head - warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." - .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) - new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) + if (liveAssignedReplicas.isEmpty) { + throw new NoReplicaOnlineException(("No replica for partition " + + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + + " Assigned replicas are: [%s]".format(assignedReplicas)) + } else { + ControllerStats.uncleanLeaderElectionRate.mark() + val newLeader = liveAssignedReplicas.head + warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." + .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) } - case false => + } else { val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) val newLeader = liveReplicasInIsr.head debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." - .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) + .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) - } + } info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicas) case None => http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index ec03b84..47efc51 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -245,12 +245,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controllerContext.partitionLeadershipInfo.get(topicPartition) match { case Some(currentLeaderIsrAndEpoch) => // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state - controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match { - case true => // leader is alive - partitionState.put(topicPartition, OnlinePartition) - case false => - partitionState.put(topicPartition, OfflinePartition) - } + if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader)) + // leader is alive + partitionState.put(topicPartition, OnlinePartition) + else + partitionState.put(topicPartition, OfflinePartition) case None => partitionState.put(topicPartition, NewPartition) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 2fd8b95..d49b6af 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -330,14 +330,13 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val partition = topicPartition.partition assignedReplicas.foreach { replicaId => val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId) - controllerContext.liveBrokerIds.contains(replicaId) match { - case true => replicaState.put(partitionAndReplica, OnlineReplica) - case false => - // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted. - // This is required during controller failover since during controller failover a broker can go down, - // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side. - replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) - } + if (controllerContext.liveBrokerIds.contains(replicaId)) + replicaState.put(partitionAndReplica, OnlineReplica) + else + // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted. + // This is required during controller failover since during controller failover a broker can go down, + // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side. + replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala b/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala index ce55c16..73dfd34 100644 --- a/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala +++ b/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala @@ -22,21 +22,19 @@ import java.nio.ByteBuffer class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream { override def read():Int = { - buffer.hasRemaining match { - case true => - (buffer.get() & 0xFF) - case false => -1 - } + if (buffer.hasRemaining) + buffer.get() & 0xFF + else + -1 } override def read(bytes:Array[Byte], off:Int, len:Int):Int = { - buffer.hasRemaining match { - case true => - // Read only what's left - val realLen = math.min(len, buffer.remaining()) - buffer.get(bytes, off, realLen) - realLen - case false => -1 - } + if (buffer.hasRemaining) { + // Read only what's left + val realLen = math.min(len, buffer.remaining()) + buffer.get(bytes, off, realLen) + realLen + } else + -1 } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index c11ad21..c2f95ea 100755 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -74,13 +74,14 @@ class Producer[K,V](val config: ProducerConfig, if (hasShutdown.get) throw new ProducerClosedException recordStats(messages) - sync match { - case true => eventHandler.handle(messages) - case false => asyncSend(messages) - } + if (sync) + eventHandler.handle(messages) + else + asyncSend(messages) } } + private def recordStats(messages: Seq[KeyedMessage[K,V]]) { for (message <- messages) { producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark() @@ -95,11 +96,10 @@ class Producer[K,V](val config: ProducerConfig, queue.offer(message) case _ => try { - config.queueEnqueueTimeoutMs < 0 match { - case true => + if (config.queueEnqueueTimeoutMs < 0) { queue.put(message) true - case _ => + } else { queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/tools/JmxTool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 8112f9e..1dcfb19 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -92,9 +92,10 @@ object JmxTool extends Logging { val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]) val numExpectedAttributes: Map[ObjectName, Int] = - attributesWhitelistExists match { - case true => queries.map((_, attributesWhitelist.get.size)).toMap - case false => names.map{(name: ObjectName) => + if (attributesWhitelistExists) + queries.map((_, attributesWhitelist.get.size)).toMap + else { + names.map{(name: ObjectName) => val mbean = mbsc.getMBeanInfo(name) (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index ab5324c..bbec5b1 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -196,14 +196,14 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { // Additionally, we create the consumers znode (not in // securePersistentZkPaths) to make sure that we don't // add ACLs to it. - val secureOpt: String = secondZk.isSecure match { - case true => + val secureOpt: String = + if (secondZk.isSecure) { firstZk.createPersistentPath(ZkUtils.ConsumersPath) "secure" - case false => + } else { secondZk.createPersistentPath(ZkUtils.ConsumersPath) "unsecure" - } + } ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl")) info("Done with migration") for (path <- secondZk.securePersistentZkPaths) { @@ -231,15 +231,17 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { * Verifies ACL. */ private def isAclCorrect(list: java.util.List[ACL], secure: Boolean): Boolean = { - val isListSizeCorrect = secure match { - case true => list.size == 2 - case false => list.size == 1 - } + val isListSizeCorrect = + if (secure) + list.size == 2 + else + list.size == 1 isListSizeCorrect && list.asScala.forall( - secure match { - case true => isAclSecure - case false => isAclUnsecure - }) + if (secure) + isAclSecure + else + isAclUnsecure + ) } /**
