YARN-3816. [Aggregation] App-level aggregation and accumulation for YARN system metrics (Li Lu via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74c1b597 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74c1b597 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74c1b597 Branch: refs/heads/YARN-2928 Commit: 74c1b5977c1e2af820fe3a37a0b31af86f08430c Parents: e9cfce4 Author: Sangjin Lee <sj...@apache.org> Authored: Fri Apr 22 10:24:40 2016 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Fri Jul 8 10:19:01 2016 -0700 ---------------------------------------------------------------------- .../records/timelineservice/TimelineMetric.java | 140 ++++++++++-- .../TimelineMetricCalculator.java | 115 ++++++++++ .../TimelineMetricOperation.java | 167 +++++++++++++++ .../timelineservice/TestTimelineMetric.java | 100 +++++++++ .../TestTimelineServiceRecords.java | 6 +- .../timelineservice/NMTimelinePublisher.java | 4 + .../collector/AppLevelTimelineCollector.java | 72 +++++++ .../collector/TimelineCollector.java | 213 ++++++++++++++++++- .../storage/TimelineAggregationTrack.java | 2 +- .../collector/TestTimelineCollector.java | 127 +++++++++++ .../TestFileSystemTimelineWriterImpl.java | 43 +++- .../storage/TestHBaseTimelineStorage.java | 35 ++- 12 files changed, 998 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java index 2f60515..f0c6849 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java @@ -19,12 +19,13 @@ package org.apache.hadoop.yarn.api.records.timelineservice; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -import java.util.Comparator; +import java.util.Collections; import java.util.Map; import java.util.TreeMap; @@ -48,13 +49,13 @@ public class TimelineMetric { private Type type; private String id; - private Comparator<Long> reverseComparator = new Comparator<Long>() { - @Override - public int compare(Long l1, Long l2) { - return l2.compareTo(l1); - } - }; - private TreeMap<Long, Number> values = new TreeMap<>(reverseComparator); + // By default, not to do any aggregation operations. This field will NOT be + // persisted (like a "transient" member). + private TimelineMetricOperation realtimeAggregationOp + = TimelineMetricOperation.NOP; + + private TreeMap<Long, Number> values + = new TreeMap<>(Collections.reverseOrder()); public TimelineMetric() { this(Type.SINGLE_VALUE); @@ -83,6 +84,26 @@ public class TimelineMetric { this.id = metricId; } + /** + * Get the real time aggregation operation of this metric. + * + * @return Real time aggregation operation + */ + public TimelineMetricOperation getRealtimeAggregationOp() { + return realtimeAggregationOp; + } + + /** + * Set the real time aggregation operation of this metric. + * + * @param op A timeline metric operation that the metric should perform on + * real time aggregations + */ + public void setRealtimeAggregationOp( + final TimelineMetricOperation op) { + this.realtimeAggregationOp = op; + } + // required by JAXB @InterfaceAudience.Private @XmlElement(name = "values") @@ -98,8 +119,8 @@ public class TimelineMetric { if (type == Type.SINGLE_VALUE) { overwrite(vals); } else { - if (values != null) { - this.values = new TreeMap<Long, Number>(reverseComparator); + if (vals != null) { + this.values = new TreeMap<>(Collections.reverseOrder()); this.values.putAll(vals); } else { this.values = null; @@ -166,11 +187,100 @@ public class TimelineMetric { @Override public String toString() { - String str = "{id:" + id + ", type:" + type; - if (!values.isEmpty()) { - str += ", values:" + values; + return "{id: " + id + ", type: " + type + + ", realtimeAggregationOp: " + + realtimeAggregationOp + "; " + values.toString() + + "}"; + } + + /** + * Get the latest timeline metric as single value type. + * + * @param metric Incoming timeline metric + * @return The latest metric in the incoming metric + */ + public static TimelineMetric getLatestSingleValueMetric( + TimelineMetric metric) { + if (metric.getType() == Type.SINGLE_VALUE) { + return metric; + } else { + TimelineMetric singleValueMetric = new TimelineMetric(Type.SINGLE_VALUE); + Long firstKey = metric.values.firstKey(); + if (firstKey != null) { + Number firstValue = metric.values.get(firstKey); + singleValueMetric.addValue(firstKey, firstValue); + } + return singleValueMetric; } - str += "}"; - return str; } + + /** + * Get single data timestamp of the metric. + * + * @return the single data timestamp + */ + public long getSingleDataTimestamp() { + if (this.type == Type.SINGLE_VALUE) { + if (values.size() == 0) { + throw new YarnRuntimeException("Values for this timeline metric is " + + "empty."); + } else { + return values.firstKey(); + } + } else { + throw new YarnRuntimeException("Type for this timeline metric is not " + + "SINGLE_VALUE."); + } + } + + /** + * Get single data value of the metric. + * + * @return the single data value + */ + public Number getSingleDataValue() { + if (this.type == Type.SINGLE_VALUE) { + if (values.size() == 0) { + return null; + } else { + return values.get(values.firstKey()); + } + } else { + throw new YarnRuntimeException("Type for this timeline metric is not " + + "SINGLE_VALUE."); + } + } + + /** + * Aggregate an incoming metric to the base aggregated metric with the given + * operation state in a stateless fashion. The assumption here is + * baseAggregatedMetric and latestMetric should be single value data if not + * null. + * + * @param incomingMetric Incoming timeline metric to aggregate + * @param baseAggregatedMetric Base timeline metric + * @return Result metric after aggregation + */ + public static TimelineMetric aggregateTo(TimelineMetric incomingMetric, + TimelineMetric baseAggregatedMetric) { + return aggregateTo(incomingMetric, baseAggregatedMetric, null); + } + + /** + * Aggregate an incoming metric to the base aggregated metric with the given + * operation state. The assumption here is baseAggregatedMetric and + * latestMetric should be single value data if not null. + * + * @param incomingMetric Incoming timeline metric to aggregate + * @param baseAggregatedMetric Base timeline metric + * @param state Operation state + * @return Result metric after aggregation + */ + public static TimelineMetric aggregateTo(TimelineMetric incomingMetric, + TimelineMetric baseAggregatedMetric, Map<Object, Object> state) { + TimelineMetricOperation operation + = incomingMetric.getRealtimeAggregationOp(); + return operation.aggregate(incomingMetric, baseAggregatedMetric, state); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java new file mode 100644 index 0000000..4c9045f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +/** + * A calculator for timeline metrics. + */ +public final class TimelineMetricCalculator { + + private TimelineMetricCalculator() { + // do nothing. + } + + /** + * Compare two not-null numbers. + * @param n1 Number n1 + * @param n2 Number n2 + * @return 0 if n1 equals n2, a negative int if n1 is less than n2, a + * positive int otherwise. + */ + public static int compare(Number n1, Number n2) { + if (n1 == null || n2 == null) { + throw new YarnRuntimeException( + "Number to be compared shouldn't be null."); + } + + if (n1 instanceof Integer || n1 instanceof Long) { + if (n1.longValue() == n2.longValue()) { + return 0; + } else { + return (n1.longValue() < n2.longValue()) ? -1 : 1; + } + } + + if (n1 instanceof Float || n1 instanceof Double) { + if (n1.doubleValue() == n2.doubleValue()) { + return 0; + } else { + return (n1.doubleValue() < n2.doubleValue()) ? -1 : 1; + } + } + + // TODO throw warnings/exceptions for other types of number. + throw new YarnRuntimeException("Unsupported types for number comparison: " + + n1.getClass().getName() + ", " + n2.getClass().getName()); + } + + /** + * Subtract operation between two Numbers. + * @param n1 Number n1 + * @param n2 Number n2 + * @return Number represent to (n1 - n2). + */ + public static Number sub(Number n1, Number n2) { + if (n1 == null) { + throw new YarnRuntimeException( + "Number to be subtracted shouldn't be null."); + } else if (n2 == null) { + return n1; + } + + if (n1 instanceof Integer || n1 instanceof Long) { + return n1.longValue() - n2.longValue(); + } + + if (n1 instanceof Float || n1 instanceof Double) { + return n1.doubleValue() - n2.doubleValue(); + } + + // TODO throw warnings/exceptions for other types of number. + return null; + } + + /** + * Sum up two Numbers. + * @param n1 Number n1 + * @param n2 Number n2 + * @return Number represent to (n1 + n2). + */ + public static Number sum(Number n1, Number n2) { + if (n1 == null) { + return n2; + } else if (n2 == null) { + return n1; + } + + if (n1 instanceof Integer || n1 instanceof Long) { + return n1.longValue() + n2.longValue(); + } + + if (n1 instanceof Float || n1 instanceof Double) { + return n1.doubleValue() + n2.doubleValue(); + } + + // TODO throw warnings/exceptions for other types of number. + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java new file mode 100644 index 0000000..58e5c38 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.timelineservice; + +import java.util.Map; + +/** + * Aggregation operations. + */ +public enum TimelineMetricOperation { + NOP("NOP") { + /** + * Do nothing on the base metric. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (not used) + * @return Metric b + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map<Object, Object> state) { + return base; + } + }, + MAX("MAX") { + /** + * Keep the greater value of incoming and base. Stateless operation. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (not used) + * @return the greater value of a and b + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map<Object, Object> state) { + if (base == null) { + return incoming; + } + Number incomingValue = incoming.getSingleDataValue(); + Number aggregateValue = base.getSingleDataValue(); + if (aggregateValue == null) { + aggregateValue = Long.MIN_VALUE; + } + if (TimelineMetricCalculator.compare(incomingValue, aggregateValue) > 0) { + base.addValue(incoming.getSingleDataTimestamp(), incomingValue); + } + return base; + } + }, + REPLACE("REPLACE") { + /** + * Replace the base metric with the incoming value. Stateless operation. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (not used) + * @return Metric a + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, + Map<Object, Object> state) { + return incoming; + } + }, + SUM("SUM") { + /** + * Return the sum of the incoming metric and the base metric if the + * operation is stateless. For stateful operations, also subtract the + * value of the timeline metric mapped to the PREV_METRIC_STATE_KEY + * in the state object. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (PREV_METRIC_STATE_KEY's value as Metric p) + * @return A metric with value a + b - p + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base, + Map<Object, Object> state) { + if (base == null) { + return incoming; + } + Number incomingValue = incoming.getSingleDataValue(); + Number aggregateValue = base.getSingleDataValue(); + Number result + = TimelineMetricCalculator.sum(incomingValue, aggregateValue); + + // If there are previous value in the state, we will take it off from the + // sum + if (state != null) { + Object prevMetric = state.get(PREV_METRIC_STATE_KEY); + if (prevMetric instanceof TimelineMetric) { + result = TimelineMetricCalculator.sub(result, + ((TimelineMetric) prevMetric).getSingleDataValue()); + } + } + base.addValue(incoming.getSingleDataTimestamp(), result); + return base; + } + }, + AVG("AVERAGE") { + /** + * Return the average value of the incoming metric and the base metric, + * with a given state. Not supported yet. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state + * @return Not finished yet + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base, + Map<Object, Object> state) { + // Not supported yet + throw new UnsupportedOperationException( + "Unsupported aggregation operation: AVERAGE"); + } + }; + + public static final String PREV_METRIC_STATE_KEY = "PREV_METRIC"; + + /** + * Perform the aggregation operation. + * + * @param incoming Incoming metric + * @param aggregate Base aggregation metric + * @param state Operation state + * @return Result metric for this aggregation operation + */ + public TimelineMetric aggregate(TimelineMetric incoming, + TimelineMetric aggregate, Map<Object, Object> state) { + return exec(incoming, aggregate, state); + } + + private final String opName; + + TimelineMetricOperation(String opString) { + opName = opString; + } + + @Override + public String toString() { + return this.opName; + } + + abstract TimelineMetric exec(TimelineMetric incoming, TimelineMetric base, + Map<Object, Object> state); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java new file mode 100644 index 0000000..3244bc3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; + +import org.junit.Test; + +public class TestTimelineMetric { + + @Test + public void testTimelineMetricAggregation() { + long ts = System.currentTimeMillis(); + // single_value metric add against null metric + TimelineMetric m1 = getSingleValueMetric("MEGA_BYTES_MILLIS", + TimelineMetricOperation.SUM, ts, 10000L); + TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null); + assertEquals(10000L, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m2 = getSingleValueMetric("MEGA_BYTES_MILLIS", + TimelineMetricOperation.SUM, ts, 20000L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + assertEquals(30000L, aggregatedMetric.getSingleDataValue()); + + // stateful sum test + Map<Object, Object> state = new HashMap<>(); + state.put(TimelineMetricOperation.PREV_METRIC_STATE_KEY, m2); + TimelineMetric m2New = getSingleValueMetric("MEGA_BYTES_MILLIS", + TimelineMetricOperation.SUM, ts, 10000L); + aggregatedMetric = TimelineMetric.aggregateTo(m2New, aggregatedMetric, + state); + assertEquals(20000L, aggregatedMetric.getSingleDataValue()); + + // single_value metric max against single_value metric + TimelineMetric m3 = getSingleValueMetric("TRANSFER_RATE", + TimelineMetricOperation.MAX, ts, 150L); + TimelineMetric aggregatedMax = TimelineMetric.aggregateTo(m3, null); + assertEquals(150L, aggregatedMax.getSingleDataValue()); + + TimelineMetric m4 = getSingleValueMetric("TRANSFER_RATE", + TimelineMetricOperation.MAX, ts, 170L); + aggregatedMax = TimelineMetric.aggregateTo(m4, aggregatedMax); + assertEquals(170L, aggregatedMax.getSingleDataValue()); + + // single_value metric avg against single_value metric + TimelineMetric m5 = getSingleValueMetric("TRANSFER_RATE", + TimelineMetricOperation.AVG, ts, 150L); + try { + TimelineMetric.aggregateTo(m5, null); + fail("Taking average among metrics is not supported! "); + } catch (UnsupportedOperationException e) { + // Expected + } + + } + + private static TimelineMetric getSingleValueMetric(String id, + TimelineMetricOperation op, long timestamp, long value) { + TimelineMetric m = new TimelineMetric(); + m.setId(id); + m.setType(Type.SINGLE_VALUE); + m.setRealtimeAggregationOp(op); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + metricValues.put(timestamp, value); + m.setValues(metricValues); + return m; + } + + private static TimelineMetric getTimeSeriesMetric(String id, + TimelineMetricOperation op, Map<Long, Number> metricValues) { + TimelineMetric m = new TimelineMetric(); + m.setId(id); + m.setType(Type.TIME_SERIES); + m.setRealtimeAggregationOp(op); + m.setValues(metricValues); + return m; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java index 51ec762..592bfa3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java @@ -64,13 +64,13 @@ public class TestTimelineServiceRecords { metric1.getValues().entrySet().iterator(); Map.Entry<Long, Number> entry = itr.next(); Assert.assertEquals(new Long(3L), entry.getKey()); - Assert.assertEquals(new Double(3.0D), entry.getValue()); + Assert.assertEquals(3.0D, entry.getValue()); entry = itr.next(); Assert.assertEquals(new Long(2L), entry.getKey()); - Assert.assertEquals(new Integer(2), entry.getValue()); + Assert.assertEquals(2, entry.getValue()); entry = itr.next(); Assert.assertEquals(new Long(1L), entry.getKey()); - Assert.assertEquals(new Float(1.0F), entry.getValue()); + Assert.assertEquals(1.0F, entry.getValue()); Assert.assertFalse(itr.hasNext()); entity.addMetric(metric1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 4d3dafd..39a6181 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; @@ -119,12 +120,15 @@ public class NMTimelinePublisher extends CompositeService { if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) { TimelineMetric memoryMetric = new TimelineMetric(); memoryMetric.setId(ContainerMetric.MEMORY.toString()); + memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); memoryMetric.addValue(currentTimeMillis, pmemUsage); entity.addMetric(memoryMetric); } if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) { TimelineMetric cpuMetric = new TimelineMetric(); cpuMetric.setId(ContainerMetric.CPU.toString()); + // TODO: support average + cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); cpuMetric.addValue(currentTimeMillis, Math.round(cpuUsagePercentPerCore)); entity.addMetric(cpuMetric); http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 4fe445a..eb05262 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -18,15 +18,26 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.base.Preconditions; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + /** * Service that handles writes to the timeline service and writes them to the * backing storage for a given YARN application. @@ -36,8 +47,16 @@ import com.google.common.base.Preconditions; @Private @Unstable public class AppLevelTimelineCollector extends TimelineCollector { + private static final Log LOG = LogFactory.getLog(TimelineCollector.class); + + private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1; + private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15; + private static Set<String> entityTypesSkipAggregation + = initializeSkipSet(); + private final ApplicationId appId; private final TimelineCollectorContext context; + private ScheduledThreadPoolExecutor appAggregationExecutor; public AppLevelTimelineCollector(ApplicationId appId) { super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); @@ -46,6 +65,14 @@ public class AppLevelTimelineCollector extends TimelineCollector { context = new TimelineCollectorContext(); } + private static Set<String> initializeSkipSet() { + Set<String> result = new HashSet<>(); + result.add(TimelineEntityType.YARN_APPLICATION.toString()); + result.add(TimelineEntityType.YARN_FLOW_RUN.toString()); + result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString()); + return result; + } + @Override protected void serviceInit(Configuration conf) throws Exception { context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID, @@ -60,11 +87,25 @@ public class AppLevelTimelineCollector extends TimelineCollector { @Override protected void serviceStart() throws Exception { + // Launch the aggregation thread + appAggregationExecutor = new ScheduledThreadPoolExecutor( + AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS, + new ThreadFactoryBuilder() + .setNameFormat("TimelineCollector Aggregation thread #%d") + .build()); + appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0, + AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, + TimeUnit.SECONDS); super.serviceStart(); } @Override protected void serviceStop() throws Exception { + appAggregationExecutor.shutdown(); + if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.info("App-level aggregator shutdown timed out, shutdown now. "); + appAggregationExecutor.shutdownNow(); + } super.serviceStop(); } @@ -73,4 +114,35 @@ public class AppLevelTimelineCollector extends TimelineCollector { return context; } + @Override + protected Set<String> getEntityTypesSkipAggregation() { + return entityTypesSkipAggregation; + } + + private class AppLevelAggregator implements Runnable { + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("App-level real-time aggregating"); + } + try { + TimelineCollectorContext currContext = getTimelineEntityContext(); + TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId( + getAggregationGroups(), currContext.getAppId(), + TimelineEntityType.YARN_APPLICATION.toString()); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(resultEntity); + getWriter().write(currContext.getClusterId(), currContext.getUserId(), + currContext.getFlowName(), currContext.getFlowVersion(), + currContext.getFlowRunId(), currContext.getAppId(), entities); + } catch (Exception e) { + LOG.error("Error aggregating timeline metrics", e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("App-level real-time aggregation complete"); + } + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 15187d1..8cd645c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -19,6 +19,12 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,7 +33,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; @@ -41,9 +50,15 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; @Private @Unstable public abstract class TimelineCollector extends CompositeService { + private static final Log LOG = LogFactory.getLog(TimelineCollector.class); + public static final String SEPARATOR = "_"; private TimelineWriter writer; + private ConcurrentMap<String, AggregationStatusTable> aggregationGroups + = new ConcurrentHashMap<>(); + private static Set<String> entityTypesSkipAggregation + = new HashSet<>(); public TimelineCollector(String name) { super(name); @@ -68,6 +83,28 @@ public abstract class TimelineCollector extends CompositeService { this.writer = w; } + protected TimelineWriter getWriter() { + return writer; + } + + protected Map<String, AggregationStatusTable> getAggregationGroups() { + return aggregationGroups; + } + + /** + * Method to decide the set of timeline entity types the collector should + * skip on aggregations. Subclasses may want to override this method to + * customize their own behaviors. + * + * @return A set of strings consists of all types the collector should skip. + */ + protected Set<String> getEntityTypesSkipAggregation() { + return entityTypesSkipAggregation; + } + + public abstract TimelineCollectorContext getTimelineEntityContext(); + + /** * Handles entity writes. These writes are synchronous and are written to the * backing storage without buffering/batching. If any entity already exists, @@ -90,8 +127,12 @@ public abstract class TimelineCollector extends CompositeService { LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - TimelineCollectorContext context = getTimelineEntityContext(); + + // Update application metrics for aggregation + updateAggregateStatus(entities, aggregationGroups, + getEntityTypesSkipAggregation()); + return writer.write(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(), context.getAppId(), entities); @@ -117,6 +158,174 @@ public abstract class TimelineCollector extends CompositeService { } } - public abstract TimelineCollectorContext getTimelineEntityContext(); + /** + * Aggregate all metrics in given timeline entities with no predefined states. + * + * @param entities Entities to aggregate + * @param resultEntityId Id of the result entity + * @param resultEntityType Type of the result entity + * @param needsGroupIdInResult Marks if we want the aggregation group id in + * each aggregated metrics. + * @return A timeline entity that contains all aggregated TimelineMetric. + */ + public static TimelineEntity aggregateEntities( + TimelineEntities entities, String resultEntityId, + String resultEntityType, boolean needsGroupIdInResult) { + ConcurrentMap<String, AggregationStatusTable> aggregationGroups + = new ConcurrentHashMap<>(); + updateAggregateStatus(entities, aggregationGroups, null); + if (needsGroupIdInResult) { + return aggregate(aggregationGroups, resultEntityId, resultEntityType); + } else { + return aggregateWithoutGroupId( + aggregationGroups, resultEntityId, resultEntityType); + } + } + /** + * Update the aggregation status table for a timeline collector. + * + * @param entities Entities to update + * @param aggregationGroups Aggregation status table + * @param typesToSkip Entity types that we can safely assume to skip updating + */ + static void updateAggregateStatus( + TimelineEntities entities, + ConcurrentMap<String, AggregationStatusTable> aggregationGroups, + Set<String> typesToSkip) { + for (TimelineEntity e : entities.getEntities()) { + if ((typesToSkip != null && typesToSkip.contains(e.getType())) + || e.getMetrics().isEmpty()) { + continue; + } + AggregationStatusTable aggrTable = aggregationGroups.get(e.getType()); + if (aggrTable == null) { + AggregationStatusTable table = new AggregationStatusTable(); + aggrTable = aggregationGroups.putIfAbsent(e.getType(), + table); + if (aggrTable == null) { + aggrTable = table; + } + } + aggrTable.update(e); + } + } + + /** + * Aggregate internal status and generate timeline entities for the + * aggregation results. + * + * @param aggregationGroups Aggregation status table + * @param resultEntityId Id of the result entity + * @param resultEntityType Type of the result entity + * @return A timeline entity that contains all aggregated TimelineMetric. + */ + static TimelineEntity aggregate( + Map<String, AggregationStatusTable> aggregationGroups, + String resultEntityId, String resultEntityType) { + TimelineEntity result = new TimelineEntity(); + result.setId(resultEntityId); + result.setType(resultEntityType); + for (Map.Entry<String, AggregationStatusTable> entry + : aggregationGroups.entrySet()) { + entry.getValue().aggregateAllTo(result, entry.getKey()); + } + return result; + } + + /** + * Aggregate internal status and generate timeline entities for the + * aggregation results. The result metrics will not have aggregation group + * information. + * + * @param aggregationGroups Aggregation status table + * @param resultEntityId Id of the result entity + * @param resultEntityType Type of the result entity + * @return A timeline entity that contains all aggregated TimelineMetric. + */ + static TimelineEntity aggregateWithoutGroupId( + Map<String, AggregationStatusTable> aggregationGroups, + String resultEntityId, String resultEntityType) { + TimelineEntity result = new TimelineEntity(); + result.setId(resultEntityId); + result.setType(resultEntityType); + for (Map.Entry<String, AggregationStatusTable> entry + : aggregationGroups.entrySet()) { + entry.getValue().aggregateAllTo(result, ""); + } + return result; + } + + // Note: In memory aggregation is performed in an eventually consistent + // fashion. + private static class AggregationStatusTable { + // On aggregation, for each metric, aggregate all per-entity accumulated + // metrics. We only use the id and type for TimelineMetrics in the key set + // of this table. + private ConcurrentMap<TimelineMetric, Map<String, TimelineMetric>> + aggregateTable; + + public AggregationStatusTable() { + aggregateTable = new ConcurrentHashMap<>(); + } + + public void update(TimelineEntity incoming) { + String entityId = incoming.getId(); + for (TimelineMetric m : incoming.getMetrics()) { + // Skip if the metric does not need aggregation + if (m.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) { + continue; + } + // Update aggregateTable + Map<String, TimelineMetric> aggrRow = aggregateTable.get(m); + if (aggrRow == null) { + Map<String, TimelineMetric> tempRow = new ConcurrentHashMap<>(); + aggrRow = aggregateTable.putIfAbsent(m, tempRow); + if (aggrRow == null) { + aggrRow = tempRow; + } + } + aggrRow.put(entityId, m); + } + } + + public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e, + String aggregationGroupId) { + if (metric.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) { + return e; + } + Map<String, TimelineMetric> aggrRow = aggregateTable.get(metric); + if (aggrRow != null) { + TimelineMetric aggrMetric = new TimelineMetric(); + if (aggregationGroupId.length() > 0) { + aggrMetric.setId(metric.getId() + SEPARATOR + aggregationGroupId); + } else { + aggrMetric.setId(metric.getId()); + } + aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP); + Map<Object, Object> status = new HashMap<>(); + for (TimelineMetric m : aggrRow.values()) { + TimelineMetric.aggregateTo(m, aggrMetric, status); + // getRealtimeAggregationOp returns an enum so we can directly + // compare with "!=". + if (m.getRealtimeAggregationOp() + != aggrMetric.getRealtimeAggregationOp()) { + aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp()); + } + } + Set<TimelineMetric> metrics = e.getMetrics(); + metrics.remove(aggrMetric); + metrics.add(aggrMetric); + } + return e; + } + + public TimelineEntity aggregateAllTo(TimelineEntity e, + String aggregationGroupId) { + for (TimelineMetric m : aggregateTable.keySet()) { + aggregateTo(m, e, aggregationGroupId); + } + return e; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java index f0b1e47..6a1e086 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java @@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; * */ public enum TimelineAggregationTrack { - FLOW, USER, QUEUE + APP, FLOW, USER, QUEUE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java new file mode 100644 index 0000000..5b4dc50 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestTimelineCollector { + + private TimelineEntities generateTestEntities(int groups, int entities) { + TimelineEntities te = new TimelineEntities(); + for (int j = 0; j < groups; j++) { + for (int i = 0; i < entities; i++) { + TimelineEntity entity = new TimelineEntity(); + String containerId = "container_1000178881110_2002_" + i; + entity.setId(containerId); + String entityType = "TEST_" + j; + entity.setType(entityType); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("HDFS_BYTES_WRITE"); + m1.setRealtimeAggregationOp(TimelineMetricOperation.SUM); + long ts = System.currentTimeMillis(); + m1.addValue(ts - 20000, 100L); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId("VCORES_USED"); + m2.setRealtimeAggregationOp(TimelineMetricOperation.SUM); + m2.addValue(ts - 20000, 3L); + metrics.add(m2); + + // m3 should not show up in the aggregation + TimelineMetric m3 = new TimelineMetric(); + m3.setId("UNRELATED_VALUES"); + m3.addValue(ts - 20000, 3L); + metrics.add(m3); + + TimelineMetric m4 = new TimelineMetric(); + m4.setId("TXN_FINISH_TIME"); + m4.setRealtimeAggregationOp(TimelineMetricOperation.MAX); + m4.addValue(ts - 20000, i); + metrics.add(m4); + + entity.addMetrics(metrics); + te.addEntity(entity); + } + } + + return te; + } + + @Test + public void testAggregation() throws Exception { + // Test aggregation with multiple groups. + int groups = 3; + int n = 50; + TimelineEntities testEntities = generateTestEntities(groups, n); + TimelineEntity resultEntity = TimelineCollector.aggregateEntities( + testEntities, "test_result", "TEST_AGGR", true); + assertEquals(resultEntity.getMetrics().size(), groups * 3); + + for (int i = 0; i < groups; i++) { + Set<TimelineMetric> metrics = resultEntity.getMetrics(); + for (TimelineMetric m : metrics) { + if (m.getId().startsWith("HDFS_BYTES_WRITE")) { + assertEquals(100 * n, m.getSingleDataValue().intValue()); + } else if (m.getId().startsWith("VCORES_USED")) { + assertEquals(3 * n, m.getSingleDataValue().intValue()); + } else if (m.getId().startsWith("TXN_FINISH_TIME")) { + assertEquals(n - 1, m.getSingleDataValue()); + } else { + fail("Unrecognized metric! " + m.getId()); + } + } + } + + // Test aggregation with a single group. + TimelineEntities testEntities1 = generateTestEntities(1, n); + TimelineEntity resultEntity1 = TimelineCollector.aggregateEntities( + testEntities1, "test_result", "TEST_AGGR", false); + assertEquals(resultEntity1.getMetrics().size(), 3); + + Set<TimelineMetric> metrics = resultEntity1.getMetrics(); + for (TimelineMetric m : metrics) { + if (m.getId().equals("HDFS_BYTES_WRITE")) { + assertEquals(100 * n, m.getSingleDataValue().intValue()); + } else if (m.getId().equals("VCORES_USED")) { + assertEquals(3 * n, m.getSingleDataValue().intValue()); + } else if (m.getId().equals("TXN_FINISH_TIME")) { + assertEquals(n - 1, m.getSingleDataValue()); + } else { + fail("Unrecognized metric! " + m.getId()); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 5ce7d3b..2f79daa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -25,11 +25,15 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Test; @@ -51,6 +55,26 @@ public class TestFileSystemTimelineWriterImpl { entity.setCreatedTime(1425016501000L); te.addEntity(entity); + TimelineMetric metric = new TimelineMetric(); + String metricId = "CPU"; + metric.setId(metricId); + metric.setType(TimelineMetric.Type.SINGLE_VALUE); + metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); + metric.addValue(1425016501000L, 1234567L); + + TimelineEntity entity2 = new TimelineEntity(); + String id2 = "metric"; + String type2 = "app"; + entity2.setId(id2); + entity2.setType(type2); + entity2.setCreatedTime(1425016503000L); + entity2.addMetric(metric); + te.addEntity(entity2); + + Map<String, TimelineMetric> aggregatedMetrics = + new HashMap<String, TimelineMetric>(); + aggregatedMetrics.put(metricId, metric); + FileSystemTimelineWriterImpl fsi = null; try { fsi = new FileSystemTimelineWriterImpl(); @@ -68,11 +92,27 @@ public class TestFileSystemTimelineWriterImpl { assertTrue(f.exists() && !f.isDirectory()); List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8); // ensure there's only one entity + 1 new line - assertTrue(data.size() == 2); + assertTrue("data size is:" + data.size(), data.size() == 2); String d = data.get(0); // confirm the contents same as what was written assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); + // verify aggregated metrics + String fileName2 = fsi.getOutputRoot() + + "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" + + type2 + "/" + id2 + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + Path path2 = Paths.get(fileName2); + File file = new File(fileName2); + assertTrue(file.exists() && !file.isDirectory()); + List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8); + // ensure there's only one entity + 1 new line + assertTrue("data size is:" + data.size(), data2.size() == 2); + String metricToString = data2.get(0); + // confirm the contents same as what was written + assertEquals(metricToString, + TimelineUtils.dumpTimelineRecordtoJSON(entity2)); + // delete the directory File outputDir = new File(fsi.getOutputRoot()); FileUtils.deleteDirectory(outputDir); @@ -84,4 +124,5 @@ public class TestFileSystemTimelineWriterImpl { } } } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/74c1b597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 6b57ec4..8ab54bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -539,6 +540,26 @@ public class TestHBaseTimelineStorage { metrics.add(m1); entity.addMetrics(metrics); + // add aggregated metrics + TimelineEntity aggEntity = new TimelineEntity(); + String type = TimelineEntityType.YARN_APPLICATION.toString(); + aggEntity.setId(appId); + aggEntity.setType(type); + long cTime2 = 1425016502000L; + long mTime2 = 1425026902000L; + aggEntity.setCreatedTime(cTime2); + + TimelineMetric aggMetric = new TimelineMetric(); + aggMetric.setId("MEM_USAGE"); + Map<Long, Number> aggMetricValues = new HashMap<Long, Number>(); + ts = System.currentTimeMillis(); + aggMetricValues.put(ts - 120000, 102400000); + aggMetric.setType(Type.SINGLE_VALUE); + aggMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); + aggMetric.setValues(aggMetricValues); + Set<TimelineMetric> aggMetrics = new HashSet<>(); + aggMetrics.add(aggMetric); + entity.addMetrics(aggMetrics); te.addEntity(entity); HBaseTimelineWriterImpl hbi = null; @@ -564,7 +585,7 @@ public class TestHBaseTimelineStorage { Result result = new ApplicationTable().getResult(c1, conn, get); assertTrue(result != null); - assertEquals(15, result.size()); + assertEquals(16, result.size()); // check the row key byte[] row1 = result.getRow(); @@ -652,10 +673,17 @@ public class TestHBaseTimelineStorage { assertEquals(conf, conf2); Set<TimelineMetric> metrics2 = e1.getMetrics(); - assertEquals(metrics, metrics2); + assertEquals(2, metrics2.size()); for (TimelineMetric metric2 : metrics2) { Map<Long, Number> metricValues2 = metric2.getValues(); - matchMetrics(metricValues, metricValues2); + assertTrue(metric2.getId().equals("MAP_SLOT_MILLIS") || + metric2.getId().equals("MEM_USAGE")); + if (metric2.getId().equals("MAP_SLOT_MILLIS")) { + matchMetrics(metricValues, metricValues2); + } + if (metric2.getId().equals("MEM_USAGE")) { + matchMetrics(aggMetricValues, metricValues2); + } } } finally { if (hbi != null) { @@ -724,7 +752,6 @@ public class TestHBaseTimelineStorage { m1.setValues(metricValues); metrics.add(m1); entity.addMetrics(metrics); - te.addEntity(entity); HBaseTimelineWriterImpl hbi = null; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org