http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index c514c20..889ae19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -20,13 +20,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.IOException; import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; import java.util.Set; -import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,47 +29,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; - -import com.google.common.base.Preconditions; public class HBaseTimelineReaderImpl extends AbstractService implements TimelineReader { private static final Log LOG = LogFactory .getLog(HBaseTimelineReaderImpl.class); - private static final long DEFAULT_BEGIN_TIME = 0L; - private static final long DEFAULT_END_TIME = Long.MAX_VALUE; private Configuration hbaseConf = null; private Connection conn; - private EntityTable entityTable; - private AppToFlowTable appToFlowTable; - private ApplicationTable applicationTable; public HBaseTimelineReaderImpl() { super(HBaseTimelineReaderImpl.class.getName()); @@ -85,9 +50,6 @@ public class HBaseTimelineReaderImpl super.serviceInit(conf); hbaseConf = HBaseConfiguration.create(conf); conn = ConnectionFactory.createConnection(hbaseConf); - entityTable = new EntityTable(); - appToFlowTable = new AppToFlowTable(); - applicationTable = new ApplicationTable(); } @Override @@ -104,35 +66,10 @@ public class HBaseTimelineReaderImpl String flowId, Long flowRunId, String appId, String entityType, String entityId, EnumSet<Field> fieldsToRetrieve) throws IOException { - validateParams(userId, clusterId, appId, entityType, entityId, true); - // In reality both should be null or neither should be null - if (flowId == null || flowRunId == null) { - FlowContext context = lookupFlowContext(clusterId, appId); - flowId = context.flowId; - flowRunId = context.flowRunId; - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - - boolean isApplication = isApplicationEntity(entityType); - byte[] rowKey = isApplication ? - ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, - appId) : - EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, - entityType, entityId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - Result result = isApplication ? - applicationTable.getResult(hbaseConf, conn, get) : - entityTable.getResult(hbaseConf, conn, get); - return parseEntity(result, fieldsToRetrieve, - false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME, - DEFAULT_END_TIME, null, null, null, null, null, null, isApplication); - } - - private static boolean isApplicationEntity(String entityType) { - return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType); + TimelineEntityReader reader = + TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId, + flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve); + return reader.readEntity(hbaseConf, conn); } @Override @@ -144,361 +81,12 @@ public class HBaseTimelineReaderImpl Map<String, Object> infoFilters, Map<String, String> configFilters, Set<String> metricFilters, Set<String> eventFilters, EnumSet<Field> fieldsToRetrieve) throws IOException { - validateParams(userId, clusterId, appId, entityType, null, false); - // In reality both should be null or neither should be null - if (flowId == null || flowRunId == null) { - FlowContext context = lookupFlowContext(clusterId, appId); - flowId = context.flowId; - flowRunId = context.flowRunId; - } - if (limit == null) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (modifiedTimeBegin == null) { - modifiedTimeBegin = DEFAULT_BEGIN_TIME; - } - if (modifiedTimeEnd == null) { - modifiedTimeEnd = DEFAULT_END_TIME; - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - - NavigableSet<TimelineEntity> entities = new TreeSet<>(); - boolean isApplication = isApplicationEntity(entityType); - if (isApplication) { - // If getEntities() is called for an application, there can be at most - // one entity. If the entity passes the filter, it is returned. Otherwise, - // an empty set is returned. - byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId, - flowRunId, appId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - Result result = applicationTable.getResult(hbaseConf, conn, get); - TimelineEntity entity = parseEntity(result, fieldsToRetrieve, - true, createdTimeBegin, createdTimeEnd, true, modifiedTimeBegin, - modifiedTimeEnd, isRelatedTo, relatesTo, infoFilters, configFilters, - eventFilters, metricFilters, isApplication); - if (entity != null) { - entities.add(entity); - } - } else { - // Scan through part of the table to find the entities belong to one app - // and one type - Scan scan = new Scan(); - scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( - clusterId, userId, flowId, flowRunId, appId, entityType)); - scan.setMaxVersions(Integer.MAX_VALUE); - ResultScanner scanner = - entityTable.getResultScanner(hbaseConf, conn, scan); - for (Result result : scanner) { - TimelineEntity entity = parseEntity(result, fieldsToRetrieve, - true, createdTimeBegin, createdTimeEnd, - true, modifiedTimeBegin, modifiedTimeEnd, - isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters, - metricFilters, isApplication); - if (entity == null) { - continue; - } - if (entities.size() > limit) { - entities.pollLast(); - } - entities.add(entity); - } - } - return entities; - } - - private FlowContext lookupFlowContext(String clusterId, String appId) - throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); - Get get = new Get(rowKey); - Result result = appToFlowTable.getResult(hbaseConf, conn, get); - if (result != null && !result.isEmpty()) { - return new FlowContext( - AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); - } else { - throw new IOException( - "Unable to find the context flow ID and flow run ID for clusterId=" + - clusterId + ", appId=" + appId); - } - } - - private static class FlowContext { - private String flowId; - private Long flowRunId; - public FlowContext(String flowId, Long flowRunId) { - this.flowId = flowId; - this.flowRunId = flowRunId; - } - } - - private static void validateParams(String userId, String clusterId, - String appId, String entityType, String entityId, boolean checkEntityId) { - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(appId, "appId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); - if (checkEntityId) { - Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); - } - } - - private static TimelineEntity parseEntity( - Result result, EnumSet<Field> fieldsToRetrieve, - boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd, - boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd, - Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> eventFilters, Set<String> metricFilters, - boolean isApplication) - throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - String entityType = isApplication ? - TimelineEntityType.YARN_APPLICATION.toString() : - EntityColumn.TYPE.readResult(result).toString(); - entity.setType(entityType); - String entityId = isApplication ? - ApplicationColumn.ID.readResult(result).toString() : - EntityColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - // fetch created time - Number createdTime = isApplication ? - (Number)ApplicationColumn.CREATED_TIME.readResult(result) : - (Number)EntityColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime.longValue()); - if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { - return null; - } - - // fetch modified time - Number modifiedTime = isApplication ? - (Number)ApplicationColumn.MODIFIED_TIME.readResult(result) : - (Number)EntityColumn.MODIFIED_TIME.readResult(result); - entity.setModifiedTime(modifiedTime.longValue()); - if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin || - entity.getModifiedTime() > modifiedTimeEnd)) { - return null; - } - - // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - if (isApplication) { - readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, - true); - } else { - readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, - true); - } - if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - if (isApplication) { - readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, - false); - } else { - readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); - } - if (checkRelatesTo && !TimelineReaderUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - if (isApplication) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); - } else { - readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); - } - if (checkInfo && - !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } - } - - // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - if (isApplication) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); - } else { - readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); - } - if (checkConfigs && !TimelineReaderUtils.matchFilters( - entity.getConfigs(), configFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } - } - - // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, isApplication); - if (checkEvents && !TimelineReaderUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { - readMetrics(entity, result, isApplication); - if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } - } - return entity; - } - - private static <T> void readRelationship( - TimelineEntity entity, Result result, ColumnPrefix<T> prefix, - boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map<String, Set<String>> - Map<String, Object> columns = prefix.readResults(result); - for (Map.Entry<String, Object> column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded( - column.getValue().toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - private static <T> void readKeyValuePairs( - TimelineEntity entity, Result result, ColumnPrefix<T> prefix, - boolean isConfig) throws IOException { - // info and configuration are of type Map<String, Object or String> - Map<String, Object> columns = prefix.readResults(result); - if (isConfig) { - for (Map.Entry<String, Object> column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } - - /** - * Read events from the entity table or the application table. The column name - * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted - * if there is no info associated with the event. - * - * See {@link EntityTable} and {@link ApplicationTable} for a more detailed - * schema description. - */ - private static void readEvents(TimelineEntity entity, Result result, - boolean isApplication) throws IOException { - Map<String, TimelineEvent> eventsMap = new HashMap<>(); - Map<?, Object> eventsResult = isApplication ? - ApplicationColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result) : - EntityColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); - for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) { - byte[][] karr = (byte[][])eventResult.getKey(); - // the column name is of the form "eventId=timestamp=infoKey" - if (karr.length == 3) { - String id = Bytes.toString(karr[0]); - long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1])); - String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); - TimelineEvent event = eventsMap.get(key); - if (event == null) { - event = new TimelineEvent(); - event.setId(id); - event.setTimestamp(ts); - eventsMap.put(key, event); - } - // handle empty info - String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); - if (infoKey != null) { - event.addInfo(infoKey, eventResult.getValue()); - } - } else { - LOG.warn("incorrectly formatted column name: it will be discarded"); - continue; - } - } - Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); - entity.addEvents(eventsSet); - } - - private static void readMetrics(TimelineEntity entity, Result result, - boolean isApplication) throws IOException { - NavigableMap<String, NavigableMap<Long, Number>> metricsResult; - if (isApplication) { - metricsResult = - ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); - } else { - metricsResult = - EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); - } - for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: - metricsResult.entrySet()) { - TimelineMetric metric = new TimelineMetric(); - metric.setId(metricResult.getKey()); - // Simply assume that if the value set contains more than 1 elements, the - // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric - metric.setType(metricResult.getValue().size() > 1 ? - TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); - metric.addValues(metricResult.getValue()); - entity.addMetric(metric); - } + TimelineEntityReader reader = + TimelineEntityReaderFactory.createMultipleEntitiesReader(userId, + clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, + modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, + metricFilters, eventFilters, fieldsToRetrieve); + return reader.readEntities(hbaseConf, conn); } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java new file mode 100644 index 0000000..0d1134c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; + +/** + * The base class for reading and deserializing timeline entities from the + * HBase storage. Different types can be defined for different types of the + * entities that are being requested. + */ +abstract class TimelineEntityReader { + protected final boolean singleEntityRead; + + protected String userId; + protected String clusterId; + protected String flowId; + protected Long flowRunId; + protected String appId; + protected String entityType; + protected EnumSet<Field> fieldsToRetrieve; + // used only for a single entity read mode + protected String entityId; + // used only for multiple entity read mode + protected Long limit; + protected Long createdTimeBegin; + protected Long createdTimeEnd; + protected Long modifiedTimeBegin; + protected Long modifiedTimeEnd; + protected Map<String, Set<String>> relatesTo; + protected Map<String, Set<String>> isRelatedTo; + protected Map<String, Object> infoFilters; + protected Map<String, String> configFilters; + protected Set<String> metricFilters; + protected Set<String> eventFilters; + + /** + * Main table the entity reader uses. + */ + protected BaseTable<?> table; + + /** + * Instantiates a reader for multiple-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) { + this.singleEntityRead = false; + this.userId = userId; + this.clusterId = clusterId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.limit = limit; + this.createdTimeBegin = createdTimeBegin; + this.createdTimeEnd = createdTimeEnd; + this.modifiedTimeBegin = modifiedTimeBegin; + this.modifiedTimeEnd = modifiedTimeEnd; + this.relatesTo = relatesTo; + this.isRelatedTo = isRelatedTo; + this.infoFilters = infoFilters; + this.configFilters = configFilters; + this.metricFilters = metricFilters; + this.eventFilters = eventFilters; + + this.table = getTable(); + } + + /** + * Instantiates a reader for single-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet<Field> fieldsToRetrieve) { + this.singleEntityRead = true; + this.userId = userId; + this.clusterId = clusterId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.entityId = entityId; + + this.table = getTable(); + } + + /** + * Reads and deserializes a single timeline entity from the HBase storage. + */ + public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) + throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + Result result = getResult(hbaseConf, conn); + return parseEntity(result); + } + + /** + * Reads and deserializes a set of timeline entities from the HBase storage. + * It goes through all the results available, and returns the number of + * entries as specified in the limit in the entity's natural sort order. + */ + public Set<TimelineEntity> readEntities(Configuration hbaseConf, + Connection conn) throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + NavigableSet<TimelineEntity> entities = new TreeSet<>(); + Iterable<Result> results = getResults(hbaseConf, conn); + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null) { + continue; + } + entities.add(entity); + if (entities.size() > limit) { + entities.pollLast(); + } + } + return entities; + } + + /** + * Returns the main table to be used by the entity reader. + */ + protected abstract BaseTable<?> getTable(); + + /** + * Validates the required parameters to read the entities. + */ + protected abstract void validateParams(); + + /** + * Sets certain parameters to defaults if the values are not provided. + */ + protected abstract void augmentParams(Configuration hbaseConf, + Connection conn) throws IOException; + + /** + * Fetches a {@link Result} instance for a single-entity read. + * + * @return the {@link Result} instance or null if no such record is found. + */ + protected abstract Result getResult(Configuration hbaseConf, Connection conn) + throws IOException; + + /** + * Fetches an iterator for {@link Result} instances for a multi-entity read. + */ + protected abstract Iterable<Result> getResults(Configuration hbaseConf, + Connection conn) throws IOException; + + /** + * Given a {@link Result} instance, deserializes and creates a + * {@link TimelineEntity}. + * + * @return the {@link TimelineEntity} instance, or null if the {@link Result} + * is null or empty. + */ + protected abstract TimelineEntity parseEntity(Result result) + throws IOException; + + /** + * Helper method for reading and deserializing {@link TimelineMetric} objects + * using the specified column prefix. The timeline metrics then are added to + * the given timeline entity. + */ + protected void readMetrics(TimelineEntity entity, Result result, + ColumnPrefix<?> columnPrefix) throws IOException { + NavigableMap<String, NavigableMap<Long, Number>> metricsResult = + columnPrefix.readResultsWithTimestamps(result); + for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric + metric.setType(metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java new file mode 100644 index 0000000..4fdef40 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; + +/** + * Factory methods for instantiating a timeline entity reader. + */ +class TimelineEntityReaderFactory { + /** + * Creates a timeline entity reader instance for reading a single entity with + * the specified input. + */ + public static TimelineEntityReader createSingleEntityReader(String userId, + String clusterId, String flowId, Long flowRunId, String appId, + String entityType, String entityId, EnumSet<Field> fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } + } + + /** + * Creates a timeline entity reader instance for reading set of entities with + * the specified input and predicates. + */ + public static TimelineEntityReader createMultipleEntitiesReader(String userId, + String clusterId, String flowId, Long flowRunId, String appId, + String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java index 5f3868b..e3b5a87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java @@ -19,14 +19,46 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; /** * Represents a rowkey for the application table. */ public class ApplicationRowKey { - // TODO: more methods are needed for this class. + private final String clusterId; + private final String userId; + private final String flowId; + private final long flowRunId; + private final String appId; - // TODO: API needs to be cleaned up. + public ApplicationRowKey(String clusterId, String userId, String flowId, + long flowRunId, String appId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + } + + public String getClusterId() { + return clusterId; + } + + public String getUserId() { + return userId; + } + + public String getFlowId() { + return flowId; + } + + public long getFlowRunId() { + return flowRunId; + } + + public String getAppId() { + return appId; + } /** * Constructs a row key for the application table as follows: @@ -46,22 +78,32 @@ public class ApplicationRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId)); + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); byte[] third = Bytes.toBytes(appId); return Separator.QUALIFIERS.join(first, second, third); } /** - * Converts a timestamp into its inverse timestamp to be used in (row) keys - * where we want to have the most recent timestamp in the top of the table - * (scans start at the most recent timestamp first). - * - * @param key value to be inverted so that the latest version will be first in - * a scan. - * @return inverted long + * Given the raw row key as bytes, returns the row key as an object. */ - public static long invert(Long key) { - return Long.MAX_VALUE - key; - } + public static ApplicationRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 5) { + throw new IllegalArgumentException("the row key is not valid for " + + "an application"); + } + String clusterId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); + String userId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1])); + String flowId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); + long flowRunId = + TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3])); + String appId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4])); + return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java index ad4fec6..ca88056 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java @@ -24,6 +24,22 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; * Represents a rowkey for the app_flow table. */ public class AppToFlowRowKey { + private final String clusterId; + private final String appId; + + public AppToFlowRowKey(String clusterId, String appId) { + this.clusterId = clusterId; + this.appId = appId; + } + + public String getClusterId() { + return clusterId; + } + + public String getAppId() { + return appId; + } + /** * Constructs a row key prefix for the app_flow table as follows: * {@code clusterId!AppId} @@ -36,4 +52,19 @@ public class AppToFlowRowKey { return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId)); } + /** + * Given the raw row key as bytes, returns the row key as an object. + */ + public static AppToFlowRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 2) { + throw new IllegalArgumentException("the row key is not valid for " + + "the app-to-flow table"); + } + + String clusterId = Bytes.toString(rowKeyComponents[0]); + String appId = Bytes.toString(rowKeyComponents[1]); + return new AppToFlowRowKey(clusterId, appId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java index abba79a..9545438 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; /** - * Implements behavior common to tables used in the timeline service storage. + * Implements behavior common to tables used in the timeline service storage. It + * is thread-safe, and can be used by multiple threads concurrently. * * @param <T> reference to the table instance class itself for type safety. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 9a72be0..6a534ed73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; @@ -26,9 +25,52 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit * Represents a rowkey for the entity table. */ public class EntityRowKey { - // TODO: more methods are needed for this class. + private final String clusterId; + private final String userId; + private final String flowId; + private final long flowRunId; + private final String appId; + private final String entityType; + private final String entityId; - // TODO: API needs to be cleaned up. + public EntityRowKey(String clusterId, String userId, String flowId, + long flowRunId, String appId, String entityType, String entityId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.entityId = entityId; + } + + public String getClusterId() { + return clusterId; + } + + public String getUserId() { + return userId; + } + + public String getFlowId() { + return flowId; + } + + public long getFlowRunId() { + return flowRunId; + } + + public String getAppId() { + return appId; + } + + public String getEntityType() { + return entityType; + } + + public String getEntityId() { + return entityId; + } /** * Constructs a row key prefix for the entity table as follows: @@ -106,4 +148,32 @@ public class EntityRowKey { return Separator.QUALIFIERS.join(first, second, third); } + /** + * Given the raw row key as bytes, returns the row key as an object. + */ + public static EntityRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 7) { + throw new IllegalArgumentException("the row key is not valid for " + + "an entity"); + } + + String userId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); + String clusterId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1])); + String flowId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); + long flowRunId = + TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3])); + String appId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4])); + String entityType = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5])); + String entityId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6])); + return new EntityRowKey(clusterId, userId, flowId, flowRunId, appId, + entityType, entityId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index 19e4e83..18ca599 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -55,6 +55,10 @@ public class FlowActivityRowKey { return flowId; } + public static byte[] getRowKeyPrefix(String clusterId) { + return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, "")); + } + /** * Constructs a row key for the flow activity table as follows: * {@code clusterId!dayTimestamp!user!flowId} @@ -65,7 +69,8 @@ public class FlowActivityRowKey { * @param flowId * @return byte array with the row key prefix */ - public static byte[] getRowKey(String clusterId, String userId, String flowId) { + public static byte[] getRowKey(String clusterId, String userId, + String flowId) { long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System .currentTimeMillis()); return getRowKey(clusterId, dayTs, userId, flowId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index e133241..880d481 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -25,7 +25,34 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit * Represents a rowkey for the flow run table. */ public class FlowRunRowKey { - // TODO: more methods are needed for this class like parse row key + private final String clusterId; + private final String userId; + private final String flowId; + private final long flowRunId; + + public FlowRunRowKey(String clusterId, String userId, String flowId, + long flowRunId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowId = flowId; + this.flowRunId = flowRunId; + } + + public String getClusterId() { + return clusterId; + } + + public String getUserId() { + return userId; + } + + public String getFlowId() { + return flowId; + } + + public long getFlowRunId() { + return flowRunId; + } /** * Constructs a row key for the entity table as follows: { @@ -47,4 +74,25 @@ public class FlowRunRowKey { return Separator.QUALIFIERS.join(first, second); } + /** + * Given the raw row key as bytes, returns the row key as an object. + */ + public static FlowRunRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 4) { + throw new IllegalArgumentException("the row key is not valid for " + + "a flow run"); + } + + String clusterId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); + String userId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1])); + String flowId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); + long flowRunId = + TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3])); + return new FlowRunRowKey(clusterId, userId, flowId, flowRunId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index a1948aa..651bb3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -18,6 +18,15 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; @@ -33,15 +42,6 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - /** * Invoked via the coprocessor when a Get or a Scan is issued for flow run * table. Looks through the list of cells per row, checks their tags and does http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 3962341..01920b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -508,32 +508,28 @@ public class TestHBaseTimelineStorage { private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, String flow, long runid, String appName, TimelineEntity te) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); - - assertTrue(rowKeyComponents.length == 7); - assertEquals(user, Bytes.toString(rowKeyComponents[0])); - assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); - assertEquals(flow, Bytes.toString(rowKeyComponents[2])); - assertEquals(TimelineWriterUtils.invert(runid), - Bytes.toLong(rowKeyComponents[3])); - assertEquals(appName, Bytes.toString(rowKeyComponents[4])); - assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); - assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); + EntityRowKey key = EntityRowKey.parseRowKey(rowKey); + + assertEquals(user, key.getUserId()); + assertEquals(cluster, key.getClusterId()); + assertEquals(flow, key.getFlowId()); + assertEquals(runid, key.getFlowRunId()); + assertEquals(appName, key.getAppId()); + assertEquals(te.getType(), key.getEntityType()); + assertEquals(te.getId(), key.getEntityId()); return true; } private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster, String user, String flow, long runid, String appName) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + ApplicationRowKey key = ApplicationRowKey.parseRowKey(rowKey); - assertTrue(rowKeyComponents.length == 5); - assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); - assertEquals(user, Bytes.toString(rowKeyComponents[1])); - assertEquals(flow, Bytes.toString(rowKeyComponents[2])); - assertEquals(TimelineWriterUtils.invert(runid), - Bytes.toLong(rowKeyComponents[3])); - assertEquals(appName, Bytes.toString(rowKeyComponents[4])); + assertEquals(cluster, key.getClusterId()); + assertEquals(user, key.getUserId()); + assertEquals(flow, key.getFlowId()); + assertEquals(runid, key.getFlowRunId()); + assertEquals(appName, key.getAppId()); return true; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java index f8331fa..d18613a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java @@ -45,7 +45,7 @@ class TestFlowDataGenerator { String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 1425016501000L; + long cTime = 1425016501000L; entity.setCreatedTime(cTime); // add metrics @@ -54,8 +54,8 @@ class TestFlowDataGenerator { m1.setId(metric1); Map<Long, Number> metricValues = new HashMap<Long, Number>(); long ts = System.currentTimeMillis(); - metricValues.put(ts - 100000, 2); - metricValues.put(ts - 80000, 40); + metricValues.put(ts - 100000, 2L); + metricValues.put(ts - 80000, 40L); m1.setType(Type.TIME_SERIES); m1.setValues(metricValues); metrics.add(m1); @@ -64,8 +64,8 @@ class TestFlowDataGenerator { m2.setId(metric2); metricValues = new HashMap<Long, Number>(); ts = System.currentTimeMillis(); - metricValues.put(ts - 100000, 31); - metricValues.put(ts - 80000, 57); + metricValues.put(ts - 100000, 31L); + metricValues.put(ts - 80000, 57L); m2.setType(Type.TIME_SERIES); m2.setValues(metricValues); metrics.add(m2); @@ -80,7 +80,7 @@ class TestFlowDataGenerator { String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 1425016501000L; + long cTime = 1425016501000L; entity.setCreatedTime(cTime); // add metrics Set<TimelineMetric> metrics = new HashSet<>(); @@ -103,8 +103,8 @@ class TestFlowDataGenerator { String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 20000000000000L; - Long mTime = 1425026901000L; + long cTime = 20000000000000L; + long mTime = 1425026901000L; entity.setCreatedTime(cTime); entity.setModifiedTime(mTime); // add metrics @@ -113,10 +113,10 @@ class TestFlowDataGenerator { m1.setId(metric1); Map<Long, Number> metricValues = new HashMap<Long, Number>(); long ts = System.currentTimeMillis(); - metricValues.put(ts - 120000, 100000000); - metricValues.put(ts - 100000, 200000000); - metricValues.put(ts - 80000, 300000000); - metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 120000, 100000000L); + metricValues.put(ts - 100000, 200000000L); + metricValues.put(ts - 80000, 300000000L); + metricValues.put(ts - 60000, 400000000L); metricValues.put(ts - 40000, 50000000000L); metricValues.put(ts - 20000, 60000000000L); m1.setType(Type.TIME_SERIES); @@ -126,7 +126,7 @@ class TestFlowDataGenerator { TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - Long expTs = 1436512802000L; + long expTs = 1436512802000L; event.setTimestamp(expTs); String expKey = "foo_event"; Object expVal = "test"; @@ -142,9 +142,9 @@ class TestFlowDataGenerator { return entity; } - static TimelineEntity getEntityGreaterStartTime() { + static TimelineEntity getEntityGreaterStartTime(long startTs) { TimelineEntity entity = new TimelineEntity(); - entity.setCreatedTime(30000000000000L); + entity.setCreatedTime(startTs); entity.setId("flowRunHello with greater start time"); String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setType(type); @@ -173,14 +173,13 @@ class TestFlowDataGenerator { return entity; } - static TimelineEntity getEntityMinStartTime() { + static TimelineEntity getEntityMinStartTime(long startTs) { TimelineEntity entity = new TimelineEntity(); String id = "flowRunHelloMInStartTime"; String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 10000000000000L; - entity.setCreatedTime(cTime); + entity.setCreatedTime(startTs); TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setTimestamp(System.currentTimeMillis()); @@ -195,12 +194,12 @@ class TestFlowDataGenerator { String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 1425016501000L; + long cTime = 1425016501000L; entity.setCreatedTime(cTime); TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - Long expTs = 1436512802000L; + long expTs = 1436512802000L; event.setTimestamp(expTs); String expKey = "foo_event"; Object expVal = "test"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index b4a0c74..6bdec6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -21,19 +21,16 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -42,26 +39,17 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -119,11 +107,13 @@ public class TestHBaseStorageFlowActivity { String user = "testWriteFlowRunMinMaxToHBase_user1"; String flow = "testing_flowRun_flow_name"; String flowVersion = "CF7022C10F1354"; - Long runid = 1002345678919L; + long runid = 1002345678919L; String appName = "application_100000000000_1111"; + long minStartTs = 10000000000000L; + long greaterStartTs = 30000000000000L; long endTs = 1439750690000L; TimelineEntity entityMinStartTime = TestFlowDataGenerator - .getEntityMinStartTime(); + .getEntityMinStartTime(minStartTs); try { hbi = new HBaseTimelineWriterImpl(c1); @@ -146,7 +136,7 @@ public class TestHBaseStorageFlowActivity { // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator - .getEntityGreaterStartTime(); + .getEntityGreaterStartTime(greaterStartTs); te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; @@ -181,6 +171,31 @@ public class TestHBaseStorageFlowActivity { assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + // get the flow activity entity + Set<TimelineEntity> entities = + hbr.getEntities(null, cluster, null, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, + null, null, null, null, null, null, null, null, null); + assertEquals(1, entities.size()); + for (TimelineEntity e : entities) { + FlowActivityEntity flowActivity = (FlowActivityEntity)e; + assertEquals(cluster, flowActivity.getCluster()); + assertEquals(user, flowActivity.getUser()); + assertEquals(flow, flowActivity.getFlowName()); + assertEquals(dayTs, flowActivity.getDate().getTime()); + Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns(); + assertEquals(1, flowRuns.size()); + } + } finally { + hbr.close(); + } } /** @@ -193,7 +208,7 @@ public class TestHBaseStorageFlowActivity { String user = "testWriteFlowActivityOneFlow_user1"; String flow = "flow_activity_test_flow_name"; String flowVersion = "A122110F135BC4"; - Long runid = 1001111178919L; + long runid = 1001111178919L; TimelineEntities te = new TimelineEntities(); TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1(); @@ -212,10 +227,35 @@ public class TestHBaseStorageFlowActivity { } // check flow activity checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1); + + // use the reader to verify the data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + Set<TimelineEntity> entities = + hbr.getEntities(user, cluster, flow, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, + null, null, null, null, null, null, null, null, null); + assertEquals(1, entities.size()); + for (TimelineEntity e : entities) { + FlowActivityEntity entity = (FlowActivityEntity)e; + NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns(); + assertEquals(1, flowRuns.size()); + for (FlowRunEntity flowRun : flowRuns) { + assertEquals(runid, flowRun.getRunId()); + assertEquals(flowVersion, flowRun.getVersion()); + } + } + } finally { + hbr.close(); + } } private void checkFlowActivityTable(String cluster, String user, String flow, - String flowVersion, Long runid, Configuration c1) throws IOException { + String flowVersion, long runid, Configuration c1) throws IOException { Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); @@ -263,7 +303,7 @@ public class TestHBaseStorageFlowActivity { String user = "testManyRunsFlowActivity_c_user1"; String flow = "flow_activity_test_flow_name"; String flowVersion1 = "A122110F135BC4"; - Long runid1 = 11111111111L; + long runid1 = 11111111111L; String flowVersion2 = "A12222222222C4"; long runid2 = 2222222222222L; @@ -303,11 +343,50 @@ public class TestHBaseStorageFlowActivity { checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1, runid1, flowVersion2, runid2, flowVersion3, runid3); + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + Set<TimelineEntity> entities = + hbr.getEntities(null, cluster, null, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, + null, null, null, null, null, null, null, null, null); + assertEquals(1, entities.size()); + for (TimelineEntity e : entities) { + FlowActivityEntity flowActivity = (FlowActivityEntity)e; + assertEquals(cluster, flowActivity.getCluster()); + assertEquals(user, flowActivity.getUser()); + assertEquals(flow, flowActivity.getFlowName()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivity.getDate().getTime()); + Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns(); + assertEquals(3, flowRuns.size()); + for (FlowRunEntity flowRun : flowRuns) { + long runId = flowRun.getRunId(); + String version = flowRun.getVersion(); + if (runId == runid1) { + assertEquals(flowVersion1, version); + } else if (runId == runid2) { + assertEquals(flowVersion2, version); + } else if (runId == runid3) { + assertEquals(flowVersion3, version); + } else { + fail("unknown run id: " + runId); + } + } + } + } finally { + hbr.close(); + } } private void checkFlowActivityTableSeveralRuns(String cluster, String user, - String flow, Configuration c1, String flowVersion1, Long runid1, - String flowVersion2, Long runid2, String flowVersion3, Long runid3) + String flow, Configuration c1, String flowVersion1, long runid1, + String flowVersion2, long runid2, String flowVersion3, long runid3) throws IOException { Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); @@ -351,7 +430,7 @@ public class TestHBaseStorageFlowActivity { assertEquals(1, rowCount); } - private void checkFlowActivityRunId(Long runid, String flowVersion, + private void checkFlowActivityRunId(long runid, String flowVersion, Map<byte[], byte[]> values) throws IOException { byte[] rq = ColumnHelper.getColumnQualifier( FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index bf524ea..b0f83b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -21,20 +21,15 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -42,32 +37,16 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -125,11 +104,13 @@ public class TestHBaseStorageFlowRun { String user = "testWriteFlowRunMinMaxToHBase_user1"; String flow = "testing_flowRun_flow_name"; String flowVersion = "CF7022C10F1354"; - Long runid = 1002345678919L; + long runid = 1002345678919L; String appName = "application_100000000000_1111"; + long minStartTs = 10000000000000L; + long greaterStartTs = 30000000000000L; long endTs = 1439750690000L; TimelineEntity entityMinStartTime = TestFlowDataGenerator - .getEntityMinStartTime(); + .getEntityMinStartTime(minStartTs); try { hbi = new HBaseTimelineWriterImpl(c1); @@ -152,7 +133,7 @@ public class TestHBaseStorageFlowRun { // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator - .getEntityGreaterStartTime(); + .getEntityGreaterStartTime(greaterStartTs); te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; @@ -183,24 +164,29 @@ public class TestHBaseStorageFlowRun { .getBytes()); assertEquals(2, r1.size()); - Long starttime = (Long) GenericObjectMapper.read(values + long starttime = (Long) GenericObjectMapper.read(values .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); - Long expmin = entityMinStartTime.getCreatedTime(); - assertEquals(expmin, starttime); + assertEquals(minStartTs, starttime); assertEquals(endTs, GenericObjectMapper.read(values .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); - } - boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user, - String flow, Long runid) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); - assertTrue(rowKeyComponents.length == 4); - assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); - assertEquals(user, Bytes.toString(rowKeyComponents[1])); - assertEquals(flow, Bytes.toString(rowKeyComponents[2])); - assertEquals(TimelineWriterUtils.invert(runid), - Bytes.toLong(rowKeyComponents[3])); - return true; + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + // get the flow run entity + TimelineEntity entity = + hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + FlowRunEntity flowRun = (FlowRunEntity)entity; + assertEquals(minStartTs, flowRun.getStartTime()); + assertEquals(endTs, flowRun.getMaxEndTime()); + } finally { + hbr.close(); + } } /** @@ -218,7 +204,7 @@ public class TestHBaseStorageFlowRun { String user = "testWriteFlowRunMetricsOneFlow_user1"; String flow = "testing_flowRun_metrics_flow_name"; String flowVersion = "CF7022C10F1354"; - Long runid = 1002345678919L; + long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); @@ -244,6 +230,41 @@ public class TestHBaseStorageFlowRun { // check flow run checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + TimelineEntity entity = + hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + Set<TimelineMetric> metrics = entity.getMetrics(); + assertEquals(2, metrics.size()); + for (TimelineMetric metric : metrics) { + String id = metric.getId(); + Map<Long, Number> values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141, value); + break; + case metric2: + assertEquals(57, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } finally { + hbr.close(); + } } private void checkFlowRunTable(String cluster, String user, String flow,