http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index b5fc214..2d85bab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -17,21 +17,26 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; +import java.io.IOException; import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; -import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -39,6 +44,15 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; @@ -53,6 +67,8 @@ public final class TimelineStorageUtils { private TimelineStorageUtils() { } + private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class); + /** empty bytes. */ public static final byte[] EMPTY_BYTES = new byte[0]; @@ -312,6 +328,21 @@ public final class TimelineStorageUtils { } /** + * Check if we have a certain field amongst fields to retrieve. This method + * checks against {@link Field#ALL} as well because that would mean field + * passed needs to be matched. + * + * @param fieldsToRetrieve fields to be retrieved. + * @param requiredField fields to be checked in fieldsToRetrieve. + * @return true if has the required field, false otherwise. + */ + public static boolean hasField(EnumSet<Field> fieldsToRetrieve, + Field requiredField) { + return fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(requiredField); + } + + /** * Checks if the input TimelineEntity object is an ApplicationEntity. * * @param te TimelineEntity object. @@ -385,87 +416,317 @@ public final class TimelineStorageUtils { } /** + * Matches key-values filter. Used for relatesTo/isRelatedTo filters. * - * @param entityRelations the relations of an entity - * @param relationFilters the relations for filtering - * @return a boolean flag to indicate if both match + * @param entity entity which holds relatesTo/isRelatedTo relations which we + * will match against. + * @param keyValuesFilter key-values filter. + * @param entityFiltersType type of filters we are trying to match. + * @return true, if filter matches, false otherwise. */ - public static boolean matchRelations( - Map<String, Set<String>> entityRelations, - Map<String, Set<String>> relationFilters) { - for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) { - Set<String> ids = entityRelations.get(relation.getKey()); - if (ids == null) { + private static boolean matchKeyValuesFilter(TimelineEntity entity, + TimelineKeyValuesFilter keyValuesFilter, + TimelineEntityFiltersType entityFiltersType) { + Map<String, Set<String>> relations = null; + if (entityFiltersType == TimelineEntityFiltersType.IS_RELATED_TO) { + relations = entity.getIsRelatedToEntities(); + } else if (entityFiltersType == TimelineEntityFiltersType.RELATES_TO) { + relations = entity.getRelatesToEntities(); + } + if (relations == null) { + return false; + } + Set<String> ids = relations.get(keyValuesFilter.getKey()); + if (ids == null) { + return false; + } + boolean matched = false; + for (Object id : keyValuesFilter.getValues()) { + // Matches if id is found amongst the relationships for an entity and + // filter's compare op is EQUAL. + // If compare op is NOT_EQUAL, for a match to occur, id should not be + // found amongst relationships for an entity. + matched = !(ids.contains(id) ^ + keyValuesFilter.getCompareOp() == TimelineCompareOp.EQUAL); + if (!matched) { return false; } - for (String id : relation.getValue()) { - if (!ids.contains(id)) { - return false; - } - } } return true; } /** + * Matches relatesto. * - * @param map the map of key/value pairs in an entity - * @param filters the map of key/value pairs for filtering - * @return a boolean flag to indicate if both match + * @param entity entity which holds relatesto relations. + * @param relatesTo the relations for filtering. + * @return true, if filter matches, false otherwise. + * @throws IOException if an unsupported filter for matching relations is + * being matched. */ - public static boolean matchFilters(Map<String, ? extends Object> map, - Map<String, ? extends Object> filters) { - for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) { - Object value = map.get(filter.getKey()); - if (value == null) { - return false; - } - if (!value.equals(filter.getValue())) { - return false; - } + public static boolean matchRelatesTo(TimelineEntity entity, + TimelineFilterList relatesTo) throws IOException { + return matchFilters( + entity, relatesTo, TimelineEntityFiltersType.RELATES_TO); + } + + /** + * Matches isrelatedto. + * + * @param entity entity which holds isRelatedTo relations. + * @param isRelatedTo the relations for filtering. + * @return true, if filter matches, false otherwise. + * @throws IOException if an unsupported filter for matching relations is + * being matched. + */ + public static boolean matchIsRelatedTo(TimelineEntity entity, + TimelineFilterList isRelatedTo) throws IOException { + return matchFilters( + entity, isRelatedTo, TimelineEntityFiltersType.IS_RELATED_TO); + } + + /** + * Matches key-value filter. Used for config and info filters. + * + * @param entity entity which holds the config/info which we will match + * against. + * @param kvFilter a key-value filter. + * @param entityFiltersType type of filters we are trying to match. + * @return true, if filter matches, false otherwise. + */ + private static boolean matchKeyValueFilter(TimelineEntity entity, + TimelineKeyValueFilter kvFilter, + TimelineEntityFiltersType entityFiltersType) { + Map<String, ? extends Object> map = null; + // Supported only for config and info filters. + if (entityFiltersType == TimelineEntityFiltersType.CONFIG) { + map = entity.getConfigs(); + } else if (entityFiltersType == TimelineEntityFiltersType.INFO) { + map = entity.getInfo(); } - return true; + if (map == null) { + return false; + } + Object value = map.get(kvFilter.getKey()); + if (value == null) { + return false; + } + // Matches if filter's value is equal to the value of the key and filter's + // compare op is EQUAL. + // If compare op is NOT_EQUAL, for a match to occur, value should not be + // equal to the value of the key. + return !(value.equals(kvFilter.getValue()) ^ + kvFilter.getCompareOp() == TimelineCompareOp.EQUAL); + } + + /** + * Matches config filters. + * + * @param entity entity which holds a map of config key-value pairs. + * @param configFilters list of info filters. + * @return a boolean flag to indicate if both match. + * @throws IOException if an unsupported filter for matching config filters is + * being matched. + */ + public static boolean matchConfigFilters(TimelineEntity entity, + TimelineFilterList configFilters) throws IOException { + return + matchFilters(entity, configFilters, TimelineEntityFiltersType.CONFIG); + } + + /** + * Matches info filters. + * + * @param entity entity which holds a map of info key-value pairs. + * @param infoFilters list of info filters. + * @return a boolean flag to indicate if both match. + * @throws IOException if an unsupported filter for matching info filters is + * being matched. + */ + public static boolean matchInfoFilters(TimelineEntity entity, + TimelineFilterList infoFilters) throws IOException { + return matchFilters(entity, infoFilters, TimelineEntityFiltersType.INFO); } /** + * Matches exists filter. Used for event filters. * - * @param entityEvents the set of event objects in an entity - * @param eventFilters the set of event Ids for filtering - * @return a boolean flag to indicate if both match + * @param entity entity which holds the events which we will match against. + * @param existsFilter exists filter. + * @param entityFiltersType type of filters we are trying to match. + * @return true, if filter matches, false otherwise. */ - public static boolean matchEventFilters(Set<TimelineEvent> entityEvents, - Set<String> eventFilters) { + private static boolean matchExistsFilter(TimelineEntity entity, + TimelineExistsFilter existsFilter, + TimelineEntityFiltersType entityFiltersType) { + // Currently exists filter is only supported for event filters. + if (entityFiltersType != TimelineEntityFiltersType.EVENT) { + return false; + } Set<String> eventIds = new HashSet<String>(); - for (TimelineEvent event : entityEvents) { + for (TimelineEvent event : entity.getEvents()) { eventIds.add(event.getId()); } - for (String eventFilter : eventFilters) { - if (!eventIds.contains(eventFilter)) { - return false; - } + // Matches if filter's value is contained in the list of events filter's + // compare op is EQUAL. + // If compare op is NOT_EQUAL, for a match to occur, value should not be + // contained in the list of events. + return !(eventIds.contains(existsFilter.getValue()) ^ + existsFilter.getCompareOp() == TimelineCompareOp.EQUAL); + } + + /** + * Matches event filters. + * + * @param entity entity which holds a set of event objects. + * @param eventFilters the set of event Ids for filtering. + * @return a boolean flag to indicate if both match. + * @throws IOException if an unsupported filter for matching event filters is + * being matched. + */ + public static boolean matchEventFilters(TimelineEntity entity, + TimelineFilterList eventFilters) throws IOException { + return matchFilters(entity, eventFilters, TimelineEntityFiltersType.EVENT); + } + + /** + * Compare two values based on comparison operator. + * + * @param compareOp comparison operator. + * @param val1 value 1. + * @param val2 value 2. + * @return true, if relation matches, false otherwise + */ + private static boolean compareValues(TimelineCompareOp compareOp, + long val1, long val2) { + switch (compareOp) { + case LESS_THAN: + return val1 < val2; + case LESS_OR_EQUAL: + return val1 <= val2; + case EQUAL: + return val1 == val2; + case NOT_EQUAL: + return val1 != val2; + case GREATER_OR_EQUAL: + return val1 >= val2; + case GREATER_THAN: + return val1 > val2; + default: + throw new RuntimeException("Unknown TimelineCompareOp " + + compareOp.name()); } - return true; } /** + * Matches compare filter. Used for metric filters. * - * @param metrics the set of metric objects in an entity - * @param metricFilters the set of metric Ids for filtering - * @return a boolean flag to indicate if both match + * @param entity entity which holds the metrics which we will match against. + * @param compareFilter compare filter. + * @param entityFiltersType type of filters we are trying to match. + * @return true, if filter matches, false otherwise. + * @throws IOException if metric filters holds non integral values. */ - public static boolean matchMetricFilters(Set<TimelineMetric> metrics, - Set<String> metricFilters) { - Set<String> metricIds = new HashSet<String>(); - for (TimelineMetric metric : metrics) { - metricIds.add(metric.getId()); + private static boolean matchCompareFilter(TimelineEntity entity, + TimelineCompareFilter compareFilter, + TimelineEntityFiltersType entityFiltersType) throws IOException { + // Currently exists filter is only supported for metric filters. + if (entityFiltersType != TimelineEntityFiltersType.METRIC) { + return false; + } + // We expect only integral values(short/int/long) for metric filters. + if (!isIntegralValue(compareFilter.getValue())) { + throw new IOException("Metric filters has non integral values"); + } + Map<String, TimelineMetric> metricMap = + new HashMap<String, TimelineMetric>(); + for (TimelineMetric metric : entity.getMetrics()) { + metricMap.put(metric.getId(), metric); } + TimelineMetric metric = metricMap.get(compareFilter.getKey()); + if (metric == null) { + return false; + } + // We will be using the latest value of metric to compare. + return compareValues(compareFilter.getCompareOp(), + metric.getValuesJAXB().firstEntry().getValue().longValue(), + ((Number)compareFilter.getValue()).longValue()); + } - for (String metricFilter : metricFilters) { - if (!metricIds.contains(metricFilter)) { - return false; + /** + * Matches metric filters. + * + * @param entity entity which holds a set of metric objects. + * @param metricFilters list of metric filters. + * @return a boolean flag to indicate if both match. + * @throws IOException if an unsupported filter for matching metric filters is + * being matched. + */ + public static boolean matchMetricFilters(TimelineEntity entity, + TimelineFilterList metricFilters) throws IOException { + return matchFilters( + entity, metricFilters, TimelineEntityFiltersType.METRIC); + } + + /** + * Common routine to match different filters. Iterates over a filter list and + * calls routines based on filter type. + * + * @param entity Timeline entity. + * @param filters filter list. + * @param entityFiltersType type of filters which are being matched. + * @return a boolean flag to indicate if filter matches. + * @throws IOException if an unsupported filter for matching this specific + * filter is being matched. + */ + private static boolean matchFilters(TimelineEntity entity, + TimelineFilterList filters, TimelineEntityFiltersType entityFiltersType) + throws IOException { + if (filters == null || filters.getFilterList().isEmpty()) { + return false; + } + TimelineFilterList.Operator operator = filters.getOperator(); + for (TimelineFilter filter : filters.getFilterList()) { + TimelineFilterType filterType = filter.getFilterType(); + if (!entityFiltersType.isValidFilter(filterType)) { + throw new IOException("Unsupported filter " + filterType); + } + boolean matched = false; + switch (filterType) { + case LIST: + matched = matchFilters(entity, (TimelineFilterList)filter, + entityFiltersType); + break; + case COMPARE: + matched = matchCompareFilter(entity, (TimelineCompareFilter)filter, + entityFiltersType); + break; + case EXISTS: + matched = matchExistsFilter(entity, (TimelineExistsFilter)filter, + entityFiltersType); + break; + case KEY_VALUE: + matched = matchKeyValueFilter(entity, (TimelineKeyValueFilter)filter, + entityFiltersType); + break; + case KEY_VALUES: + matched = matchKeyValuesFilter(entity, (TimelineKeyValuesFilter)filter, + entityFiltersType); + break; + default: + throw new IOException("Unsupported filter " + filterType); + } + if (!matched) { + if(operator == TimelineFilterList.Operator.AND) { + return false; + } + } else { + if(operator == TimelineFilterList.Operator.OR) { + return true; + } } } - return true; + return operator == TimelineFilterList.Operator.AND; } /** @@ -530,4 +791,100 @@ public final class TimelineStorageUtils { } return appId; } + + /** + * Helper method for reading relationship. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isRelatedTo if true, means relationship is to be added to + * isRelatedTo, otherwise its added to relatesTo. + * @throws IOException if any problem is encountered while reading result. + */ + public static <T> void readRelationship( + TimelineEntity entity, Result result, ColumnPrefix<T> prefix, + boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map<String, Set<String>> + Map<String, Object> columns = prefix.readResults(result); + for (Map.Entry<String, Object> column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded( + column.getValue().toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + /** + * Helper method for reading key-value pairs for either info or config. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isConfig if true, means we are reading configs, otherwise info. + * @throws IOException if any problem is encountered while reading result. + */ + public static <T> void readKeyValuePairs( + TimelineEntity entity, Result result, ColumnPrefix<T> prefix, + boolean isConfig) throws IOException { + // info and configuration are of type Map<String, Object or String> + Map<String, Object> columns = prefix.readResults(result); + if (isConfig) { + for (Map.Entry<String, Object> column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } + + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result HBase Result. + * @param prefix column prefix. + * @throws IOException if any problem is encountered while reading result. + */ + public static <T> void readEvents(TimelineEntity entity, Result result, + ColumnPrefix<T> prefix) throws IOException { + Map<String, TimelineEvent> eventsMap = new HashMap<>(); + Map<?, Object> eventsResult = + prefix.readResultsHavingCompoundColumnQualifiers(result); + for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) { + byte[][] karr = (byte[][])eventResult.getKey(); + // the column name is of the form "eventId=timestamp=infoKey" + if (karr.length == 3) { + String id = Bytes.toString(karr[0]); + long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); + String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); + TimelineEvent event = eventsMap.get(key); + if (event == null) { + event = new TimelineEvent(); + event.setId(id); + event.setTimestamp(ts); + eventsMap.put(key, event); + } + // handle empty info + String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); + if (infoKey != null) { + event.addInfo(infoKey, eventResult.getValue()); + } + } else { + LOG.warn("incorrectly formatted column name: it will be discarded"); + continue; + } + } + Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index f47ba93..775879a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -24,8 +24,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** @@ -46,7 +49,8 @@ public enum EntityColumn implements Column<EntityTable> { /** * When the entity was created. */ - CREATED_TIME(EntityColumnFamily.INFO, "created_time"), + CREATED_TIME(EntityColumnFamily.INFO, "created_time", + LongConverter.getInstance()), /** * The version of the flow that this entity belongs to. @@ -60,12 +64,17 @@ public enum EntityColumn implements Column<EntityTable> { EntityColumn(ColumnFamily<EntityTable> columnFamily, String columnQualifier) { + this(columnFamily, columnQualifier, GenericConverter.getInstance()); + } + + EntityColumn(ColumnFamily<EntityTable> columnFamily, + String columnQualifier, ValueConverter converter) { this.columnFamily = columnFamily; this.columnQualifier = columnQualifier; // Future-proof by ensuring the right column prefix hygiene. this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); - this.column = new ColumnHelper<EntityTable>(columnFamily); + this.column = new ColumnHelper<EntityTable>(columnFamily, converter); } /** @@ -108,6 +117,21 @@ public enum EntityColumn implements Column<EntityTable> { return null; } + @Override + public byte[] getColumnQualifierBytes() { + return columnQualifierBytes.clone(); + } + + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + /** * Retrieve an {@link EntityColumn} given a name, or null if there is no * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index f3c7e7f..de2b29d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -56,7 +56,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { /** * Lifecycle events for an entity. */ - EVENT(EntityColumnFamily.INFO, "e"), + EVENT(EntityColumnFamily.INFO, "e", true), /** * Config column stores configuration with config key as the column name. @@ -78,6 +78,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { */ private final String columnPrefix; private final byte[] columnPrefixBytes; + private final boolean compoundColQual; /** * Private constructor, meant to be used by the enum definition. @@ -87,7 +88,18 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { */ EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily, String columnPrefix) { - this(columnFamily, columnPrefix, GenericConverter.getInstance()); + this(columnFamily, columnPrefix, false, GenericConverter.getInstance()); + } + + EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily, + String columnPrefix, boolean compondColQual) { + this(columnFamily, columnPrefix, compondColQual, + GenericConverter.getInstance()); + } + + EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily, + String columnPrefix, ValueConverter converter) { + this(columnFamily, columnPrefix, false, converter); } /** @@ -99,7 +111,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { * this column prefix. */ EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily, - String columnPrefix, ValueConverter converter) { + String columnPrefix, boolean compondColQual, ValueConverter converter) { column = new ColumnHelper<EntityTable>(columnFamily, converter); this.columnFamily = columnFamily; this.columnPrefix = columnPrefix; @@ -110,6 +122,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); } + this.compoundColQual = compondColQual; } /** @@ -131,6 +144,24 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { this.columnPrefixBytes, qualifierPrefix); } + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + + public byte[] getCompoundColQualBytes(String qualifier, + byte[]...components) { + if (!compoundColQual) { + return ColumnHelper.getColumnQualifier(null, qualifier); + } + return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components); + } + /* * (non-Javadoc) * @@ -287,5 +318,4 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { // Default to null return null; } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index a5933da..188c2fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; /** * Identifies partially qualified columns for the {@link FlowActivityTable}. @@ -50,6 +51,7 @@ public enum FlowActivityColumnPrefix */ private final String columnPrefix; private final byte[] columnPrefixBytes; + private final boolean compoundColQual; private final AggregationOperation aggOp; @@ -64,6 +66,12 @@ public enum FlowActivityColumnPrefix private FlowActivityColumnPrefix( ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix, AggregationOperation aggOp) { + this(columnFamily, columnPrefix, aggOp, false); + } + + private FlowActivityColumnPrefix( + ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix, + AggregationOperation aggOp, boolean compoundColQual) { column = new ColumnHelper<FlowActivityTable>(columnFamily); this.columnFamily = columnFamily; this.columnPrefix = columnPrefix; @@ -75,6 +83,7 @@ public enum FlowActivityColumnPrefix .encode(columnPrefix)); } this.aggOp = aggOp; + this.compoundColQual = compoundColQual; } /** @@ -100,6 +109,16 @@ public enum FlowActivityColumnPrefix return columnPrefixBytes.clone(); } + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + public AggregationOperation getAttribute() { return aggOp; } @@ -251,4 +270,20 @@ public enum FlowActivityColumnPrefix column.store(rowKey, tableMutator, columnQualifier, null, inputValue, combinedAttributes); } + + @Override + public byte[] getCompoundColQualBytes(String qualifier, + byte[]...components) { + if (!compoundColQual) { + return ColumnHelper.getColumnQualifier(null, qualifier); + } + return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components); + } + + @Override + public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result) + throws IOException { + // There are no compound column qualifiers for flow activity table. + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index d50bb16..f1553b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -86,10 +86,12 @@ public enum FlowRunColumn implements Column<FlowRunTable> { return columnQualifier; } + @Override public byte[] getColumnQualifierBytes() { return columnQualifierBytes.clone(); } + @Override public byte[] getColumnFamilyBytes() { return columnFamily.getBytes(); } @@ -144,6 +146,7 @@ public enum FlowRunColumn implements Column<FlowRunTable> { return null; } + @Override public ValueConverter getValueConverter() { return column.getValueConverter(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index fa94aae..77f2ab2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -52,6 +52,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { */ private final String columnPrefix; private final byte[] columnPrefixBytes; + private final boolean compoundColQual; private final AggregationOperation aggOp; @@ -65,6 +66,12 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { */ private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily, String columnPrefix, AggregationOperation fra, ValueConverter converter) { + this(columnFamily, columnPrefix, fra, converter, false); + } + + private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily, + String columnPrefix, AggregationOperation fra, ValueConverter converter, + boolean compoundColQual) { column = new ColumnHelper<FlowRunTable>(columnFamily, converter); this.columnFamily = columnFamily; this.columnPrefix = columnPrefix; @@ -76,6 +83,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { .encode(columnPrefix)); } this.aggOp = fra; + this.compoundColQual = compoundColQual; } /** @@ -101,6 +109,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { this.columnPrefixBytes, qualifierPrefix); } + @Override public byte[] getColumnFamilyBytes() { return columnFamily.getBytes(); } @@ -222,6 +231,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { return null; } + @Override public ValueConverter getValueConverter() { return column.getValueConverter(); } @@ -257,4 +267,20 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { // Default to null return null; } + + @Override + public byte[] getCompoundColQualBytes(String qualifier, + byte[]...components) { + if (!compoundColQual) { + return ColumnHelper.getColumnQualifier(null, qualifier); + } + return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components); + } + + @Override + public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result) + throws IOException { + // There are no compound column qualifiers for flow run table. + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 6baea37..0ace529 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -200,6 +200,7 @@ class FlowScanner implements RegionScanner, Closeable { int addedCnt = 0; long currentTimestamp = System.currentTimeMillis(); ValueConverter converter = null; + while (cellLimit <= 0 || addedCnt < cellLimit) { cell = peekAtNextCell(cellLimit); if (cell == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 0de09e0..53210f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; import java.util.EnumSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; @@ -76,93 +77,231 @@ class ApplicationEntityReader extends GenericEntityReader { return APPLICATION_TABLE; } + /** + * This method is called only for multiple entity reads. + */ @Override - protected FilterList constructFilterListBasedOnFields() { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // Fetch all the columns. - if (dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (dataToRetrieve.getConfsToRetrieve() == null || - dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) && - (dataToRetrieve.getMetricsToRetrieve() == null || - dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { - return list; + protected FilterList constructFilterListBasedOnFilters() throws IOException { + // Filters here cannot be null for multiple entity reads as they are set in + // augmentParams if null. + TimelineEntityFilters filters = getFilters(); + FilterList listBasedOnFilters = new FilterList(); + // Create filter list based on created time range and add it to + // listBasedOnFilters. + long createdTimeBegin = filters.getCreatedTimeBegin(); + long createdTimeEnd = filters.getCreatedTimeEnd(); + if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createSingleColValueFiltersByRange( + ApplicationColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd)); } - FilterList infoColFamilyList = new FilterList(); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); - infoColFamilyList.addFilter(infoColumnFamily); + // Create filter list based on metric filters and add it to + // listBasedOnFilters. + TimelineFilterList metricFilters = filters.getMetricFilters(); + if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.METRIC, metricFilters)); + } + // Create filter list based on config filters and add it to + // listBasedOnFilters. + TimelineFilterList configFilters = filters.getConfigFilters(); + if (configFilters != null && !configFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.CONFIG, configFilters)); + } + // Create filter list based on info filters and add it to listBasedOnFilters + TimelineFilterList infoFilters = filters.getInfoFilters(); + if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.INFO, infoFilters)); + } + return listBasedOnFilters; + } + + /** + * Add {@link QualifierFilter} filters to filter list for each column of + * application table. + * + * @param list filter list to which qualifier filters have to be added. + */ + @Override + protected void updateFixedColumns(FilterList list) { + for (ApplicationColumn column : ApplicationColumn.values()) { + list.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryComparator(column.getColumnQualifierBytes()))); + } + } + + /** + * Creates a filter list which indicates that only some of the column + * qualifiers in the info column family will be returned in result. + * + * @return filter list. + * @throws IOException if any problem occurs while creating filter list. + */ + private FilterList createFilterListForColsOfInfoFamily() + throws IOException { + FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); + // Add filters for each column in entity table. + updateFixedColumns(infoFamilyColsFilter); + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // If INFO field has to be retrieved, add a filter for fetching columns + // with INFO column prefix. + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, ApplicationColumnPrefix.INFO)); + } + TimelineFilterList relatesTo = getFilters().getRelatesTo(); + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + // If RELATES_TO field has to be retrieved, add a filter for fetching + // columns with RELATES_TO column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, ApplicationColumnPrefix.RELATES_TO)); + } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain RELATES_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // relatesTo filters are specified. relatesTo filters will then be + // matched after fetching rows from HBase. + Set<String> relatesToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.RELATES_TO, relatesToCols)); + } + TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + // If IS_RELATED_TO field has to be retrieved, add a filter for fetching + // columns with IS_RELATED_TO column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); + } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain IS_RELATED_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // isRelatedTo filters are specified. isRelatedTo filters will then be + // matched after fetching rows from HBase. + Set<String> isRelatedToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + } + TimelineFilterList eventFilters = getFilters().getEventFilters(); + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + // If EVENTS field has to be retrieved, add a filter for fetching columns + // with EVENT column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, ApplicationColumnPrefix.EVENT)); + } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){ + // Even if fields to retrieve does not contain EVENTS, we still need to + // have a filter to fetch some of the column qualifiers on the basis of + // event filters specified. Event filters will then be matched after + // fetching rows from HBase. + Set<String> eventCols = + TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.EVENT, eventCols)); + } + return infoFamilyColsFilter; + } + + /** + * Exclude column prefixes via filters which are not required(based on fields + * to retrieve) from info column family. These filters are added to filter + * list which contains a filter for getting info column family. + * + * @param infoColFamilyList filter list for info column family. + */ + private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // Events not required. - TimelineEntityFilters filters = getFilters(); - if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (isSingleEntityRead() || filters.getEventFilters() == null)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.EVENT.getColumnPrefixBytes("")))); + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT)); } // info not required. - if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (isSingleEntityRead() || filters.getInfoFilters() == null)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.INFO.getColumnPrefixBytes("")))); + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO)); } - // is releated to not required. - if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (isSingleEntityRead() || filters.getIsRelatedTo() == null)) { + // is related to not required. + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); } // relates to not required. - if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (isSingleEntityRead() || filters.getRelatesTo() == null)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO)); } - list.addFilter(infoColFamilyList); - if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) || - (!isSingleEntityRead() && filters.getConfigFilters() != null)) || - (dataToRetrieve.getConfsToRetrieve() != null && - !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) { - FilterList filterCfg = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes()))); - if (dataToRetrieve.getConfsToRetrieve() != null && - !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) { - filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.CONFIG, - dataToRetrieve.getConfsToRetrieve())); - } - list.addFilter(filterCfg); + } + + /** + * Updates filter list based on fields for confs and metrics to retrieve. + * + * @param listBasedOnFields filter list based on fields. + * @throws IOException if any problem occurs while updating filter list. + */ + private void updateFilterForConfsAndMetricsToRetrieve( + FilterList listBasedOnFields) throws IOException { + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); + // Please note that if confsToRetrieve is specified, we would have added + // CONFS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { + // Create a filter list for configs. + listBasedOnFields.addFilter(TimelineFilterUtils. + createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getConfsToRetrieve(), + ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG)); } - if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) || - (!isSingleEntityRead() && filters.getMetricFilters() != null)) || - (dataToRetrieve.getMetricsToRetrieve() != null && - !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { - FilterList filterMetrics = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes()))); - if (dataToRetrieve.getMetricsToRetrieve() != null && - !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) { - filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.METRIC, - dataToRetrieve.getMetricsToRetrieve())); - } - list.addFilter(filterMetrics); + + // Please note that if metricsToRetrieve is specified, we would have added + // METRICS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { + // Create a filter list for metrics. + listBasedOnFields.addFilter(TimelineFilterUtils. + createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getMetricsToRetrieve(), + ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC)); + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() throws IOException { + if (!needCreateFilterListBasedOnFields()) { + // Fetch all the columns. No need of a filter. + return null; + } + FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE); + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { + // We can fetch only some of the columns from info family. + infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily()); + } else { + // Exclude column prefixes in info column family which are not required + // based on fields to retrieve. + excludeFieldsFromInfoColFamily(infoColFamilyList); } - return list; + listBasedOnFields.addFilter(infoColFamilyList); + + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + return listBasedOnFields; } @Override @@ -182,6 +321,9 @@ class ApplicationEntityReader extends GenericEntityReader { @Override protected void validateParams() { + Preconditions.checkNotNull(getContext(), "context shouldn't be null"); + Preconditions.checkNotNull( + getDataToRetrieve(), "data to retrieve shouldn't be null"); Preconditions.checkNotNull(getContext().getClusterId(), "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getEntityType(), @@ -202,6 +344,7 @@ class ApplicationEntityReader extends GenericEntityReader { throws IOException { TimelineReaderContext context = getContext(); if (isSingleEntityRead()) { + // Get flow context information from AppToFlow table. if (context.getFlowName() == null || context.getFlowRunId() == null || context.getUserId() == null) { FlowContext flowContext = lookupFlowContext( @@ -211,7 +354,12 @@ class ApplicationEntityReader extends GenericEntityReader { context.setUserId(flowContext.getUserId()); } } + // Add configs/metrics to fields to retrieve if confsToRetrieve and/or + // metricsToRetrieve are specified. getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); + if (!isSingleEntityRead()) { + createFiltersIfNull(); + } } @Override @@ -252,114 +400,84 @@ class ApplicationEntityReader extends GenericEntityReader { Number createdTime = (Number)ApplicationColumn.CREATED_TIME.readResult(result); entity.setCreatedTime(createdTime.longValue()); - if (!isSingleEntityRead() && - (entity.getCreatedTime() < filters.getCreatedTimeBegin() || - entity.getCreatedTime() > filters.getCreatedTimeEnd())) { - return null; - } + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // fetch is related to entities + // fetch is related to entities and match isRelatedTo filter. If isRelatedTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // isRelatedTo are not set in HBase scan. boolean checkIsRelatedTo = - filters != null && filters.getIsRelatedTo() != null && - filters.getIsRelatedTo().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, - true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), filters.getIsRelatedTo())) { + !isSingleEntityRead() && filters.getIsRelatedTo() != null && + filters.getIsRelatedTo().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || + checkIsRelatedTo) { + TimelineStorageUtils.readRelationship( + entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { return null; } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, + Field.IS_RELATED_TO)) { entity.getIsRelatedToEntities().clear(); } } - // fetch relates to entities + // fetch relates to entities and match relatesTo filter. If relatesTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // relatesTo are not set in HBase scan. boolean checkRelatesTo = - filters != null && filters.getRelatesTo() != null && - filters.getRelatesTo().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, - false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), filters.getRelatesTo())) { + !isSingleEntityRead() && filters.getRelatesTo() != null && + filters.getRelatesTo().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) || + checkRelatesTo) { + TimelineStorageUtils.readRelationship( + entity, result, ApplicationColumnPrefix.RELATES_TO, false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { return null; } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { entity.getRelatesToEntities().clear(); } } - // fetch info - boolean checkInfo = filters != null && filters.getInfoFilters() != null && - filters.getInfoFilters().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); - if (checkInfo && - !TimelineStorageUtils.matchFilters( - entity.getInfo(), filters.getInfoFilters())) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } + // fetch info if fieldsToRetrieve contains INFO or ALL. + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { + TimelineStorageUtils.readKeyValuePairs( + entity, result, ApplicationColumnPrefix.INFO, false); } - // fetch configs - boolean checkConfigs = - filters != null && filters.getConfigFilters() != null && - filters.getConfigFilters().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), filters.getConfigFilters())) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } + // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) { + TimelineStorageUtils.readKeyValuePairs( + entity, result, ApplicationColumnPrefix.CONFIG, true); } - // fetch events + // fetch events and match event filters if they exist. If event filters do + // not match, entity would be dropped. We have to match filters locally + // as relevant HBase filters to filter out rows on the basis of events + // are not set in HBase scan. boolean checkEvents = - filters != null && filters.getEventFilters() != null && - filters.getEventFilters().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, true); - if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), filters.getEventFilters())) { + !isSingleEntityRead() && filters.getEventFilters() != null && + filters.getEventFilters().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) || + checkEvents) { + TimelineStorageUtils.readEvents( + entity, result, ApplicationColumnPrefix.EVENT); + if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { return null; } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { entity.getEvents().clear(); } } - // fetch metrics - boolean checkMetrics = - filters != null && filters.getMetricFilters() != null && - filters.getMetricFilters().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + // fetch metrics if fieldsToRetrieve contains METRICS or ALL. + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) { readMetrics(entity, result, ApplicationColumnPrefix.METRIC); - if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), filters.getMetricFilters())) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } } return entity; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index 0d2bdd8..d8ca038 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; @@ -75,6 +76,12 @@ class FlowActivityEntityReader extends TimelineEntityReader { @Override protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { + createFiltersIfNull(); + } + + @Override + protected FilterList constructFilterListBasedOnFilters() throws IOException { + return null; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 743315c..b2de2e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -38,9 +38,11 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; @@ -76,6 +78,9 @@ class FlowRunEntityReader extends TimelineEntityReader { @Override protected void validateParams() { + Preconditions.checkNotNull(getContext(), "context shouldn't be null"); + Preconditions.checkNotNull( + getDataToRetrieve(), "data to retrieve shouldn't be null"); Preconditions.checkNotNull(getContext().getClusterId(), "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getUserId(), @@ -90,37 +95,87 @@ class FlowRunEntityReader extends TimelineEntityReader { @Override protected void augmentParams(Configuration hbaseConf, Connection conn) { + // Add metrics to fields to retrieve if metricsToRetrieve is specified. getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); + if (!isSingleEntityRead()) { + createFiltersIfNull(); + } + } + + protected FilterList constructFilterListBasedOnFilters() throws IOException { + FilterList listBasedOnFilters = new FilterList(); + // Filter based on created time range. + Long createdTimeBegin = getFilters().getCreatedTimeBegin(); + Long createdTimeEnd = getFilters().getCreatedTimeEnd(); + if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createSingleColValueFiltersByRange( + FlowRunColumn.MIN_START_TIME, createdTimeBegin, createdTimeEnd)); + } + // Filter based on metric filters. + TimelineFilterList metricFilters = getFilters().getMetricFilters(); + if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricFilters)); + } + return listBasedOnFilters; + } + + /** + * Add {@link QualifierFilter} filters to filter list for each column of + * flow run table. + * + * @return filter list to which qualifier filters have been added. + */ + private FilterList updateFixedColumns() { + FilterList columnsList = new FilterList(Operator.MUST_PASS_ONE); + for (FlowRunColumn column : FlowRunColumn.values()) { + columnsList.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryComparator(column.getColumnQualifierBytes()))); + } + return columnsList; } @Override - protected FilterList constructFilterListBasedOnFields() { + protected FilterList constructFilterListBasedOnFields() throws IOException { FilterList list = new FilterList(Operator.MUST_PASS_ONE); - // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // Metrics not required. - if (!isSingleEntityRead() && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)) { + // If multiple entities have to be retrieved, check if metrics have to be + // retrieved and if not, add a filter so that metrics can be excluded. + // Metrics are always returned if we are reading a single entity. + if (!isSingleEntityRead() && !TimelineStorageUtils.hasField( + dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); infoColFamilyList.addFilter(infoColumnFamily); infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); - list.addFilter(infoColFamilyList); - } - if (dataToRetrieve.getMetricsToRetrieve() != null && - !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) { - FilterList infoColFamilyList = new FilterList(); - infoColFamilyList.addFilter(infoColumnFamily); - infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, dataToRetrieve.getMetricsToRetrieve())); + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); list.addFilter(infoColFamilyList); + } else { + // Check if metricsToRetrieve are specified and if they are, create a + // filter list for info column family by adding flow run tables columns + // and a list for metrics to retrieve. Pls note that fieldsToRetrieve + // will have METRICS added to it if metricsToRetrieve are specified + // (in augmentParams()). + TimelineFilterList metricsToRetrieve = + dataToRetrieve.getMetricsToRetrieve(); + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + FilterList infoColFamilyList = new FilterList(); + infoColFamilyList.addFilter(infoColumnFamily); + FilterList columnsList = updateFixedColumns(); + columnsList.addFilter( + TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + infoColFamilyList.addFilter(columnsList); + list.addFilter(infoColFamilyList); + } } return list; } @@ -175,11 +230,6 @@ class FlowRunEntityReader extends TimelineEntityReader { if (startTime != null) { flowRun.setStartTime(startTime.longValue()); } - if (!isSingleEntityRead() && - (flowRun.getStartTime() < getFilters().getCreatedTimeBegin() || - flowRun.getStartTime() > getFilters().getCreatedTimeEnd())) { - return null; - } // read the end time if available Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); @@ -193,9 +243,10 @@ class FlowRunEntityReader extends TimelineEntityReader { flowRun.setVersion(version); } - // read metrics - if (isSingleEntityRead() || - getDataToRetrieve().getFieldsToRetrieve().contains(Field.METRICS)) { + // read metrics if its a single entity query or if METRICS are part of + // fieldsToRetrieve. + if (isSingleEntityRead() || TimelineStorageUtils.hasField( + getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) { readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org