Repository: kafka Updated Branches: refs/heads/trunk b31a7a902 -> 48aec6ef1
KAFKA-4441; Monitoring incorrect during topic creation and deletion OfflinePartitionsCount PreferredReplicaImbalanceCount metrics check for topic being deleted Added integration test which polls the metrics while topics are being created and deleted Developed with mimaison Author: Edoardo Comar <eco...@uk.ibm.com> Reviewers: Dong Lin <lindon...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com> Closes #2325 from edoardocomar/KAFKA-4441 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/48aec6ef Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/48aec6ef Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/48aec6ef Branch: refs/heads/trunk Commit: 48aec6ef1e8065bd14e54172d6443144fb80738b Parents: b31a7a9 Author: Edoardo Comar <eco...@uk.ibm.com> Authored: Mon Feb 6 12:23:24 2017 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Feb 6 12:23:24 2017 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/cluster/Replica.scala | 5 +- .../kafka/controller/KafkaController.scala | 10 +- ...MetricsDuringTopicCreationDeletionTest.scala | 161 +++++++++++++++++++ 3 files changed, 172 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/48aec6ef/core/src/main/scala/kafka/cluster/Replica.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 346e5d6..8597b06 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -68,9 +68,9 @@ class Replica(val brokerId: Int, */ def updateLogReadResult(logReadResult : LogReadResult) { if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) - _lastCaughtUpTimeMs = logReadResult.fetchTimeMs + _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs) else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset) - _lastCaughtUpTimeMs = lastFetchTimeMs + _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs) logEndOffset = logReadResult.info.fetchOffsetMetadata lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset @@ -130,6 +130,7 @@ class Replica(val brokerId: Int, replicaString.append("; Topic: " + partition.topic) replicaString.append("; Partition: " + partition.partitionId) replicaString.append("; isLocal: " + isLocal) + replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs) if (isLocal) replicaString.append("; Highwatermark: " + highWatermark) replicaString.toString } http://git-wip-us.apache.org/repos/asf/kafka/blob/48aec6ef/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e38adf8..774316b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -188,7 +188,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState if (!isActive) 0 else - controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader)) + controllerContext.partitionLeadershipInfo.count(p => + (!controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader)) + && (!deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)) + ) } } } @@ -203,7 +206,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState 0 else controllerContext.partitionReplicaAssignment.count { - case (topicPartition, replicas) => controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head + case (topicPartition, replicas) => + (controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head + && (!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic)) + ) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/48aec6ef/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala new file mode 100644 index 0000000..19a0f9d --- /dev/null +++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.integration + +import java.util.Properties +import kafka.server.KafkaConfig +import kafka.utils.{Logging, TestUtils} +import scala.collection.JavaConverters.mapAsScalaMapConverter + +import org.junit.{Before, Test} +import com.yammer.metrics.Metrics +import com.yammer.metrics.core.Gauge + + +class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with Logging { + + private val nodesNum = 3 + private val topicName = "topic" + private val topicNum = 2 + private val replicationFactor = 3 + private val partitionNum = 3 + private val createDeleteIterations = 3 + + private val overridingProps = new Properties + overridingProps.put(KafkaConfig.DeleteTopicEnableProp, "true") + overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, "false") + // speed up the test for UnderReplicatedPartitions + // which relies on the ISR expiry thread to execute concurrently with topic creation + overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, "2000") + + private val testedMetrics = List("OfflinePartitionsCount","PreferredReplicaImbalanceCount","UnderReplicatedPartitions") + private val topics = List.tabulate(topicNum) (n => topicName + n) + + @volatile private var running = true + + override def generateConfigs() = TestUtils.createBrokerConfigs(nodesNum, zkConnect) + .map(KafkaConfig.fromProps(_, overridingProps)) + + @Before + override def setUp { + // Do some Metrics Registry cleanup by removing the metrics that this test checks. + // This is a test workaround to the issue that prior harness runs may have left a populated registry. + // see https://issues.apache.org/jira/browse/KAFKA-4605 + for (m <- (testedMetrics)) { + Metrics.defaultRegistry.allMetrics.asScala + .filterKeys(k => k.getName.endsWith(m)) + .headOption match { + case Some(e) => Metrics.defaultRegistry.removeMetric(e._1) + case None => + } + } + + super.setUp + } + + /* + * checking all metrics we care in a single test is faster though it would be more elegant to have 3 @Test methods + */ + @Test + def testMetricsDuringTopicCreateDelete() { + + // For UnderReplicatedPartitions, because of https://issues.apache.org/jira/browse/KAFKA-4605 + // we can't access the metrics value of each server. So instead we directly invoke the method + // replicaManager.underReplicatedPartitionCount() that defines the metrics value. + @volatile var underReplicatedPartitionCount = 0 + + // For OfflinePartitionsCount and PreferredReplicaImbalanceCount even with https://issues.apache.org/jira/browse/KAFKA-4605 + // the test has worked reliably because the metric that gets triggered is the one generated by the first started server (controller) + val offlinePartitionsCountGauge = getGauge("OfflinePartitionsCount") + @volatile var offlinePartitionsCount = offlinePartitionsCountGauge.value + assert(offlinePartitionsCount == 0) + + val preferredReplicaImbalanceCountGauge = getGauge("PreferredReplicaImbalanceCount") + @volatile var preferredReplicaImbalanceCount = preferredReplicaImbalanceCountGauge.value + assert(preferredReplicaImbalanceCount == 0) + + // Thread checking the metric continuously + running = true + val thread = new Thread(new Runnable { + def run() { + while (running) { + for ( s <- servers if running) { + underReplicatedPartitionCount = s.replicaManager.underReplicatedPartitionCount + if (underReplicatedPartitionCount > 0) { + running = false + } + } + + preferredReplicaImbalanceCount = preferredReplicaImbalanceCountGauge.value + if (preferredReplicaImbalanceCount > 0) { + running = false + } + + offlinePartitionsCount = offlinePartitionsCountGauge.value + if (offlinePartitionsCount > 0) { + running = false + } + } + } + }) + thread.start + + // breakable loop that creates and deletes topics + createDeleteTopics() + + // if the thread checking the gauge is still run, stop it + running = false; + thread.join + + assert(offlinePartitionsCount==0, "OfflinePartitionCount not 0: "+ offlinePartitionsCount) + assert(preferredReplicaImbalanceCount==0, "PreferredReplicaImbalanceCount not 0: " + preferredReplicaImbalanceCount) + assert(underReplicatedPartitionCount==0, "UnderReplicatedPartitionCount not 0: " + underReplicatedPartitionCount) + } + + private def getGauge(metricName: String) = { + Metrics.defaultRegistry.allMetrics.asScala + .filterKeys(k => k.getName.endsWith(metricName)) + .headOption + .getOrElse { fail( "Unable to find metric " + metricName ) } + ._2.asInstanceOf[Gauge[Int]] + } + + private def createDeleteTopics() { + for (l <- 1 to createDeleteIterations if running) { + // Create topics + for (t <- topics if running) { + try { + kafka.admin.AdminUtils.createTopic(zkUtils, t, partitionNum, replicationFactor) + } catch { + case e: Exception => e.printStackTrace + } + } + Thread.sleep(500) + + // Delete topics + for (t <- topics if running) { + try { + kafka.admin.AdminUtils.deleteTopic(zkUtils, t) + } catch { + case e: Exception => e.printStackTrace + } + } + Thread.sleep(500) + } + } +}