Repository: ambari Updated Branches: refs/heads/branch-metrics-dev e78210d58 -> 5649e80bb
AMBARI-5707. Adjust TimelineMetricAggregators for execution time delay. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5649e80b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5649e80b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5649e80b Branch: refs/heads/branch-metrics-dev Commit: 5649e80bbd573e53f18baf07f6dd8a3ec034cb3c Parents: e78210d Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Thu Oct 23 15:01:15 2014 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Thu Oct 23 15:01:15 2014 -0700 ---------------------------------------------------------------------- .../timeline/AbstractTimelineAggregator.java | 27 +++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5649e80b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java index e702fc0..2630846 100644 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java @@ -36,7 +36,7 @@ public abstract class AbstractTimelineAggregator implements Runnable { protected final String CHECKPOINT_LOCATION; private final Log LOG; static final long checkpointDelay = 120000; - static final Integer RESULTSET_FETCH_SIZE = 5000; + static final Integer RESULTSET_FETCH_SIZE = 2000; private static final ObjectMapper mapper; static { @@ -78,11 +78,30 @@ public abstract class AbstractTimelineAggregator implements Runnable { } catch (IOException io) { LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io); } + long sleepTime = SLEEP_INTERVAL; if (lastCheckPointTime != -1) { - LOG.info("Last check point time: " + lastCheckPointTime + ", " + - "lagBy: " + ((System.currentTimeMillis() - lastCheckPointTime)) / 1000); + LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " + + ((System.currentTimeMillis() - lastCheckPointTime) / 1000) + + " seconds." ); + + long startTime = System.currentTimeMillis(); boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL); + long executionTime = System.currentTimeMillis() - startTime; + long delta = SLEEP_INTERVAL - executionTime; + + if (delta > 0) { + // Sleep for (configured sleep - time to execute task) + sleepTime = delta; + } else { + // No sleep because last run took too long to execute + LOG.info("Aggregator execution took too long, " + + "cancelling sleep. executionTime = " + executionTime); + sleepTime = 1; + } + + LOG.debug("Aggregator sleep interval = " + sleepTime); + if (success) { try { saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); @@ -94,7 +113,7 @@ public abstract class AbstractTimelineAggregator implements Runnable { } try { - Thread.sleep(SLEEP_INTERVAL); + Thread.sleep(sleepTime); } catch (InterruptedException e) { LOG.info("Sleep interrupted, continuing with aggregation."); }