Repository: ambari
Updated Branches:
  refs/heads/branch-metrics-dev e78210d58 -> 5649e80bb


AMBARI-5707. Adjust TimelineMetricAggregators for execution time delay. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5649e80b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5649e80b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5649e80b

Branch: refs/heads/branch-metrics-dev
Commit: 5649e80bbd573e53f18baf07f6dd8a3ec034cb3c
Parents: e78210d
Author: Siddharth Wagle <swa...@hortonworks.com>
Authored: Thu Oct 23 15:01:15 2014 -0700
Committer: Siddharth Wagle <swa...@hortonworks.com>
Committed: Thu Oct 23 15:01:15 2014 -0700

----------------------------------------------------------------------
 .../timeline/AbstractTimelineAggregator.java    | 27 +++++++++++++++++---
 1 file changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5649e80b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index e702fc0..2630846 100644
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -36,7 +36,7 @@ public abstract class AbstractTimelineAggregator implements 
Runnable {
   protected final String CHECKPOINT_LOCATION;
   private final Log LOG;
   static final long checkpointDelay = 120000;
-  static final Integer RESULTSET_FETCH_SIZE = 5000;
+  static final Integer RESULTSET_FETCH_SIZE = 2000;
   private static final ObjectMapper mapper;
 
   static {
@@ -78,11 +78,30 @@ public abstract class AbstractTimelineAggregator implements 
Runnable {
       } catch (IOException io) {
         LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
       }
+      long sleepTime = SLEEP_INTERVAL;
 
       if (lastCheckPointTime != -1) {
-        LOG.info("Last check point time: " + lastCheckPointTime + ", " +
-          "lagBy: " + ((System.currentTimeMillis() - lastCheckPointTime)) / 
1000);
+        LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+          + ((System.currentTimeMillis() - lastCheckPointTime) / 1000)
+          + " seconds." );
+
+        long startTime = System.currentTimeMillis();
         boolean success = doWork(lastCheckPointTime, lastCheckPointTime + 
SLEEP_INTERVAL);
+        long executionTime = System.currentTimeMillis() - startTime;
+        long delta = SLEEP_INTERVAL - executionTime;
+
+        if (delta > 0) {
+          // Sleep for (configured sleep - time to execute task)
+          sleepTime = delta;
+        } else {
+          // No sleep because last run took too long to execute
+          LOG.info("Aggregator execution took too long, " +
+            "cancelling sleep. executionTime = " + executionTime);
+          sleepTime = 1;
+        }
+
+        LOG.debug("Aggregator sleep interval = " + sleepTime);
+
         if (success) {
           try {
             saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
@@ -94,7 +113,7 @@ public abstract class AbstractTimelineAggregator implements 
Runnable {
       }
 
       try {
-        Thread.sleep(SLEEP_INTERVAL);
+        Thread.sleep(sleepTime);
       } catch (InterruptedException e) {
         LOG.info("Sleep interrupted, continuing with aggregation.");
       }

Reply via email to