AMBARI-10904. Provide a configurable timeout setting on 
MetricsTimelineSink.emitMetrics. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5226ae1b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5226ae1b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5226ae1b

Branch: refs/heads/trunk
Commit: 5226ae1be4ceafb0ca4544a48d69c85edbc4a410
Parents: 0c39d4e
Author: Siddharth Wagle <swa...@hortonworks.com>
Authored: Mon May 4 18:06:44 2015 -0700
Committer: Siddharth Wagle <swa...@hortonworks.com>
Committed: Mon May 4 18:06:44 2015 -0700

----------------------------------------------------------------------
 .../timeline/AbstractTimelineMetricsSink.java   | 11 +++-
 .../cache/HandleConnectExceptionTest.java       |  6 ++
 .../sink/flume/FlumeTimelineMetricsSink.java    | 13 +++--
 .../timeline/HadoopTimelineMetricsSink.java     |  8 +++
 .../kafka/KafkaTimelineMetricsReporter.java     | 60 +++++++++++---------
 .../storm/StormTimelineMetricsReporter.java     |  9 +++
 .../sink/storm/StormTimelineMetricsSink.java    | 14 ++++-
 7 files changed, 85 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index fd4cacd..4b93f50 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -17,26 +17,28 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
-import java.io.IOException;
-import java.net.ConnectException;
-
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.codehaus.jackson.map.AnnotationIntrospector;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.net.ConnectException;
 
 public abstract class AbstractTimelineMetricsSink {
   public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = 
"tagsForPrefix.";
   public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
   public static final String METRICS_SEND_INTERVAL = "sendInterval";
+  public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout";
   public static final String COLLECTOR_HOST_PROPERTY = "collector";
   public static final String COLLECTOR_PORT_PROPERTY = "port";
 
+  protected static final int DEFAULT_POST_TIMEOUT_SECONDS = 10;
   protected final Log LOG;
   private HttpClient httpClient = new HttpClient();
 
@@ -63,6 +65,7 @@ public abstract class AbstractTimelineMetricsSink {
 
       PostMethod postMethod = new PostMethod(connectUrl);
       postMethod.setRequestEntity(requestEntity);
+      postMethod.setParameter(HttpMethodParams.SO_TIMEOUT, 
String.valueOf(getTimeoutSeconds() * 1000));
       int statusCode = httpClient.executeMethod(postMethod);
       if (statusCode != 200) {
         LOG.info("Unable to POST metrics to collector, " + connectUrl);
@@ -79,4 +82,6 @@ public abstract class AbstractTimelineMetricsSink {
   }
 
   abstract protected String getCollectorUri();
+
+  abstract protected int getTimeoutSeconds();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 2786e3c..4f9b93e 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -68,6 +68,12 @@ public class HandleConnectExceptionTest {
     protected String getCollectorUri() {
       return COLLECTOR_URL;
     }
+
+    @Override
+    protected int getTimeoutSeconds() {
+      return 10;
+    }
+
     @Override
     public void emitMetrics(TimelineMetrics metrics) throws IOException {
       super.emitMetrics(metrics);

http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index a6137af..1d4c739 100644
--- 
a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -24,16 +24,15 @@ import org.apache.flume.Context;
 import org.apache.flume.FlumeException;
 import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -45,8 +44,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-
-
 public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink 
implements MonitorService {
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
@@ -55,6 +52,7 @@ public class FlumeTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
   private String hostname;
   private final static String COUNTER_METRICS_PROPERTY = "counters";
   private final Set<String> counterMetrics = new HashSet<String>();
+  private int timeoutSeconds = 10;
 
   @Override
   public void start() {
@@ -83,6 +81,8 @@ public class FlumeTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
       throw new FlumeException("Could not identify hostname.", e);
     }
     Configuration configuration = new 
Configuration("/flume-metrics2.properties");
+    timeoutSeconds = 
Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
+        String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
     int maxRowCacheSize = 
Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
         String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
     int metricsSendInterval = 
Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
@@ -102,6 +102,11 @@ public class FlumeTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
     return collectorUri;
   }
 
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
   public void setPollFrequency(long pollFrequency) {
     this.pollFrequency = pollFrequency;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 12230f5..2d171d9 100644
--- 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -56,6 +56,7 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
   private String collectorUri;
   private static final String SERVICE_NAME_PREFIX = "serviceName-prefix";
   private static final String SERVICE_NAME = "serviceName";
+  private int timeoutSeconds = 10;
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -91,6 +92,8 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
 
     LOG.info("Collector Uri: " + collectorUri);
 
+    timeoutSeconds = conf.getInt(METRICS_POST_TIMEOUT_SECONDS, 
DEFAULT_POST_TIMEOUT_SECONDS);
+
     int maxRowCacheSize = conf.getInt(MAX_METRIC_ROW_CACHE_SIZE,
       TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT);
     int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL,
@@ -151,6 +154,11 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
   }
 
   @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  @Override
   public void putMetrics(MetricsRecord record) {
     try {
       String recordName = record.name();

http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
 
b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index cc365bd..00a58ee 100644
--- 
a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ 
b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -18,27 +18,6 @@
 
 package org.apache.hadoop.metrics2.sink.kafka;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import kafka.metrics.KafkaMetricsConfig;
-import kafka.metrics.KafkaMetricsReporter;
-import kafka.utils.VerifiableProperties;
-
-import org.apache.commons.lang.ClassUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Gauge;
@@ -51,9 +30,31 @@ import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.core.Summarizable;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.stats.Snapshot;
+import kafka.metrics.KafkaMetricsConfig;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.utils.VerifiableProperties;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
+import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
-public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink 
implements KafkaMetricsReporter,
-    KafkaTimelineMetricsReporterMBean {
+public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
+    implements KafkaMetricsReporter, KafkaTimelineMetricsReporterMBean {
 
   private final static Log LOG = 
LogFactory.getLog(KafkaTimelineMetricsReporter.class);
 
@@ -72,12 +73,18 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink im
   private String hostname;
   private TimelineScheduledReporter reporter;
   private TimelineMetricsCache metricsCache;
+  private int timeoutSeconds = 10;
 
   @Override
   protected String getCollectorUri() {
     return collectorUri;
   }
 
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
@@ -93,10 +100,9 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink im
           throw new RuntimeException("Could not identify hostname.", e);
         }
         KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
-        int metricsSendInterval = 
Integer.parseInt(props.getString(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY,
-            String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
-        int maxRowCacheSize = 
Integer.parseInt(props.getString(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY,
-            String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+        timeoutSeconds = props.getInt(METRICS_POST_TIMEOUT_SECONDS, 
DEFAULT_POST_TIMEOUT_SECONDS);
+        int metricsSendInterval = 
props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS);
+        int maxRowCacheSize = 
props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, 
MAX_RECS_PER_NAME_DEFAULT);
         String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, 
TIMELINE_DEFAULT_HOST);
         String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, 
TIMELINE_DEFAULT_PORT);
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, 
metricsSendInterval));

http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 89fc2ca..04b4b90 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -49,6 +49,7 @@ public class StormTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
   private String collectorUri;
   private NimbusClient nimbusClient;
   private String applicationId;
+  private int timeoutSeconds;
 
   public StormTimelineMetricsReporter() {
 
@@ -60,6 +61,11 @@ public class StormTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
   }
 
   @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  @Override
   public void prepare(Map conf) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
@@ -75,6 +81,9 @@ public class StormTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
       this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
       String collectorHostname = cf.get(COLLECTOR_HOST).toString();
       String port = cf.get(COLLECTOR_PORT).toString();
+      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
+        Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
+        DEFAULT_POST_TIMEOUT_SECONDS;
       applicationId = cf.get(APP_ID).toString();
       collectorUri = "http://"; + collectorHostname + ":" + port + 
"/ws/v1/timeline/metrics";
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/5226ae1b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 767695b..b7827bb 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -39,10 +39,13 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.*;
+
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink 
implements IMetricsConsumer {
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private String hostname;
+  private int timeoutSeconds;
 
   @Override
   protected String getCollectorUri() {
@@ -50,6 +53,11 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
   }
 
   @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, 
IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
     try {
@@ -59,10 +67,12 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
       throw new RuntimeException("Could not identify hostname.", e);
     }
     Configuration configuration = new 
Configuration("/storm-metrics2.properties");
+    timeoutSeconds = 
Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
+        String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
     int maxRowCacheSize = 
Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
-        String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+        String.valueOf(MAX_RECS_PER_NAME_DEFAULT)));
     int metricsSendInterval = 
Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
-        String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
+        String.valueOf(MAX_EVICTION_TIME_MILLIS)));
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, 
metricsSendInterval);
     collectorUri = "http://"; + 
configuration.getProperty(COLLECTOR_HOST_PROPERTY) + ":" + 
configuration.getProperty(COLLECTOR_PORT_PROPERTY) + "/ws/v1/timeline/metrics";
   }

Reply via email to