http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8e8a03b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index d8f73d4..6696ac5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -19,13 +19,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; @@ -33,28 +28,22 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; @@ -71,7 +60,6 @@ import com.google.common.base.Preconditions; */ class GenericEntityReader extends TimelineEntityReader { private static final EntityTable ENTITY_TABLE = new EntityTable(); - private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); /** * Used to look up the flow context. @@ -97,92 +85,322 @@ class GenericEntityReader extends TimelineEntityReader { } @Override - protected FilterList constructFilterListBasedOnFields() { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // Fetch all the columns. - if (dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (dataToRetrieve.getConfsToRetrieve() == null || - dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) && - (dataToRetrieve.getMetricsToRetrieve() == null || - dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { - return list; + protected FilterList constructFilterListBasedOnFilters() throws IOException { + // Filters here cannot be null for multiple entity reads as they are set in + // augmentParams if null. + FilterList listBasedOnFilters = new FilterList(); + TimelineEntityFilters filters = getFilters(); + // Create filter list based on created time range and add it to + // listBasedOnFilters. + long createdTimeBegin = filters.getCreatedTimeBegin(); + long createdTimeEnd = filters.getCreatedTimeEnd(); + if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createSingleColValueFiltersByRange( + EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd)); } - FilterList infoColFamilyList = new FilterList(); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.INFO.getBytes())); - infoColFamilyList.addFilter(infoColumnFamily); + // Create filter list based on metric filters and add it to + // listBasedOnFilters. + TimelineFilterList metricFilters = filters.getMetricFilters(); + if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.METRIC, metricFilters)); + } + // Create filter list based on config filters and add it to + // listBasedOnFilters. + TimelineFilterList configFilters = filters.getConfigFilters(); + if (configFilters != null && !configFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.CONFIG, configFilters)); + } + // Create filter list based on info filters and add it to listBasedOnFilters + TimelineFilterList infoFilters = filters.getInfoFilters(); + if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.INFO, infoFilters)); + } + return listBasedOnFilters; + } + + /** + * Check if we need to fetch only some of the event columns. + * + * @return true if we need to fetch some of the columns, false otherwise. + */ + private static boolean fetchPartialEventCols(TimelineFilterList eventFilters, + EnumSet<Field> fieldsToRetrieve) { + return (eventFilters != null && !eventFilters.getFilterList().isEmpty() && + !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)); + } + + /** + * Check if we need to fetch only some of the relates_to columns. + * + * @return true if we need to fetch some of the columns, false otherwise. + */ + private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, + EnumSet<Field> fieldsToRetrieve) { + return (relatesTo != null && !relatesTo.getFilterList().isEmpty() && + !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)); + } + + /** + * Check if we need to fetch only some of the is_related_to columns. + * + * @return true if we need to fetch some of the columns, false otherwise. + */ + private static boolean fetchPartialIsRelatedToCols( + TimelineFilterList isRelatedTo, EnumSet<Field> fieldsToRetrieve) { + return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() && + !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)); + } + + /** + * Check if we need to fetch only some of the columns based on event filters, + * relatesto and isrelatedto from info family. + * + * @return true, if we need to fetch only some of the columns, false if we + * need to fetch all the columns under info column family. + */ + protected boolean fetchPartialColsFromInfoFamily() { + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); TimelineEntityFilters filters = getFilters(); + return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) || + fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) || + fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), fieldsToRetrieve); + } + + /** + * Check if we need to create filter list based on fields. We need to create + * a filter list iff all fields need not be retrieved or we have some specific + * fields or metrics to retrieve. We also need to create a filter list if we + * have relationships(relatesTo/isRelatedTo) and event filters specified for + * the query. + * + * @return true if we need to create the filter list, false otherwise. + */ + protected boolean needCreateFilterListBasedOnFields() { + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); + // Check if all fields are to be retrieved or not. If all fields have to + // be retrieved, also check if we have some metrics or configs to + // retrieve specified for the query because then a filter list will have + // to be created. + boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) || + (dataToRetrieve.getConfsToRetrieve() != null && + !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) || + (dataToRetrieve.getMetricsToRetrieve() != null && + !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()); + // Filters need to be checked only if we are reading multiple entities. If + // condition above is false, we check if there are relationships(relatesTo/ + // isRelatedTo) and event filters specified for the query. + if (!flag && !isSingleEntityRead()) { + TimelineEntityFilters filters = getFilters(); + flag = (filters.getEventFilters() != null && + !filters.getEventFilters().getFilterList().isEmpty()) || + (filters.getIsRelatedTo() != null && + !filters.getIsRelatedTo().getFilterList().isEmpty()) || + (filters.getRelatesTo() != null && + !filters.getRelatesTo().getFilterList().isEmpty()); + } + return flag; + } + + /** + * Add {@link QualifierFilter} filters to filter list for each column of + * entity table. + * + * @param list filter list to which qualifier filters have to be added. + */ + protected void updateFixedColumns(FilterList list) { + for (EntityColumn column : EntityColumn.values()) { + list.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryComparator(column.getColumnQualifierBytes()))); + } + } + + /** + * Creates a filter list which indicates that only some of the column + * qualifiers in the info column family will be returned in result. + * + * @param isApplication If true, it means operations are to be performed for + * application table, otherwise for entity table. + * @return filter list. + * @throws IOException if any problem occurs while creating filter list. + */ + private FilterList createFilterListForColsOfInfoFamily() + throws IOException { + FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); + // Add filters for each column in entity table. + updateFixedColumns(infoFamilyColsFilter); + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // If INFO field has to be retrieved, add a filter for fetching columns + // with INFO column prefix. + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, EntityColumnPrefix.INFO)); + } + TimelineFilterList relatesTo = getFilters().getRelatesTo(); + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + // If RELATES_TO field has to be retrieved, add a filter for fetching + // columns with RELATES_TO column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO)); + } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain RELATES_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // relatesTo filters are specified. relatesTo filters will then be + // matched after fetching rows from HBase. + Set<String> relatesToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createFiltersFromColumnQualifiers( + EntityColumnPrefix.RELATES_TO, relatesToCols)); + } + TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + // If IS_RELATED_TO field has to be retrieved, add a filter for fetching + // columns with IS_RELATED_TO column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO)); + } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain IS_RELATED_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // isRelatedTo filters are specified. isRelatedTo filters will then be + // matched after fetching rows from HBase. + Set<String> isRelatedToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createFiltersFromColumnQualifiers( + EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + } + TimelineFilterList eventFilters = getFilters().getEventFilters(); + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + // If EVENTS field has to be retrieved, add a filter for fetching columns + // with EVENT column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, EntityColumnPrefix.EVENT)); + } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){ + // Even if fields to retrieve does not contain EVENTS, we still need to + // have a filter to fetch some of the column qualifiers on the basis of + // event filters specified. Event filters will then be matched after + // fetching rows from HBase. + Set<String> eventCols = + TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createFiltersFromColumnQualifiers( + EntityColumnPrefix.EVENT, eventCols)); + } + return infoFamilyColsFilter; + } + + /** + * Exclude column prefixes via filters which are not required(based on fields + * to retrieve) from info column family. These filters are added to filter + * list which contains a filter for getting info column family. + * + * @param infoColFamilyList filter list for info column family. + */ + private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // Events not required. - if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (isSingleEntityRead() || filters.getEventFilters() == null)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.EVENT.getColumnPrefixBytes("")))); + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT)); } // info not required. - if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (isSingleEntityRead() || filters.getInfoFilters() == null)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.INFO.getColumnPrefixBytes("")))); + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO)); } // is related to not required. - if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (isSingleEntityRead() || filters.getIsRelatedTo() == null)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO)); } // relates to not required. - if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) && - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (isSingleEntityRead() || filters.getRelatesTo() == null)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO)); } - list.addFilter(infoColFamilyList); - if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) || - (!isSingleEntityRead() && filters.getConfigFilters() != null)) || - (dataToRetrieve.getConfsToRetrieve() != null && - !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) { - FilterList filterCfg = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes()))); - if (dataToRetrieve.getConfsToRetrieve() != null && - !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) { - filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.CONFIG, dataToRetrieve.getConfsToRetrieve())); - } - list.addFilter(filterCfg); + } + + /** + * Updates filter list based on fields for confs and metrics to retrieve. + * + * @param listBasedOnFields filter list based on fields. + * @throws IOException if any problem occurs while updating filter list. + */ + private void updateFilterForConfsAndMetricsToRetrieve( + FilterList listBasedOnFields) throws IOException { + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); + // Please note that if confsToRetrieve is specified, we would have added + // CONFS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { + // Create a filter list for configs. + listBasedOnFields.addFilter(TimelineFilterUtils. + createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getConfsToRetrieve(), + EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG)); } - if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) || - (!isSingleEntityRead() && filters.getMetricFilters() != null)) || - (dataToRetrieve.getMetricsToRetrieve() != null && - !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { - FilterList filterMetrics = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.METRICS.getBytes()))); - if (dataToRetrieve.getMetricsToRetrieve() != null && - !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) { - filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.METRIC, dataToRetrieve.getMetricsToRetrieve())); - } - list.addFilter(filterMetrics); + + // Please note that if metricsToRetrieve is specified, we would have added + // METRICS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { + // Create a filter list for metrics. + listBasedOnFields.addFilter(TimelineFilterUtils. + createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getMetricsToRetrieve(), + EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() throws IOException { + if (!needCreateFilterListBasedOnFields()) { + // Fetch all the columns. No need of a filter. + return null; + } + FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE); + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { + // We can fetch only some of the columns from info family. + infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily()); + } else { + // Exclude column prefixes in info column family which are not required + // based on fields to retrieve. + excludeFieldsFromInfoColFamily(infoColFamilyList); } - return list; + listBasedOnFields.addFilter(infoColFamilyList); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + return listBasedOnFields; } + /** + * Looks up flow context from AppToFlow table. + * + * @param clusterId Cluster Id. + * @param appId App Id. + * @param hbaseConf HBase configuration. + * @param conn HBase Connection. + * @return flow context information. + * @throws IOException if any problem occurs while fetching flow information. + */ protected FlowContext lookupFlowContext(String clusterId, String appId, Configuration hbaseConf, Connection conn) throws IOException { byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); @@ -200,6 +418,9 @@ class GenericEntityReader extends TimelineEntityReader { } } + /** + * Encapsulates flow context information. + */ protected static class FlowContext { private final String userId; private final String flowName; @@ -222,6 +443,9 @@ class GenericEntityReader extends TimelineEntityReader { @Override protected void validateParams() { + Preconditions.checkNotNull(getContext(), "context shouldn't be null"); + Preconditions.checkNotNull( + getDataToRetrieve(), "data to retrieve shouldn't be null"); Preconditions.checkNotNull(getContext().getClusterId(), "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getAppId(), @@ -241,13 +465,19 @@ class GenericEntityReader extends TimelineEntityReader { // In reality all three should be null or neither should be null if (context.getFlowName() == null || context.getFlowRunId() == null || context.getUserId() == null) { + // Get flow context information from AppToFlow table. FlowContext flowContext = lookupFlowContext( context.getClusterId(), context.getAppId(), hbaseConf, conn); context.setFlowName(flowContext.flowName); context.setFlowRunId(flowContext.flowRunId); context.setUserId(flowContext.userId); } + // Add configs/metrics to fields to retrieve if confsToRetrieve and/or + // metricsToRetrieve are specified. getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); + if (!isSingleEntityRead()) { + createFiltersIfNull(); + } } @Override @@ -298,215 +528,84 @@ class GenericEntityReader extends TimelineEntityReader { // fetch created time Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); entity.setCreatedTime(createdTime.longValue()); - if (!isSingleEntityRead() && - (entity.getCreatedTime() < filters.getCreatedTimeBegin() || - entity.getCreatedTime() > filters.getCreatedTimeEnd())) { - return null; - } + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // fetch is related to entities + // fetch is related to entities and match isRelatedTo filter. If isRelatedTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // isRelatedTo are not set in HBase scan. boolean checkIsRelatedTo = - filters != null && filters.getIsRelatedTo() != null && - filters.getIsRelatedTo().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), filters.getIsRelatedTo())) { + !isSingleEntityRead() && filters.getIsRelatedTo() != null && + filters.getIsRelatedTo().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || + checkIsRelatedTo) { + TimelineStorageUtils.readRelationship( + entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { return null; } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, + Field.IS_RELATED_TO)) { entity.getIsRelatedToEntities().clear(); } } - // fetch relates to entities + // fetch relates to entities and match relatesTo filter. If relatesTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // relatesTo are not set in HBase scan. boolean checkRelatesTo = - filters != null && filters.getRelatesTo() != null && - filters.getRelatesTo().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), filters.getRelatesTo())) { + !isSingleEntityRead() && filters.getRelatesTo() != null && + filters.getRelatesTo().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) || + checkRelatesTo) { + TimelineStorageUtils.readRelationship( + entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { return null; } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { entity.getRelatesToEntities().clear(); } } - // fetch info - boolean checkInfo = filters != null && filters.getInfoFilters() != null && - filters.getInfoFilters().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); - if (checkInfo && - !TimelineStorageUtils.matchFilters( - entity.getInfo(), filters.getInfoFilters())) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } + // fetch info if fieldsToRetrieve contains INFO or ALL. + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { + TimelineStorageUtils.readKeyValuePairs( + entity, result, EntityColumnPrefix.INFO, false); } - // fetch configs - boolean checkConfigs = - filters != null && filters.getConfigFilters() != null && - filters.getConfigFilters().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), filters.getConfigFilters())) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } + // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) { + TimelineStorageUtils.readKeyValuePairs( + entity, result, EntityColumnPrefix.CONFIG, true); } - // fetch events + // fetch events and match event filters if they exist. If event filters do + // not match, entity would be dropped. We have to match filters locally + // as relevant HBase filters to filter out rows on the basis of events + // are not set in HBase scan. boolean checkEvents = - filters != null && filters.getEventFilters() != null && - filters.getEventFilters().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, false); - if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), filters.getEventFilters())) { + !isSingleEntityRead() && filters.getEventFilters() != null && + filters.getEventFilters().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) || + checkEvents) { + TimelineStorageUtils.readEvents(entity, result, EntityColumnPrefix.EVENT); + if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { return null; } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { entity.getEvents().clear(); } } - // fetch metrics - boolean checkMetrics = - filters != null && filters.getMetricFilters() != null && - filters.getMetricFilters().size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + // fetch metrics if fieldsToRetrieve contains METRICS or ALL. + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) { readMetrics(entity, result, EntityColumnPrefix.METRIC); - if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), filters.getMetricFilters())) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } } return entity; } - - /** - * Helper method for reading relationship. - * - * @param <T> Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isRelatedTo if true, means relationship is to be added to - * isRelatedTo, otherwise its added to relatesTo. - * @throws IOException if any problem is encountered while reading result. - */ - protected <T> void readRelationship( - TimelineEntity entity, Result result, ColumnPrefix<T> prefix, - boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map<String, Set<String>> - Map<String, Object> columns = prefix.readResults(result); - for (Map.Entry<String, Object> column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded( - column.getValue().toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - /** - * Helper method for reading key-value pairs for either info or config. - * - * @param <T> Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isConfig if true, means we are reading configs, otherwise info. - * @throws IOException if any problem is encountered while reading result. - */ - protected <T> void readKeyValuePairs( - TimelineEntity entity, Result result, ColumnPrefix<T> prefix, - boolean isConfig) throws IOException { - // info and configuration are of type Map<String, Object or String> - Map<String, Object> columns = prefix.readResults(result); - if (isConfig) { - for (Map.Entry<String, Object> column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } - - /** - * Read events from the entity table or the application table. The column name - * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted - * if there is no info associated with the event. - * - * @param entity entity to fill. - * @param result HBase Result. - * @param isApplication if true, event read is for application table, - * otherwise its being read for entity table. - * @throws IOException if any problem is encountered while reading result. - * - * See {@link EntityTable} and {@link ApplicationTable} for a more detailed - * schema description. - */ - protected void readEvents(TimelineEntity entity, Result result, - boolean isApplication) throws IOException { - Map<String, TimelineEvent> eventsMap = new HashMap<>(); - Map<?, Object> eventsResult = isApplication ? - ApplicationColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result) : - EntityColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); - for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) { - byte[][] karr = (byte[][])eventResult.getKey(); - // the column name is of the form "eventId=timestamp=infoKey" - if (karr.length == 3) { - String id = Bytes.toString(karr[0]); - long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); - String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); - TimelineEvent event = eventsMap.get(key); - if (event == null) { - event = new TimelineEvent(); - event.setId(id); - event.setTimestamp(ts); - eventsMap.put(key, event); - } - // handle empty info - String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); - if (infoKey != null) { - event.addInfo(infoKey, eventResult.getValue()); - } - } else { - LOG.warn("incorrectly formatted column name: it will be discarded"); - continue; - } - } - Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); - entity.addEvents(eventsSet); - } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8e8a03b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 281e901..4299de9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -107,11 +107,60 @@ public abstract class TimelineEntityReader { /** * Creates a {@link FilterList} based on fields, confs and metrics to * retrieve. This filter list will be set in Scan/Get objects to trim down - * results fetched from HBase back-end storage. + * results fetched from HBase back-end storage. This is called only for + * multiple entity reads. * * @return a {@link FilterList} object. + * @throws IOException if any problem occurs while creating filter list. */ - protected abstract FilterList constructFilterListBasedOnFields(); + protected abstract FilterList constructFilterListBasedOnFields() + throws IOException; + + /** + * Creates a {@link FilterList} based on info, config and metric filters. This + * filter list will be set in HBase Get to trim down results fetched from + * HBase back-end storage. + * + * @return a {@link FilterList} object. + * @throws IOException if any problem occurs while creating filter list. + */ + protected abstract FilterList constructFilterListBasedOnFilters() + throws IOException; + + /** + * Combines filter lists created based on fields and based on filters. + * + * @return a {@link FilterList} object if it can be constructed. Returns null, + * if filter list cannot be created either on the basis of filters or on the + * basis of fields. + * @throws IOException if any problem occurs while creating filter list. + */ + private FilterList createFilterList() throws IOException { + FilterList listBasedOnFilters = constructFilterListBasedOnFilters(); + boolean hasListBasedOnFilters = listBasedOnFilters != null && + !listBasedOnFilters.getFilters().isEmpty(); + FilterList listBasedOnFields = constructFilterListBasedOnFields(); + boolean hasListBasedOnFields = listBasedOnFields != null && + !listBasedOnFields.getFilters().isEmpty(); + // If filter lists based on both filters and fields can be created, + // combine them in a new filter list and return it. + // If either one of them has been created, return that filter list. + // Return null, if none of the filter lists can be created. This indicates + // that no filter list needs to be added to HBase Scan as filters are not + // specified for the query or only the default view of entity needs to be + // returned. + if (hasListBasedOnFilters && hasListBasedOnFields) { + FilterList list = new FilterList(); + list.addFilter(listBasedOnFilters); + list.addFilter(listBasedOnFields); + return list; + } else if (hasListBasedOnFilters) { + return listBasedOnFilters; + } else if (hasListBasedOnFields) { + return listBasedOnFields; + } + return null; + } protected TimelineReaderContext getContext() { return context; @@ -126,6 +175,16 @@ public abstract class TimelineEntityReader { } /** + * Create a {@link TimelineEntityFilters} object with default values for + * filters. + */ + protected void createFiltersIfNull() { + if (filters == null) { + filters = new TimelineEntityFilters(); + } + } + + /** * Reads and deserializes a single timeline entity from the HBase storage. * * @param hbaseConf HBase Configuration. @@ -140,6 +199,9 @@ public abstract class TimelineEntityReader { augmentParams(hbaseConf, conn); FilterList filterList = constructFilterListBasedOnFields(); + if (LOG.isDebugEnabled() && filterList != null) { + LOG.debug("FilterList created for get is - " + filterList); + } Result result = getResult(hbaseConf, conn, filterList); if (result == null || result.isEmpty()) { // Could not find a matching row. @@ -166,7 +228,10 @@ public abstract class TimelineEntityReader { augmentParams(hbaseConf, conn); NavigableSet<TimelineEntity> entities = new TreeSet<>(); - FilterList filterList = constructFilterListBasedOnFields(); + FilterList filterList = createFilterList(); + if (LOG.isDebugEnabled() && filterList != null) { + LOG.debug("FilterList created for scan is - " + filterList); + } ResultScanner results = getResults(hbaseConf, conn, filterList); try { for (Result result : results) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8e8a03b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java index b6e23a9..2bd2830 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java @@ -221,7 +221,7 @@ public class TestTimelineReaderWebServices { assertTrue("UID should be present", entity.getInfo().containsKey(TimelineReaderManager.UID_KEY)); // Includes UID. - assertEquals(2, entity.getInfo().size()); + assertEquals(3, entity.getInfo().size()); // No events will be returned as events are not part of fields. assertEquals(0, entity.getEvents().size()); } finally { @@ -247,7 +247,7 @@ public class TestTimelineReaderWebServices { assertTrue("UID should be present", entity.getInfo().containsKey(TimelineReaderManager.UID_KEY)); // Includes UID. - assertEquals(2, entity.getInfo().size()); + assertEquals(3, entity.getInfo().size()); assertEquals(2, entity.getEvents().size()); } finally { client.destroy(); @@ -443,10 +443,8 @@ public class TestTimelineReaderWebServices { resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); assertNotNull(entities); - assertEquals(2, entities.size()); - assertTrue("Entities with id_1 and id_3 should have been present" + - " in response.", - entities.contains(newEntity("app", "id_1")) && + assertEquals(1, entities.size()); + assertTrue("Entity with id_3 should have been present in response.", entities.contains(newEntity("app", "id_3"))); } finally { client.destroy(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8e8a03b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java index a8a2ff8..23d64e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -40,6 +41,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.AfterClass; @@ -112,6 +120,7 @@ public class TestFileSystemTimelineReaderImpl { entity11.setCreatedTime(1425016502000L); Map<String, Object> info1 = new HashMap<String, Object>(); info1.put("info1", "val1"); + info1.put("info2", "val5"); entity11.addInfo(info1); TimelineEvent event = new TimelineEvent(); event.setId("event_1"); @@ -121,7 +130,7 @@ public class TestFileSystemTimelineReaderImpl { TimelineMetric metric1 = new TimelineMetric(); metric1.setId("metric1"); metric1.setType(TimelineMetric.Type.SINGLE_VALUE); - metric1.addValue(1425016502006L, 113.2F); + metric1.addValue(1425016502006L, 113); metrics.add(metric1); TimelineMetric metric2 = new TimelineMetric(); metric2.setId("metric2"); @@ -130,7 +139,7 @@ public class TestFileSystemTimelineReaderImpl { metrics.add(metric2); entity11.setMetrics(metrics); Map<String,String> configs = new HashMap<String, String>(); - configs.put("config_1", "123"); + configs.put("config_1", "127"); entity11.setConfigs(configs); entity11.addRelatesToEntity("flow", "flow1"); entity11.addIsRelatedToEntity("type1", "tid1_1"); @@ -171,7 +180,7 @@ public class TestFileSystemTimelineReaderImpl { info1.put("info2", 4); entity2.addInfo(info2); Map<String,String> configs2 = new HashMap<String, String>(); - configs2.put("config_1", "123"); + configs2.put("config_1", "129"); configs2.put("config_3", "def"); entity2.setConfigs(configs2); TimelineEvent event2 = new TimelineEvent(); @@ -182,7 +191,7 @@ public class TestFileSystemTimelineReaderImpl { TimelineMetric metric21 = new TimelineMetric(); metric21.setId("metric1"); metric21.setType(TimelineMetric.Type.SINGLE_VALUE); - metric21.addValue(1425016501006L, 123.2F); + metric21.addValue(1425016501006L, 300); metrics2.add(metric21); TimelineMetric metric22 = new TimelineMetric(); metric22.setId("metric2"); @@ -205,6 +214,7 @@ public class TestFileSystemTimelineReaderImpl { entity3.setCreatedTime(1425016501050L); Map<String, Object> info3 = new HashMap<String, Object>(); info3.put("info2", 3.5); + info3.put("info4", 20); entity3.addInfo(info3); Map<String,String> configs3 = new HashMap<String, String>(); configs3.put("config_1", "123"); @@ -222,7 +232,7 @@ public class TestFileSystemTimelineReaderImpl { TimelineMetric metric31 = new TimelineMetric(); metric31.setId("metric1"); metric31.setType(TimelineMetric.Type.SINGLE_VALUE); - metric31.addValue(1425016501006L, 124.8F); + metric31.addValue(1425016501006L, 124); metrics3.add(metric31); TimelineMetric metric32 = new TimelineMetric(); metric32.setId("metric2"); @@ -317,7 +327,7 @@ public class TestFileSystemTimelineReaderImpl { Assert.assertEquals(1425016502000L, result.getCreatedTime()); Assert.assertEquals(3, result.getConfigs().size()); Assert.assertEquals(3, result.getMetrics().size()); - Assert.assertEquals(1, result.getInfo().size()); + Assert.assertEquals(2, result.getInfo().size()); // No events will be returned Assert.assertEquals(0, result.getEvents().size()); } @@ -344,8 +354,8 @@ public class TestFileSystemTimelineReaderImpl { Set<TimelineEntity> result = reader.getEntities( new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", "app", null), new TimelineEntityFilters(), - new TimelineDataToRetrieve()); - // All 3 entities will be returned + new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL))); + // All 4 entities will be returned Assert.assertEquals(4, result.size()); } @@ -425,12 +435,13 @@ public class TestFileSystemTimelineReaderImpl { @Test public void testGetFilteredEntities() throws Exception { // Get entities based on info filters. - Map<String, Object> infoFilters = new HashMap<String, Object>(); - infoFilters.put("info2", 3.5); + TimelineFilterList infoFilterList = new TimelineFilterList(); + infoFilterList.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5)); Set<TimelineEntity> result = reader.getEntities( new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", "app", null), - new TimelineEntityFilters(null, null, null, null, null, infoFilters, + new TimelineEntityFilters(null, null, null, null, null, infoFilterList, null, null, null), new TimelineDataToRetrieve()); Assert.assertEquals(1, result.size()); @@ -442,26 +453,30 @@ public class TestFileSystemTimelineReaderImpl { } // Get entities based on config filters. - Map<String, String> configFilters = new HashMap<String, String>(); - configFilters.put("config_1", "123"); - configFilters.put("config_3", "abc"); + TimelineFilterList confFilterList = new TimelineFilterList(); + confFilterList.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "123")); + confFilterList.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc")); result = reader.getEntities( new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", "app", null), new TimelineEntityFilters(null, null, null, null, null, null, - configFilters, null, null), + confFilterList, null, null), new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); + Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { + if (!entity.getId().equals("id_3")) { Assert.fail("Incorrect filtering based on config filters"); } } // Get entities based on event filters. - Set<String> eventFilters = new HashSet<String>(); - eventFilters.add("event_2"); - eventFilters.add("event_4"); + TimelineFilterList eventFilters = new TimelineFilterList(); + eventFilters.addFilter( + new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_2")); + eventFilters.addFilter( + new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_4")); result = reader.getEntities( new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", "app", null), @@ -476,13 +491,14 @@ public class TestFileSystemTimelineReaderImpl { } // Get entities based on metric filters. - Set<String> metricFilters = new HashSet<String>(); - metricFilters.add("metric3"); + TimelineFilterList metricFilterList = new TimelineFilterList(); + metricFilterList.addFilter(new TimelineCompareFilter( + TimelineCompareOp.GREATER_OR_EQUAL, "metric3", 0L)); result = reader.getEntities( new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", "app", null), new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilters, null), + metricFilterList, null), new TimelineDataToRetrieve()); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_2 should be returned. @@ -491,15 +507,266 @@ public class TestFileSystemTimelineReaderImpl { Assert.fail("Incorrect filtering based on metric filters"); } } - } + + // Get entities based on complex config filters. + TimelineFilterList list1 = new TimelineFilterList(); + list1.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "129")); + list1.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "def")); + TimelineFilterList list2 = new TimelineFilterList(); + list2.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23")); + list2.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc")); + TimelineFilterList confFilterList1 = + new TimelineFilterList(Operator.OR, list1, list2); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList1, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + TimelineFilterList list3 = new TimelineFilterList(); + list3.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.NOT_EQUAL, "config_1", "123")); + list3.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.NOT_EQUAL, "config_3", "abc")); + TimelineFilterList list4 = new TimelineFilterList(); + list4.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23")); + TimelineFilterList confFilterList2 = + new TimelineFilterList(Operator.OR, list3, list4); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList2, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + TimelineFilterList confFilterList3 = new TimelineFilterList(); + confFilterList3.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.NOT_EQUAL, "config_1", "127")); + confFilterList3.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.NOT_EQUAL, "config_3", "abc")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList3, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for(TimelineEntity entity : result) { + if (!entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + TimelineFilterList confFilterList4 = new TimelineFilterList(); + confFilterList4.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.EQUAL, "config_dummy", "dummy")); + confFilterList4.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.EQUAL, "config_3", "def")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList4, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(0, result.size()); + + TimelineFilterList confFilterList5 = new TimelineFilterList(Operator.OR); + confFilterList5.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.EQUAL, "config_dummy", "dummy")); + confFilterList5.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.EQUAL, "config_3", "def")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList5, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + // Get entities based on complex metric filters. + TimelineFilterList list6 = new TimelineFilterList(); + list6.addFilter(new TimelineCompareFilter( + TimelineCompareOp.GREATER_THAN, "metric1", 200)); + list6.addFilter(new TimelineCompareFilter( + TimelineCompareOp.EQUAL, "metric3", 23)); + TimelineFilterList list7 = new TimelineFilterList(); + list7.addFilter(new TimelineCompareFilter( + TimelineCompareOp.GREATER_OR_EQUAL, "metric2", 74)); + TimelineFilterList metricFilterList1 = + new TimelineFilterList(Operator.OR, list6, list7); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList1, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + // Two entities with IDs' id_2 and id_3 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_2") && !entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + + TimelineFilterList metricFilterList2 = new TimelineFilterList(); + metricFilterList2.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_THAN, "metric2", 70)); + metricFilterList2.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList2, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + + TimelineFilterList metricFilterList3 = new TimelineFilterList(); + metricFilterList3.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_THAN, "dummy_metric", 30)); + metricFilterList3.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList3, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(0, result.size()); + + TimelineFilterList metricFilterList4 = new TimelineFilterList(Operator.OR); + metricFilterList4.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_THAN, "dummy_metric", 30)); + metricFilterList4.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList4, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + + TimelineFilterList metricFilterList5 = + new TimelineFilterList(new TimelineCompareFilter( + TimelineCompareOp.NOT_EQUAL, "metric2", 74)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList5, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + + TimelineFilterList infoFilterList1 = new TimelineFilterList(); + infoFilterList1.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5)); + infoFilterList1.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.NOT_EQUAL, "info4", 20)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, infoFilterList1, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(0, result.size()); + + TimelineFilterList infoFilterList2 = new TimelineFilterList(Operator.OR); + infoFilterList2.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5)); + infoFilterList2.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info1", "val1")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, infoFilterList2, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on info filters"); + } + } + + TimelineFilterList infoFilterList3 = new TimelineFilterList(); + infoFilterList3.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1)); + infoFilterList3.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, infoFilterList3, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(0, result.size()); + + TimelineFilterList infoFilterList4 = new TimelineFilterList(Operator.OR); + infoFilterList4.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1)); + infoFilterList4.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, infoFilterList4, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1")) { + Assert.fail("Incorrect filtering based on info filters"); + } + } + } @Test public void testGetEntitiesByRelations() throws Exception { // Get entities based on relatesTo. - Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>(); - Set<String> relatesToIds = new HashSet<String>(); - relatesToIds.add("flow1"); - relatesTo.put("flow", relatesToIds); + TimelineFilterList relatesTo = new TimelineFilterList(Operator.OR); + Set<Object> relatesToIds = + new HashSet<Object>(Arrays.asList((Object)"flow1")); + relatesTo.addFilter(new TimelineKeyValuesFilter( + TimelineCompareOp.EQUAL, "flow", relatesToIds)); Set<TimelineEntity> result = reader.getEntities( new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", "app", null), @@ -515,10 +782,11 @@ public class TestFileSystemTimelineReaderImpl { } // Get entities based on isRelatedTo. - Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>(); - Set<String> isRelatedToIds = new HashSet<String>(); - isRelatedToIds.add("tid1_2"); - isRelatedTo.put("type1", isRelatedToIds); + TimelineFilterList isRelatedTo = new TimelineFilterList(Operator.OR); + Set<Object> isRelatedToIds = + new HashSet<Object>(Arrays.asList((Object)"tid1_2")); + isRelatedTo.addFilter(new TimelineKeyValuesFilter( + TimelineCompareOp.EQUAL, "type1", isRelatedToIds)); result = reader.getEntities( new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", "app", null),