Repository: kafka
Updated Branches:
  refs/heads/trunk 1756a22f7 -> 54371e63d


MINOR: Make PushHttpMetricsReporter API compatible with releases back to 0.8.2.2

This is follow up to #4072 which added the PushHttpMetricsReporter and 
converted some services to use it. We somehow missed some compatibility issues 
that made the ProducerPerformance tool fail when using a newer tools jar with 
older common/clients jar, which we do with some system tests so we have all the 
features we need in the tool but can build compatibility tests for older 
releases.

This just adjusts some API usage to make the tool compatible with all previous 
releases.

I have a full run of the tests starting 
[here](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1122/)

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes #4214 from ewencp/fix-compatibility-sanity-check-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54371e63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54371e63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54371e63

Branch: refs/heads/trunk
Commit: 54371e63d3c703c932777c2c0f95700c8cb5ba61
Parents: 1756a22
Author: Ewen Cheslack-Postava <[email protected]>
Authored: Wed Nov 15 13:27:58 2017 -0800
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Wed Nov 15 13:27:58 2017 -0800

----------------------------------------------------------------------
 .../kafka/tools/PushHttpMetricsReporter.java    | 29 ++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/54371e63/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java 
b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
index d5839a4..c5764b4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -24,9 +24,9 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,15 +87,15 @@ public class PushHttpMetricsReporter implements 
MetricsReporter {
                     "The URL to report metrics to")
             .define(METRICS_PERIOD_CONFIG, ConfigDef.Type.INT, 
ConfigDef.Importance.HIGH,
                     "The frequency at which metrics should be reported, in 
second")
-            .define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.LOW,
-                    "The hostname to report with each metric; if null, 
defaults to the FQDN that can be automatically" +
+            .define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, "", 
ConfigDef.Importance.LOW,
+                    "The hostname to report with each metric; if empty, 
defaults to the FQDN that can be automatically" +
                             "determined")
             .define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, "", 
ConfigDef.Importance.LOW,
                     "Client ID to identify the application, generally 
inherited from the " +
                             "producer/consumer/streams/connect instance");
 
     public PushHttpMetricsReporter() {
-        time = Time.SYSTEM;
+        time = new SystemTime();
         executor = Executors.newSingleThreadScheduledExecutor();
     }
 
@@ -106,17 +106,17 @@ public class PushHttpMetricsReporter implements 
MetricsReporter {
 
     @Override
     public void configure(Map<String, ?> configs) {
-        AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs, true) 
{ };
+        PushHttpMetricsReporterConfig config = new 
PushHttpMetricsReporterConfig(CONFIG_DEF, configs);
         try {
             url = new URL(config.getString(METRICS_URL_CONFIG));
         } catch (MalformedURLException e) {
             throw new ConfigException("Malformed metrics.url", e);
         }
-        int period = config.getInt(METRICS_PERIOD_CONFIG);
+        int period = config.getInteger(METRICS_PERIOD_CONFIG);
         clientId = config.getString(CLIENT_ID_CONFIG);
 
         host = config.getString(METRICS_HOST_CONFIG);
-        if (host == null) {
+        if (host == null || host.isEmpty()) {
             try {
                 host = InetAddress.getLocalHost().getCanonicalHostName();
             } catch (UnknownHostException e) {
@@ -161,7 +161,7 @@ public class PushHttpMetricsReporter implements 
MetricsReporter {
         try {
             executor.awaitTermination(30, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-            throw new InterruptException("Interrupted when shutting down 
PushHttpMetricsReporter", e);
+            throw new KafkaException("Interrupted when shutting down 
PushHttpMetricsReporter", e);
         }
     }
 
@@ -316,4 +316,17 @@ public class PushHttpMetricsReporter implements 
MetricsReporter {
             return value;
         }
     }
+
+    // The signature for getInt changed from returning int to Integer so to 
remain compatible with 0.8.2.2 jars
+    // for system tests we replace it with a custom version that works for all 
versions.
+    private static class PushHttpMetricsReporterConfig extends AbstractConfig {
+        public PushHttpMetricsReporterConfig(ConfigDef definition, Map<?, ?> 
originals) {
+            super(definition, originals);
+        }
+
+        public Integer getInteger(String key) {
+            return (Integer) get(key);
+        }
+
+    }
 }

Reply via email to