YARN-3906. Split the application table from the entity table. Contributed by Sangjin Lee.
(cherry picked from commit bcd755eba9466ce277d3c14192c31da6462c4ab3) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e0137ad Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e0137ad Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e0137ad Branch: refs/heads/YARN-2928-rebase Commit: 8e0137ad714cc3b98b786f3089ebf645d85dd2f6 Parents: 6aa4c6d Author: Junping Du <junping...@apache.org> Authored: Tue Aug 11 16:59:21 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Mon Nov 9 16:13:11 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../storage/HBaseTimelineReaderImpl.java | 202 ++++++++---- .../storage/HBaseTimelineWriterImpl.java | 145 ++++++--- .../storage/TimelineSchemaCreator.java | 10 + .../storage/application/ApplicationColumn.java | 136 ++++++++ .../application/ApplicationColumnFamily.java | 65 ++++ .../application/ApplicationColumnPrefix.java | 217 +++++++++++++ .../storage/application/ApplicationRowKey.java | 67 ++++ .../storage/application/ApplicationTable.java | 164 ++++++++++ .../storage/application/package-info.java | 25 ++ .../storage/entity/EntityColumnPrefix.java | 2 +- .../storage/entity/EntityTable.java | 59 ++-- .../storage/TestHBaseTimelineWriterImpl.java | 321 ++++++++++++++++--- 13 files changed, 1230 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 45a7f93..735aa62 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -85,6 +85,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3049. [Storage Implementation] Implement storage reader interface to fetch raw data from HBase backend (Zhijie Shen via sjlee) + YARN-3906. Split the application table from the entity table. (Sangjin Lee + via junping_du) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 5258b9c..094f868 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -18,7 +18,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,11 +44,17 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; @@ -45,18 +63,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeSet; +import com.google.common.base.Preconditions; public class HBaseTimelineReaderImpl extends AbstractService implements TimelineReader { @@ -70,6 +77,7 @@ public class HBaseTimelineReaderImpl private Connection conn; private EntityTable entityTable; private AppToFlowTable appToFlowTable; + private ApplicationTable applicationTable; public HBaseTimelineReaderImpl() { super(HBaseTimelineReaderImpl.class.getName()); @@ -82,6 +90,7 @@ public class HBaseTimelineReaderImpl conn = ConnectionFactory.createConnection(hbaseConf); entityTable = new EntityTable(); appToFlowTable = new AppToFlowTable(); + applicationTable = new ApplicationTable(); } @Override @@ -109,14 +118,24 @@ public class HBaseTimelineReaderImpl fieldsToRetrieve = EnumSet.noneOf(Field.class); } - byte[] rowKey = EntityRowKey.getRowKey( - clusterId, userId, flowId, flowRunId, appId, entityType, entityId); + boolean isApplication = isApplicationEntity(entityType); + byte[] rowKey = isApplication ? + ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, + appId) : + EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, + entityType, entityId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); - return parseEntity( - entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve, + Result result = isApplication ? + applicationTable.getResult(hbaseConf, conn, get) : + entityTable.getResult(hbaseConf, conn, get); + return parseEntity(result, fieldsToRetrieve, false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME, - DEFAULT_END_TIME, null, null, null, null, null, null); + DEFAULT_END_TIME, null, null, null, null, null, null, isApplication); + } + + private static boolean isApplicationEntity(String entityType) { + return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType); } @Override @@ -155,26 +174,46 @@ public class HBaseTimelineReaderImpl } NavigableSet<TimelineEntity> entities = new TreeSet<>(); - // Scan through part of the table to find the entities belong to one app and - // one type - Scan scan = new Scan(); - scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( - clusterId, userId, flowId, flowRunId, appId, entityType)); - scan.setMaxVersions(Integer.MAX_VALUE); - ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan); - for (Result result : scanner) { + boolean isApplication = isApplicationEntity(entityType); + if (isApplication) { + // If getEntities() is called for an application, there can be at most + // one entity. If the entity passes the filter, it is returned. Otherwise, + // an empty set is returned. + byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId, + flowRunId, appId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = applicationTable.getResult(hbaseConf, conn, get); TimelineEntity entity = parseEntity(result, fieldsToRetrieve, - true, createdTimeBegin, createdTimeEnd, - true, modifiedTimeBegin, modifiedTimeEnd, - isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters, - metricFilters); - if (entity == null) { - continue; + true, createdTimeBegin, createdTimeEnd, true, modifiedTimeBegin, + modifiedTimeEnd, isRelatedTo, relatesTo, infoFilters, configFilters, + eventFilters, metricFilters, isApplication); + if (entity != null) { + entities.add(entity); } - if (entities.size() > limit) { - entities.pollLast(); + } else { + // Scan through part of the table to find the entities belong to one app + // and one type + Scan scan = new Scan(); + scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( + clusterId, userId, flowId, flowRunId, appId, entityType)); + scan.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = + entityTable.getResultScanner(hbaseConf, conn, scan); + for (Result result : scanner) { + TimelineEntity entity = parseEntity(result, fieldsToRetrieve, + true, createdTimeBegin, createdTimeEnd, + true, modifiedTimeBegin, modifiedTimeEnd, + isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters, + metricFilters, isApplication); + if (entity == null) { + continue; + } + if (entities.size() > limit) { + entities.pollLast(); + } + entities.add(entity); } - entities.add(entity); } return entities; } @@ -221,26 +260,37 @@ public class HBaseTimelineReaderImpl boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd, Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo, Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> eventFilters, Set<String> metricFilters) + Set<String> eventFilters, Set<String> metricFilters, + boolean isApplication) throws IOException { if (result == null || result.isEmpty()) { return null; } TimelineEntity entity = new TimelineEntity(); - entity.setType(EntityColumn.TYPE.readResult(result).toString()); - entity.setId(EntityColumn.ID.readResult(result).toString()); + String entityType = isApplication ? + TimelineEntityType.YARN_APPLICATION.toString() : + EntityColumn.TYPE.readResult(result).toString(); + entity.setType(entityType); + String entityId = isApplication ? + ApplicationColumn.ID.readResult(result).toString() : + EntityColumn.ID.readResult(result).toString(); + entity.setId(entityId); // fetch created time - entity.setCreatedTime( - ((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue()); + Number createdTime = isApplication ? + (Number)ApplicationColumn.CREATED_TIME.readResult(result) : + (Number)EntityColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin || entity.getCreatedTime() > createdTimeEnd)) { return null; } // fetch modified time - entity.setCreatedTime( - ((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue()); + Number modifiedTime = isApplication ? + (Number)ApplicationColumn.MODIFIED_TIME.readResult(result) : + (Number)EntityColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin || entity.getModifiedTime() > modifiedTimeEnd)) { return null; @@ -250,7 +300,13 @@ public class HBaseTimelineReaderImpl boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO); + if (isApplication) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); + } else { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, + true); + } if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( entity.getIsRelatedToEntities(), isRelatedTo)) { return null; @@ -265,7 +321,12 @@ public class HBaseTimelineReaderImpl boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - readRelationship(entity, result, EntityColumnPrefix.RELATES_TO); + if (isApplication) { + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); + } else { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + } if (checkRelatesTo && !TimelineReaderUtils.matchRelations( entity.getRelatesToEntities(), relatesTo)) { return null; @@ -280,7 +341,11 @@ public class HBaseTimelineReaderImpl boolean checkInfo = infoFilters != null && infoFilters.size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - readKeyValuePairs(entity, result, EntityColumnPrefix.INFO); + if (isApplication) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); + } else { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); + } if (checkInfo && !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { return null; @@ -295,7 +360,11 @@ public class HBaseTimelineReaderImpl boolean checkConfigs = configFilters != null && configFilters.size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG); + if (isApplication) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); + } else { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); + } if (checkConfigs && !TimelineReaderUtils.matchFilters( entity.getConfigs(), configFilters)) { return null; @@ -310,7 +379,7 @@ public class HBaseTimelineReaderImpl boolean checkEvents = eventFilters != null && eventFilters.size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result); + readEvents(entity, result, isApplication); if (checkEvents && !TimelineReaderUtils.matchEventFilters( entity.getEvents(), eventFilters)) { return null; @@ -325,7 +394,7 @@ public class HBaseTimelineReaderImpl boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { - readMetrics(entity, result); + readMetrics(entity, result, isApplication); if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( entity.getMetrics(), metricFilters)) { return null; @@ -338,15 +407,15 @@ public class HBaseTimelineReaderImpl return entity; } - private static void readRelationship( - TimelineEntity entity, Result result, EntityColumnPrefix prefix) - throws IOException { + private static <T> void readRelationship( + TimelineEntity entity, Result result, ColumnPrefix<T> prefix, + boolean isRelatedTo) throws IOException { // isRelatedTo and relatesTo are of type Map<String, Set<String>> Map<String, Object> columns = prefix.readResults(result); for (Map.Entry<String, Object> column : columns.entrySet()) { for (String id : Separator.VALUES.splitEncoded( column.getValue().toString())) { - if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) { + if (isRelatedTo) { entity.addIsRelatedToEntity(column.getKey(), id); } else { entity.addRelatesToEntity(column.getKey(), id); @@ -355,12 +424,12 @@ public class HBaseTimelineReaderImpl } } - private static void readKeyValuePairs( - TimelineEntity entity, Result result, EntityColumnPrefix prefix) - throws IOException { + private static <T> void readKeyValuePairs( + TimelineEntity entity, Result result, ColumnPrefix<T> prefix, + boolean isConfig) throws IOException { // info and configuration are of type Map<String, Object or String> Map<String, Object> columns = prefix.readResults(result); - if (prefix.equals(EntityColumnPrefix.CONFIG)) { + if (isConfig) { for (Map.Entry<String, Object> column : columns.entrySet()) { entity.addConfig(column.getKey(), column.getKey().toString()); } @@ -369,10 +438,11 @@ public class HBaseTimelineReaderImpl } } - private static void readEvents(TimelineEntity entity, Result result) - throws IOException { + private static void readEvents(TimelineEntity entity, Result result, + boolean isApplication) throws IOException { Map<String, TimelineEvent> eventsMap = new HashMap<>(); - Map<String, Object> eventsResult = + Map<String, Object> eventsResult = isApplication ? + ApplicationColumnPrefix.EVENT.readResults(result) : EntityColumnPrefix.EVENT.readResults(result); for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) { Collection<String> tokens = @@ -405,10 +475,16 @@ public class HBaseTimelineReaderImpl entity.addEvents(eventsSet); } - private static void readMetrics(TimelineEntity entity, Result result) - throws IOException { - NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); + private static void readMetrics(TimelineEntity entity, Result result, + boolean isApplication) throws IOException { + NavigableMap<String, NavigableMap<Long, Number>> metricsResult; + if (isApplication) { + metricsResult = + ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); + } else { + metricsResult = + EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); + } for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: metricsResult.entrySet()) { TimelineMetric metric = new TimelineMetric(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 5290415..96192cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -38,9 +38,14 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; @@ -61,6 +66,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements private Connection conn; private TypedBufferedMutator<EntityTable> entityTable; private TypedBufferedMutator<AppToFlowTable> appToFlowTable; + private TypedBufferedMutator<ApplicationTable> applicationTable; private static final Log LOG = LogFactory .getLog(HBaseTimelineWriterImpl.class); @@ -84,6 +90,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements conn = ConnectionFactory.createConnection(hbaseConf); entityTable = new EntityTable().getTableMutator(hbaseConf, conn); appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn); + applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn); } /** @@ -102,18 +109,20 @@ public class HBaseTimelineWriterImpl extends AbstractService implements continue; } - byte[] rowKey = + // if the entity is the application, the destination is the application + // table + boolean isApplication = isApplicationEntity(te); + byte[] rowKey = isApplication ? + ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, + appId) : EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, te.getType(), te.getId()); - storeInfo(rowKey, te, flowVersion); - storeEvents(rowKey, te.getEvents()); - storeConfig(rowKey, te.getConfigs()); - storeMetrics(rowKey, te.getMetrics()); - storeRelations(rowKey, te.getIsRelatedToEntities(), - EntityColumnPrefix.IS_RELATED_TO); - storeRelations(rowKey, te.getRelatesToEntities(), - EntityColumnPrefix.RELATES_TO); + storeInfo(rowKey, te, flowVersion, isApplication); + storeEvents(rowKey, te.getEvents(), isApplication); + storeConfig(rowKey, te.getConfigs(), isApplication); + storeMetrics(rowKey, te.getMetrics(), isApplication); + storeRelations(rowKey, te, isApplication); if (isApplicationCreated(te)) { onApplicationCreated( @@ -123,9 +132,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements return putStatus; } + private static boolean isApplicationEntity(TimelineEntity te) { + return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + } + private static boolean isApplicationCreated(TimelineEntity te) { - if (te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString())) { - boolean isAppCreated = false; + if (isApplicationEntity(te)) { for (TimelineEvent event : te.getEvents()) { if (event.getId().equals( ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { @@ -145,41 +157,74 @@ public class HBaseTimelineWriterImpl extends AbstractService implements rowKey, appToFlowTable, null, flowRunId); } + private void storeRelations(byte[] rowKey, TimelineEntity te, + boolean isApplication) throws IOException { + if (isApplication) { + storeRelations(rowKey, te.getIsRelatedToEntities(), + ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + ApplicationColumnPrefix.RELATES_TO, applicationTable); + } else { + storeRelations(rowKey, te.getIsRelatedToEntities(), + EntityColumnPrefix.IS_RELATED_TO, entityTable); + storeRelations(rowKey, te.getRelatesToEntities(), + EntityColumnPrefix.RELATES_TO, entityTable); + } + } + /** * Stores the Relations from the {@linkplain TimelineEntity} object */ - private void storeRelations(byte[] rowKey, + private <T> void storeRelations(byte[] rowKey, Map<String, Set<String>> connectedEntities, - EntityColumnPrefix entityColumnPrefix) throws IOException { + ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) + throws IOException { for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities .entrySet()) { // id3?id4?id5 String compoundValue = Separator.VALUES.joinEncoded(connectedEntity.getValue()); - entityColumnPrefix.store(rowKey, entityTable, connectedEntity.getKey(), - null, compoundValue); + columnPrefix.store(rowKey, table, connectedEntity.getKey(), null, + compoundValue); } } /** * Stores information from the {@linkplain TimelineEntity} object */ - private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion) - throws IOException { + private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, + boolean isApplication) throws IOException { - EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); - EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); - EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, - te.getCreatedTime()); - EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, - te.getModifiedTime()); - EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); - Map<String, Object> info = te.getInfo(); - if (info != null) { - for (Map.Entry<String, Object> entry : info.entrySet()) { - EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(), - null, entry.getValue()); + if (isApplication) { + ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId()); + ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null, + te.getCreatedTime()); + ApplicationColumn.MODIFIED_TIME.store(rowKey, applicationTable, null, + te.getModifiedTime()); + ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null, + flowVersion); + Map<String, Object> info = te.getInfo(); + if (info != null) { + for (Map.Entry<String, Object> entry : info.entrySet()) { + ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, + entry.getKey(), null, entry.getValue()); + } + } + } else { + EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); + EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); + EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, + te.getCreatedTime()); + EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, + te.getModifiedTime()); + EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); + Map<String, Object> info = te.getInfo(); + if (info != null) { + for (Map.Entry<String, Object> entry : info.entrySet()) { + EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(), + null, entry.getValue()); + } } } } @@ -187,14 +232,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements /** * stores the config information from {@linkplain TimelineEntity} */ - private void storeConfig(byte[] rowKey, Map<String, String> config) - throws IOException { + private void storeConfig(byte[] rowKey, Map<String, String> config, + boolean isApplication) throws IOException { if (config == null) { return; } for (Map.Entry<String, String> entry : config.entrySet()) { - EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(), + if (isApplication) { + ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, + entry.getKey(), null, entry.getValue()); + } else { + EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(), null, entry.getValue()); + } } } @@ -202,16 +252,21 @@ public class HBaseTimelineWriterImpl extends AbstractService implements * stores the {@linkplain TimelineMetric} information from the * {@linkplain TimelineEvent} object */ - private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics) - throws IOException { + private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics, + boolean isApplication) throws IOException { if (metrics != null) { for (TimelineMetric metric : metrics) { String metricColumnQualifier = metric.getId(); Map<Long, Number> timeseries = metric.getValues(); for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); - EntityColumnPrefix.METRIC.store(rowKey, entityTable, + if (isApplication) { + ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable, metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } else { + EntityColumnPrefix.METRIC.store(rowKey, entityTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } } } } @@ -220,8 +275,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements /** * Stores the events from the {@linkplain TimelineEvent} object */ - private void storeEvents(byte[] rowKey, Set<TimelineEvent> events) - throws IOException { + private void storeEvents(byte[] rowKey, Set<TimelineEvent> events, + boolean isApplication) throws IOException { if (events != null) { for (TimelineEvent event : events) { if (event != null) { @@ -258,8 +313,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // convert back to string to avoid additional API on store. String compoundColumnQualifier = Bytes.toString(compoundColumnQualifierBytes); - EntityColumnPrefix.EVENT.store(rowKey, entityTable, + if (isApplication) { + ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, compoundColumnQualifier, null, info.getValue()); + } else { + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + compoundColumnQualifier, null, info.getValue()); + } } // for info: eventInfo } } @@ -279,6 +339,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // flush all buffered mutators entityTable.flush(); appToFlowTable.flush(); + applicationTable.flush(); } /** @@ -288,15 +349,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements @Override protected void serviceStop() throws Exception { if (entityTable != null) { - LOG.info("closing entity table"); + LOG.info("closing the entity table"); // The close API performs flushing and releases any resources held entityTable.close(); } if (appToFlowTable != null) { - LOG.info("closing app_flow table"); + LOG.info("closing the app_flow table"); // The close API performs flushing and releases any resources held appToFlowTable.close(); } + if (applicationTable != null) { + LOG.info("closing the application table"); + applicationTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index 2c3897d..3a22ed6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -76,6 +77,12 @@ public class TimelineSchemaCreator { if (StringUtils.isNotBlank(appToflowTableName)) { hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName); } + // Grab the applicationTableName argument + String applicationTableName = commandLine.getOptionValue("a"); + if (StringUtils.isNotBlank(applicationTableName)) { + hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME, + applicationTableName); + } createAllTables(hbaseConf); } @@ -103,6 +110,8 @@ public class TimelineSchemaCreator { o = new Option("a2f", "appToflowTableName", true, "app to flow table name"); o.setArgName("appToflowTableName"); + o = new Option("a", "applicationTableName", true, "application table name"); + o.setArgName("applicationTableName"); o.setRequired(false); options.addOption(o); @@ -132,6 +141,7 @@ public class TimelineSchemaCreator { } new EntityTable().createTable(admin, hbaseConf); new AppToFlowTable().createTable(admin, hbaseConf); + new ApplicationTable().createTable(admin, hbaseConf); } finally { if (conn != null) { conn.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java new file mode 100644 index 0000000..c028386 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies fully qualified columns for the {@link ApplicationTable}. + */ +public enum ApplicationColumn implements Column<ApplicationTable> { + + /** + * App id + */ + ID(ApplicationColumnFamily.INFO, "id"), + + /** + * When the application was created. + */ + CREATED_TIME(ApplicationColumnFamily.INFO, "created_time"), + + /** + * When it was modified. + */ + MODIFIED_TIME(ApplicationColumnFamily.INFO, "modified_time"), + + /** + * The version of the flow that this app belongs to. + */ + FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version"); + + private final ColumnHelper<ApplicationTable> column; + private final ColumnFamily<ApplicationTable> columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + + private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, + String columnQualifier) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = + Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); + this.column = new ColumnHelper<ApplicationTable>(columnFamily); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public void store(byte[] rowKey, + TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp, + Object inputValue) throws IOException { + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + /** + * Retrieve an {@link ApplicationColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnQualifier Name of the column to retrieve + * @return the corresponding {@link ApplicationColumn} or null + */ + public static final ApplicationColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (ApplicationColumn ac : ApplicationColumn.values()) { + // Find a match based only on name. + if (ac.getColumnQualifier().equals(columnQualifier)) { + return ac; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link ApplicationColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param name Name of the column to retrieve + * @return the corresponding {@link ApplicationColumn} or null if both + * arguments don't match. + */ + public static final ApplicationColumn columnFor( + ApplicationColumnFamily columnFamily, String name) { + + for (ApplicationColumn ac : ApplicationColumn.values()) { + // Find a match based column family and on name. + if (ac.columnFamily.equals(columnFamily) + && ac.getColumnQualifier().equals(name)) { + return ac; + } + } + + // Default to null + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java new file mode 100644 index 0000000..97e5f7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the application table column families. + */ +public enum ApplicationColumnFamily implements ColumnFamily<ApplicationTable> { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"), + + /** + * Configurations are in a separate column family for two reasons: a) the size + * of the config values can be very large and b) we expect that config values + * are often separately accessed from other metrics and info columns. + */ + CONFIGS("c"), + + /** + * Metrics have a separate column family, because they have a separate TTL. + */ + METRICS("m"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value create a column family with this name. Must be lower case and + * without spaces. + */ + private ApplicationColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java new file mode 100644 index 0000000..cd9e845 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies partially qualified columns for the application table. + */ +public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { + + /** + * To store TimelineEntity getIsRelatedToEntities values. + */ + IS_RELATED_TO(ApplicationColumnFamily.INFO, "s"), + + /** + * To store TimelineEntity getRelatesToEntities values. + */ + RELATES_TO(ApplicationColumnFamily.INFO, "r"), + + /** + * To store TimelineEntity info values. + */ + INFO(ApplicationColumnFamily.INFO, "i"), + + /** + * Lifecycle events for an application + */ + EVENT(ApplicationColumnFamily.INFO, "e"), + + /** + * Config column stores configuration with config key as the column name. + */ + CONFIG(ApplicationColumnFamily.CONFIGS, null), + + /** + * Metrics are stored with the metric name as the column name. + */ + METRIC(ApplicationColumnFamily.METRICS, null); + + private final ColumnHelper<ApplicationTable> column; + private final ColumnFamily<ApplicationTable> columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. + */ + private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, + String columnPrefix) { + column = new ColumnHelper<ApplicationTable>(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = + Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); + } + } + + /** + * @return the column name value + */ + private String getColumnPrefix() { + return columnPrefix; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier, + Long timestamp, Object inputValue) throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map<String, Object> readResults(Result result) throws IOException { + return column.readResults(result, columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + */ + public <V> NavigableMap<String, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); + } + + /** + * Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there + * is no match. The following holds true: {@code columnFor(x) == columnFor(y)} + * if and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix Name of the column to retrieve + * @return the corresponding {@link ApplicationColumnPrefix} or null + */ + public static final ApplicationColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) { + // Find a match based only on name. + if (acp.getColumnPrefix().equals(columnPrefix)) { + return acp; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there + * is no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param columnPrefix Name of the column to retrieve + * @return the corresponding {@link ApplicationColumnPrefix} or null if both + * arguments don't match. + */ + public static final ApplicationColumnPrefix columnFor( + ApplicationColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) { + // Find a match based column family and on name. + if (acp.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (acp.getColumnPrefix() == null)) || (acp + .getColumnPrefix().equals(columnPrefix)))) { + return acp; + } + } + + // Default to null + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java new file mode 100644 index 0000000..5f3868b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents a rowkey for the application table. + */ +public class ApplicationRowKey { + // TODO: more methods are needed for this class. + + // TODO: API needs to be cleaned up. + + /** + * Constructs a row key for the application table as follows: + * {@code clusterId!userName!flowId!flowRunId!AppId} + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @return byte array with the row key + */ + public static byte[] getRowKey(String clusterId, String userId, + String flowId, Long flowRunId, String appId) { + byte[] first = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId, + flowId)); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId)); + byte[] third = Bytes.toBytes(appId); + return Separator.QUALIFIERS.join(first, second, third); + } + + /** + * Converts a timestamp into its inverse timestamp to be used in (row) keys + * where we want to have the most recent timestamp in the top of the table + * (scans start at the most recent timestamp first). + * + * @param key value to be inverted so that the latest version will be first in + * a scan. + * @return inverted long + */ + public static long invert(Long key) { + return Long.MAX_VALUE - key; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java new file mode 100644 index 0000000..d2a2cb9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; + +/** + * The application table as column families info, config and metrics. Info + * stores information about a YARN application entity, config stores + * configuration data of a YARN application, metrics stores the metrics of a + * YARN application. This table is entirely analogous to the entity table but + * created for better performance. + * + * Example application table record: + * + * <pre> + * |-------------------------------------------------------------------------| + * | Row | Column Family | Column Family| Column Family| + * | key | info | metrics | config | + * |-------------------------------------------------------------------------| + * | clusterId! | id:appId | metricId1: | configKey1: | + * | userName! | | metricValue1 | configValue1 | + * | flowId! | created_time: | @timestamp1 | | + * | flowRunId! | 1392993084018 | | configKey2: | + * | AppId | | metriciD1: | configValue2 | + * | | modified_time: | metricValue2 | | + * | | 1392995081012 | @timestamp2 | | + * | | | | | + * | | i!infoKey: | metricId2: | | + * | | infoValue | metricValue1 | | + * | | | @timestamp2 | | + * | | r!relatesToKey: | | | + * | | id3?id4?id5 | | | + * | | | | | + * | | s!isRelatedToKey: | | | + * | | id7?id9?id6 | | | + * | | | | | + * | | e!eventId?timestamp?infoKey: | | | + * | | eventInfoValue | | | + * | | | | | + * | | flowVersion: | | | + * | | versionValue | | | + * |-------------------------------------------------------------------------| + * </pre> + */ +public class ApplicationTable extends BaseTable<ApplicationTable> { + /** application prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".application"; + + /** config param name that specifies the application table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** + * config param name that specifies the TTL for metrics column family in + * application table + */ + private static final String METRICS_TTL_CONF_NAME = PREFIX + + ".table.metrics.ttl"; + + /** default value for application table name */ + private static final String DEFAULT_TABLE_NAME = + "timelineservice.application"; + + /** default TTL is 30 days for metrics timeseries */ + private static final int DEFAULT_METRICS_TTL = 2592000; + + /** default max number of versions */ + private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000; + + private static final Log LOG = LogFactory.getLog(ApplicationTable.class); + + public ApplicationTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor applicationTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(ApplicationColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + applicationTableDescp.addFamily(infoCF); + + HColumnDescriptor configCF = + new HColumnDescriptor(ApplicationColumnFamily.CONFIGS.getBytes()); + configCF.setBloomFilterType(BloomType.ROWCOL); + configCF.setBlockCacheEnabled(true); + applicationTableDescp.addFamily(configCF); + + HColumnDescriptor metricsCF = + new HColumnDescriptor(ApplicationColumnFamily.METRICS.getBytes()); + applicationTableDescp.addFamily(metricsCF); + metricsCF.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + metricsCF.setMinVersions(1); + metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME, + DEFAULT_METRICS_TTL)); + applicationTableDescp + .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(applicationTableDescp, + TimelineHBaseSchemaConstants.getUsernameSplits()); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } + + /** + * @param metricsTTL time to live parameter for the metrics in this table. + * @param hbaseConf configuration in which to set the metrics TTL config + * variable. + */ + public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) { + hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java new file mode 100644 index 0000000..c60e6f5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 58272ab..c8485c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -157,7 +157,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ - public <T> NavigableMap<String, NavigableMap<Long, T>> + public <V> NavigableMap<String, NavigableMap<Long, V>> readResultsWithTimestamps(Result result) throws IOException { return column.readResultsWithTimestamps(result, columnPrefixBytes); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index f657a14..9a8bd8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -40,36 +40,35 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas * Example entity table record: * * <pre> - * |--------------------------------------------------------------------| - * | Row | Column Family | Column Family| Column Family| - * | key | info | metrics | config | - * |--------------------------------------------------------------------| - * | userName! | id:entityId | metricId1: | configKey1: | - * | clusterId! | | metricValue1 | configValue1 | - * | flowId! | type:entityType | @timestamp1 | | - * | flowRunId! | | | configKey2: | - * | AppId! | created_time: | metriciD1: | configValue2 | - * | entityType!| 1392993084018 | metricValue2 | | - * | entityId | | @timestamp2 | | - * | | modified_time: | | | - * | | 1392995081012 | metricId2: | | - * | | | metricValue1 | | - * | | i!infoKey: | @timestamp2 | | - * | | infoValue | | | - * | | | | | - * | | r!relatesToKey: | | | - * | | id3?id4?id5 | | | - * | | | | | - * | | s!isRelatedToKey | | | - * | | id7?id9?id6 | | | - * | | | | | - * | | e!eventId?eventInfoKey: | | | - * | | eventInfoValue | | | - * | | @timestamp | | | - * | | | | | - * | | flowVersion: | | | - * | | versionValue | | | - * |--------------------------------------------------------------------| + * |-------------------------------------------------------------------------| + * | Row | Column Family | Column Family| Column Family| + * | key | info | metrics | config | + * |-------------------------------------------------------------------------| + * | userName! | id:entityId | metricId1: | configKey1: | + * | clusterId! | | metricValue1 | configValue1 | + * | flowId! | type:entityType | @timestamp1 | | + * | flowRunId! | | | configKey2: | + * | AppId! | created_time: | metriciD1: | configValue2 | + * | entityType!| 1392993084018 | metricValue2 | | + * | entityId | | @timestamp2 | | + * | | modified_time: | | | + * | | 1392995081012 | metricId2: | | + * | | | metricValue1 | | + * | | i!infoKey: | @timestamp2 | | + * | | infoValue | | | + * | | | | | + * | | r!relatesToKey: | | | + * | | id3?id4?id5 | | | + * | | | | | + * | | s!isRelatedToKey | | | + * | | id7?id9?id6 | | | + * | | | | | + * | | e!eventId?timestamp?infoKey: | | | + * | | eventInfoValue | | | + * | | | | | + * | | flowVersion: | | | + * | | versionValue | | | + * |-------------------------------------------------------------------------| * </pre> */ public class EntityTable extends BaseTable<EntityTable> {