AMBARI-10904. Provide a configurable timeout setting on MetricsTimelineSink.emitMetrics. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5226ae1b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5226ae1b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5226ae1b Branch: refs/heads/trunk Commit: 5226ae1be4ceafb0ca4544a48d69c85edbc4a410 Parents: 0c39d4e Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Mon May 4 18:06:44 2015 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Mon May 4 18:06:44 2015 -0700 ---------------------------------------------------------------------- .../timeline/AbstractTimelineMetricsSink.java | 11 +++- .../cache/HandleConnectExceptionTest.java | 6 ++ .../sink/flume/FlumeTimelineMetricsSink.java | 13 +++-- .../timeline/HadoopTimelineMetricsSink.java | 8 +++ .../kafka/KafkaTimelineMetricsReporter.java | 60 +++++++++++--------- .../storm/StormTimelineMetricsReporter.java | 9 +++ .../sink/storm/StormTimelineMetricsSink.java | 14 ++++- 7 files changed, 85 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index fd4cacd..4b93f50 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -17,26 +17,28 @@ */ package org.apache.hadoop.metrics2.sink.timeline; -import java.io.IOException; -import java.net.ConnectException; - import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.methods.StringRequestEntity; +import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.codehaus.jackson.map.AnnotationIntrospector; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.annotate.JsonSerialize; import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; +import java.io.IOException; +import java.net.ConnectException; public abstract class AbstractTimelineMetricsSink { public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix."; public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize"; public static final String METRICS_SEND_INTERVAL = "sendInterval"; + public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout"; public static final String COLLECTOR_HOST_PROPERTY = "collector"; public static final String COLLECTOR_PORT_PROPERTY = "port"; + protected static final int DEFAULT_POST_TIMEOUT_SECONDS = 10; protected final Log LOG; private HttpClient httpClient = new HttpClient(); @@ -63,6 +65,7 @@ public abstract class AbstractTimelineMetricsSink { PostMethod postMethod = new PostMethod(connectUrl); postMethod.setRequestEntity(requestEntity); + postMethod.setParameter(HttpMethodParams.SO_TIMEOUT, String.valueOf(getTimeoutSeconds() * 1000)); int statusCode = httpClient.executeMethod(postMethod); if (statusCode != 200) { LOG.info("Unable to POST metrics to collector, " + connectUrl); @@ -79,4 +82,6 @@ public abstract class AbstractTimelineMetricsSink { } abstract protected String getCollectorUri(); + + abstract protected int getTimeoutSeconds(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java index 2786e3c..4f9b93e 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java @@ -68,6 +68,12 @@ public class HandleConnectExceptionTest { protected String getCollectorUri() { return COLLECTOR_URL; } + + @Override + protected int getTimeoutSeconds() { + return 10; + } + @Override public void emitMetrics(TimelineMetrics metrics) throws IOException { super.emitMetrics(metrics); http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java index a6137af..1d4c739 100644 --- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java @@ -24,16 +24,15 @@ import org.apache.flume.Context; import org.apache.flume.FlumeException; import org.apache.flume.instrumentation.MonitorService; import org.apache.flume.instrumentation.util.JMXPollUtil; +import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration; import java.io.IOException; import java.net.InetAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -45,8 +44,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - - public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService { private String collectorUri; private TimelineMetricsCache metricsCache; @@ -55,6 +52,7 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem private String hostname; private final static String COUNTER_METRICS_PROPERTY = "counters"; private final Set<String> counterMetrics = new HashSet<String>(); + private int timeoutSeconds = 10; @Override public void start() { @@ -83,6 +81,8 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem throw new FlumeException("Could not identify hostname.", e); } Configuration configuration = new Configuration("/flume-metrics2.properties"); + timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS, + String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS))); int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE, String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT))); int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL, @@ -102,6 +102,11 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem return collectorUri; } + @Override + protected int getTimeoutSeconds() { + return timeoutSeconds; + } + public void setPollFrequency(long pollFrequency) { this.pollFrequency = pollFrequency; } http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index 12230f5..2d171d9 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -56,6 +56,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple private String collectorUri; private static final String SERVICE_NAME_PREFIX = "serviceName-prefix"; private static final String SERVICE_NAME = "serviceName"; + private int timeoutSeconds = 10; @Override public void init(SubsetConfiguration conf) { @@ -91,6 +92,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple LOG.info("Collector Uri: " + collectorUri); + timeoutSeconds = conf.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS); + int maxRowCacheSize = conf.getInt(MAX_METRIC_ROW_CACHE_SIZE, TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT); int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL, @@ -151,6 +154,11 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple } @Override + protected int getTimeoutSeconds() { + return timeoutSeconds; + } + + @Override public void putMetrics(MetricsRecord record) { try { String recordName = record.name(); http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index cc365bd..00a58ee 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -18,27 +18,6 @@ package org.apache.hadoop.metrics2.sink.kafka; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import kafka.metrics.KafkaMetricsConfig; -import kafka.metrics.KafkaMetricsReporter; -import kafka.utils.VerifiableProperties; - -import org.apache.commons.lang.ClassUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; - import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Gauge; @@ -51,9 +30,31 @@ import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.core.Summarizable; import com.yammer.metrics.core.Timer; import com.yammer.metrics.stats.Snapshot; +import kafka.metrics.KafkaMetricsConfig; +import kafka.metrics.KafkaMetricsReporter; +import kafka.utils.VerifiableProperties; +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS; +import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; -public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink implements KafkaMetricsReporter, - KafkaTimelineMetricsReporterMBean { +public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink + implements KafkaMetricsReporter, KafkaTimelineMetricsReporterMBean { private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class); @@ -72,12 +73,18 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im private String hostname; private TimelineScheduledReporter reporter; private TimelineMetricsCache metricsCache; + private int timeoutSeconds = 10; @Override protected String getCollectorUri() { return collectorUri; } + @Override + protected int getTimeoutSeconds() { + return timeoutSeconds; + } + public void setMetricsCache(TimelineMetricsCache metricsCache) { this.metricsCache = metricsCache; } @@ -93,10 +100,9 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im throw new RuntimeException("Could not identify hostname.", e); } KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props); - int metricsSendInterval = Integer.parseInt(props.getString(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, - String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS))); - int maxRowCacheSize = Integer.parseInt(props.getString(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, - String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT))); + timeoutSeconds = props.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS); + int metricsSendInterval = props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS); + int maxRowCacheSize = props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, MAX_RECS_PER_NAME_DEFAULT); String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST); String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT); setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval)); http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index 89fc2ca..04b4b90 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -49,6 +49,7 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink private String collectorUri; private NimbusClient nimbusClient; private String applicationId; + private int timeoutSeconds; public StormTimelineMetricsReporter() { @@ -60,6 +61,11 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink } @Override + protected int getTimeoutSeconds() { + return timeoutSeconds; + } + + @Override public void prepare(Map conf) { LOG.info("Preparing Storm Metrics Reporter"); try { @@ -75,6 +81,9 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink this.nimbusClient = NimbusClient.getConfiguredClient(stormConf); String collectorHostname = cf.get(COLLECTOR_HOST).toString(); String port = cf.get(COLLECTOR_PORT).toString(); + timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ? + Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) : + DEFAULT_POST_TIMEOUT_SECONDS; applicationId = cf.get(APP_ID).toString(); collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics"; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 767695b..b7827bb 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -39,10 +39,13 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.*; + public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer { private String collectorUri; private TimelineMetricsCache metricsCache; private String hostname; + private int timeoutSeconds; @Override protected String getCollectorUri() { @@ -50,6 +53,11 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem } @Override + protected int getTimeoutSeconds() { + return timeoutSeconds; + } + + @Override public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) { LOG.info("Preparing Storm Metrics Sink"); try { @@ -59,10 +67,12 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem throw new RuntimeException("Could not identify hostname.", e); } Configuration configuration = new Configuration("/storm-metrics2.properties"); + timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS, + String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS))); int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE, - String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT))); + String.valueOf(MAX_RECS_PER_NAME_DEFAULT))); int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL, - String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS))); + String.valueOf(MAX_EVICTION_TIME_MILLIS))); metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + ":" + configuration.getProperty(COLLECTOR_PORT_PROPERTY) + "/ws/v1/timeline/metrics"; }