Repository: hadoop Updated Branches: refs/heads/YARN-2928 290349837 -> 5e374b46a
YARN-5018. Online aggregation logic should not run immediately after collectors got started (Li Lu via sjlee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5e374b46 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5e374b46 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5e374b46 Branch: refs/heads/YARN-2928 Commit: 5e374b46a132a0c148397c0538ce58d2f53d56d9 Parents: 2903498 Author: Sangjin Lee <sj...@apache.org> Authored: Tue May 24 11:02:56 2016 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Tue May 24 11:02:56 2016 -0700 ---------------------------------------------------------------------- .../RMTimelineCollectorManager.java | 2 +- .../collector/AppLevelTimelineCollector.java | 17 +++++++++++++++-- .../collector/NodeTimelineCollectorManager.java | 2 +- .../collector/TimelineCollector.java | 12 +++++++++++- .../collector/TimelineCollectorManager.java | 18 +++++++++++++++++- 5 files changed, 45 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e374b46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java index a4f1084..64c3749 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -49,7 +49,7 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager { } @Override - public void postPut(ApplicationId appId, TimelineCollector collector) { + protected void doPostPut(ApplicationId appId, TimelineCollector collector) { RMApp app = rmContext.getRMApps().get(appId); if (app == null) { throw new YarnRuntimeException( http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e374b46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index eb05262..d276269 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.base.Preconditions; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -93,7 +94,8 @@ public class AppLevelTimelineCollector extends TimelineCollector { new ThreadFactoryBuilder() .setNameFormat("TimelineCollector Aggregation thread #%d") .build()); - appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0, + appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), + AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, TimeUnit.SECONDS); super.serviceStart(); @@ -126,10 +128,21 @@ public class AppLevelTimelineCollector extends TimelineCollector { if (LOG.isDebugEnabled()) { LOG.debug("App-level real-time aggregating"); } + if (!isReadyToAggregate()) { + LOG.warn("App-level collector is not ready, skip aggregation. "); + return; + } try { TimelineCollectorContext currContext = getTimelineEntityContext(); + Map<String, AggregationStatusTable> aggregationGroups + = getAggregationGroups(); + if (aggregationGroups == null + || aggregationGroups.isEmpty()) { + LOG.debug("App-level collector is empty, skip aggregation. "); + return; + } TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId( - getAggregationGroups(), currContext.getAppId(), + aggregationGroups, currContext.getAppId(), TimelineEntityType.YARN_APPLICATION.toString()); TimelineEntities entities = new TimelineEntities(); entities.addEntity(resultEntity); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e374b46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 75557a8..0323d7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -87,7 +87,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { } @Override - public void postPut(ApplicationId appId, TimelineCollector collector) { + protected void doPostPut(ApplicationId appId, TimelineCollector collector) { try { // Get context info from NM updateTimelineCollectorContext(appId, collector); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e374b46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 8cd645c..2fc3033 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -60,6 +60,8 @@ public abstract class TimelineCollector extends CompositeService { private static Set<String> entityTypesSkipAggregation = new HashSet<>(); + private volatile boolean readyToAggregate = false; + public TimelineCollector(String name) { super(name); } @@ -91,6 +93,14 @@ public abstract class TimelineCollector extends CompositeService { return aggregationGroups; } + protected void setReadyToAggregate() { + readyToAggregate = true; + } + + protected boolean isReadyToAggregate() { + return readyToAggregate; + } + /** * Method to decide the set of timeline entity types the collector should * skip on aggregations. Subclasses may want to override this method to @@ -258,7 +268,7 @@ public abstract class TimelineCollector extends CompositeService { // Note: In memory aggregation is performed in an eventually consistent // fashion. - private static class AggregationStatusTable { + protected static class AggregationStatusTable { // On aggregation, for each metric, aggregate all per-entity accumulated // metrics. We only use the id and type for TimelineMetrics in the key set // of this table. http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e374b46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 8f74ffb..a8f88e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -136,8 +136,24 @@ public class TimelineCollectorManager extends AbstractService { return collectorInTable; } - protected void postPut(ApplicationId appId, TimelineCollector collector) { + /** + * Callback handler for the timeline collector manager when a collector has + * been added into the collector map. + * @param appId Application id of the collector. + * @param collector The actual timeline collector that has been added. + */ + public void postPut(ApplicationId appId, TimelineCollector collector) { + doPostPut(appId, collector); + collector.setReadyToAggregate(); + } + /** + * A template method that will be called by + * {@link #postPut(ApplicationId, TimelineCollector)}. + * @param appId Application id of the collector. + * @param collector The actual timeline collector that has been added. + */ + protected void doPostPut(ApplicationId appId, TimelineCollector collector) { } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org