Repository: kafka Updated Branches: refs/heads/0.10.2 e393c9928 -> f0449d324
MINOR: Pass RecordingLevel to MetricConfig in the broker This is a KIP-104/105 follow-up. Thanks to ijuma for pointing out. Author: Eno Thereska <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2350 from enothereska/minor-broker-level-config (cherry picked from commit 1eb1e2f60ae006144757fc9fc10ab423c58970bb) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f0449d32 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f0449d32 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f0449d32 Branch: refs/heads/0.10.2 Commit: f0449d3248801facca442d93552a829e8b8d06a7 Parents: e393c99 Author: Eno Thereska <[email protected]> Authored: Mon Jan 23 14:45:30 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Mon Jan 23 15:18:24 2017 +0000 ---------------------------------------------------------------------- .../main/scala/kafka/server/KafkaConfig.scala | 1 + .../main/scala/kafka/server/KafkaServer.scala | 12 +++-- .../unit/kafka/server/KafkaConfigTest.scala | 4 ++ .../unit/kafka/server/ServerMetricsTest.scala | 51 ++++++++++++++++++++ 4 files changed, 64 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f0449d32/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3c2a72d..c4f54c9 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -941,6 +941,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp) val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter]) + val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp) /** ********* SSL Configuration **************/ val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp) http://git-wip-us.apache.org/repos/asf/kafka/blob/f0449d32/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 79548e8..b5075f9 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -79,9 +79,15 @@ object KafkaServer { logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs) logProps } -} + private[server] def metricConfig(kafkaConfig: KafkaConfig): MetricConfig = { + new MetricConfig() + .samples(kafkaConfig.metricNumSamples) + .recordLevel(Sensor.RecordingLevel.forName(kafkaConfig.metricRecordingLevel)) + .timeWindow(kafkaConfig.metricSampleWindowMs, TimeUnit.MILLISECONDS) + } +} /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -100,9 +106,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP var metrics: Metrics = null - private val metricConfig: MetricConfig = new MetricConfig() - .samples(config.metricNumSamples) - .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS) + private val metricConfig: MetricConfig = KafkaServer.metricConfig(config) val brokerState: BrokerState = new BrokerState http://git-wip-us.apache.org/repos/asf/kafka/blob/f0449d32/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 259178c..0f5ff5d 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -24,6 +24,7 @@ import kafka.cluster.EndPoint import kafka.message._ import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Assert._ @@ -684,6 +685,8 @@ class KafkaConfigTest { //For LogFlushIntervalMsProp defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123") defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString) + // For MetricRecordingLevelProp + defaults.put(KafkaConfig.MetricRecordingLevelProp, Sensor.RecordingLevel.DEBUG.toString) val config = KafkaConfig.fromProps(defaults) assertEquals("127.0.0.1:2181", config.zkConnect) @@ -701,6 +704,7 @@ class KafkaConfigTest { assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis) assertEquals(123L, config.logFlushIntervalMs) assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec) + assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel) } private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f0449d32/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala b/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala new file mode 100755 index 0000000..dc96680 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.common.metrics.Sensor +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +class ServerMetricsTest extends JUnitSuite { + + @Test + def testMetricsConfig(): Unit = { + val recordingLevels = List(Sensor.RecordingLevel.DEBUG, Sensor.RecordingLevel.INFO) + val illegalNames = List("IllegalName", "") + val props = TestUtils.createBrokerConfig(0, "localhost:2818") + + for (recordingLevel <- recordingLevels) { + props.put(KafkaConfig.MetricRecordingLevelProp, recordingLevel.name) + val config = KafkaConfig.fromProps(props) + val metricConfig = KafkaServer.metricConfig(config) + assertEquals(recordingLevel, metricConfig.recordLevel) + } + + for (illegalName <- illegalNames) { + intercept[IllegalArgumentException] { + props.put(KafkaConfig.MetricRecordingLevelProp, illegalName) + val config = KafkaConfig.fromProps(props) + KafkaServer.metricConfig(config) + } + } + + } + +}
