AMBARI-13173. Provide Kafka metrics filter to reduce write volume. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3b1fb3e3 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3b1fb3e3 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3b1fb3e3 Branch: refs/heads/branch-2.1.2 Commit: 3b1fb3e3452c1f9c6f81202012af58874b8e370c Parents: 69cce9c Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Tue Sep 22 17:39:44 2015 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Tue Sep 22 17:40:02 2015 -0700 ---------------------------------------------------------------------- .../kafka/KafkaTimelineMetricsReporter.java | 72 +- .../kafka/KafkaTimelineMetricsReporterTest.java | 66 +- .../sink/kafka/ScheduledReporterTest.java | 31 +- .../ambari-metrics-storm-sink/pom.xml | 2 +- .../sink/storm/StormTimelineMetricsSink.java | 2 +- .../server/upgrade/UpgradeCatalog212.java | 19 +- .../package/files/service-metrics/KAFKA.txt | 1058 ++---------------- .../KAFKA/configuration/kafka-broker.xml | 14 + 8 files changed, 225 insertions(+), 1039 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index a259864..dd7604b 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -33,8 +33,9 @@ import com.yammer.metrics.stats.Snapshot; import kafka.metrics.KafkaMetricsConfig; import kafka.metrics.KafkaMetricsReporter; import kafka.utils.VerifiableProperties; -import org.apache.commons.lang.ClassUtils; import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; @@ -46,7 +47,11 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -64,6 +69,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private static final String TIMELINE_HOST_PROPERTY = "kafka.timeline.metrics.host"; private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port"; private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled"; + private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix"; + private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix"; private static final String TIMELINE_DEFAULT_HOST = "localhost"; private static final String TIMELINE_DEFAULT_PORT = "8188"; @@ -76,6 +83,11 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private TimelineMetricsCache metricsCache; private int timeoutSeconds = 10; + private String[] excludedMetricsPrefixes; + private String[] includedMetricsPrefixes; + // Local cache to avoid prefix matching everytime + private Set<String> excludedMetrics = new HashSet<>(); + @Override protected String getCollectorUri() { return collectorUri; @@ -108,14 +120,28 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT); setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval)); collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics"; + + // Exclusion policy + String excludedMetricsStr = props.getString(EXCLUDED_METRICS_PROPERTY, ""); + if (!StringUtils.isEmpty(excludedMetricsStr.trim())) { + excludedMetricsPrefixes = excludedMetricsStr.trim().split(","); + } + // Inclusion override + String includedMetricsStr = props.getString(INCLUDED_METRICS_PROPERTY, ""); + if (!StringUtils.isEmpty(includedMetricsStr.trim())) { + includedMetricsPrefixes = includedMetricsStr.trim().split(","); + } + initializeReporter(); if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) { startReporter(metricsConfig.pollingIntervalSecs()); } - if (LOG.isTraceEnabled()) { - LOG.trace("CollectorUri = " + collectorUri); - LOG.trace("MetricsSendInterval = " + metricsSendInterval); - LOG.trace("MaxRowCacheSize = " + maxRowCacheSize); + if (LOG.isDebugEnabled()) { + LOG.debug("CollectorUri = " + collectorUri); + LOG.debug("MetricsSendInterval = " + metricsSendInterval); + LOG.debug("MaxRowCacheSize = " + maxRowCacheSize); + LOG.debug("Excluded metrics prefixes = " + excludedMetricsStr); + LOG.debug("Included metrics prefixes = " + includedMetricsStr); } } } @@ -156,6 +182,24 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink public List<TimelineMetric> getTimelineMetricList(); } + protected boolean isExcludedMetric(String metricName) { + if (excludedMetrics.contains(metricName)) { + return true; + } + if (LOG.isTraceEnabled()) { + LOG.trace("metricName => " + metricName + + ", exclude: " + StringUtils.startsWithAny(metricName, excludedMetricsPrefixes) + + ", include: " + StringUtils.startsWithAny(metricName, includedMetricsPrefixes)); + } + if (StringUtils.startsWithAny(metricName, excludedMetricsPrefixes)) { + if (!StringUtils.startsWithAny(metricName, includedMetricsPrefixes)) { + excludedMetrics.add(metricName); + return true; + } + } + return false; + } + class TimelineScheduledReporter extends ScheduledReporter implements MetricProcessor<Context> { private static final String APP_ID = "kafka_broker"; @@ -187,21 +231,21 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink final MetricName metricName = entry.getKey(); final Metric metric = entry.getValue(); Context context = new Context() { - public List<TimelineMetric> getTimelineMetricList() { return metricsList; } - }; metric.processWith(this, metricName, context); } } catch (Throwable t) { LOG.error("Exception processing Kafka metric", t); } - if (LOG.isTraceEnabled()) { - LOG.trace("Metrics List size: " + metricsList.size()); - LOG.trace("Metics Set size: " + metrics.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("Metrics List size: " + metricsList.size()); + LOG.debug("Metics Set size: " + metrics.size()); + LOG.debug("Excluded metrics set: " + excludedMetrics); } + if (!metricsList.isEmpty()) { TimelineMetrics timelineMetrics = new TimelineMetrics(); timelineMetrics.setMetrics(metricsList); @@ -333,7 +377,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink final String ninetyNinthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, NINETY_NINTH_PERCENTILE_SUFIX, snapshot.get99thPercentile()); final String ninetyNinePointNinePercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, - NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, snapshot.get999thPercentile()); + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, snapshot.get999thPercentile()); return new String[] { medianName, ninetyEighthPercentileName, ninetyFifthPercentileName, ninetyNinePointNinePercentileName, @@ -343,7 +387,11 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private String cacheSanitizedTimelineMetric(long currentTimeMillis, String sanitizedName, String suffix, Number metricValue) { final String meterName = sanitizedName + suffix; final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, meterName, metricValue); - metricsCache.putTimelineMetric(metric); + // Skip cache if we decide not to include the metric + // Cannot do this before calculations of percentiles + if (!isExcludedMetric(meterName)) { + metricsCache.putTimelineMetric(metric); + } return meterName; } http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java index 67c61e1..70f4850 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java @@ -18,18 +18,16 @@ package org.apache.hadoop.metrics2.sink.kafka; -import static org.mockito.Mockito.mock; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replay; -import static org.powermock.api.easymock.PowerMock.verifyAll; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.Timer; +import junit.framework.Assert; import kafka.utils.VerifiableProperties; - import org.apache.commons.httpclient.HttpClient; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; @@ -40,19 +38,19 @@ import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; - -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.Counter; -import com.yammer.metrics.core.Gauge; -import com.yammer.metrics.core.Histogram; -import com.yammer.metrics.core.Meter; -import com.yammer.metrics.core.Metric; -import com.yammer.metrics.core.MetricsRegistry; -import com.yammer.metrics.core.Timer; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.verifyAll; @RunWith(PowerMockRunner.class) -@PrepareForTest({ Metrics.class, HttpClient.class }) -@PowerMockIgnore("javax.management.*") +@PrepareForTest({ Metrics.class, HttpClient.class, + KafkaTimelineMetricsReporter.TimelineScheduledReporter.class }) +@PowerMockIgnore({"javax.management.*", "org.apache.log4j.*", "org.slf4j.*"}) public class KafkaTimelineMetricsReporterTest { private final List<Metric> list = new ArrayList<Metric>(); @@ -81,6 +79,8 @@ public class KafkaTimelineMetricsReporterTest { properties.setProperty("kafka.timeline.metrics.host", "localhost"); properties.setProperty("kafka.timeline.metrics.port", "8188"); properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true"); + properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c"); + properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d"); props = new VerifiableProperties(properties); } @@ -98,6 +98,27 @@ public class KafkaTimelineMetricsReporterTest { verifyAll(); } + @Test + public void testMetricsExclusionPolicy() throws Exception { + mockStatic(Metrics.class); + EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2); + TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter); + kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache); + HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class); + kafkaTimelineMetricsReporter.setHttpClient(httpClient); + + replay(Metrics.class, httpClient, timelineMetricsCache); + kafkaTimelineMetricsReporter.init(props); + + Assert.assertTrue(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c")); + Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b")); + Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d")); + Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d.e")); + + kafkaTimelineMetricsReporter.stopReporter(); + verifyAll(); + } + private TimelineMetricsCache getTimelineMetricsCache(KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter) { TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class); kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache); @@ -106,4 +127,5 @@ public class KafkaTimelineMetricsReporterTest { EasyMock.expectLastCall().once(); return timelineMetricsCache; } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java index 41f9126..de8026f 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java @@ -18,10 +18,17 @@ package org.apache.hadoop.metrics2.sink.kafka; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.Timer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import java.util.ArrayList; import java.util.HashMap; @@ -31,18 +38,10 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.yammer.metrics.core.Counter; -import com.yammer.metrics.core.Gauge; -import com.yammer.metrics.core.Histogram; -import com.yammer.metrics.core.Meter; -import com.yammer.metrics.core.Metric; -import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.MetricsRegistry; -import com.yammer.metrics.core.Timer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class ScheduledReporterTest { private final Gauge gauge = mock(Gauge.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-storm-sink/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/pom.xml b/ambari-metrics/ambari-metrics-storm-sink/pom.xml index c666de0..d3ffe22 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/pom.xml +++ b/ambari-metrics/ambari-metrics-storm-sink/pom.xml @@ -32,7 +32,7 @@ limitations under the License. <properties> <!--<storm.version>0.9.3.2.2.1.0-2340</storm.version>--> - <storm.version>0.10.0.2.3.2.0-2650</storm.version> + <storm.version>0.10.0.2.3.0.0-2557</storm.version> </properties> <build> http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 36339c5..3a49e0a 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -82,7 +82,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); for (DataPoint dataPoint : dataPoints) { if (dataPoint.value != null && NumberUtils.isNumber(dataPoint.value.toString())) { - LOG.info(dataPoint.name + " = " + dataPoint.value); + LOG.debug(dataPoint.name + " = " + dataPoint.value); TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp, taskInfo.srcComponentId, dataPoint.name, dataPoint.value.toString()); // Put intermediate values into the cache until it is time to send http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java index 610ab14..cab9d3c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java @@ -119,7 +119,7 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog { private void executeTopologyDDLUpdates() throws AmbariException, SQLException { dbAccessor.addColumn(TOPOLOGY_REQUEST_TABLE, new DBColumnInfo(TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, - Long.class, null, null, true)); + Long.class, null, null, true)); // TOPOLOGY_REQUEST_CLUSTER_NAME_COLUMN will be deleted in PreDML. We need a cluster name to set cluster id. // dbAccessor.dropColumn(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_NAME_COLUMN); // dbAccessor.setColumnNullable(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, false); @@ -140,7 +140,7 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog { dbAccessor.dropColumn(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_NAME_COLUMN); dbAccessor.setColumnNullable(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, false); dbAccessor.addFKConstraint(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_ID_FK_CONSTRAINT_NAME, - TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, CLUSTERS_TABLE, CLUSTERS_TABLE_CLUSTER_ID_COLUMN, false); + TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, CLUSTERS_TABLE, CLUSTERS_TABLE_CLUSTER_ID_COLUMN, false); } /** @@ -196,6 +196,21 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog { updateHiveConfigs(); updateOozieConfigs(); updateHbaseAndClusterConfigurations(); + updateKafkaConfigurations(); + } + + protected void updateKafkaConfigurations() throws AmbariException { + Map<String, String> properties = new HashMap<>(); + properties.put("external.kafka.metrics.exclude.prefix", + "kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory," + + "kafka.server.BrokerTopicMetrics.BytesRejectedPerSec"); + properties.put("external.kafka.metrics.include.prefix", + "kafka.network.RequestMetrics.ResponseQueueTimeMs.request.OffsetCommit.98percentile," + + "kafka.network.RequestMetrics.ResponseQueueTimeMs.request.Offsets.95percentile," + + "kafka.network.RequestMetrics.ResponseSendTimeMs.request.Fetch.95percentile," + + "kafka.network.RequestMetrics.RequestsPerSec.request"); + + updateConfigurationProperties("kafka-broker", properties, false, false); } protected void updateHbaseAndClusterConfigurations() throws AmbariException {