Repository: kafka Updated Branches: refs/heads/trunk 5dabca025 -> ed91af512
KAFKA-4756; The auto-generated broker id should be passed to MetricRe⦠â¦porter.configure Author: Colin P. Mccabe <cmcc...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com> Closes #2540 from cmccabe/KAFKA-4756 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed91af51 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed91af51 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed91af51 Branch: refs/heads/trunk Commit: ed91af512fcb5c0ea3032ed259451817c96b5ee1 Parents: 5dabca0 Author: Colin P. Mccabe <cmcc...@confluent.io> Authored: Mon Feb 13 13:46:37 2017 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Feb 13 13:46:37 2017 -0800 ---------------------------------------------------------------------- .../kafka/common/config/AbstractConfig.java | 17 ++++++++++++- .../apache/kafka/common/metrics/Metrics.java | 4 ++++ .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../main/scala/kafka/server/KafkaServer.scala | 23 +++++++++--------- .../KafkaMetricReporterClusterIdTest.scala | 25 +++++++++++++++++++- 5 files changed, 57 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 84047e0..6d985ae 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -233,10 +233,25 @@ public class AbstractConfig { * @return The list of configured instances */ public <T> List<T> getConfiguredInstances(String key, Class<T> t) { + return getConfiguredInstances(key, t, Collections.EMPTY_MAP); + } + + /** + * Get a list of configured instances of the given class specified by the given configuration key. The configuration + * may specify either null or an empty string to indicate no configured instances. In both cases, this method + * returns an empty list to indicate no configured instances. + * @param key The configuration key for the class + * @param t The interface the class should implement + * @param configOverrides Configuration overrides to use. + * @return The list of configured instances + */ + public <T> List<T> getConfiguredInstances(String key, Class<T> t, Map<String, Object> configOverrides) { List<String> klasses = getList(key); List<T> objects = new ArrayList<T>(); if (klasses == null) return objects; + Map<String, Object> configPairs = originals(); + configPairs.putAll(configOverrides); for (Object klass : klasses) { Object o; if (klass instanceof String) { @@ -252,7 +267,7 @@ public class AbstractConfig { if (!t.isInstance(o)) throw new KafkaException(klass + " is not an instance of " + t.getName()); if (o instanceof Configurable) - ((Configurable) o).configure(originals()); + ((Configurable) o).configure(configPairs); objects.add(t.cast(o)); } return objects; http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index 7b303fa..512c18e 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -443,6 +443,10 @@ public class Metrics implements Closeable { return this.metrics; } + public List<MetricsReporter> reporters() { + return this.reporters; + } + public KafkaMetric metric(MetricName metricName) { return this.metrics.get(metricName); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ab7e1ae..0180a2c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -822,6 +822,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp) val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) + val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp) val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp) val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp) @@ -938,7 +939,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra /** ********* Metric Configuration **************/ val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp) val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) - val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter]) val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp) /** ********* SSL Configuration **************/ http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 54431d9..2a247ec 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -101,13 +101,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private var shutdownLatch = new CountDownLatch(1) private val jmxPrefix: String = "kafka.server" - private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses - reporters.add(new JmxReporter(jmxPrefix)) var metrics: Metrics = null - private val metricConfig: MetricConfig = KafkaServer.metricConfig(config) - val brokerState: BrokerState = new BrokerState var apis: KafkaApis = null @@ -182,9 +178,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val canStartup = isStartingUp.compareAndSet(false, true) if (canStartup) { - metrics = new Metrics(metricConfig, reporters, time, true) - quotaManagers = QuotaFactory.instantiate(config, metrics, time) - brokerState.newState(Starting) /* start scheduler */ @@ -197,16 +190,24 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP _clusterId = getOrGenerateClusterId(zkUtils) info(s"Cluster ID = $clusterId") + /* generate brokerId */ + config.brokerId = getBrokerId + this.logIdent = "[Kafka Server " + config.brokerId + "], " + + /* create and configure metrics */ + val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter], + Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava) + reporters.add(new JmxReporter(jmxPrefix)) + val metricConfig = KafkaServer.metricConfig(config) + metrics = new Metrics(metricConfig, reporters, time, true) + + quotaManagers = QuotaFactory.instantiate(config, metrics, time) notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala) /* start log manager */ logManager = createLogManager(zkUtils.zkClient, brokerState) logManager.startup() - /* generate brokerId */ - config.brokerId = getBrokerId - this.logIdent = "[Kafka Server " + config.brokerId + "], " - metadataCache = new MetadataCache(config.brokerId) credentialProvider = new CredentialProvider(config.saslEnabledMechanisms) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala index d235d02..dfcb4ac 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala @@ -28,6 +28,7 @@ import org.junit.{After, Before, Test} import org.apache.kafka.test.TestUtils.isValidClusterId object KafkaMetricReporterClusterIdTest { + val setupError = new AtomicReference[String]("") class MockKafkaMetricsReporter extends KafkaMetricsReporter with ClusterResourceListener { @@ -52,8 +53,26 @@ object KafkaMetricReporterClusterIdTest { override def onUpdate(clusterMetadata: ClusterResource) { MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata) } - } + override def configure(configs: java.util.Map[String, _]): Unit = { + // Check that the configuration passed to the MetricsReporter includes the broker id as an Integer. + // This is a regression test for KAFKA-4756. + // + // Because this code is run during the test setUp phase, if we throw an exception here, + // it just results in the test itself being declared "not found" rather than failing. + // So we track an error message which we will check later in the test body. + val brokerId = configs.get(KafkaConfig.BrokerIdProp) + if (brokerId == null) + setupError.compareAndSet("", "No value was set for the broker id.") + else if (!brokerId.isInstanceOf[String]) + setupError.compareAndSet("", "The value set for the broker id was not a string.") + try + Integer.parseInt(brokerId.asInstanceOf[String]) + catch { + case e: Exception => setupError.compareAndSet("", "Error parsing broker id " + e.toString) + } + } + } } class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness { @@ -66,6 +85,8 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness { val props = TestUtils.createBrokerConfig(1, zkConnect) props.setProperty("kafka.metrics.reporters", "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter") props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter") + props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true") + props.setProperty(KafkaConfig.BrokerIdProp, "-1") config = KafkaConfig.fromProps(props) server = KafkaServerStartable.fromProps(props) server.startup() @@ -73,6 +94,8 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness { @Test def testClusterIdPresent() { + assertEquals("", KafkaMetricReporterClusterIdTest.setupError.get()) + assertNotNull(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META) isValidClusterId(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId())