YARN-6375 App level aggregation should not consider metric values reported in the previous aggregation cycle (Varun Saxena via Vrushali C)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1ab397c3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1ab397c3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1ab397c3 Branch: refs/heads/YARN-5355_branch2 Commit: 1ab397c30a8d9499c7ffeba845eb07a969f31c38 Parents: 047d204 Author: Vrushali Channapattan <vrush...@apache.org> Authored: Thu May 4 15:30:29 2017 -0700 Committer: Varun Saxena <varunsax...@apache.org> Committed: Thu Aug 31 01:10:04 2017 +0530 ---------------------------------------------------------------------- .../collector/TimelineCollector.java | 23 +++-- .../collector/TestTimelineCollector.java | 95 +++++++++++++++++++- 2 files changed, 108 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ab397c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index c94c505..5416b26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -310,13 +310,15 @@ public abstract class TimelineCollector extends CompositeService { // Update aggregateTable Map<String, TimelineMetric> aggrRow = aggregateTable.get(m); if (aggrRow == null) { - Map<String, TimelineMetric> tempRow = new ConcurrentHashMap<>(); + Map<String, TimelineMetric> tempRow = new HashMap<>(); aggrRow = aggregateTable.putIfAbsent(m, tempRow); if (aggrRow == null) { aggrRow = tempRow; } } - aggrRow.put(entityId, m); + synchronized (aggrRow) { + aggrRow.put(entityId, m); + } } } @@ -335,14 +337,17 @@ public abstract class TimelineCollector extends CompositeService { } aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP); Map<Object, Object> status = new HashMap<>(); - for (TimelineMetric m : aggrRow.values()) { - TimelineMetric.aggregateTo(m, aggrMetric, status); - // getRealtimeAggregationOp returns an enum so we can directly - // compare with "!=". - if (m.getRealtimeAggregationOp() - != aggrMetric.getRealtimeAggregationOp()) { - aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp()); + synchronized (aggrRow) { + for (TimelineMetric m : aggrRow.values()) { + TimelineMetric.aggregateTo(m, aggrMetric, status); + // getRealtimeAggregationOp returns an enum so we can directly + // compare with "!=". + if (m.getRealtimeAggregationOp() + != aggrMetric.getRealtimeAggregationOp()) { + aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp()); + } } + aggrRow.clear(); } Set<TimelineMetric> metrics = e.getMetrics(); metrics.remove(aggrMetric); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ab397c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index a55f227..0f17553 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -18,19 +18,27 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.Test; +import com.google.common.collect.Sets; + import java.io.IOException; import java.util.HashSet; +import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; @@ -187,4 +195,89 @@ public class TestTimelineCollector { return context; } } -} + + private static TimelineEntity createEntity(String id, String type) { + TimelineEntity entity = new TimelineEntity(); + entity.setId(id); + entity.setType(type); + return entity; + } + + private static TimelineMetric createDummyMetric(long ts, Long value) { + TimelineMetric metric = new TimelineMetric(); + metric.setId("dummy_metric"); + metric.addValue(ts, value); + metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); + return metric; + } + + @Test + public void testClearPreviousEntitiesOnAggregation() throws Exception { + final long ts = System.currentTimeMillis(); + TimelineCollector collector = new TimelineCollector("") { + @Override + public TimelineCollectorContext getTimelineEntityContext() { + return new TimelineCollectorContext("cluster", "user", "flow", "1", + 1L, ApplicationId.newInstance(ts, 1).toString()); + } + }; + collector.init(new Configuration()); + collector.setWriter(mock(TimelineWriter.class)); + + // Put 5 entities with different metric values. + TimelineEntities entities = new TimelineEntities(); + for (int i = 1; i <=5; i++) { + TimelineEntity entity = createEntity("e" + i, "type"); + entity.addMetric(createDummyMetric(ts + i, Long.valueOf(i * 50))); + entities.addEntity(entity); + } + collector.putEntities(entities, UserGroupInformation.getCurrentUser()); + + TimelineCollectorContext currContext = collector.getTimelineEntityContext(); + // Aggregate the entities. + Map<String, AggregationStatusTable> aggregationGroups + = collector.getAggregationGroups(); + assertEquals(Sets.newHashSet("type"), aggregationGroups.keySet()); + TimelineEntity aggregatedEntity = TimelineCollector. + aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(), + TimelineEntityType.YARN_APPLICATION.toString()); + TimelineMetric aggregatedMetric = + aggregatedEntity.getMetrics().iterator().next(); + assertEquals(750L, aggregatedMetric.getValues().values().iterator().next()); + assertEquals(TimelineMetricOperation.SUM, + aggregatedMetric.getRealtimeAggregationOp()); + + // Aggregate entities. + aggregatedEntity = TimelineCollector. + aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(), + TimelineEntityType.YARN_APPLICATION.toString()); + aggregatedMetric = aggregatedEntity.getMetrics().iterator().next(); + // No values aggregated as no metrics put for an entity between this + // aggregation and the previous one. + assertTrue(aggregatedMetric.getValues().isEmpty()); + assertEquals(TimelineMetricOperation.NOP, + aggregatedMetric.getRealtimeAggregationOp()); + + // Put 3 entities. + entities = new TimelineEntities(); + for (int i = 1; i <=3; i++) { + TimelineEntity entity = createEntity("e" + i, "type"); + entity.addMetric(createDummyMetric(System.currentTimeMillis() + i, 50L)); + entities.addEntity(entity); + } + aggregationGroups = collector.getAggregationGroups(); + collector.putEntities(entities, UserGroupInformation.getCurrentUser()); + + // Aggregate entities. + aggregatedEntity = TimelineCollector. + aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(), + TimelineEntityType.YARN_APPLICATION.toString()); + // Last 3 entities picked up for aggregation. + aggregatedMetric = aggregatedEntity.getMetrics().iterator().next(); + assertEquals(150L, aggregatedMetric.getValues().values().iterator().next()); + assertEquals(TimelineMetricOperation.SUM, + aggregatedMetric.getRealtimeAggregationOp()); + + collector.close(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org