AMBARI-19661. Kafka Brokers go down after Wire Encryption. (dsen via swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/114f0e8b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/114f0e8b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/114f0e8b Branch: refs/heads/branch-dev-patch-upgrade Commit: 114f0e8ba660170d700ac3a5b47d6697f9ac90c5 Parents: c00399c Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Wed Jan 25 12:33:21 2017 -0800 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Wed Jan 25 12:33:21 2017 -0800 ---------------------------------------------------------------------- .../kafka/KafkaTimelineMetricsReporter.java | 26 +++++++++++--------- .../kafka/KafkaTimelineMetricsReporterTest.java | 26 ++++++++++++++++++++ 2 files changed, 41 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/114f0e8b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index fef1f24..5892599 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -60,17 +60,21 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class); - private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "kafka.timeline.metrics.sendInterval"; - private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = "kafka.timeline.metrics.maxRowCacheSize"; - private static final String TIMELINE_HOSTS_PROPERTY = "kafka.timeline.metrics.hosts"; - private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port"; - private static final String TIMELINE_PROTOCOL_PROPERTY = "kafka.timeline.metrics.protocol"; - private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled"; - private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix"; - private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix"; + private static final String TIMELINE_METRICS_KAFKA_PREFIX = "kafka.timeline.metrics."; + private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "sendInterval"; + private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "maxRowCacheSize"; + private static final String TIMELINE_HOSTS_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "hosts"; + private static final String TIMELINE_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "port"; + private static final String TIMELINE_PROTOCOL_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "protocol"; + private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "reporter.enabled"; + private static final String TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PATH_PROPERTY; + private static final String TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_TYPE_PROPERTY; + private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY; private static final String TIMELINE_DEFAULT_HOST = "localhost"; private static final String TIMELINE_DEFAULT_PORT = "6188"; private static final String TIMELINE_DEFAULT_PROTOCOL = "http"; + private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix"; + private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix"; private volatile boolean initialized = false; private boolean running = false; @@ -159,9 +163,9 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval)); if (metricCollectorProtocol.contains("https")) { - String trustStorePath = props.getString(SSL_KEYSTORE_PATH_PROPERTY).trim(); - String trustStoreType = props.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim(); - String trustStorePwd = props.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); + String trustStorePath = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY).trim(); + String trustStoreType = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY).trim(); + String trustStorePwd = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); loadTruststore(trustStorePath, trustStoreType, trustStorePwd); } http://git-wip-us.apache.org/repos/asf/ambari/blob/114f0e8b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java index c569d88..e1ac48c 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java @@ -100,6 +100,32 @@ public class KafkaTimelineMetricsReporterTest { } @Test + public void testReporterStartStopHttps() { + mockStatic(Metrics.class); + EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2); + TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter); + kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache); + replay(Metrics.class, timelineMetricsCache); + + Properties properties = new Properties(); + properties.setProperty("zookeeper.connect", "localhost:2181"); + properties.setProperty("kafka.timeline.metrics.sendInterval", "5900"); + properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000"); + properties.setProperty("kafka.timeline.metrics.hosts", "localhost:6188"); + properties.setProperty("kafka.timeline.metrics.port", "6188"); + properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true"); + properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c"); + properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d"); + properties.setProperty("kafka.timeline.metrics.protocol", "https"); + properties.setProperty("kafka.timeline.metrics.truststore.path", ""); + properties.setProperty("kafka.timeline.metrics.truststore.type", ""); + properties.setProperty("kafka.timeline.metrics.truststore.password", ""); + kafkaTimelineMetricsReporter.init(new VerifiableProperties(properties)); + kafkaTimelineMetricsReporter.stopReporter(); + verifyAll(); + } + + @Test public void testMetricsExclusionPolicy() throws Exception { mockStatic(Metrics.class); EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2);