Repository: kafka
Updated Branches:
  refs/heads/trunk b31a7a902 -> 48aec6ef1


KAFKA-4441; Monitoring incorrect during topic creation and deletion

OfflinePartitionsCount PreferredReplicaImbalanceCount metrics check for
topic being deleted

Added integration test which polls the metrics while topics are being
created and deleted

Developed with mimaison

Author: Edoardo Comar <eco...@uk.ibm.com>

Reviewers: Dong Lin <lindon...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Jun 
Rao <jun...@gmail.com>

Closes #2325 from edoardocomar/KAFKA-4441


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/48aec6ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/48aec6ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/48aec6ef

Branch: refs/heads/trunk
Commit: 48aec6ef1e8065bd14e54172d6443144fb80738b
Parents: b31a7a9
Author: Edoardo Comar <eco...@uk.ibm.com>
Authored: Mon Feb 6 12:23:24 2017 -0800
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Feb 6 12:23:24 2017 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Replica.scala |   5 +-
 .../kafka/controller/KafkaController.scala      |  10 +-
 ...MetricsDuringTopicCreationDeletionTest.scala | 161 +++++++++++++++++++
 3 files changed, 172 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/48aec6ef/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index 346e5d6..8597b06 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -68,9 +68,9 @@ class Replica(val brokerId: Int,
    */
   def updateLogReadResult(logReadResult : LogReadResult) {
     if (logReadResult.info.fetchOffsetMetadata.messageOffset >= 
logReadResult.leaderLogEndOffset)
-      _lastCaughtUpTimeMs = logReadResult.fetchTimeMs
+      _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, 
logReadResult.fetchTimeMs)
     else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= 
lastFetchLeaderLogEndOffset)
-      _lastCaughtUpTimeMs = lastFetchTimeMs
+      _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
 
     logEndOffset = logReadResult.info.fetchOffsetMetadata
     lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
@@ -130,6 +130,7 @@ class Replica(val brokerId: Int,
     replicaString.append("; Topic: " + partition.topic)
     replicaString.append("; Partition: " + partition.partitionId)
     replicaString.append("; isLocal: " + isLocal)
+    replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
     if (isLocal) replicaString.append("; Highwatermark: " + highWatermark)
     replicaString.toString
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/48aec6ef/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index e38adf8..774316b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -188,7 +188,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
           if (!isActive)
             0
           else
-            controllerContext.partitionLeadershipInfo.count(p => 
!controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader))
+            controllerContext.partitionLeadershipInfo.count(p => 
+              
(!controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader))
+              && (!deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic))
+            )
         }
       }
     }
@@ -203,7 +206,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
             0
           else
             controllerContext.partitionReplicaAssignment.count {
-              case (topicPartition, replicas) => 
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader 
!= replicas.head
+              case (topicPartition, replicas) => 
+                
(controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader 
!= replicas.head 
+                && 
(!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic))
+                )
             }
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/48aec6ef/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
 
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
new file mode 100644
index 0000000..19a0f9d
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import java.util.Properties
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, TestUtils}
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+import org.junit.{Before, Test}
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Gauge
+
+
+class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness 
with Logging {
+
+  private val nodesNum = 3
+  private val topicName = "topic"
+  private val topicNum = 2
+  private val replicationFactor = 3
+  private val partitionNum = 3
+  private val createDeleteIterations = 3
+  
+  private val overridingProps = new Properties
+  overridingProps.put(KafkaConfig.DeleteTopicEnableProp, "true")
+  overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
+  // speed up the test for UnderReplicatedPartitions 
+  // which relies on the ISR expiry thread to execute concurrently with topic 
creation
+  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, "2000") 
+
+  private val testedMetrics = 
List("OfflinePartitionsCount","PreferredReplicaImbalanceCount","UnderReplicatedPartitions")
+  private val topics = List.tabulate(topicNum) (n => topicName + n)
+
+  @volatile private var running = true
+  
+  override def generateConfigs() = TestUtils.createBrokerConfigs(nodesNum, 
zkConnect)
+    .map(KafkaConfig.fromProps(_, overridingProps))
+
+  @Before
+  override def setUp {
+    // Do some Metrics Registry cleanup by removing the metrics that this test 
checks. 
+    // This is a test workaround to the issue that prior harness runs may have 
left a populated registry.
+    // see https://issues.apache.org/jira/browse/KAFKA-4605
+    for (m <- (testedMetrics)) {
+        Metrics.defaultRegistry.allMetrics.asScala
+        .filterKeys(k => k.getName.endsWith(m))
+        .headOption match {
+           case Some(e) => Metrics.defaultRegistry.removeMetric(e._1)
+           case None =>
+        }
+    }
+    
+    super.setUp
+  }
+
+  /*
+   * checking all metrics we care in a single test is faster though it would 
be more elegant to have 3 @Test methods
+   */
+  @Test
+  def testMetricsDuringTopicCreateDelete() {
+
+    // For UnderReplicatedPartitions, because of 
https://issues.apache.org/jira/browse/KAFKA-4605
+    // we can't access the metrics value of each server. So instead we 
directly invoke the method 
+    // replicaManager.underReplicatedPartitionCount() that defines the metrics 
value.
+    @volatile var underReplicatedPartitionCount = 0
+
+    // For OfflinePartitionsCount and PreferredReplicaImbalanceCount even with 
https://issues.apache.org/jira/browse/KAFKA-4605
+    // the test has worked reliably because the metric that gets triggered is 
the one generated by the first started server (controller)
+    val offlinePartitionsCountGauge = getGauge("OfflinePartitionsCount")
+    @volatile var offlinePartitionsCount = offlinePartitionsCountGauge.value
+    assert(offlinePartitionsCount == 0)
+
+    val preferredReplicaImbalanceCountGauge = 
getGauge("PreferredReplicaImbalanceCount")
+    @volatile var preferredReplicaImbalanceCount = 
preferredReplicaImbalanceCountGauge.value
+    assert(preferredReplicaImbalanceCount == 0)
+
+    // Thread checking the metric continuously
+    running = true
+    val thread = new Thread(new Runnable {
+      def run() {
+        while (running) {
+          for ( s <- servers if running) {
+            underReplicatedPartitionCount = 
s.replicaManager.underReplicatedPartitionCount
+            if (underReplicatedPartitionCount > 0) {
+              running = false
+            }
+          }
+
+          preferredReplicaImbalanceCount = 
preferredReplicaImbalanceCountGauge.value
+          if (preferredReplicaImbalanceCount > 0) {
+             running = false
+          }
+
+          offlinePartitionsCount = offlinePartitionsCountGauge.value
+          if (offlinePartitionsCount > 0) {
+             running = false
+          }
+        }
+      }
+    })
+    thread.start
+
+    // breakable loop that creates and deletes topics
+    createDeleteTopics()
+
+    // if the thread checking the gauge is still run, stop it
+    running = false;
+    thread.join
+    
+    assert(offlinePartitionsCount==0, "OfflinePartitionCount not 0: "+ 
offlinePartitionsCount)
+    assert(preferredReplicaImbalanceCount==0, "PreferredReplicaImbalanceCount 
not 0: " + preferredReplicaImbalanceCount)
+    assert(underReplicatedPartitionCount==0, "UnderReplicatedPartitionCount 
not 0: " + underReplicatedPartitionCount)
+  }
+
+  private def getGauge(metricName: String) = {
+    Metrics.defaultRegistry.allMetrics.asScala
+           .filterKeys(k => k.getName.endsWith(metricName))
+           .headOption
+           .getOrElse { fail( "Unable to find metric " + metricName ) }
+           ._2.asInstanceOf[Gauge[Int]]
+  }
+  
+  private def createDeleteTopics() {
+    for (l <- 1 to createDeleteIterations if running) {
+      // Create topics
+      for (t <- topics if running) {
+        try {
+          kafka.admin.AdminUtils.createTopic(zkUtils, t, partitionNum, 
replicationFactor)
+        } catch {
+          case e: Exception => e.printStackTrace
+        }
+      }
+      Thread.sleep(500)
+
+      // Delete topics
+      for (t <- topics if running) {
+          try {
+              kafka.admin.AdminUtils.deleteTopic(zkUtils, t)
+          } catch {
+          case e: Exception => e.printStackTrace
+          }
+      }
+      Thread.sleep(500)
+    }
+  }
+}

Reply via email to