Repository: kafka
Updated Branches:
  refs/heads/0.10.2 e393c9928 -> f0449d324


MINOR: Pass RecordingLevel to MetricConfig in the broker

This is a KIP-104/105 follow-up. Thanks to ijuma for pointing out.

Author: Eno Thereska <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes #2350 from enothereska/minor-broker-level-config

(cherry picked from commit 1eb1e2f60ae006144757fc9fc10ab423c58970bb)
Signed-off-by: Ismael Juma <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f0449d32
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f0449d32
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f0449d32

Branch: refs/heads/0.10.2
Commit: f0449d3248801facca442d93552a829e8b8d06a7
Parents: e393c99
Author: Eno Thereska <[email protected]>
Authored: Mon Jan 23 14:45:30 2017 +0000
Committer: Ismael Juma <[email protected]>
Committed: Mon Jan 23 15:18:24 2017 +0000

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaConfig.scala   |  1 +
 .../main/scala/kafka/server/KafkaServer.scala   | 12 +++--
 .../unit/kafka/server/KafkaConfigTest.scala     |  4 ++
 .../unit/kafka/server/ServerMetricsTest.scala   | 51 ++++++++++++++++++++
 4 files changed, 64 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f0449d32/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3c2a72d..c4f54c9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -941,6 +941,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
   val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
   val metricReporterClasses: java.util.List[MetricsReporter] = 
getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, 
classOf[MetricsReporter])
+  val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)
 
   /** ********* SSL Configuration **************/
   val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f0449d32/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 79548e8..b5075f9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -79,9 +79,15 @@ object KafkaServer {
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, 
kafkaConfig.logMessageTimestampDifferenceMaxMs)
     logProps
   }
-}
 
+  private[server] def metricConfig(kafkaConfig: KafkaConfig): MetricConfig = {
+    new MetricConfig()
+      .samples(kafkaConfig.metricNumSamples)
+      
.recordLevel(Sensor.RecordingLevel.forName(kafkaConfig.metricRecordingLevel))
+      .timeWindow(kafkaConfig.metricSampleWindowMs, TimeUnit.MILLISECONDS)
+  }
 
+}
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all 
functionality required
@@ -100,9 +106,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   var metrics: Metrics = null
 
-  private val metricConfig: MetricConfig = new MetricConfig()
-    .samples(config.metricNumSamples)
-    .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS)
+  private val metricConfig: MetricConfig = KafkaServer.metricConfig(config)
 
   val brokerState: BrokerState = new BrokerState
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f0449d32/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 259178c..0f5ff5d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -24,6 +24,7 @@ import kafka.cluster.EndPoint
 import kafka.message._
 import kafka.utils.{CoreUtils, TestUtils}
 import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Assert._
@@ -684,6 +685,8 @@ class KafkaConfigTest {
     //For LogFlushIntervalMsProp
     defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
     defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, 
SnappyCompressionCodec.codec.toString)
+    // For MetricRecordingLevelProp
+    defaults.put(KafkaConfig.MetricRecordingLevelProp, 
Sensor.RecordingLevel.DEBUG.toString)
 
     val config = KafkaConfig.fromProps(defaults)
     assertEquals("127.0.0.1:2181", config.zkConnect)
@@ -701,6 +704,7 @@ class KafkaConfigTest {
     assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
     assertEquals(123L, config.logFlushIntervalMs)
     assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
+    assertEquals(Sensor.RecordingLevel.DEBUG.toString, 
config.metricRecordingLevel)
   }
 
   private def assertPropertyInvalid(validRequiredProps: => Properties, name: 
String, values: Any*) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f0449d32/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala
new file mode 100755
index 0000000..dc96680
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.metrics.Sensor
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+class ServerMetricsTest extends JUnitSuite {
+
+  @Test
+  def testMetricsConfig(): Unit = {
+    val recordingLevels = List(Sensor.RecordingLevel.DEBUG, 
Sensor.RecordingLevel.INFO)
+    val illegalNames = List("IllegalName", "")
+    val props = TestUtils.createBrokerConfig(0, "localhost:2818")
+
+    for (recordingLevel <- recordingLevels) {
+      props.put(KafkaConfig.MetricRecordingLevelProp, recordingLevel.name)
+      val config = KafkaConfig.fromProps(props)
+      val metricConfig = KafkaServer.metricConfig(config)
+      assertEquals(recordingLevel, metricConfig.recordLevel)
+    }
+
+    for (illegalName <- illegalNames) {
+      intercept[IllegalArgumentException] {
+        props.put(KafkaConfig.MetricRecordingLevelProp, illegalName)
+        val config = KafkaConfig.fromProps(props)
+        KafkaServer.metricConfig(config)
+      }
+    }
+
+  }
+
+}

Reply via email to