http://git-wip-us.apache.org/repos/asf/hadoop/blob/10a4f8ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java index 0eeb195..ccb33b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java @@ -20,17 +20,14 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; /** ATSv2 reader interface. */ @Private @@ -38,11 +35,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefi public interface TimelineReader extends Service { /** - * Default limit for {@link #getEntities}. - */ - long DEFAULT_LIMIT = 100; - - /** * Possible fields to retrieve for {@link #getEntities} and * {@link #getEntity}. */ @@ -57,55 +49,61 @@ public interface TimelineReader extends Service { } /** - * <p>The API to fetch the single entity given the entity identifier in the - * scope of the given context.</p> - * - * @param userId - * Context user Id(optional). - * @param clusterId - * Context cluster Id(mandatory). - * @param flowName - * Context flow Id (optional). - * @param flowRunId - * Context flow run Id (optional). - * @param appId - * Context app Id (mandatory) - * @param entityType - * Entity type (mandatory) - * @param entityId - * Entity Id (mandatory) - * @param confsToRetrieve - * Used for deciding which configs to return in response. This is - * represented as a {@link TimelineFilterList} object containing - * {@link TimelinePrefixFilter} objects. These can either be exact config - * keys' or prefixes which are then compared against config keys' to decide - * configs to return in response. - * @param metricsToRetrieve - * Used for deciding which metrics to return in response. This is - * represented as a {@link TimelineFilterList} object containing - * {@link TimelinePrefixFilter} objects. These can either be exact metric - * ids' or prefixes which are then compared against metric ids' to decide - * metrics to return in response. - * @param fieldsToRetrieve - * Specifies which fields of the entity object to retrieve(optional), see - * {@link Field}. If null, retrieves 4 fields namely entity id, - * entity type and entity created time. All fields will be returned if - * {@link Field#ALL} is specified. - * @return a {@link TimelineEntity} instance or null. The entity will - * contain the metadata plus the given fields to retrieve. + * <p>The API to fetch the single entity given the identifier(depending on + * the entity type) in the scope of the given context.</p> + * @param context Context which defines the scope in which query has to be + * made. Use getters of {@link TimelineReaderContext} to fetch context + * fields. Context contains the following :<br> + * <ul> + * <li><b>entityType</b> - Entity type(mandatory).</li> + * <li><b>clusterId</b> - Identifies the cluster(mandatory).</li> + * <li><b>userId</b> - Identifies the user.</li> + * <li><b>flowName</b> - Context flow name.</li> + * <li><b>flowRunId</b> - Context flow run id.</li> + * <li><b>appId</b> - Context app id.</li> + * <li><b>entityId</b> - Entity id.</li> + * </ul> + * Fields in context which are mandatory depends on entity type. Entity + * type is always mandatory. In addition to entity type, below is the list + * of context fields which are mandatory, based on entity type.<br> + * <ul> + * <li>If entity type is YARN_FLOW_RUN (i.e. query to fetch a specific flow + * run), clusterId, userId, flowName and flowRunId are mandatory.</li> + * <li>If entity type is YARN_APPLICATION (i.e. query to fetch a specific + * app), query is within the scope of clusterId, userId, flowName, + * flowRunId and appId. But out of this, only clusterId and appId are + * mandatory. If only clusterId and appId are supplied, backend storage + * must fetch the flow context information i.e. userId, flowName and + * flowRunId first and based on that, fetch the app. If flow context + * information is also given, app can be directly fetched. + * </li> + * <li>For other entity types (i.e. query to fetch generic entity), query + * is within the scope of clusterId, userId, flowName, flowRunId, appId, + * entityType and entityId. But out of this, only clusterId, appId, + * entityType and entityId are mandatory. If flow context information is + * not supplied, backend storage must fetch the flow context information + * i.e. userId, flowName and flowRunId first and based on that, fetch the + * entity. If flow context information is also given, entity can be + * directly queried. + * </li> + * </ul> + * @param dataToRetrieve Specifies which data to retrieve for the entity. Use + * getters of TimelineDataToRetrieve class to fetch dataToRetrieve + * fields. All the dataToRetrieve fields are optional. Refer to + * {@link TimelineDataToRetrieve} for details. + * @return A <cite>TimelineEntity</cite> instance or null. The entity will + * contain the metadata plus the given fields to retrieve.<br> * If entityType is YARN_FLOW_RUN, entity returned is of type - * {@link FlowRunEntity}. + * <cite>FlowRunEntity</cite>.<br> * For all other entity types, entity returned is of type - * {@link TimelineEntity}. + * <cite>TimelineEntity</cite>. * @throws IOException */ - TimelineEntity getEntity(String userId, String clusterId, String flowName, - Long flowRunId, String appId, String entityType, String entityId, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve) throws IOException; + TimelineEntity getEntity(TimelineReaderContext context, + TimelineDataToRetrieve dataToRetrieve) throws IOException; /** - * <p>The API to search for a set of entities of the given the entity type in + * <p>The API to search for a set of entities of the given entity type in * the scope of the given context which matches the given predicates. The * predicates include the created time window, limit to number of entities to * be returned, and the entities can be filtered by checking whether they @@ -115,84 +113,66 @@ public interface TimelineReader extends Service { * related to other entities. For those parameters which have multiple * entries, the qualified entity needs to meet all or them.</p> * - * @param userId - * Context user Id(optional). - * @param clusterId - * Context cluster Id(mandatory). - * @param flowName - * Context flow Id (optional). - * @param flowRunId - * Context flow run Id (optional). - * @param appId - * Context app Id (mandatory) - * @param entityType - * Entity type (mandatory) - * @param limit - * A limit on the number of entities to return (optional). If null or <=0, - * defaults to {@link #DEFAULT_LIMIT}. - * @param createdTimeBegin - * Matched entities should not be created before this timestamp (optional). - * If null or <=0, defaults to 0. - * @param createdTimeEnd - * Matched entities should not be created after this timestamp (optional). - * If null or <=0, defaults to {@link Long#MAX_VALUE}. - * @param relatesTo - * Matched entities should relate to given entities (optional). - * @param isRelatedTo - * Matched entities should be related to given entities (optional). - * @param infoFilters - * Matched entities should have exact matches to the given info represented - * as key-value pairs (optional). If null or empty, the filter is not - * applied. - * @param configFilters - * Matched entities should have exact matches to the given configs - * represented as key-value pairs (optional). If null or empty, the filter - * is not applied. - * @param metricFilters - * Matched entities should contain the given metrics (optional). If null - * or empty, the filter is not applied. - * @param eventFilters - * Matched entities should contain the given events (optional). If null - * or empty, the filter is not applied. - * @param confsToRetrieve - * Used for deciding which configs to return in response. This is - * represented as a {@link TimelineFilterList} object containing - * {@link TimelinePrefixFilter} objects. These can either be exact config - * keys' or prefixes which are then compared against config keys' to decide - * configs(inside entities) to return in response. This should not be - * confused with configFilters which is used to decide which entities to - * return instead. - * @param metricsToRetrieve - * Used for deciding which metrics to return in response. This is - * represented as a {@link TimelineFilterList} object containing - * {@link TimelinePrefixFilter} objects. These can either be exact metric - * ids' or prefixes which are then compared against metric ids' to decide - * metrics(inside entities) to return in response. This should not be - * confused with metricFilters which is used to decide which entities to - * return instead. - * @param fieldsToRetrieve - * Specifies which fields of the entity object to retrieve(optional), see - * {@link Field}. If null, retrieves 4 fields namely entity id, - * entity type and entity created time. All fields will be returned if - * {@link Field#ALL} is specified. - * @return A set of {@link TimelineEntity} instances of the given entity type - * in the given context scope which matches the given predicates + * @param context Context which defines the scope in which query has to be + * made. Use getters of {@link TimelineReaderContext} to fetch context + * fields. Context contains the following :<br> + * <ul> + * <li><b>entityType</b> - Entity type(mandatory).</li> + * <li><b>clusterId</b> - Identifies the cluster(mandatory).</li> + * <li><b>userId</b> - Identifies the user.</li> + * <li><b>flowName</b> - Context flow name.</li> + * <li><b>flowRunId</b> - Context flow run id.</li> + * <li><b>appId</b> - Context app id.</li> + * </ul> + * Although entityId is also part of context, it has no meaning for + * getEntities.<br> + * Fields in context which are mandatory depends on entity type. Entity + * type is always mandatory. In addition to entity type, below is the list + * of context fields which are mandatory, based on entity type.<br> + * <ul> + * <li>If entity type is YARN_FLOW_ACTIVITY (i.e. query to fetch flows), + * only clusterId is mandatory. + * </li> + * <li>If entity type is YARN_FLOW_RUN (i.e. query to fetch flow runs), + * clusterId, userId and flowName are mandatory.</li> + * <li>If entity type is YARN_APPLICATION (i.e. query to fetch apps), we + * can either get all apps within the context of flow name or within the + * context of flow run. If apps are queried within the scope of flow name, + * clusterId, userId and flowName are supplied. If they are queried within + * the scope of flow run, clusterId, userId, flowName and flowRunId are + * supplied.</li> + * <li>For other entity types (i.e. query to fetch generic entities), query + * is within the scope of clusterId, userId, flowName, flowRunId, appId and + * entityType. But out of this, only clusterId, appId and entityType are + * mandatory. If flow context information is not supplied, backend storage + * must fetch the flow context information i.e. userId, flowName and + * flowRunId first and based on that, fetch the entities. If flow context + * information is also given, entities can be directly queried. + * </li> + * </ul> + * @param filters Specifies filters which restrict the number of entities + * to return. Use getters of TimelineEntityFilters class to fetch + * various filters. All the filters are optional. Refer to + * {@link TimelineEntityFilters} for details. + * @param dataToRetrieve Specifies which data to retrieve for each entity. Use + * getters of TimelineDataToRetrieve class to fetch dataToRetrieve + * fields. All the dataToRetrieve fields are optional. Refer to + * {@link TimelineDataToRetrieve} for details. + * @return A set of <cite>TimelineEntity</cite> instances of the given entity + * type in the given context scope which matches the given predicates * ordered by created time, descending. Each entity will only contain the * metadata(id, type and created time) plus the given fields to retrieve. + * <br> * If entityType is YARN_FLOW_ACTIVITY, entities returned are of type - * {@link FlowActivityEntity}. + * <cite>FlowActivityEntity</cite>.<br> * If entityType is YARN_FLOW_RUN, entities returned are of type - * {@link FlowRunEntity}. + * <cite>FlowRunEntity</cite>.<br> * For all other entity types, entities returned are of type - * {@link TimelineEntity}. + * <cite>TimelineEntity</cite>. * @throws IOException */ - Set<TimelineEntity> getEntities(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve) throws IOException; + Set<TimelineEntity> getEntities( + TimelineReaderContext context, + TimelineEntityFilters filters, + TimelineDataToRetrieve dataToRetrieve) throws IOException; } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10a4f8ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 4d61076..387f7d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; import java.util.EnumSet; -import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -38,9 +36,10 @@ import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; @@ -60,26 +59,14 @@ class ApplicationEntityReader extends GenericEntityReader { private static final ApplicationTable APPLICATION_TABLE = new ApplicationTable(); - public ApplicationEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, relatesTo, isRelatedTo, infoFilters, - configFilters, metricFilters, eventFilters, confsToRetrieve, - metricsToRetrieve, fieldsToRetrieve, true); + public ApplicationEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve, true); } - public ApplicationEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); + public ApplicationEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); } /** @@ -92,12 +79,13 @@ class ApplicationEntityReader extends GenericEntityReader { @Override protected FilterList constructFilterListBasedOnFields() { FilterList list = new FilterList(Operator.MUST_PASS_ONE); + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Fetch all the columns. - if (fieldsToRetrieve.contains(Field.ALL) && - (confsToRetrieve == null || - confsToRetrieve.getFilterList().isEmpty()) && - (metricsToRetrieve == null || - metricsToRetrieve.getFilterList().isEmpty())) { + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (dataToRetrieve.getConfsToRetrieve() == null || + dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) && + (dataToRetrieve.getMetricsToRetrieve() == null || + dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { return list; } FilterList infoColFamilyList = new FilterList(); @@ -107,61 +95,70 @@ class ApplicationEntityReader extends GenericEntityReader { new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); infoColFamilyList.addFilter(infoColumnFamily); // Events not required. - if (!fieldsToRetrieve.contains(Field.EVENTS) && - !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + TimelineEntityFilters filters = getFilters(); + if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (singleEntityRead || filters.getEventFilters() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( ApplicationColumnPrefix.EVENT.getColumnPrefixBytes("")))); } // info not required. - if (!fieldsToRetrieve.contains(Field.INFO) && - !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (singleEntityRead || filters.getInfoFilters() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( ApplicationColumnPrefix.INFO.getColumnPrefixBytes("")))); } // is releated to not required. - if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && - !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (singleEntityRead || filters.getIsRelatedTo() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); } // relates to not required. - if (!fieldsToRetrieve.contains(Field.RELATES_TO) && - !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (singleEntityRead || filters.getRelatesTo() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); } list.addFilter(infoColFamilyList); - if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || - (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty())) { + if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) || + (!singleEntityRead && filters.getConfigFilters() != null)) || + (dataToRetrieve.getConfsToRetrieve() != null && + !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) { FilterList filterCfg = new FilterList(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes()))); - if (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty()) { + if (dataToRetrieve.getConfsToRetrieve() != null && + !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) { filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.CONFIG, confsToRetrieve)); + ApplicationColumnPrefix.CONFIG, + dataToRetrieve.getConfsToRetrieve())); } list.addFilter(filterCfg); } - if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || - (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty())) { + if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) || + (!singleEntityRead && filters.getMetricFilters() != null)) || + (dataToRetrieve.getMetricsToRetrieve() != null && + !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { FilterList filterMetrics = new FilterList(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes()))); - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { + if (dataToRetrieve.getMetricsToRetrieve() != null && + !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) { filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.METRIC, metricsToRetrieve)); + ApplicationColumnPrefix.METRIC, + dataToRetrieve.getMetricsToRetrieve())); } list.addFilter(filterMetrics); } @@ -171,9 +168,10 @@ class ApplicationEntityReader extends GenericEntityReader { @Override protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { + TimelineReaderContext context = getContext(); byte[] rowKey = - ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, - appId); + ApplicationRowKey.getRowKey(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId(), context.getAppId()); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -184,66 +182,54 @@ class ApplicationEntityReader extends GenericEntityReader { @Override protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getEntityType(), + "entityType shouldn't be null"); if (singleEntityRead) { - Preconditions.checkNotNull(appId, "appId shouldn't be null"); + Preconditions.checkNotNull(getContext().getAppId(), + "appId shouldn't be null"); } else { - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); + Preconditions.checkNotNull(getContext().getUserId(), + "userId shouldn't be null"); + Preconditions.checkNotNull(getContext().getFlowName(), + "flowName shouldn't be null"); } } @Override protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { + TimelineReaderContext context = getContext(); if (singleEntityRead) { - if (flowName == null || flowRunId == null || userId == null) { - FlowContext context = - lookupFlowContext(clusterId, appId, hbaseConf, conn); - flowName = context.flowName; - flowRunId = context.flowRunId; - userId = context.userId; - } - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (!fieldsToRetrieve.contains(Field.CONFIGS) && - confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.CONFIGS); - } - if (!fieldsToRetrieve.contains(Field.METRICS) && - metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.METRICS); - } - if (!singleEntityRead) { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; + if (context.getFlowName() == null || context.getFlowRunId() == null || + context.getUserId() == null) { + FlowContext flowContext = lookupFlowContext( + context.getClusterId(), context.getAppId(), hbaseConf, conn); + context.setFlowName(flowContext.flowName); + context.setFlowRunId(flowContext.flowRunId); + context.setUserId(flowContext.userId); } } + getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); } @Override protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); - if (flowRunId != null) { + TimelineReaderContext context = getContext(); + if (context.getFlowRunId() != null) { scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(clusterId, userId, flowName, flowRunId)); + getRowKeyPrefix(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId())); } else { scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(clusterId, userId, flowName)); + getRowKeyPrefix(context.getClusterId(), context.getUserId(), + context.getFlowName())); } FilterList newList = new FilterList(); - newList.addFilter(new PageFilter(limit)); + newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { newList.addFilter(filterList); } @@ -261,23 +247,27 @@ class ApplicationEntityReader extends GenericEntityReader { String entityId = ApplicationColumn.ID.readResult(result).toString(); entity.setId(entityId); + TimelineEntityFilters filters = getFilters(); // fetch created time Number createdTime = (Number)ApplicationColumn.CREATED_TIME.readResult(result); entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { + if (!singleEntityRead && + (entity.getCreatedTime() < filters.getCreatedTimeBegin() || + entity.getCreatedTime() > filters.getCreatedTimeEnd())) { return null; } - + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + boolean checkIsRelatedTo = + filters != null && filters.getIsRelatedTo() != null && + filters.getIsRelatedTo().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true); if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { + entity.getIsRelatedToEntities(), filters.getIsRelatedTo())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -287,13 +277,15 @@ class ApplicationEntityReader extends GenericEntityReader { } // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + boolean checkRelatesTo = + filters != null && filters.getRelatesTo() != null && + filters.getRelatesTo().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, false); if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { + entity.getRelatesToEntities(), filters.getRelatesTo())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -303,12 +295,14 @@ class ApplicationEntityReader extends GenericEntityReader { } // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + boolean checkInfo = filters != null && filters.getInfoFilters() != null && + filters.getInfoFilters().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.INFO) || checkInfo) { readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); if (checkInfo && - !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { + !TimelineStorageUtils.matchFilters( + entity.getInfo(), filters.getInfoFilters())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -318,12 +312,14 @@ class ApplicationEntityReader extends GenericEntityReader { } // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; + boolean checkConfigs = + filters != null && filters.getConfigFilters() != null && + filters.getConfigFilters().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), configFilters)) { + entity.getConfigs(), filters.getConfigFilters())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -333,12 +329,14 @@ class ApplicationEntityReader extends GenericEntityReader { } // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + boolean checkEvents = + filters != null && filters.getEventFilters() != null && + filters.getEventFilters().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { readEvents(entity, result, true); if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { + entity.getEvents(), filters.getEventFilters())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -348,12 +346,14 @@ class ApplicationEntityReader extends GenericEntityReader { } // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + boolean checkMetrics = + filters != null && filters.getMetricFilters() != null && + filters.getMetricFilters().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { readMetrics(entity, result, ApplicationColumnPrefix.METRIC); if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { + entity.getMetrics(), filters.getMetricFilters())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && http://git-wip-us.apache.org/repos/asf/hadoop/blob/10a4f8ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index 048f608..96350da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -18,9 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; -import java.util.EnumSet; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -32,8 +30,9 @@ import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; @@ -49,24 +48,14 @@ class FlowActivityEntityReader extends TimelineEntityReader { private static final FlowActivityTable FLOW_ACTIVITY_TABLE = new FlowActivityTable(); - public FlowActivityEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, relatesTo, isRelatedTo, infoFilters, - configFilters, metricFilters, eventFilters, null, null, - fieldsToRetrieve, true); + public FlowActivityEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve, true); } - public FlowActivityEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - null, null, fieldsToRetrieve); + public FlowActivityEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); } /** @@ -79,21 +68,13 @@ class FlowActivityEntityReader extends TimelineEntityReader { @Override protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); } @Override protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } } @Override @@ -112,20 +93,24 @@ class FlowActivityEntityReader extends TimelineEntityReader { protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); - if (createdTimeBegin == DEFAULT_BEGIN_TIME && - createdTimeEnd == DEFAULT_END_TIME) { + String clusterId = getContext().getClusterId(); + if (getFilters().getCreatedTimeBegin() == 0L && + getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { + // All records have to be chosen. scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); } else { scan.setStartRow( - FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd)); + FlowActivityRowKey.getRowKeyPrefix(clusterId, + getFilters().getCreatedTimeEnd())); scan.setStopRow( FlowActivityRowKey.getRowKeyPrefix(clusterId, - (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1)))); + (getFilters().getCreatedTimeBegin() <= 0 ? 0 : + (getFilters().getCreatedTimeBegin() - 1)))); } // use the page filter to limit the result to the page size // the scanner may still return more than the limit; therefore we need to // read the right number as we iterate - scan.setFilter(new PageFilter(limit)); + scan.setFilter(new PageFilter(getFilters().getLimit())); return table.getResultScanner(hbaseConf, conn, scan); } @@ -137,8 +122,8 @@ class FlowActivityEntityReader extends TimelineEntityReader { String user = rowKey.getUserId(); String flowName = rowKey.getFlowName(); - FlowActivityEntity flowActivity = - new FlowActivityEntity(clusterId, time, user, flowName); + FlowActivityEntity flowActivity = new FlowActivityEntity( + getContext().getClusterId(), time, user, flowName); // set the id flowActivity.setId(flowActivity.getId()); // get the list of run ids along with the version that are associated with http://git-wip-us.apache.org/repos/asf/hadoop/blob/10a4f8ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 4f50b02..2d1c41c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -18,9 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -38,9 +35,10 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; @@ -58,26 +56,14 @@ import com.google.common.base.Preconditions; class FlowRunEntityReader extends TimelineEntityReader { private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); - public FlowRunEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, relatesTo, isRelatedTo, infoFilters, - configFilters, metricFilters, eventFilters, null, metricsToRetrieve, - fieldsToRetrieve, true); + public FlowRunEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve, true); } - public FlowRunEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - null, metricsToRetrieve, fieldsToRetrieve); + public FlowRunEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); } /** @@ -90,35 +76,21 @@ class FlowRunEntityReader extends TimelineEntityReader { @Override protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getUserId(), + "userId shouldn't be null"); + Preconditions.checkNotNull(getContext().getFlowName(), + "flowName shouldn't be null"); if (singleEntityRead) { - Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); + Preconditions.checkNotNull(getContext().getFlowRunId(), + "flowRunId shouldn't be null"); } } @Override protected void augmentParams(Configuration hbaseConf, Connection conn) { - if (!singleEntityRead) { - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (!fieldsToRetrieve.contains(Field.METRICS) && - metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.METRICS); - } - } + getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); } @Override @@ -129,9 +101,11 @@ class FlowRunEntityReader extends TimelineEntityReader { FamilyFilter infoColumnFamily = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Metrics not required. - if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) && - !fieldsToRetrieve.contains(Field.ALL)) { + if (!singleEntityRead && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)) { FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); infoColFamilyList.addFilter(infoColumnFamily); infoColFamilyList.addFilter( @@ -140,12 +114,12 @@ class FlowRunEntityReader extends TimelineEntityReader { FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); list.addFilter(infoColFamilyList); } - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { + if (dataToRetrieve.getMetricsToRetrieve() != null && + !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) { FilterList infoColFamilyList = new FilterList(); infoColFamilyList.addFilter(infoColumnFamily); infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + FlowRunColumnPrefix.METRIC, dataToRetrieve.getMetricsToRetrieve())); list.addFilter(infoColFamilyList); } return list; @@ -154,8 +128,10 @@ class FlowRunEntityReader extends TimelineEntityReader { @Override protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { + TimelineReaderContext context = getContext(); byte[] rowKey = - FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); + FlowRunRowKey.getRowKey(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId()); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -168,10 +144,12 @@ class FlowRunEntityReader extends TimelineEntityReader { protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); + TimelineReaderContext context = getContext(); scan.setRowPrefixFilter( - FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName)); + FlowRunRowKey.getRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName())); FilterList newList = new FilterList(); - newList.addFilter(new PageFilter(limit)); + newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { newList.addFilter(filterList); } @@ -181,11 +159,12 @@ class FlowRunEntityReader extends TimelineEntityReader { @Override protected TimelineEntity parseEntity(Result result) throws IOException { + TimelineReaderContext context = getContext(); FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(userId); - flowRun.setName(flowName); + flowRun.setUser(context.getUserId()); + flowRun.setName(context.getFlowName()); if (singleEntityRead) { - flowRun.setRunId(flowRunId); + flowRun.setRunId(context.getFlowRunId()); } else { FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); flowRun.setRunId(rowKey.getFlowRunId()); @@ -196,8 +175,9 @@ class FlowRunEntityReader extends TimelineEntityReader { if (startTime != null) { flowRun.setStartTime(startTime.longValue()); } - if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin || - flowRun.getStartTime() > createdTimeEnd)) { + if (!singleEntityRead && + (flowRun.getStartTime() < getFilters().getCreatedTimeBegin() || + flowRun.getStartTime() > getFilters().getCreatedTimeEnd())) { return null; } @@ -214,7 +194,8 @@ class FlowRunEntityReader extends TimelineEntityReader { } // read metrics - if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) { + if (singleEntityRead || + getDataToRetrieve().getFieldsToRetrieve().contains(Field.METRICS)) { readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/10a4f8ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 237b9ac..3bc2f3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -42,9 +42,10 @@ import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; @@ -77,26 +78,15 @@ class GenericEntityReader extends TimelineEntityReader { */ private final AppToFlowTable appToFlowTable = new AppToFlowTable(); - public GenericEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, relatesTo, isRelatedTo, infoFilters, - configFilters, metricFilters, eventFilters, confsToRetrieve, - metricsToRetrieve, fieldsToRetrieve, sortedKeys); + public GenericEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, + boolean sortedKeys) { + super(ctxt, entityFilters, toRetrieve, sortedKeys); } - public GenericEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); + public GenericEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); } /** @@ -109,12 +99,13 @@ class GenericEntityReader extends TimelineEntityReader { @Override protected FilterList constructFilterListBasedOnFields() { FilterList list = new FilterList(Operator.MUST_PASS_ONE); + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Fetch all the columns. - if (fieldsToRetrieve.contains(Field.ALL) && - (confsToRetrieve == null || - confsToRetrieve.getFilterList().isEmpty()) && - (metricsToRetrieve == null || - metricsToRetrieve.getFilterList().isEmpty())) { + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (dataToRetrieve.getConfsToRetrieve() == null || + dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) && + (dataToRetrieve.getMetricsToRetrieve() == null || + dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { return list; } FilterList infoColFamilyList = new FilterList(); @@ -123,62 +114,69 @@ class GenericEntityReader extends TimelineEntityReader { new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(EntityColumnFamily.INFO.getBytes())); infoColFamilyList.addFilter(infoColumnFamily); + TimelineEntityFilters filters = getFilters(); // Events not required. - if (!fieldsToRetrieve.contains(Field.EVENTS) && - !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (singleEntityRead || filters.getEventFilters() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( EntityColumnPrefix.EVENT.getColumnPrefixBytes("")))); } // info not required. - if (!fieldsToRetrieve.contains(Field.INFO) && - !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (singleEntityRead || filters.getInfoFilters() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( EntityColumnPrefix.INFO.getColumnPrefixBytes("")))); } // is related to not required. - if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && - !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (singleEntityRead || filters.getIsRelatedTo() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); } // relates to not required. - if (!fieldsToRetrieve.contains(Field.RELATES_TO) && - !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) && + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && + (singleEntityRead || filters.getRelatesTo() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); } list.addFilter(infoColFamilyList); - if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || - (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty())) { + if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) || + (!singleEntityRead && filters.getConfigFilters() != null)) || + (dataToRetrieve.getConfsToRetrieve() != null && + !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) { FilterList filterCfg = new FilterList(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes()))); - if (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty()) { + if (dataToRetrieve.getConfsToRetrieve() != null && + !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) { filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.CONFIG, confsToRetrieve)); + EntityColumnPrefix.CONFIG, dataToRetrieve.getConfsToRetrieve())); } list.addFilter(filterCfg); } - if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || - (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty())) { + if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) || + (!singleEntityRead && filters.getMetricFilters() != null)) || + (dataToRetrieve.getMetricsToRetrieve() != null && + !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { FilterList filterMetrics = new FilterList(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(EntityColumnFamily.METRICS.getBytes()))); - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { + if (dataToRetrieve.getMetricsToRetrieve() != null && + !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) { filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.METRIC, metricsToRetrieve)); + EntityColumnPrefix.METRIC, dataToRetrieve.getMetricsToRetrieve())); } list.addFilter(filterMetrics); } @@ -215,56 +213,42 @@ class GenericEntityReader extends TimelineEntityReader { @Override protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(appId, "appId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getAppId(), + "appId shouldn't be null"); + Preconditions.checkNotNull(getContext().getEntityType(), + "entityType shouldn't be null"); if (singleEntityRead) { - Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); + Preconditions.checkNotNull(getContext().getEntityId(), + "entityId shouldn't be null"); } } @Override protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { + TimelineReaderContext context = getContext(); // In reality all three should be null or neither should be null - if (flowName == null || flowRunId == null || userId == null) { - FlowContext context = - lookupFlowContext(clusterId, appId, hbaseConf, conn); - flowName = context.flowName; - flowRunId = context.flowRunId; - userId = context.userId; - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (!fieldsToRetrieve.contains(Field.CONFIGS) && - confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.CONFIGS); - } - if (!fieldsToRetrieve.contains(Field.METRICS) && - metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.METRICS); - } - if (!singleEntityRead) { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } + if (context.getFlowName() == null || context.getFlowRunId() == null || + context.getUserId() == null) { + FlowContext flowContext = lookupFlowContext( + context.getClusterId(), context.getAppId(), hbaseConf, conn); + context.setFlowName(flowContext.flowName); + context.setFlowRunId(flowContext.flowRunId); + context.setUserId(flowContext.userId); } + getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); } @Override protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { + TimelineReaderContext context = getContext(); byte[] rowKey = - EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, - entityType, entityId); + EntityRowKey.getRowKey(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId(), context.getAppId(), + context.getEntityType(), context.getEntityId()); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -279,8 +263,10 @@ class GenericEntityReader extends TimelineEntityReader { // Scan through part of the table to find the entities belong to one app // and one type Scan scan = new Scan(); + TimelineReaderContext context = getContext(); scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( - clusterId, userId, flowName, flowRunId, appId, entityType)); + context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowRunId(), context.getAppId(), context.getEntityType())); scan.setMaxVersions(Integer.MAX_VALUE); if (filterList != null && !filterList.getFilters().isEmpty()) { scan.setFilter(filterList); @@ -299,21 +285,25 @@ class GenericEntityReader extends TimelineEntityReader { String entityId = EntityColumn.ID.readResult(result).toString(); entity.setId(entityId); + TimelineEntityFilters filters = getFilters(); // fetch created time Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { + if (!singleEntityRead && + (entity.getCreatedTime() < filters.getCreatedTimeBegin() || + entity.getCreatedTime() > filters.getCreatedTimeEnd())) { return null; } - + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + boolean checkIsRelatedTo = + filters != null && filters.getIsRelatedTo() != null && + filters.getIsRelatedTo().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { + entity.getIsRelatedToEntities(), filters.getIsRelatedTo())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -323,12 +313,14 @@ class GenericEntityReader extends TimelineEntityReader { } // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + boolean checkRelatesTo = + filters != null && filters.getRelatesTo() != null && + filters.getRelatesTo().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { + entity.getRelatesToEntities(), filters.getRelatesTo())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -338,12 +330,14 @@ class GenericEntityReader extends TimelineEntityReader { } // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + boolean checkInfo = filters != null && filters.getInfoFilters() != null && + filters.getInfoFilters().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.INFO) || checkInfo) { readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); if (checkInfo && - !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { + !TimelineStorageUtils.matchFilters( + entity.getInfo(), filters.getInfoFilters())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -353,12 +347,14 @@ class GenericEntityReader extends TimelineEntityReader { } // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; + boolean checkConfigs = + filters != null && filters.getConfigFilters() != null && + filters.getConfigFilters().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), configFilters)) { + entity.getConfigs(), filters.getConfigFilters())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -368,12 +364,14 @@ class GenericEntityReader extends TimelineEntityReader { } // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + boolean checkEvents = + filters != null && filters.getEventFilters() != null && + filters.getEventFilters().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { readEvents(entity, result, false); if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { + entity.getEvents(), filters.getEventFilters())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -383,12 +381,14 @@ class GenericEntityReader extends TimelineEntityReader { } // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + boolean checkMetrics = + filters != null && filters.getMetricFilters() != null && + filters.getMetricFilters().size() > 0; if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { readMetrics(entity, result, EntityColumnPrefix.METRIC); if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { + entity.getMetrics(), filters.getMetricFilters())) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && http://git-wip-us.apache.org/repos/asf/hadoop/blob/10a4f8ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index bc86b6d..454c179 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; -import java.util.EnumSet; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; @@ -34,8 +33,9 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; @@ -46,32 +46,12 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix */ public abstract class TimelineEntityReader { private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); - protected static final long DEFAULT_BEGIN_TIME = 0L; - protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; protected final boolean singleEntityRead; - - protected String userId; - protected String clusterId; - protected String flowName; - protected Long flowRunId; - protected String appId; - protected String entityType; - protected EnumSet<Field> fieldsToRetrieve; - // used only for a single entity read mode - protected String entityId; + private TimelineReaderContext context; + private TimelineDataToRetrieve dataToRetrieve; // used only for multiple entity read mode - protected Long limit; - protected Long createdTimeBegin; - protected Long createdTimeEnd; - protected Map<String, Set<String>> relatesTo; - protected Map<String, Set<String>> isRelatedTo; - protected Map<String, Object> infoFilters; - protected Map<String, String> configFilters; - protected Set<String> metricFilters; - protected Set<String> eventFilters; - protected TimelineFilterList confsToRetrieve; - protected TimelineFilterList metricsToRetrieve; + private TimelineEntityFilters filters; /** * Main table the entity reader uses. @@ -89,34 +69,14 @@ public abstract class TimelineEntityReader { /** * Instantiates a reader for multiple-entity reads. */ - protected TimelineEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) { + protected TimelineEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, + boolean sortedKeys) { this.singleEntityRead = false; this.sortedKeys = sortedKeys; - this.userId = userId; - this.clusterId = clusterId; - this.flowName = flowName; - this.flowRunId = flowRunId; - this.appId = appId; - this.entityType = entityType; - this.fieldsToRetrieve = fieldsToRetrieve; - this.limit = limit; - this.createdTimeBegin = createdTimeBegin; - this.createdTimeEnd = createdTimeEnd; - this.relatesTo = relatesTo; - this.isRelatedTo = isRelatedTo; - this.infoFilters = infoFilters; - this.configFilters = configFilters; - this.metricFilters = metricFilters; - this.eventFilters = eventFilters; - this.confsToRetrieve = confsToRetrieve; - this.metricsToRetrieve = metricsToRetrieve; + this.context = ctxt; + this.dataToRetrieve = toRetrieve; + this.filters = entityFilters; this.table = getTable(); } @@ -124,21 +84,11 @@ public abstract class TimelineEntityReader { /** * Instantiates a reader for single-entity reads. */ - protected TimelineEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { + protected TimelineEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { this.singleEntityRead = true; - this.userId = userId; - this.clusterId = clusterId; - this.flowName = flowName; - this.flowRunId = flowRunId; - this.appId = appId; - this.entityType = entityType; - this.fieldsToRetrieve = fieldsToRetrieve; - this.entityId = entityId; - this.confsToRetrieve = confsToRetrieve; - this.metricsToRetrieve = metricsToRetrieve; + this.context = ctxt; + this.dataToRetrieve = toRetrieve; this.table = getTable(); } @@ -151,6 +101,18 @@ public abstract class TimelineEntityReader { */ protected abstract FilterList constructFilterListBasedOnFields(); + protected TimelineReaderContext getContext() { + return context; + } + + protected TimelineDataToRetrieve getDataToRetrieve() { + return dataToRetrieve; + } + + protected TimelineEntityFilters getFilters() { + return filters; + } + /** * Reads and deserializes a single timeline entity from the HBase storage. */ @@ -163,7 +125,8 @@ public abstract class TimelineEntityReader { Result result = getResult(hbaseConf, conn, filterList); if (result == null || result.isEmpty()) { // Could not find a matching row. - LOG.info("Cannot find matching entity of type " + entityType); + LOG.info("Cannot find matching entity of type " + + context.getEntityType()); return null; } return parseEntity(result); @@ -190,11 +153,11 @@ public abstract class TimelineEntityReader { } entities.add(entity); if (!sortedKeys) { - if (entities.size() > limit) { + if (entities.size() > filters.getLimit()) { entities.pollLast(); } } else { - if (entities.size() == limit) { + if (entities.size() == filters.getLimit()) { break; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/10a4f8ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java index 2e2c652..f2bdacd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -17,13 +17,10 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; /** * Factory methods for instantiating a timeline entity reader. @@ -33,25 +30,21 @@ public class TimelineEntityReaderFactory { * Creates a timeline entity reader instance for reading a single entity with * the specified input. */ - public static TimelineEntityReader createSingleEntityReader(String userId, - String clusterId, String flowName, Long flowRunId, String appId, - String entityType, String entityId, TimelineFilterList confs, - TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) { + public static TimelineEntityReader createSingleEntityReader( + TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) { // currently the types that are handled separate from the generic entity // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { - return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, confs, metrics, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { - return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, confs, metrics, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { - return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { + return new ApplicationEntityReader(context, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_RUN.matches(context.getEntityType())) { + return new FlowRunEntityReader(context, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { + return new FlowActivityEntityReader(context, dataToRetrieve); } else { // assume we're dealing with a generic entity read - return new GenericEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, confs, metrics, fieldsToRetrieve); + return new GenericEntityReader(context, dataToRetrieve); } } @@ -59,37 +52,22 @@ public class TimelineEntityReaderFactory { * Creates a timeline entity reader instance for reading set of entities with * the specified input and predicates. */ - public static TimelineEntityReader createMultipleEntitiesReader(String userId, - String clusterId, String flowName, Long flowRunId, String appId, - String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confs, TimelineFilterList metrics, - EnumSet<Field> fieldsToRetrieve) { + public static TimelineEntityReader createMultipleEntitiesReader( + TimelineReaderContext context, TimelineEntityFilters filters, + TimelineDataToRetrieve dataToRetrieve) { // currently the types that are handled separate from the generic entity // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { - return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo, - isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters, - confs, metrics, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { - return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo, - isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { - return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo, - isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters, - confs, metrics, fieldsToRetrieve); + if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { + return new ApplicationEntityReader(context, filters, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { + return new FlowActivityEntityReader(context, filters, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_RUN.matches(context.getEntityType())) { + return new FlowRunEntityReader(context, filters, dataToRetrieve); } else { // assume we're dealing with a generic entity read - return new GenericEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo, - isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters, - confs, metrics, fieldsToRetrieve, false); + return new GenericEntityReader(context, filters, dataToRetrieve, false); } } }