YARN-4062. Add the flush and compaction functionality via coprocessors and scanners for flow run table (Vrushali C via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc698197 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc698197 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc698197 Branch: refs/heads/YARN-2928 Commit: bc698197cde0f40e6e85a9fb1a11f1f92952e91e Parents: c6f4c51 Author: Sangjin Lee <sj...@apache.org> Authored: Thu Mar 17 18:22:04 2016 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Thu Mar 17 18:22:04 2016 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 16 + .../src/main/resources/yarn-default.xml | 10 + .../storage/HBaseTimelineWriterImpl.java | 5 +- .../storage/common/TimelineStorageUtils.java | 55 ++ .../storage/common/TimestampGenerator.java | 13 +- .../storage/flow/AggregationOperation.java | 17 +- .../storage/flow/FlowRunColumn.java | 4 +- .../storage/flow/FlowRunColumnPrefix.java | 2 +- .../storage/flow/FlowRunCoprocessor.java | 70 +- .../storage/flow/FlowRunRowKey.java | 16 + .../storage/flow/FlowScanner.java | 269 ++++++-- .../storage/flow/FlowScannerOperation.java | 46 ++ .../storage/flow/TestFlowDataGenerator.java | 178 +++++- .../storage/flow/TestHBaseStorageFlowRun.java | 112 +++- .../flow/TestHBaseStorageFlowRunCompaction.java | 635 +++++++++++++++++++ 16 files changed, 1365 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4b7fd2c..762e43c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -127,6 +127,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-4179. [reader implementation] support flow activity queries based on time (Varun Saxena via sjlee) + YARN-4062. Add the flush and compaction functionality via coprocessors and + scanners for flow run table (Vrushali C via sjlee) + IMPROVEMENTS YARN-4224. Support fetching entities by UID and change the REST interface http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6ac6fb9..863b5a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1757,6 +1757,22 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60; + /** + * The name for setting that controls how long the final value of + * a metric of a completed app is retained before merging + * into the flow sum. + */ + public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD = + TIMELINE_SERVICE_PREFIX + + "coprocessor.app-final-value-retention-milliseconds"; + + /** + * The setting that controls how long the final value of a metric + * of a completed app is retained before merging into the flow sum. + */ + public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 3 * 24 + * 60 * 60 * 1000L; + public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 2cbc836..31b897b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2067,6 +2067,7 @@ <value>604800</value> </property> + <!-- Timeline Service v2 Configuration --> <property> <description>The setting that controls how often the timeline collector flushes the timeline writer.</description> @@ -2088,6 +2089,15 @@ <name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name> <value>10</value> </property> + + <property> + <description> The setting that controls how long the final value + of a metric of a completed app is retained before merging into + the flow sum.</description> + <name>yarn.timeline-service.coprocessor.app-final-value-retention-milliseconds</name> + <value>259200000</value> + </property> + <!-- Shared Cache Configuration --> <property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 1afe878..b75007d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -261,7 +261,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); storeFlowMetrics(rowKey, metrics, - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), + AggregationOperation.SUM.getAttribute()); } } @@ -500,4 +501,4 @@ public class HBaseTimelineWriterImpl extends AbstractService implements super.serviceStop(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index 605dbe7..b5fc214 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -24,9 +24,13 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; +import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -475,4 +479,55 @@ public final class TimelineStorageUtils { return (obj instanceof Short) || (obj instanceof Integer) || (obj instanceof Long); } + + /** + * creates a new cell based on the input cell but with the new value. + * + * @param origCell Original cell + * @param newValue new cell value + * @return cell + * @throws IOException while creating new cell. + */ + public static Cell createNewCell(Cell origCell, byte[] newValue) + throws IOException { + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + /** + * creates a cell with the given inputs. + * + * @param row row of the cell to be created + * @param family column family name of the new cell + * @param qualifier qualifier for the new cell + * @param ts timestamp of the new cell + * @param newValue value of the new cell + * @param tags tags in the new cell + * @return cell + * @throws IOException while creating the cell. + */ + public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, + long ts, byte[] newValue, byte[] tags) throws IOException { + return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, + newValue, tags); + } + + /** + * returns app id from the list of tags. + * + * @param tags cell tags to be looked into + * @return App Id as the AggregationCompactionDimension + */ + public static String getAggregationCompactionDimension(List<Tag> tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(t.getValue()); + return appId; + } + } + return appId; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java index 7238efa..288046c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java @@ -33,7 +33,7 @@ public class TimestampGenerator { * if this is changed, then reading cell timestamps written with older * multiplier value will not work */ - public static final long TS_MULTIPLIER = 1000L; + public static final long TS_MULTIPLIER = 1000000L; private final AtomicLong lastTimestamp = new AtomicLong(); @@ -74,13 +74,14 @@ public class TimestampGenerator { } /** - * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of - * application id + * Returns a timestamp multiplied with TS_MULTIPLIER and last few digits of + * application id. * * Unlikely scenario of generating a timestamp that is a duplicate: If more - * than a 1000 concurrent apps are running in one flow run AND write to same - * column at the same time, then say appId of 1001 will overlap with appId of - * 001 and there may be collisions for that flow run's specific column. + * than a 1M concurrent apps are running in one flow run AND write to same + * column at the same time, then say appId of 1M and 1 will overlap + * with appId of 001 and there may be collisions for that flow run's + * specific column. * * @param incomingTS Timestamp to be converted. * @param appId Application Id. http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java index 6240e81..40cdd2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java @@ -21,19 +21,19 @@ import org.apache.hadoop.hbase.util.Bytes; /** * Identifies the attributes to be set for puts into the {@link FlowRunTable}. - * The numbers used for tagType are prime numbers + * The numbers used for tagType are prime numbers. */ public enum AggregationOperation { /** * When the flow was started. */ - MIN((byte) 71), + GLOBAL_MIN((byte) 71), /** * When it ended. */ - MAX((byte) 73), + GLOBAL_MAX((byte) 73), /** * The metrics of the flow. @@ -46,9 +46,16 @@ public enum AggregationOperation { SUM_FINAL((byte) 83), /** - * compact. + * Min value as per the latest timestamp + * seen for a given app. */ - COMPACT((byte) 89); + LATEST_MIN((byte) 89), + + /** + * Max value as per the latest timestamp + * seen for a given app. + */ + LATEST_MAX((byte) 97); private byte tagType; private byte[] inBytes; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index 148a37f..d50bb16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -41,14 +41,14 @@ public enum FlowRunColumn implements Column<FlowRunTable> { * application start times. */ MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", - AggregationOperation.MIN, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()), /** * When the flow ended. This is the maximum of currently known application end * times. */ MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", - AggregationOperation.MAX, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()), /** * The version of the flow that this flow belongs to. http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index 3d7c40e..fa94aae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -40,7 +40,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { /** * To store flow run info values. */ - METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM, + METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance()); private final ColumnHelper<FlowRunTable> column; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 9698f06..450640a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -40,7 +40,12 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -51,7 +56,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGen */ public class FlowRunCoprocessor extends BaseRegionObserver { - @SuppressWarnings("unused") private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); private HRegion region; @@ -160,8 +164,8 @@ public class FlowRunCoprocessor extends BaseRegionObserver { scan.setMaxVersions(); RegionScanner scanner = null; try { - scanner = new FlowScanner(region, scan.getBatch(), - region.getScanner(scan)); + scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(), + region.getScanner(scan), FlowScannerOperation.READ); scanner.next(results); e.bypass(); } finally { @@ -209,6 +213,64 @@ public class FlowRunCoprocessor extends BaseRegionObserver { public RegionScanner postScannerOpen( ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner scanner) throws IOException { - return new FlowScanner(region, scan.getBatch(), scanner); + return new FlowScanner(e.getEnvironment(), scan.getBatch(), + scanner, FlowScannerOperation.READ); + } + + @Override + public InternalScanner preFlush( + ObserverContext<RegionCoprocessorEnvironment> c, Store store, + InternalScanner scanner) throws IOException { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("preFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" + + store.getMemstoreFlushSize() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } + return new FlowScanner(c.getEnvironment(), -1, scanner, + FlowScannerOperation.FLUSH); + } + + @Override + public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, + Store store, StoreFile resultFile) { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("postFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" + + store.getMemstoreFlushSize() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } + } + + @Override + public InternalScanner preCompact( + ObserverContext<RegionCoprocessorEnvironment> e, Store store, + InternalScanner scanner, ScanType scanType, CompactionRequest request) + throws IOException { + + FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; + if (request != null) { + requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION + : FlowScannerOperation.MINOR_COMPACTION); + LOG.info("Compactionrequest= " + request.toString() + " " + + requestOp.toString() + " RegionName=" + + e.getEnvironment().getRegion().getRegionNameAsString()); + } + + return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index 0585dc9..eac8f05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -113,4 +113,20 @@ public class FlowRunRowKey { TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); return new FlowRunRowKey(clusterId, userId, flowName, flowRunId); } + + /** + * returns the Flow Key as a verbose String output. + * @return String + */ + @Override + public String toString() { + StringBuilder flowKeyStr = new StringBuilder(); + flowKeyStr.append("{clusterId=" + clusterId); + flowKeyStr.append(" userId=" + userId); + flowKeyStr.append(" flowName=" + flowName); + flowKeyStr.append(" flowRunId="); + flowKeyStr.append(flowRunId); + flowKeyStr.append("}"); + return flowKeyStr.toString(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 6fefd15..6baea37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -29,20 +29,26 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; + +import com.google.common.annotations.VisibleForTesting; /** * Invoked via the coprocessor when a Get or a Scan is issued for flow run @@ -55,23 +61,42 @@ class FlowScanner implements RegionScanner, Closeable { private static final Log LOG = LogFactory.getLog(FlowScanner.class); + /** + * use a special application id to represent the flow id this is needed since + * TimestampGenerator parses the app id to generate a cell timestamp. + */ + private static final String FLOW_APP_ID = "application_00000000000_0000"; + private final HRegion region; private final InternalScanner flowRunScanner; - private RegionScanner regionScanner; private final int limit; + private final long appFinalValueRetentionThreshold; + private RegionScanner regionScanner; private boolean hasMore; private byte[] currentRow; private List<Cell> availableCells = new ArrayList<>(); private int currentIndex; + private FlowScannerOperation action = FlowScannerOperation.READ; - FlowScanner(HRegion region, int limit, InternalScanner internalScanner) { - this.region = region; + FlowScanner(RegionCoprocessorEnvironment env, int limit, + InternalScanner internalScanner, FlowScannerOperation action) { this.limit = limit; this.flowRunScanner = internalScanner; if (internalScanner instanceof RegionScanner) { this.regionScanner = (RegionScanner) internalScanner; } - // TODO: note if it's compaction/flush + this.action = action; + if (env == null) { + this.appFinalValueRetentionThreshold = + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD; + this.region = null; + } else { + this.region = env.getRegion(); + Configuration hbaseConf = env.getConfiguration(); + this.appFinalValueRetentionThreshold = hbaseConf.getLong( + YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); + } } /* @@ -104,17 +129,6 @@ class FlowScanner implements RegionScanner, Closeable { return nextInternal(cells, cellLimit); } - private String getAggregationCompactionDimension(List<Tag> tags) { - String appId = null; - for (Tag t : tags) { - if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t - .getType()) { - appId = Bytes.toString(t.getValue()); - } - } - return appId; - } - /** * Get value converter associated with a column or a column prefix. If nothing * matches, generic converter is returned. @@ -165,6 +179,7 @@ class FlowScanner implements RegionScanner, Closeable { * @return true if next row is available for the scanner, false otherwise * @throws IOException */ + @SuppressWarnings("deprecation") private boolean nextInternal(List<Cell> cells, int cellLimit) throws IOException { Cell cell = null; @@ -183,14 +198,18 @@ class FlowScanner implements RegionScanner, Closeable { SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); Set<String> alreadySeenAggDim = new HashSet<>(); int addedCnt = 0; + long currentTimestamp = System.currentTimeMillis(); ValueConverter converter = null; - while (((cell = peekAtNextCell(cellLimit)) != null) - && (cellLimit <= 0 || addedCnt < cellLimit)) { + while (cellLimit <= 0 || addedCnt < cellLimit) { + cell = peekAtNextCell(cellLimit); + if (cell == null) { + break; + } byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (converter != null && isNumericConverter(converter)) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter); + (NumericValueConverter)converter, currentTimestamp); } resetState(currentColumnCells, alreadySeenAggDim); currentColumnQualifier = newColumnQualifier; @@ -207,8 +226,17 @@ class FlowScanner implements RegionScanner, Closeable { nextCell(cellLimit); } if (!currentColumnCells.isEmpty()) { - emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter); + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, + (NumericValueConverter)converter, currentTimestamp); + if (LOG.isDebugEnabled()) { + if (addedCnt > 0) { + LOG.debug("emitted cells. " + addedCnt + " for " + this.action + + " rowKey=" + + FlowRunRowKey.parseRowKey(cells.get(0).getRow()).toString()); + } else { + LOG.debug("emitted no cells for " + this.action); + } + } } return hasMore(); } @@ -247,7 +275,7 @@ class FlowScanner implements RegionScanner, Closeable { } switch (currentAggOp) { - case MIN: + case GLOBAL_MIN: if (currentColumnCells.size() == 0) { currentColumnCells.add(cell); } else { @@ -260,7 +288,7 @@ class FlowScanner implements RegionScanner, Closeable { } } break; - case MAX: + case GLOBAL_MAX: if (currentColumnCells.size() == 0) { currentColumnCells.add(cell); } else { @@ -275,16 +303,32 @@ class FlowScanner implements RegionScanner, Closeable { break; case SUM: case SUM_FINAL: + if (LOG.isTraceEnabled()) { + LOG.trace("In collect cells " + + " FlowSannerOperation=" + + this.action + + " currentAggOp=" + + currentAggOp + + " cell qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + + " cell value= " + + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp()); + } + // only if this app has not been seen yet, add to current column cells List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - String aggDim = getAggregationCompactionDimension(tags); - - // If this agg dimension has already been seen, since they show up in - // sorted order, we drop the rest which are older. In other words, this - // cell is older than previously seen cells for that agg dim. + String aggDim = TimelineStorageUtils + .getAggregationCompactionDimension(tags); if (!alreadySeenAggDim.contains(aggDim)) { - // Not seen this agg dim, hence consider this cell in our working set + // if this agg dimension has already been seen, + // since they show up in sorted order + // we drop the rest which are older + // in other words, this cell is older than previously seen cells + // for that agg dim + // but when this agg dim is not seen, + // consider this cell in our working set currentColumnCells.add(cell); alreadySeenAggDim.add(aggDim); } @@ -300,8 +344,8 @@ class FlowScanner implements RegionScanner, Closeable { * parameter. */ private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells, - AggregationOperation currentAggOp, NumericValueConverter converter) - throws IOException { + AggregationOperation currentAggOp, NumericValueConverter converter, + long currentTimestamp) throws IOException { if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { return 0; } @@ -309,17 +353,36 @@ class FlowScanner implements RegionScanner, Closeable { cells.addAll(currentColumnCells); return currentColumnCells.size(); } + if (LOG.isTraceEnabled()) { + LOG.trace("In emitCells " + this.action + " currentColumnCells size= " + + currentColumnCells.size() + " currentAggOp" + currentAggOp); + } switch (currentAggOp) { - case MIN: - case MAX: + case GLOBAL_MIN: + case GLOBAL_MAX: cells.addAll(currentColumnCells); return currentColumnCells.size(); case SUM: case SUM_FINAL: - Cell sumCell = processSummation(currentColumnCells, converter); - cells.add(sumCell); - return 1; + switch (action) { + case FLUSH: + case MINOR_COMPACTION: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case READ: + Cell sumCell = processSummation(currentColumnCells, converter); + cells.add(sumCell); + return 1; + case MAJOR_COMPACTION: + List<Cell> finalCells = processSummationMajorCompaction( + currentColumnCells, converter, currentTimestamp); + cells.addAll(finalCells); + return finalCells.size(); + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } default: cells.addAll(currentColumnCells); return currentColumnCells.size(); @@ -349,10 +412,122 @@ class FlowScanner implements RegionScanner, Closeable { sum = converter.add(sum, currentValue); } byte[] sumBytes = converter.encodeValue(sum); - Cell sumCell = createNewCell(mostRecentCell, sumBytes); + Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes); return sumCell; } + + /** + * Returns a list of cells that contains + * + * A) the latest cells for applications that haven't finished yet + * B) summation + * for the flow, based on applications that have completed and are older than + * a certain time + * + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + @VisibleForTesting + List<Cell> processSummationMajorCompaction( + SortedSet<Cell> currentColumnCells, NumericValueConverter converter, + long currentTimestamp) + throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + boolean summationDone = false; + List<Cell> finalCells = new ArrayList<Cell>(); + if (currentColumnCells == null) { + return finalCells; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("In processSummationMajorCompaction," + + " will drop cells older than " + currentTimestamp + + " CurrentColumnCells size=" + currentColumnCells.size()); + } + + for (Cell cell : currentColumnCells) { + AggregationOperation cellAggOp = getCurrentAggOp(cell); + // if this is the existing flow sum cell + List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + String appId = TimelineStorageUtils + .getAggregationCompactionDimension(tags); + if (appId == FLOW_APP_ID) { + sum = converter.add(sum, currentValue); + summationDone = true; + if (LOG.isTraceEnabled()) { + LOG.trace("reading flow app id sum=" + sum); + } + } else { + currentValue = (Number) converter.decodeValue(CellUtil + .cloneValue(cell)); + // read the timestamp truncated by the generator + ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp()); + if ((cellAggOp == AggregationOperation.SUM_FINAL) + && ((ts + this.appFinalValueRetentionThreshold) + < currentTimestamp)) { + sum = converter.add(sum, currentValue); + summationDone = true; + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION loop sum= " + sum + + " discarding now: " + " qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" + + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp() + " " + this.action); + } + } else { + // not a final value but it's the latest cell for this app + // so include this cell in the list of cells to write back + finalCells.add(cell); + } + } + } + if (summationDone) { + Cell anyCell = currentColumnCells.first(); + List<Tag> tags = new ArrayList<Tag>(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + Cell sumCell = TimelineStorageUtils.createNewCell( + CellUtil.cloneRow(anyCell), + CellUtil.cloneFamily(anyCell), + CellUtil.cloneQualifier(anyCell), + TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(), FLOW_APP_ID), + converter.encodeValue(sum), tagByteArray); + finalCells.add(sumCell); + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION final sum= " + sum + " for " + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " " + this.action); + } + LOG.info("After major compaction for qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with sum=" + sum.longValue() + + " with cell timestamp " + sumCell.getTimestamp()); + } else { + String qualifier = ""; + LOG.info("After major compaction for qualifier=" + qualifier + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with zero sum=" + + sum.longValue()); + } + return finalCells; + } + /** * Determines which cell is to be returned based on the values in each cell * and the comparison operation MIN or MAX. @@ -375,7 +550,7 @@ class FlowScanner implements RegionScanner, Closeable { Number currentCellValue = (Number) converter.decodeValue(CellUtil .cloneValue(currentCell)); switch (currentAggOp) { - case MIN: + case GLOBAL_MIN: if (converter.compare( currentCellValue, previouslyChosenCellValue) < 0) { // new value is minimum, hence return this cell @@ -384,7 +559,7 @@ class FlowScanner implements RegionScanner, Closeable { // previously chosen value is miniumum, hence return previous min cell return previouslyChosenCell; } - case MAX: + case GLOBAL_MAX: if (converter.compare( currentCellValue, previouslyChosenCellValue) > 0) { // new value is max, hence return this cell @@ -402,16 +577,13 @@ class FlowScanner implements RegionScanner, Closeable { } } - private Cell createNewCell(Cell origCell, byte[] newValue) - throws IOException { - return CellUtil.createCell(CellUtil.cloneRow(origCell), - CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), - origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); - } - @Override public void close() throws IOException { - flowRunScanner.close(); + if (flowRunScanner != null) { + flowRunScanner.close(); + } else { + LOG.warn("scanner close called but scanner is null"); + } } /** @@ -423,8 +595,6 @@ class FlowScanner implements RegionScanner, Closeable { /** * Returns whether or not the underlying scanner has more rows. - * - * @return true, if there are more cells to return, false otherwise. */ public boolean hasMore() { return currentIndex < availableCells.size() ? true : hasMore; @@ -440,8 +610,7 @@ class FlowScanner implements RegionScanner, Closeable { * fetched by the wrapped scanner * @return the next available cell or null if no more cells are available for * the current row - * @throws IOException if any problem is encountered while grabbing the next - * cell. + * @throws IOException */ public Cell nextCell(int cellLimit) throws IOException { Cell cell = peekAtNextCell(cellLimit); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java new file mode 100644 index 0000000..73c666f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + + +/** + * Identifies the scanner operation on the {@link FlowRunTable}. + */ +public enum FlowScannerOperation { + + /** + * If the scanner is opened for reading + * during preGet or preScan. + */ + READ, + + /** + * If the scanner is opened during preFlush. + */ + FLUSH, + + /** + * If the scanner is opened during minor Compaction. + */ + MINOR_COMPACTION, + + /** + * If the scanner is opened during major Compaction. + */ + MAJOR_COMPACTION +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java index d45df57..9793ce6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -29,17 +28,60 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.conf.Configuration; /** * Generates the data/entities for the FlowRun and FlowActivity Tables */ class TestFlowDataGenerator { - private final static String metric1 = "MAP_SLOT_MILLIS"; - private final static String metric2 = "HDFS_BYTES_READ"; + private static final String metric1 = "MAP_SLOT_MILLIS"; + private static final String metric2 = "HDFS_BYTES_READ"; + public static final long END_TS_INCR = 10000L; + + static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = insertTs; - static TimelineEntity getEntityMetricsApp1() { + for (int k=1; k< 100 ; k++) { + metricValues.put(ts - k*200000, 20L); + } + metricValues.put(ts - 80000, 40L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap<Long, Number>(); + ts = System.currentTimeMillis(); + for (int k=1; k< 100 ; k++) { + metricValues.put(ts - k*100000, 31L); + } + + metricValues.put(ts - 80000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + return entity; + } + + + static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) { TimelineEntity entity = new TimelineEntity(); String id = "flowRunMetrics_test"; String type = TimelineEntityType.YARN_APPLICATION.toString(); @@ -53,7 +95,48 @@ class TestFlowDataGenerator { TimelineMetric m1 = new TimelineMetric(); m1.setId(metric1); Map<Long, Number> metricValues = new HashMap<Long, Number>(); - long ts = System.currentTimeMillis(); + long ts = insertTs; + + metricValues.put(ts - 80000, 40L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap<Long, Number>(); + ts = insertTs; + metricValues.put(ts - 80000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(insertTs); + event.addInfo("done", "insertTs=" + insertTs); + entity.addEvent(event); + return entity; + } + + + static TimelineEntity getEntityMetricsApp1(long insertTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = insertTs; metricValues.put(ts - 100000, 2L); metricValues.put(ts - 80000, 40L); m1.setType(Type.TIME_SERIES); @@ -63,7 +146,7 @@ class TestFlowDataGenerator { TimelineMetric m2 = new TimelineMetric(); m2.setId(metric2); metricValues = new HashMap<Long, Number>(); - ts = System.currentTimeMillis(); + ts = insertTs; metricValues.put(ts - 100000, 31L); metricValues.put(ts - 80000, 57L); m2.setType(Type.TIME_SERIES); @@ -74,7 +157,8 @@ class TestFlowDataGenerator { return entity; } - static TimelineEntity getEntityMetricsApp2() { + + static TimelineEntity getEntityMetricsApp2(long insertTs) { TimelineEntity entity = new TimelineEntity(); String id = "flowRunMetrics_test"; String type = TimelineEntityType.YARN_APPLICATION.toString(); @@ -87,7 +171,7 @@ class TestFlowDataGenerator { TimelineMetric m1 = new TimelineMetric(); m1.setId(metric1); Map<Long, Number> metricValues = new HashMap<Long, Number>(); - long ts = System.currentTimeMillis(); + long ts = insertTs; metricValues.put(ts - 100000, 5L); metricValues.put(ts - 80000, 101L); m1.setType(Type.TIME_SERIES); @@ -140,6 +224,55 @@ class TestFlowDataGenerator { return entity; } + static TimelineEntity getAFullEntity(long ts, long endTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunFullEntity"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(ts); + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + metricValues.put(ts - 120000, 100000000L); + metricValues.put(ts - 100000, 200000000L); + metricValues.put(ts - 80000, 300000000L); + metricValues.put(ts - 60000, 400000000L); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap<Long, Number>(); + metricValues.put(ts - 900000, 31L); + metricValues.put(ts - 30000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(ts); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + long expTs = ts + 21600000;// start time + 6hrs + event.setTimestamp(expTs); + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + static TimelineEntity getEntityGreaterStartTime(long startTs) { TimelineEntity entity = new TimelineEntity(); entity.setCreatedTime(startTs); @@ -184,6 +317,34 @@ class TestFlowDataGenerator { return entity; } + static TimelineEntity getMinFlushEntity(long startTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloFlushEntityMin"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(startTs); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(startTs); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getMaxFlushEntity(long startTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloFlushEntityMax"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(startTs); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(startTs + END_TS_INCR); + entity.addEvent(event); + return entity; + } static TimelineEntity getFlowApp1(long appCreatedTime) { TimelineEntity entity = new TimelineEntity(); @@ -203,5 +364,4 @@ class TestFlowDataGenerator { return entity; } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index b234bfd..f04dd48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -216,7 +216,8 @@ public class TestHBaseStorageFlowRun { long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -228,7 +229,8 @@ public class TestHBaseStorageFlowRun { hbi.write(cluster, user, flow, flowVersion, runid, appName, te); // write another application with same metric to this flow te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; hbi.write(cluster, user, flow, flowVersion, runid, appName, te); @@ -323,7 +325,8 @@ public class TestHBaseStorageFlowRun { long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -335,7 +338,8 @@ public class TestHBaseStorageFlowRun { hbi.write(cluster, user, flow, flowVersion, runid, appName, te); // write another application with same metric to this flow te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; hbi.write(cluster, user, flow, flowVersion, runid, appName, te); @@ -420,7 +424,8 @@ public class TestHBaseStorageFlowRun { long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -432,7 +437,8 @@ public class TestHBaseStorageFlowRun { hbi.write(cluster, user, flow, flowVersion, runid, appName, te); // write another application with same metric to this flow te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; hbi.write(cluster, user, flow, flowVersion, runid, appName, te); @@ -494,6 +500,98 @@ public class TestHBaseStorageFlowRun { } } + @Test + public void testWriteFlowRunFlush() throws Exception { + String cluster = "atestFlushFlowRun_cluster1"; + String user = "atestFlushFlowRun__user1"; + String flow = "atestFlushFlowRun_flow_name"; + String flowVersion = "AF1021C19F1351"; + long runid = 1449526652000L; + + int start = 10; + int count = 20000; + int appIdSuffix = 1; + HBaseTimelineWriterImpl hbi = null; + long insertTs = 1449796654827L - count; + long minTS = insertTs + 1; + long startTs = insertTs; + Configuration c1 = util.getConfiguration(); + TimelineEntities te1 = null; + TimelineEntity entityApp1 = null; + TimelineEntity entityApp2 = null; + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + + for (int i = start; i < count; i++) { + String appName = "application_1060350000000_" + appIdSuffix; + insertTs++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); + te1.addEntity(entityApp1); + entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); + te1.addEntity(entityApp2); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + Thread.sleep(1); + + appName = "application_1001199480000_7" + appIdSuffix; + insertTs++; + appIdSuffix++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); + te1.addEntity(entityApp1); + entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); + te1.addEntity(entityApp2); + + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + if (i % 1000 == 0) { + hbi.flush(); + checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, + runid, false); + } + } + } finally { + hbi.flush(); + hbi.close(); + checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid, + true); + } + } + + private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, + int count, String cluster, String user, String flow, long runid, + boolean checkMax) throws IOException { + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow run table + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + // scan the table and see that we get back the right min and max + // timestamps + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + Get g = new Get(startRow); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); + + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + int start = 10; + assertEquals(2, r1.size()); + long starttime = Bytes.toLong(values + .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); + assertEquals(minTS, starttime); + if (checkMax) { + assertEquals(startTs + 2 * (count - start) + + TestFlowDataGenerator.END_TS_INCR, + Bytes.toLong(values + .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); + } + } + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster();