YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e1f30320 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e1f30320 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e1f30320 Branch: refs/heads/YARN-2928-rebase Commit: e1f303201940b34de6bf3a46e8171aafe6742d02 Parents: 75a4c73 Author: Sangjin Lee <sj...@apache.org> Authored: Thu Sep 17 10:34:52 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Mon Nov 9 16:13:13 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop-yarn-server-timelineservice/pom.xml | 13 + .../storage/HBaseTimelineWriterImpl.java | 179 ++++++- .../storage/TimelineSchemaCreator.java | 22 +- .../storage/application/ApplicationColumn.java | 5 +- .../application/ApplicationColumnPrefix.java | 15 +- .../storage/apptoflow/AppToFlowColumn.java | 6 +- .../timelineservice/storage/common/Column.java | 6 +- .../storage/common/ColumnHelper.java | 93 +++- .../storage/common/ColumnPrefix.java | 28 +- .../storage/common/TimelineWriterUtils.java | 185 +++++++ .../storage/common/TimestampGenerator.java | 112 +++++ .../storage/common/package-info.java | 24 - .../storage/entity/EntityColumn.java | 6 +- .../storage/entity/EntityColumnPrefix.java | 20 +- .../flow/AggregationCompactionDimension.java | 63 +++ .../storage/flow/AggregationOperation.java | 87 ++++ .../timelineservice/storage/flow/Attribute.java | 39 ++ .../storage/flow/FlowActivityColumnFamily.java | 54 +++ .../storage/flow/FlowActivityColumnPrefix.java | 243 ++++++++++ .../storage/flow/FlowActivityRowKey.java | 113 +++++ .../storage/flow/FlowActivityTable.java | 107 ++++ .../storage/flow/FlowRunColumn.java | 161 ++++++ .../storage/flow/FlowRunColumnFamily.java | 54 +++ .../storage/flow/FlowRunColumnPrefix.java | 239 +++++++++ .../storage/flow/FlowRunCoprocessor.java | 210 ++++++++ .../storage/flow/FlowRunRowKey.java | 50 ++ .../storage/flow/FlowRunTable.java | 141 ++++++ .../storage/flow/FlowScanner.java | 486 +++++++++++++++++++ .../storage/TestHBaseTimelineStorage.java | 28 +- .../storage/flow/TestFlowDataGenerator.java | 213 ++++++++ .../flow/TestHBaseStorageFlowActivity.java | 372 ++++++++++++++ .../storage/flow/TestHBaseStorageFlowRun.java | 290 +++++++++++ 33 files changed, 3562 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 93fe33f..6f5c8d6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -103,6 +103,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-4102. Add a "skip existing table" mode for timeline schema creator (Li Lu via sjlee) + YARN-3901. Populate flow run data in the flow_run & flow activity tables + (Vrushali C via sjlee) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index da7fadf..758feb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -174,6 +174,19 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <additionnalDependencies> + <additionnalDependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + </additionnalDependency> + </additionnalDependencies> + </configuration> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 772002d..7c4a5da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -33,11 +33,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; @@ -53,23 +52,36 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; /** - * This implements a hbase based backend for storing application timeline entity + * This implements a hbase based backend for storing the timeline entity * information. + * It writes to multiple tables at the backend */ @InterfaceAudience.Private @InterfaceStability.Unstable public class HBaseTimelineWriterImpl extends AbstractService implements TimelineWriter { + private static final Log LOG = LogFactory + .getLog(HBaseTimelineWriterImpl.class); + private Connection conn; private TypedBufferedMutator<EntityTable> entityTable; private TypedBufferedMutator<AppToFlowTable> appToFlowTable; private TypedBufferedMutator<ApplicationTable> applicationTable; - - private static final Log LOG = LogFactory - .getLog(HBaseTimelineWriterImpl.class); + private TypedBufferedMutator<FlowActivityTable> flowActivityTable; + private TypedBufferedMutator<FlowRunTable> flowRunTable; public HBaseTimelineWriterImpl() { super(HBaseTimelineWriterImpl.class.getName()); @@ -91,6 +103,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements entityTable = new EntityTable().getTableMutator(hbaseConf, conn); appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn); applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn); + flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); + flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn); } /** @@ -111,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // if the entity is the application, the destination is the application // table - boolean isApplication = isApplicationEntity(te); + boolean isApplication = TimelineWriterUtils.isApplicationEntity(te); byte[] rowKey = isApplication ? ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId) : @@ -124,37 +138,139 @@ public class HBaseTimelineWriterImpl extends AbstractService implements storeMetrics(rowKey, te.getMetrics(), isApplication); storeRelations(rowKey, te, isApplication); - if (isApplicationCreated(te)) { - onApplicationCreated( - clusterId, userId, flowName, flowVersion, flowRunId, appId, te); + if (isApplication) { + if (TimelineWriterUtils.isApplicationCreated(te)) { + onApplicationCreated(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + } + // if it's an application entity, store metrics + storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId, + appId, te); + // if application has finished, store it's finish time and write final + // values + // of all metrics + if (TimelineWriterUtils.isApplicationFinished(te)) { + onApplicationFinished(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + } } } return putStatus; } - private static boolean isApplicationEntity(TimelineEntity te) { - return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + private void onApplicationCreated(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + // store in App to flow table + storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId, + appId, te); + // store in flow run table + storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + // store in flow activity table + storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); } - private static boolean isApplicationCreated(TimelineEntity te) { - if (isApplicationEntity(te)) { - for (TimelineEvent event : te.getEvents()) { - if (event.getId().equals( - ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { - return true; - } - } - } - return false; + /* + * updates the {@link FlowActivityTable} with the Application TimelineEntity + * information + */ + private void storeInFlowActivityTable(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName); + byte[] qualifier = GenericObjectMapper.write(flowRunId); + FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, + null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } - private void onApplicationCreated(String clusterId, String userId, + /* + * updates the {@link FlowRunTable} with Application Created information + */ + private void storeAppCreatedInFlowRunTable(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, + flowRunId); + FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, + te.getCreatedTime(), + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + private void storeInAppToFlowTable(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, TimelineEntity te) throws IOException { byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); - AppToFlowColumn.FLOW_RUN_ID.store( - rowKey, appToFlowTable, null, flowRunId); + AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); + } + + /* + * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an + * application has finished + */ + private void onApplicationFinished(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + // store in flow run table + storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId, + appId, te); + + // indicate in the flow activity table that the app has finished + storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + } + + /* + * Update the {@link FlowRunTable} with Application Finished information + */ + private void storeAppFinishedInFlowRunTable(String clusterId, String userId, + String flowName, long flowRunId, String appId, TimelineEntity te) + throws IOException { + byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, + flowRunId); + Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID + .getAttribute(appId); + FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, + TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId); + + // store the final value of metrics since application has finished + Set<TimelineMetric> metrics = te.getMetrics(); + if (metrics != null) { + storeFlowMetrics(rowKey, metrics, attributeAppId, + AggregationOperation.SUM_FINAL.getAttribute()); + } + } + + /* + * Updates the {@link FlowRunTable} with Application Metrics + */ + private void storeFlowMetricsAppRunning(String clusterId, String userId, + String flowName, long flowRunId, String appId, TimelineEntity te) + throws IOException { + Set<TimelineMetric> metrics = te.getMetrics(); + if (metrics != null) { + byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, + flowRunId); + storeFlowMetrics(rowKey, metrics, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + } + + private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics, + Attribute... attributes) throws IOException { + for (TimelineMetric metric : metrics) { + String metricColumnQualifier = metric.getId(); + Map<Long, Number> timeseries = metric.getValues(); + for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue(), + attributes); + } + } } private void storeRelations(byte[] rowKey, TimelineEntity te, @@ -184,7 +300,6 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // id3?id4?id5 String compoundValue = Separator.VALUES.joinEncoded(connectedEntity.getValue()); - columnPrefix.store(rowKey, table, connectedEntity.getKey(), null, compoundValue); } @@ -342,6 +457,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements entityTable.flush(); appToFlowTable.flush(); applicationTable.flush(); + flowRunTable.flush(); + flowActivityTable.flush(); } /** @@ -364,6 +481,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements LOG.info("closing the application table"); applicationTable.close(); } + if (flowRunTable != null) { + LOG.info("closing the flow run table"); + // The close API performs flushing and releases any resources held + flowRunTable.close(); + } + if (flowActivityTable != null) { + LOG.info("closing the flowActivityTable table"); + // The close API performs flushing and releases any resources held + flowActivityTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index e7e51a7..cbcff4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -42,6 +42,8 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; /** * This creates the schema for a hbase based backend for storing application @@ -199,7 +201,7 @@ public class TimelineSchemaCreator { return commandLine; } - private static void createAllTables(Configuration hbaseConf, + public static void createAllTables(Configuration hbaseConf, boolean skipExisting) throws IOException { Connection conn = null; @@ -236,6 +238,24 @@ public class TimelineSchemaCreator { throw e; } } + try { + new FlowRunTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new FlowActivityTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } } finally { if (conn != null) { conn.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java index c028386..802626d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies fully qualified columns for the {@link ApplicationTable}. @@ -76,9 +77,9 @@ public enum ApplicationColumn implements Column<ApplicationTable> { public void store(byte[] rowKey, TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp, - Object inputValue) throws IOException { + Object inputValue, Attribute... attributes) throws IOException { column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue); + inputValue, attributes); } public Object readResult(Result result) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index ad1def6..d7b5773 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies partially qualified columns for the application table. @@ -112,7 +113,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { */ public void store(byte[] rowKey, TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -123,8 +125,9 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); - } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } /* * (non-Javadoc) @@ -137,7 +140,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { */ public void store(byte[] rowKey, TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute...attributes) + throws IOException { // Null check if (qualifier == null) { @@ -148,7 +152,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java index 423037a..859fdca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java @@ -25,8 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import java.io.IOException; +import java.util.Map; /** * Identifies fully qualified columns for the {@link AppToFlowTable}. @@ -67,9 +69,9 @@ public enum AppToFlowColumn implements Column<AppToFlowTable> { public void store(byte[] rowKey, TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp, - Object inputValue) throws IOException { + Object inputValue, Attribute... attributes) throws IOException { column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue); + inputValue, attributes); } public Object readResult(Result result) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java index 3397d62..64c1cda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * A Column represents the way to store a fully qualified column in a specific @@ -38,12 +39,15 @@ public interface Column<T> { * column. * @param timestamp version timestamp. When null the server timestamp will be * used. + * @param attributes Map of attributes for this mutation. used in the coprocessor + * to set/read the cell tags. Can be null. * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator, - Long timestamp, Object inputValue) throws IOException; + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException; /** * Get the latest version of this specified column. Note: this call clones the http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index f1b7c58..3a2e088 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; - +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * This class is meant to be used only by explicit Columns, and not directly to * write by clients. @@ -58,31 +59,66 @@ public class ColumnHelper<T> { * Sends a Mutation to the table. The mutations will be buffered and sent over * the wire as part of a batch. * - * @param rowKey identifying the row to write. Nothing gets written when null. - * @param tableMutator used to modify the underlying HBase table - * @param columnQualifier column qualifier. Nothing gets written when null. - * @param timestamp version timestamp. When null the server timestamp will be - * used. - * @param inputValue the value to write to the rowKey and column qualifier. - * Nothing gets written when null. + * @param rowKey + * identifying the row to write. Nothing gets written when null. + * @param tableMutator + * used to modify the underlying HBase table + * @param columnQualifier + * column qualifier. Nothing gets written when null. + * @param timestamp + * version timestamp. When null the current timestamp multiplied with + * TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of + * app id will be used + * @param inputValue + * the value to write to the rowKey and column qualifier. Nothing + * gets written when null. * @throws IOException */ public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator, - byte[] columnQualifier, Long timestamp, Object inputValue) - throws IOException { + byte[] columnQualifier, Long timestamp, Object inputValue, + Attribute... attributes) throws IOException { if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) { return; } Put p = new Put(rowKey); + timestamp = getPutTimestamp(timestamp, attributes); + p.addColumn(columnFamilyBytes, columnQualifier, timestamp, + GenericObjectMapper.write(inputValue)); + if ((attributes != null) && (attributes.length > 0)) { + for (Attribute attribute : attributes) { + p.setAttribute(attribute.getName(), attribute.getValue()); + } + } + tableMutator.mutate(p); + } + /* + * Figures out the cell timestamp used in the Put For storing into flow run + * table. We would like to left shift the timestamp and supplement it with the + * AppId id so that there are no collisions in the flow run table's cells + */ + private long getPutTimestamp(Long timestamp, Attribute[] attributes) { if (timestamp == null) { - p.addColumn(columnFamilyBytes, columnQualifier, - GenericObjectMapper.write(inputValue)); - } else { - p.addColumn(columnFamilyBytes, columnQualifier, timestamp, - GenericObjectMapper.write(inputValue)); + timestamp = System.currentTimeMillis(); } - tableMutator.mutate(p); + String appId = getAppIdFromAttributes(attributes); + long supplementedTS = TimestampGenerator.getSupplementedTimestamp( + timestamp, appId); + return supplementedTS; + } + + private String getAppIdFromAttributes(Attribute[] attributes) { + if (attributes == null) { + return null; + } + String appId = null; + for (Attribute attribute : attributes) { + if (AggregationCompactionDimension.APPLICATION_ID.toString().equals( + attribute.getName())) { + appId = Bytes.toString(attribute.getValue()); + } + } + return appId; } /** @@ -171,7 +207,9 @@ public class ColumnHelper<T> { for (Entry<Long, byte[]> cell : cells.entrySet()) { V value = (V) GenericObjectMapper.read(cell.getValue()); - cellResults.put(cell.getKey(), value); + cellResults.put( + TimestampGenerator.getTruncatedTimestamp(cell.getKey()), + value); } } results.put(columnName, cellResults); @@ -315,6 +353,27 @@ public class ColumnHelper<T> { /** * @param columnPrefixBytes The byte representation for the column prefix. * Should not contain {@link Separator#QUALIFIERS}. + * @param qualifier for the remainder of the column. + * @return fully sanitized column qualifier that is a combination of prefix + * and qualifier. If prefix is null, the result is simply the encoded + * qualifier without any separator. + */ + public static byte[] getColumnQualifier(byte[] columnPrefixBytes, + long qualifier) { + + if (columnPrefixBytes == null) { + return Bytes.toBytes(qualifier); + } + + // Convert qualifier to lower case, strip of separators and tag on column + // prefix. + byte[] columnQualifier = + Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier)); + return columnQualifier; + } + /** + * @param columnPrefixBytes The byte representation for the column prefix. + * Should not contain {@link Separator#QUALIFIERS}. * @param qualifier the byte representation for the remainder of the column. * @return fully sanitized column qualifier that is a combination of prefix * and qualifier. If prefix is null, the result is simply the encoded http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index 509ff49..db49098 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -23,6 +23,7 @@ import java.util.NavigableMap; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Used to represent a partially qualified column, where the actual column name @@ -43,12 +44,36 @@ public interface ColumnPrefix<T> { * @param qualifier column qualifier. Nothing gets written when null. * @param timestamp version timestamp. When null the server timestamp will be * used. + *@param attributes attributes for the mutation that are used by the coprocessor + * to set/read the cell tags * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator, - String qualifier, Long timestamp, Object inputValue) throws IOException; + byte[] qualifier, Long timestamp, Object inputValue, + Attribute... attributes) throws IOException; + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey identifying the row to write. Nothing gets written when null. + * @param tableMutator used to modify the underlying HBase table. Caller is + * responsible to pass a mutator for the table that actually has this + * column. + * @param qualifier column qualifier. Nothing gets written when null. + * @param timestamp version timestamp. When null the server timestamp will be + * used. + *@param attributes attributes for the mutation that are used by the coprocessor + * to set/read the cell tags + * @param inputValue the value to write to the rowKey and column qualifier. + * Nothing gets written when null. + * @throws IOException + */ + public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator, + String qualifier, Long timestamp, Object inputValue, + Attribute... attributes) throws IOException; /** * Get the latest version of this specified column. Note: this call clones the @@ -81,4 +106,5 @@ public interface ColumnPrefix<T> { */ public <V> NavigableMap<String, NavigableMap<Long, V>> readResultsWithTimestamps(Result result) throws IOException; + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java index 58bdedc7e..371371a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java @@ -19,9 +19,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; import java.util.ArrayList; import java.util.List; +import java.util.SortedSet; +import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * bunch of utility functions used across TimelineWriter classes @@ -36,6 +46,9 @@ public class TimelineWriterUtils { /** indicator for no limits for splitting */ public static final int NO_LIMIT_SPLIT = -1; + /** milliseconds in one day */ + public static final long MILLIS_ONE_DAY = 86400000L; + /** * Splits the source array into multiple array segments using the given * separator, up to a maximum of count items. This will naturally produce @@ -140,4 +153,176 @@ public class TimelineWriterUtils { return Long.MAX_VALUE - key; } + /** + * returns the timestamp of that day's start (which is midnight 00:00:00 AM) + * for a given input timestamp + * + * @param ts + * @return timestamp of that day's beginning (midnight) + */ + public static long getTopOfTheDayTimestamp(long ts) { + long dayTimestamp = ts - (ts % MILLIS_ONE_DAY); + return dayTimestamp; + } + + /** + * Combines the input array of attributes and the input aggregation operation + * into a new array of attributes. + * + * @param attributes + * @param aggOp + * @return array of combined attributes + */ + public static Attribute[] combineAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int newLength = getNewLengthCombinedAttributes(attributes, aggOp); + Attribute[] combinedAttributes = new Attribute[newLength]; + + if (attributes != null) { + System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length); + } + + if (aggOp != null) { + Attribute a2 = aggOp.getAttribute(); + combinedAttributes[newLength - 1] = a2; + } + return combinedAttributes; + } + + /** + * Returns a number for the new array size. The new array is the combination + * of input array of attributes and the input aggregation operation. + * + * @param attributes + * @param aggOp + * @return the size for the new array + */ + private static int getNewLengthCombinedAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int oldLength = getAttributesLength(attributes); + int aggLength = getAppOpLength(aggOp); + return oldLength + aggLength; + } + + private static int getAppOpLength(AggregationOperation aggOp) { + if (aggOp != null) { + return 1; + } + return 0; + } + + private static int getAttributesLength(Attribute[] attributes) { + if (attributes != null) { + return attributes.length; + } + return 0; + } + + /** + * checks if an application has finished + * + * @param te + * @return true if application has finished else false + */ + public static boolean isApplicationFinished(TimelineEntity te) { + SortedSet<TimelineEvent> allEvents = te.getEvents(); + if ((allEvents != null) && (allEvents.size() > 0)) { + TimelineEvent event = allEvents.last(); + if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + return true; + } + } + return false; + } + + /** + * get the time at which an app finished + * + * @param te + * @return true if application has finished else false + */ + public static long getApplicationFinishedTime(TimelineEntity te) { + SortedSet<TimelineEvent> allEvents = te.getEvents(); + if ((allEvents != null) && (allEvents.size() > 0)) { + TimelineEvent event = allEvents.last(); + if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + return event.getTimestamp(); + } + } + return 0l; + } + + /** + * Checks if the input TimelineEntity object is an ApplicationEntity. + * + * @param te + * @return true if input is an ApplicationEntity, false otherwise + */ + public static boolean isApplicationEntity(TimelineEntity te) { + return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + } + + /** + * Checks for the APPLICATION_CREATED event. + * + * @param te + * @return true is application event exists, false otherwise + */ + public static boolean isApplicationCreated(TimelineEntity te) { + if (isApplicationEntity(te)) { + for (TimelineEvent event : te.getEvents()) { + if (event.getId() + .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + return true; + } + } + } + return false; + } + + /** + * Returns the first seen aggregation operation as seen in the list of input + * tags or null otherwise + * + * @param tags + * @return AggregationOperation + */ + public static AggregationOperation getAggregationOperationFromTagsList( + List<Tag> tags) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + for (Tag tag : tags) { + if (tag.getType() == aggOp.getTagType()) { + return aggOp; + } + } + } + return null; + } + + /** + * Creates a {@link Tag} from the input attribute. + * + * @param attribute + * @return Tag + */ + public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) { + // attribute could be either an Aggregation Operation or + // an Aggregation Dimension + // Get the Tag type from either + AggregationOperation aggOp = AggregationOperation + .getAggregationOperation(attribute.getKey()); + if (aggOp != null) { + Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); + return t; + } + + AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension + .getAggregationCompactionDimension(attribute.getKey()); + if (aggCompactDim != null) { + Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); + return t; + } + return null; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java new file mode 100644 index 0000000..555b64e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.ConverterUtils; + +/** + * Utility class that allows HBase coprocessors to interact with unique + * timestamps. + */ +public class TimestampGenerator { + + /* + * if this is changed, then reading cell timestamps written with older + * multiplier value will not work + */ + public static final long TS_MULTIPLIER = 1000L; + + private final AtomicLong lastTimestamp = new AtomicLong(); + + /** + * Returns the current wall clock time in milliseconds, multiplied by the + * required precision. + */ + public long currentTime() { + // We want to align cell timestamps with current time. + // cell timestamps are not be less than + // System.currentTimeMillis() * TS_MULTIPLIER. + return System.currentTimeMillis() * TS_MULTIPLIER; + } + + /** + * Returns a timestamp value unique within the scope of this + * {@code TimestampGenerator} instance. For usage by HBase + * {@code RegionObserver} coprocessors, this normally means unique within a + * given region. + * + * Unlikely scenario of generating a non-unique timestamp: if there is a + * sustained rate of more than 1M hbase writes per second AND if region fails + * over within that time range of timestamps being generated then there may be + * collisions writing to a cell version of the same column. + */ + public long getUniqueTimestamp() { + long lastTs; + long nextTs; + do { + lastTs = lastTimestamp.get(); + nextTs = Math.max(lastTs + 1, currentTime()); + } while (!lastTimestamp.compareAndSet(lastTs, nextTs)); + return nextTs; + } + + /** + * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of + * application id + * + * Unlikely scenario of generating a timestamp that is a duplicate: If more + * than a 1000 concurrent apps are running in one flow run AND write to same + * column at the same time, then say appId of 1001 will overlap with appId of + * 001 and there may be collisions for that flow run's specific column. + * + * @param incomingTS + * @param appId + * @return a timestamp multiplied with TS_MULTIPLIER and last few digits of + * application id + */ + public static long getSupplementedTimestamp(long incomingTS, String appId) { + long suffix = getAppIdSuffix(appId); + long outgoingTS = incomingTS * TS_MULTIPLIER + suffix; + return outgoingTS; + + } + + private static long getAppIdSuffix(String appIdStr) { + if (appIdStr == null) { + return 0L; + } + ApplicationId appId = ConverterUtils.toApplicationId(appIdStr); + long id = appId.getId() % TS_MULTIPLIER; + return id; + } + + /** + * truncates the last few digits of the timestamp which were supplemented by + * the TimestampGenerator#getSupplementedTimestamp function + * + * @param incomingTS + * @return a truncated timestamp value + */ + public static long getTruncatedTimestamp(long incomingTS) { + return incomingTS / TS_MULTIPLIER; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java deleted file mode 100644 index 32577fb..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -@InterfaceAudience.Private -@InterfaceStability.Unstable -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index 26e7748..8ae19b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; @@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies fully qualified columns for the {@link EntityTable}. @@ -81,9 +83,9 @@ public enum EntityColumn implements Column<EntityTable> { public void store(byte[] rowKey, TypedBufferedMutator<EntityTable> tableMutator, Long timestamp, - Object inputValue) throws IOException { + Object inputValue, Attribute... attributes) throws IOException { column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue); + inputValue, attributes); } public Object readResult(Result result) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 75ff742..0d4e5a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies partially qualified columns for the entity table. @@ -108,11 +109,13 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #store(byte[], * org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) */ public void store(byte[] rowKey, TypedBufferedMutator<EntityTable> tableMutator, String qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -123,8 +126,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); - } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } /* * (non-Javadoc) @@ -137,7 +141,8 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { */ public void store(byte[] rowKey, TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -148,8 +153,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); - } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } /* * (non-Javadoc) http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java new file mode 100644 index 0000000..ff12c7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies the compaction dimensions for the data in the {@link FlowRunTable} + * . + */ +public enum AggregationCompactionDimension { + + /** + * the application id + */ + APPLICATION_ID((byte) 101); + + private byte tagType; + private byte[] inBytes; + + private AggregationCompactionDimension(byte tagType) { + this.tagType = tagType; + this.inBytes = Bytes.toBytes(this.name()); + } + + public Attribute getAttribute(String attributeValue) { + return new Attribute(this.name(), Bytes.toBytes(attributeValue)); + } + + public byte getTagType() { + return tagType; + } + + public byte[] getInBytes() { + return this.inBytes.clone(); + } + + public static AggregationCompactionDimension getAggregationCompactionDimension( + String aggCompactDimStr) { + for (AggregationCompactionDimension aggDim : AggregationCompactionDimension + .values()) { + if (aggDim.name().equals(aggCompactDimStr)) { + return aggDim; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java new file mode 100644 index 0000000..c635ce6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies the attributes to be set for puts into the {@link FlowRunTable}. + * The numbers used for tagType are prime numbers + */ +public enum AggregationOperation { + + /** + * When the flow was started. + */ + MIN((byte) 71), + + /** + * When it ended. + */ + MAX((byte) 73), + + /** + * The metrics of the flow + */ + SUM((byte) 79), + + /** + * application running + */ + SUM_FINAL((byte) 83), + + /** + * compact + */ + COMPACT((byte) 89); + + private byte tagType; + private byte[] inBytes; + + private AggregationOperation(byte tagType) { + this.tagType = tagType; + this.inBytes = Bytes.toBytes(this.name()); + } + + public Attribute getAttribute() { + return new Attribute(this.name(), this.inBytes); + } + + public byte getTagType() { + return tagType; + } + + public byte[] getInBytes() { + return this.inBytes.clone(); + } + + /** + * returns the AggregationOperation enum that represents that string + * @param aggOpStr + * @return the AggregationOperation enum that represents that string + */ + public static AggregationOperation getAggregationOperation(String aggOpStr) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + if (aggOp.name().equals(aggOpStr)) { + return aggOp; + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java new file mode 100644 index 0000000..d3de518 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +/** + * Defines the attribute tuple to be set for puts into the {@link FlowRunTable}. + */ +public class Attribute { + private final String name; + private final byte[] value; + + public Attribute(String name, byte[] value) { + this.name = name; + this.value = value.clone(); + } + + public String getName() { + return name; + } + + public byte[] getValue() { + return value.clone(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java new file mode 100644 index 0000000..d991b42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the flow run table column families. + */ +public enum FlowActivityColumnFamily implements ColumnFamily<FlowActivityTable> { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value + * create a column family with this name. Must be lower case and + * without spaces. + */ + private FlowActivityColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java new file mode 100644 index 0000000..b899e5c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies partially qualified columns for the {@link FlowActivityTable} + */ +public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> { + + /** + * To store run ids of the flows + */ + RUN_ID(FlowActivityColumnFamily.INFO, "r", null); + + private final ColumnHelper<FlowActivityTable> column; + private final ColumnFamily<FlowActivityTable> columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + private final AggregationOperation aggOp; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily + * that this column is stored in. + * @param columnPrefix + * for this column. + */ + private FlowActivityColumnPrefix( + ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix, + AggregationOperation aggOp) { + column = new ColumnHelper<FlowActivityTable>(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE + .encode(columnPrefix)); + } + this.aggOp = aggOp; + } + + /** + * @return the column name value + */ + public String getColumnPrefix() { + return columnPrefix; + } + + public byte[] getColumnPrefixBytes() { + return columnPrefixBytes.clone(); + } + + public AggregationOperation getAttribute() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + @Override + public void store(byte[] rowKey, + TypedBufferedMutator<FlowActivityTable> tableMutator, byte[] qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map<String, Object> readResults(Result result) throws IOException { + return column.readResults(result, columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + */ + public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps( + Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); + } + + /** + * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there + * is no match. The following holds true: {@code columnFor(x) == columnFor(y)} + * if and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowActivityColumnPrefix} or null + */ + public static final FlowActivityColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (FlowActivityColumnPrefix flowActivityColPrefix : FlowActivityColumnPrefix + .values()) { + // Find a match based only on name. + if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) { + return flowActivityColPrefix; + } + } + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there + * is no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowActivityColumnPrefix} or null if both + * arguments don't match. + */ + public static final FlowActivityColumnPrefix columnFor( + FlowActivityColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (FlowActivityColumnPrefix flowActivityColumnPrefix : FlowActivityColumnPrefix + .values()) { + // Find a match based column family and on name. + if (flowActivityColumnPrefix.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (flowActivityColumnPrefix + .getColumnPrefix() == null)) || (flowActivityColumnPrefix + .getColumnPrefix().equals(columnPrefix)))) { + return flowActivityColumnPrefix; + } + } + // Default to null + return null; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + @Override + public void store(byte[] rowKey, + TypedBufferedMutator<FlowActivityTable> tableMutator, String qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, null, inputValue, + combinedAttributes); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java new file mode 100644 index 0000000..19e4e83 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; + +/** + * Represents a rowkey for the flow activity table. + */ +public class FlowActivityRowKey { + + private final String clusterId; + private final long dayTs; + private final String userId; + private final String flowId; + + public FlowActivityRowKey(String clusterId, long dayTs, String userId, + String flowId) { + this.clusterId = clusterId; + this.dayTs = dayTs; + this.userId = userId; + this.flowId = flowId; + } + + public String getClusterId() { + return clusterId; + } + + public long getDayTimestamp() { + return dayTs; + } + + public String getUserId() { + return userId; + } + + public String getFlowId() { + return flowId; + } + + /** + * Constructs a row key for the flow activity table as follows: + * {@code clusterId!dayTimestamp!user!flowId} + * + * Will insert into current day's record in the table + * @param clusterId + * @param userId + * @param flowId + * @return byte array with the row key prefix + */ + public static byte[] getRowKey(String clusterId, String userId, String flowId) { + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + return getRowKey(clusterId, dayTs, userId, flowId); + } + + /** + * Constructs a row key for the flow activity table as follows: + * {@code clusterId!dayTimestamp!user!flowId} + * + * @param clusterId + * @param dayTs + * @param userId + * @param flowId + * @return byte array for the row key + */ + public static byte[] getRowKey(String clusterId, long dayTs, String userId, + String flowId) { + return Separator.QUALIFIERS.join( + Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)), + Bytes.toBytes(TimelineWriterUtils.invert(dayTs)), + Bytes.toBytes(Separator.QUALIFIERS.encode(userId)), + Bytes.toBytes(Separator.QUALIFIERS.encode(flowId))); + } + + /** + * Given the raw row key as bytes, returns the row key as an object. + */ + public static FlowActivityRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 4) { + throw new IllegalArgumentException("the row key is not valid for " + + "a flow activity"); + } + + String clusterId = Separator.QUALIFIERS.decode(Bytes + .toString(rowKeyComponents[0])); + long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1])); + String userId = Separator.QUALIFIERS.decode(Bytes + .toString(rowKeyComponents[2])); + String flowId = Separator.QUALIFIERS.decode(Bytes + .toString(rowKeyComponents[3])); + return new FlowActivityRowKey(clusterId, dayTs, userId, flowId); + } +}