Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 2ddbf61be -> 501492c44


AMBARI-19661. Kafka Brokers go down after Wire Encryption. (dsen via swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/501492c4
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/501492c4
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/501492c4

Branch: refs/heads/branch-2.5
Commit: 501492c44ac248067f2e6dd09a9c97015095ee2a
Parents: 2ddbf61
Author: Siddharth Wagle <swa...@hortonworks.com>
Authored: Wed Jan 25 12:33:02 2017 -0800
Committer: Siddharth Wagle <swa...@hortonworks.com>
Committed: Wed Jan 25 12:33:02 2017 -0800

----------------------------------------------------------------------
 .../kafka/KafkaTimelineMetricsReporter.java     | 26 +++++++++++---------
 .../kafka/KafkaTimelineMetricsReporterTest.java | 26 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/501492c4/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
 
b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index fef1f24..5892599 100644
--- 
a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ 
b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -60,17 +60,21 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
 
   private final static Log LOG = 
LogFactory.getLog(KafkaTimelineMetricsReporter.class);
 
-  private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = 
"kafka.timeline.metrics.sendInterval";
-  private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = 
"kafka.timeline.metrics.maxRowCacheSize";
-  private static final String TIMELINE_HOSTS_PROPERTY = 
"kafka.timeline.metrics.hosts";
-  private static final String TIMELINE_PORT_PROPERTY = 
"kafka.timeline.metrics.port";
-  private static final String TIMELINE_PROTOCOL_PROPERTY = 
"kafka.timeline.metrics.protocol";
-  private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = 
"kafka.timeline.metrics.reporter.enabled";
-  private static final String EXCLUDED_METRICS_PROPERTY = 
"external.kafka.metrics.exclude.prefix";
-  private static final String INCLUDED_METRICS_PROPERTY = 
"external.kafka.metrics.include.prefix";
+  private static final String TIMELINE_METRICS_KAFKA_PREFIX = 
"kafka.timeline.metrics.";
+  private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = 
"sendInterval";
+  private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = 
TIMELINE_METRICS_KAFKA_PREFIX + "maxRowCacheSize";
+  private static final String TIMELINE_HOSTS_PROPERTY = 
TIMELINE_METRICS_KAFKA_PREFIX + "hosts";
+  private static final String TIMELINE_PORT_PROPERTY = 
TIMELINE_METRICS_KAFKA_PREFIX + "port";
+  private static final String TIMELINE_PROTOCOL_PROPERTY = 
TIMELINE_METRICS_KAFKA_PREFIX + "protocol";
+  private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = 
TIMELINE_METRICS_KAFKA_PREFIX + "reporter.enabled";
+  private static final String TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY = 
TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PATH_PROPERTY;
+  private static final String TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY = 
TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_TYPE_PROPERTY;
+  private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY 
= TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
   private static final String TIMELINE_DEFAULT_HOST = "localhost";
   private static final String TIMELINE_DEFAULT_PORT = "6188";
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
+  private static final String EXCLUDED_METRICS_PROPERTY = 
"external.kafka.metrics.exclude.prefix";
+  private static final String INCLUDED_METRICS_PROPERTY = 
"external.kafka.metrics.include.prefix";
 
   private volatile boolean initialized = false;
   private boolean running = false;
@@ -159,9 +163,9 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, 
metricsSendInterval));
 
         if (metricCollectorProtocol.contains("https")) {
-          String trustStorePath = 
props.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
-          String trustStoreType = 
props.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
-          String trustStorePwd = 
props.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+          String trustStorePath = 
props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY).trim();
+          String trustStoreType = 
props.getString(TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY).trim();
+          String trustStorePwd = 
props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
           loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
         }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/501492c4/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
 
b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
index c569d88..e1ac48c 100644
--- 
a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ 
b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
@@ -100,6 +100,32 @@ public class KafkaTimelineMetricsReporterTest {
   }
 
   @Test
+  public void testReporterStartStopHttps() {
+    mockStatic(Metrics.class);
+    EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2);
+    TimelineMetricsCache timelineMetricsCache = 
getTimelineMetricsCache(kafkaTimelineMetricsReporter);
+    kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
+    replay(Metrics.class, timelineMetricsCache);
+
+    Properties properties = new Properties();
+    properties.setProperty("zookeeper.connect", "localhost:2181");
+    properties.setProperty("kafka.timeline.metrics.sendInterval", "5900");
+    properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000");
+    properties.setProperty("kafka.timeline.metrics.hosts", "localhost:6188");
+    properties.setProperty("kafka.timeline.metrics.port", "6188");
+    properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
+    properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
+    properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
+    properties.setProperty("kafka.timeline.metrics.protocol", "https");
+    properties.setProperty("kafka.timeline.metrics.truststore.path", "");
+    properties.setProperty("kafka.timeline.metrics.truststore.type", "");
+    properties.setProperty("kafka.timeline.metrics.truststore.password", "");
+    kafkaTimelineMetricsReporter.init(new VerifiableProperties(properties));
+    kafkaTimelineMetricsReporter.stopReporter();
+    verifyAll();
+  }
+
+  @Test
   public void testMetricsExclusionPolicy() throws Exception {
     mockStatic(Metrics.class);
     EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2);

Reply via email to