Repository: kafka
Updated Branches:
  refs/heads/trunk 5dabca025 -> ed91af512


KAFKA-4756; The auto-generated broker id should be passed to MetricRe…

…porter.configure

Author: Colin P. Mccabe <cmcc...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com>

Closes #2540 from cmccabe/KAFKA-4756


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed91af51
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed91af51
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed91af51

Branch: refs/heads/trunk
Commit: ed91af512fcb5c0ea3032ed259451817c96b5ee1
Parents: 5dabca0
Author: Colin P. Mccabe <cmcc...@confluent.io>
Authored: Mon Feb 13 13:46:37 2017 -0800
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Feb 13 13:46:37 2017 -0800

----------------------------------------------------------------------
 .../kafka/common/config/AbstractConfig.java     | 17 ++++++++++++-
 .../apache/kafka/common/metrics/Metrics.java    |  4 ++++
 .../main/scala/kafka/server/KafkaConfig.scala   |  2 +-
 .../main/scala/kafka/server/KafkaServer.scala   | 23 +++++++++---------
 .../KafkaMetricReporterClusterIdTest.scala      | 25 +++++++++++++++++++-
 5 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 84047e0..6d985ae 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -233,10 +233,25 @@ public class AbstractConfig {
      * @return The list of configured instances
      */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
+        return getConfiguredInstances(key, t, Collections.EMPTY_MAP);
+    }
+
+    /**
+     * Get a list of configured instances of the given class specified by the 
given configuration key. The configuration
+     * may specify either null or an empty string to indicate no configured 
instances. In both cases, this method
+     * returns an empty list to indicate no configured instances.
+     * @param key The configuration key for the class
+     * @param t The interface the class should implement
+     * @param configOverrides Configuration overrides to use.
+     * @return The list of configured instances
+     */
+    public <T> List<T> getConfiguredInstances(String key, Class<T> t, 
Map<String, Object> configOverrides) {
         List<String> klasses = getList(key);
         List<T> objects = new ArrayList<T>();
         if (klasses == null)
             return objects;
+        Map<String, Object> configPairs = originals();
+        configPairs.putAll(configOverrides);
         for (Object klass : klasses) {
             Object o;
             if (klass instanceof String) {
@@ -252,7 +267,7 @@ public class AbstractConfig {
             if (!t.isInstance(o))
                 throw new KafkaException(klass + " is not an instance of " + 
t.getName());
             if (o instanceof Configurable)
-                ((Configurable) o).configure(originals());
+                ((Configurable) o).configure(configPairs);
             objects.add(t.cast(o));
         }
         return objects;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 7b303fa..512c18e 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -443,6 +443,10 @@ public class Metrics implements Closeable {
         return this.metrics;
     }
 
+    public List<MetricsReporter> reporters() {
+        return this.reporters;
+    }
+
     public KafkaMetric metric(MetricName metricName) {
         return this.metrics.get(metricName);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ab7e1ae..0180a2c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -822,6 +822,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   val brokerIdGenerationEnable: Boolean = 
getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
   var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
+
   val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
   val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
   val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
@@ -938,7 +939,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   /** ********* Metric Configuration **************/
   val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
   val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
-  val metricReporterClasses: java.util.List[MetricsReporter] = 
getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, 
classOf[MetricsReporter])
   val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)
 
   /** ********* SSL Configuration **************/

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 54431d9..2a247ec 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -101,13 +101,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   private var shutdownLatch = new CountDownLatch(1)
 
   private val jmxPrefix: String = "kafka.server"
-  private val reporters: java.util.List[MetricsReporter] = 
config.metricReporterClasses
-  reporters.add(new JmxReporter(jmxPrefix))
 
   var metrics: Metrics = null
 
-  private val metricConfig: MetricConfig = KafkaServer.metricConfig(config)
-
   val brokerState: BrokerState = new BrokerState
 
   var apis: KafkaApis = null
@@ -182,9 +178,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
       val canStartup = isStartingUp.compareAndSet(false, true)
       if (canStartup) {
-        metrics = new Metrics(metricConfig, reporters, time, true)
-        quotaManagers = QuotaFactory.instantiate(config, metrics, time)
-
         brokerState.newState(Starting)
 
         /* start scheduler */
@@ -197,16 +190,24 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         _clusterId = getOrGenerateClusterId(zkUtils)
         info(s"Cluster ID = $clusterId")
 
+        /* generate brokerId */
+        config.brokerId =  getBrokerId
+        this.logIdent = "[Kafka Server " + config.brokerId + "], "
+
+        /* create and configure metrics */
+        val reporters = 
config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, 
classOf[MetricsReporter],
+            Map[String, AnyRef](KafkaConfig.BrokerIdProp -> 
(config.brokerId.toString)).asJava)
+        reporters.add(new JmxReporter(jmxPrefix))
+        val metricConfig = KafkaServer.metricConfig(config)
+        metrics = new Metrics(metricConfig, reporters, time, true)
+
+        quotaManagers = QuotaFactory.instantiate(config, metrics, time)
         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
 
         /* start log manager */
         logManager = createLogManager(zkUtils.zkClient, brokerState)
         logManager.startup()
 
-        /* generate brokerId */
-        config.brokerId =  getBrokerId
-        this.logIdent = "[Kafka Server " + config.brokerId + "], "
-
         metadataCache = new MetadataCache(config.brokerId)
         credentialProvider = new 
CredentialProvider(config.saslEnabledMechanisms)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed91af51/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
index d235d02..dfcb4ac 100755
--- 
a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
@@ -28,6 +28,7 @@ import org.junit.{After, Before, Test}
 import org.apache.kafka.test.TestUtils.isValidClusterId
 
 object KafkaMetricReporterClusterIdTest {
+  val setupError = new AtomicReference[String]("")
 
   class MockKafkaMetricsReporter extends KafkaMetricsReporter with 
ClusterResourceListener {
 
@@ -52,8 +53,26 @@ object KafkaMetricReporterClusterIdTest {
     override def onUpdate(clusterMetadata: ClusterResource) {
       MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata)
     }
-  }
 
+    override def configure(configs: java.util.Map[String, _]): Unit = {
+      // Check that the configuration passed to the MetricsReporter includes 
the broker id as an Integer.
+      // This is a regression test for KAFKA-4756.
+      //
+      // Because this code is run during the test setUp phase, if we throw an 
exception here,
+      // it just results in the test itself being declared "not found" rather 
than failing.
+      // So we track an error message which we will check later in the test 
body.
+      val brokerId = configs.get(KafkaConfig.BrokerIdProp)
+      if (brokerId == null)
+        setupError.compareAndSet("", "No value was set for the broker id.")
+      else if (!brokerId.isInstanceOf[String])
+        setupError.compareAndSet("", "The value set for the broker id was not 
a string.")
+      try
+        Integer.parseInt(brokerId.asInstanceOf[String])
+      catch {
+        case e: Exception => setupError.compareAndSet("", "Error parsing 
broker id " + e.toString)
+      }
+    }
+  }
 }
 
 class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
@@ -66,6 +85,8 @@ class KafkaMetricReporterClusterIdTest extends 
ZooKeeperTestHarness {
     val props = TestUtils.createBrokerConfig(1, zkConnect)
     props.setProperty("kafka.metrics.reporters", 
"kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
     props.setProperty(KafkaConfig.MetricReporterClassesProp, 
"kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
+    props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
+    props.setProperty(KafkaConfig.BrokerIdProp, "-1")
     config = KafkaConfig.fromProps(props)
     server = KafkaServerStartable.fromProps(props)
     server.startup()
@@ -73,6 +94,8 @@ class KafkaMetricReporterClusterIdTest extends 
ZooKeeperTestHarness {
 
   @Test
   def testClusterIdPresent() {
+    assertEquals("", KafkaMetricReporterClusterIdTest.setupError.get())
+
     
assertNotNull(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META)
     
isValidClusterId(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId())
 

Reply via email to