YARN-4200. Refactor reader classes in storage to nest under hbase specific package name. Contributed by Li Lu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/61737325 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/61737325 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/61737325 Branch: refs/heads/YARN-2928 Commit: 61737325ccaaea48e20d2f288c0c9d1b43a8bccb Parents: ed5f7db Author: Li Lu <gtcarre...@apache.org> Authored: Mon Jan 11 18:05:36 2016 -0800 Committer: Li Lu <gtcarre...@apache.org> Committed: Wed May 4 16:22:12 2016 -0700 ---------------------------------------------------------------------- .../storage/ApplicationEntityReader.java | 382 -------------- .../storage/FlowActivityEntityReader.java | 163 ------ .../storage/FlowRunEntityReader.java | 225 --------- .../storage/GenericEntityReader.java | 496 ------------------ .../storage/HBaseTimelineReaderImpl.java | 2 + .../storage/TimelineEntityReader.java | 274 ---------- .../storage/TimelineEntityReaderFactory.java | 100 ---- .../storage/reader/ApplicationEntityReader.java | 383 ++++++++++++++ .../reader/FlowActivityEntityReader.java | 164 ++++++ .../storage/reader/FlowRunEntityReader.java | 226 +++++++++ .../storage/reader/GenericEntityReader.java | 497 +++++++++++++++++++ .../storage/reader/TimelineEntityReader.java | 274 ++++++++++ .../reader/TimelineEntityReaderFactory.java | 100 ++++ .../storage/reader/package-info.java | 23 + 14 files changed, 1669 insertions(+), 1640 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java deleted file mode 100644 index d812a6c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java +++ /dev/null @@ -1,382 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for application entities that are stored in the - * application table. - */ -class ApplicationEntityReader extends GenericEntityReader { - private static final ApplicationTable APPLICATION_TABLE = - new ApplicationTable(); - - public ApplicationEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, - true); - } - - public ApplicationEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); - } - - /** - * Uses the {@link ApplicationTable}. - */ - protected BaseTable<?> getTable() { - return APPLICATION_TABLE; - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - // Fetch all the columns. - if (fieldsToRetrieve.contains(Field.ALL) && - (confsToRetrieve == null || - confsToRetrieve.getFilterList().isEmpty()) && - (metricsToRetrieve == null || - metricsToRetrieve.getFilterList().isEmpty())) { - return list; - } - FilterList infoColFamilyList = new FilterList(); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); - infoColFamilyList.addFilter(infoColumnFamily); - // Events not required. - if (!fieldsToRetrieve.contains(Field.EVENTS) && - !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.EVENT.getColumnPrefixBytes("")))); - } - // info not required. - if (!fieldsToRetrieve.contains(Field.INFO) && - !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.INFO.getColumnPrefixBytes("")))); - } - // is releated to not required. - if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && - !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); - } - // relates to not required. - if (!fieldsToRetrieve.contains(Field.RELATES_TO) && - !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); - } - list.addFilter(infoColFamilyList); - if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || - (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty())) { - FilterList filterCfg = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes()))); - if (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty()) { - filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.CONFIG, confsToRetrieve)); - } - list.addFilter(filterCfg); - } - if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || - (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty())) { - FilterList filterMetrics = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes()))); - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.METRIC, metricsToRetrieve)); - } - list.addFilter(filterMetrics); - } - return list; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - byte[] rowKey = - ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, - appId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return table.getResult(hbaseConf, conn, get); - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); - if (singleEntityRead) { - Preconditions.checkNotNull(appId, "appId shouldn't be null"); - } else { - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - if (singleEntityRead) { - if (flowName == null || flowRunId == null || userId == null) { - FlowContext context = - lookupFlowContext(clusterId, appId, hbaseConf, conn); - flowName = context.flowName; - flowRunId = context.flowRunId; - userId = context.userId; - } - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (!fieldsToRetrieve.contains(Field.CONFIGS) && - confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.CONFIGS); - } - if (!fieldsToRetrieve.contains(Field.METRICS) && - metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.METRICS); - } - if (!singleEntityRead) { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (modifiedTimeBegin == null) { - modifiedTimeBegin = DEFAULT_BEGIN_TIME; - } - if (modifiedTimeEnd == null) { - modifiedTimeEnd = DEFAULT_END_TIME; - } - } - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - Scan scan = new Scan(); - if (flowRunId != null) { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(clusterId, userId, flowName, flowRunId)); - } else { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(clusterId, userId, flowName)); - } - FilterList newList = new FilterList(); - newList.addFilter(new PageFilter(limit)); - if (filterList != null && !filterList.getFilters().isEmpty()) { - newList.addFilter(filterList); - } - scan.setFilter(newList); - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); - String entityId = ApplicationColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - // fetch created time - Number createdTime = - (Number)ApplicationColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { - return null; - } - - // fetch modified time - Number modifiedTime = - (Number)ApplicationColumn.MODIFIED_TIME.readResult(result); - entity.setModifiedTime(modifiedTime.longValue()); - if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || - entity.getModifiedTime() > modifiedTimeEnd)) { - return null; - } - - // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, - true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, - false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); - if (checkInfo && - !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } - } - - // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), configFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } - } - - // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, true); - if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { - readMetrics(entity, result, ApplicationColumnPrefix.METRIC); - if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } - } - return entity; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java deleted file mode 100644 index 7e8d4ba..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow activity entities that are stored in the - * flow activity table. - */ -class FlowActivityEntityReader extends TimelineEntityReader { - private static final FlowActivityTable FLOW_ACTIVITY_TABLE = - new FlowActivityTable(); - - public FlowActivityEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, null, null, fieldsToRetrieve, true); - } - - public FlowActivityEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - null, null, fieldsToRetrieve); - } - - /** - * Uses the {@link FlowActivityTable}. - */ - @Override - protected BaseTable<?> getTable() { - return FLOW_ACTIVITY_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - return null; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - throw new UnsupportedOperationException( - "we don't support a single entity query"); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - Scan scan = new Scan(); - if (createdTimeBegin == DEFAULT_BEGIN_TIME && - createdTimeEnd == DEFAULT_END_TIME) { - scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); - } else { - scan.setStartRow( - FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd)); - scan.setStopRow( - FlowActivityRowKey.getRowKeyPrefix(clusterId, - (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1)))); - } - // use the page filter to limit the result to the page size - // the scanner may still return more than the limit; therefore we need to - // read the right number as we iterate - scan.setFilter(new PageFilter(limit)); - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); - - long time = rowKey.getDayTimestamp(); - String user = rowKey.getUserId(); - String flowName = rowKey.getFlowName(); - - FlowActivityEntity flowActivity = - new FlowActivityEntity(clusterId, time, user, flowName); - // set the id - flowActivity.setId(flowActivity.getId()); - // get the list of run ids along with the version that are associated with - // this flow on this day - Map<String, Object> runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result); - for (Map.Entry<String, Object> e : runIdsMap.entrySet()) { - Long runId = Long.valueOf(e.getKey()); - String version = (String)e.getValue(); - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(user); - flowRun.setName(flowName); - flowRun.setRunId(runId); - flowRun.setVersion(version); - // set the id - flowRun.setId(flowRun.getId()); - flowActivity.addFlowRun(flowRun); - } - - return flowActivity; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java deleted file mode 100644 index c9076ee..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow run entities that are stored in the flow run - * table. - */ -class FlowRunEntityReader extends TimelineEntityReader { - private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); - - public FlowRunEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true); - } - - public FlowRunEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - null, metricsToRetrieve, fieldsToRetrieve); - } - - /** - * Uses the {@link FlowRunTable}. - */ - @Override - protected BaseTable<?> getTable() { - return FLOW_RUN_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); - if (singleEntityRead) { - Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) { - if (!singleEntityRead) { - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (!fieldsToRetrieve.contains(Field.METRICS) && - metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.METRICS); - } - } - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); - // Metrics not required. - if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) && - !fieldsToRetrieve.contains(Field.ALL)) { - FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); - infoColFamilyList.addFilter(infoColumnFamily); - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); - list.addFilter(infoColFamilyList); - } - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - FilterList infoColFamilyList = new FilterList(); - infoColFamilyList.addFilter(infoColumnFamily); - infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricsToRetrieve)); - list.addFilter(infoColFamilyList); - } - return list; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - byte[] rowKey = - FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return table.getResult(hbaseConf, conn, get); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - Scan scan = new Scan(); - scan.setRowPrefixFilter( - FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName)); - FilterList newList = new FilterList(); - newList.addFilter(new PageFilter(limit)); - if (filterList != null && !filterList.getFilters().isEmpty()) { - newList.addFilter(filterList); - } - scan.setFilter(newList); - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(userId); - flowRun.setName(flowName); - if (singleEntityRead) { - flowRun.setRunId(flowRunId); - } else { - FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); - flowRun.setRunId(rowKey.getFlowRunId()); - } - - // read the start time - Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result); - if (startTime != null) { - flowRun.setStartTime(startTime.longValue()); - } - if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin || - flowRun.getStartTime() > createdTimeEnd)) { - return null; - } - - // read the end time if available - Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); - if (endTime != null) { - flowRun.setMaxEndTime(endTime.longValue()); - } - - // read the flow version - String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); - if (version != null) { - flowRun.setVersion(version); - } - - // read metrics - if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) { - readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); - } - - // set the id - flowRun.setId(flowRun.getId()); - return flowRun; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java deleted file mode 100644 index 784dfd5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ /dev/null @@ -1,496 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for generic entities that are stored in the entity - * table. - */ -class GenericEntityReader extends TimelineEntityReader { - private static final EntityTable ENTITY_TABLE = new EntityTable(); - private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); - - /** - * Used to look up the flow context. - */ - private final AppToFlowTable appToFlowTable = new AppToFlowTable(); - - public GenericEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, - sortedKeys); - } - - public GenericEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); - } - - /** - * Uses the {@link EntityTable}. - */ - protected BaseTable<?> getTable() { - return ENTITY_TABLE; - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - // Fetch all the columns. - if (fieldsToRetrieve.contains(Field.ALL) && - (confsToRetrieve == null || - confsToRetrieve.getFilterList().isEmpty()) && - (metricsToRetrieve == null || - metricsToRetrieve.getFilterList().isEmpty())) { - return list; - } - FilterList infoColFamilyList = new FilterList(); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.INFO.getBytes())); - infoColFamilyList.addFilter(infoColumnFamily); - // Events not required. - if (!fieldsToRetrieve.contains(Field.EVENTS) && - !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.EVENT.getColumnPrefixBytes("")))); - } - // info not required. - if (!fieldsToRetrieve.contains(Field.INFO) && - !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.INFO.getColumnPrefixBytes("")))); - } - // is related to not required. - if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && - !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); - } - // relates to not required. - if (!fieldsToRetrieve.contains(Field.RELATES_TO) && - !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); - } - list.addFilter(infoColFamilyList); - if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || - (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty())) { - FilterList filterCfg = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes()))); - if (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty()) { - filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.CONFIG, confsToRetrieve)); - } - list.addFilter(filterCfg); - } - if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || - (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty())) { - FilterList filterMetrics = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.METRICS.getBytes()))); - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.METRIC, metricsToRetrieve)); - } - list.addFilter(filterMetrics); - } - return list; - } - - protected FlowContext lookupFlowContext(String clusterId, String appId, - Configuration hbaseConf, Connection conn) throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); - Get get = new Get(rowKey); - Result result = appToFlowTable.getResult(hbaseConf, conn, get); - if (result != null && !result.isEmpty()) { - return new FlowContext( - AppToFlowColumn.USER_ID.readResult(result).toString(), - AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); - } else { - throw new IOException( - "Unable to find the context flow ID and flow run ID for clusterId=" + - clusterId + ", appId=" + appId); - } - } - - protected static class FlowContext { - protected final String userId; - protected final String flowName; - protected final Long flowRunId; - public FlowContext(String user, String flowName, Long flowRunId) { - this.userId = user; - this.flowName = flowName; - this.flowRunId = flowRunId; - } - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(appId, "appId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); - if (singleEntityRead) { - Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - // In reality all three should be null or neither should be null - if (flowName == null || flowRunId == null || userId == null) { - FlowContext context = - lookupFlowContext(clusterId, appId, hbaseConf, conn); - flowName = context.flowName; - flowRunId = context.flowRunId; - userId = context.userId; - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (!fieldsToRetrieve.contains(Field.CONFIGS) && - confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.CONFIGS); - } - if (!fieldsToRetrieve.contains(Field.METRICS) && - metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.METRICS); - } - if (!singleEntityRead) { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (modifiedTimeBegin == null) { - modifiedTimeBegin = DEFAULT_BEGIN_TIME; - } - if (modifiedTimeEnd == null) { - modifiedTimeEnd = DEFAULT_END_TIME; - } - } - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - byte[] rowKey = - EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, - entityType, entityId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return table.getResult(hbaseConf, conn, get); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - // Scan through part of the table to find the entities belong to one app - // and one type - Scan scan = new Scan(); - scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( - clusterId, userId, flowName, flowRunId, appId, entityType)); - scan.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - scan.setFilter(filterList); - } - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - String entityType = EntityColumn.TYPE.readResult(result).toString(); - entity.setType(entityType); - String entityId = EntityColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - // fetch created time - Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { - return null; - } - - // fetch modified time - Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result); - entity.setModifiedTime(modifiedTime.longValue()); - if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || - entity.getModifiedTime() > modifiedTimeEnd)) { - return null; - } - - // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); - if (checkInfo && - !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } - } - - // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), configFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } - } - - // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, false); - if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { - readMetrics(entity, result, EntityColumnPrefix.METRIC); - if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } - } - return entity; - } - - /** - * Helper method for reading relationship. - */ - protected <T> void readRelationship( - TimelineEntity entity, Result result, ColumnPrefix<T> prefix, - boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map<String, Set<String>> - Map<String, Object> columns = prefix.readResults(result); - for (Map.Entry<String, Object> column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded( - column.getValue().toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - /** - * Helper method for reading key-value pairs for either info or config. - */ - protected <T> void readKeyValuePairs( - TimelineEntity entity, Result result, ColumnPrefix<T> prefix, - boolean isConfig) throws IOException { - // info and configuration are of type Map<String, Object or String> - Map<String, Object> columns = prefix.readResults(result); - if (isConfig) { - for (Map.Entry<String, Object> column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } - - /** - * Read events from the entity table or the application table. The column name - * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted - * if there is no info associated with the event. - * - * See {@link EntityTable} and {@link ApplicationTable} for a more detailed - * schema description. - */ - protected void readEvents(TimelineEntity entity, Result result, - boolean isApplication) throws IOException { - Map<String, TimelineEvent> eventsMap = new HashMap<>(); - Map<?, Object> eventsResult = isApplication ? - ApplicationColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result) : - EntityColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); - for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) { - byte[][] karr = (byte[][])eventResult.getKey(); - // the column name is of the form "eventId=timestamp=infoKey" - if (karr.length == 3) { - String id = Bytes.toString(karr[0]); - long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); - String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); - TimelineEvent event = eventsMap.get(key); - if (event == null) { - event = new TimelineEvent(); - event.setId(id); - event.setTimestamp(ts); - eventsMap.put(key, event); - } - // handle empty info - String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); - if (infoKey != null) { - event.addInfo(infoKey, eventResult.getValue()); - } - } else { - LOG.warn("incorrectly formatted column name: it will be discarded"); - continue; - } - } - Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); - entity.addEvents(eventsSet); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 96c5a19..bc48cbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory; public class HBaseTimelineReaderImpl extends AbstractService implements TimelineReader { http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java deleted file mode 100644 index a26c0c2..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; - -/** - * The base class for reading and deserializing timeline entities from the - * HBase storage. Different types can be defined for different types of the - * entities that are being requested. - */ -abstract class TimelineEntityReader { - private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); - protected static final long DEFAULT_BEGIN_TIME = 0L; - protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; - - protected final boolean singleEntityRead; - - protected String userId; - protected String clusterId; - protected String flowName; - protected Long flowRunId; - protected String appId; - protected String entityType; - protected EnumSet<Field> fieldsToRetrieve; - // used only for a single entity read mode - protected String entityId; - // used only for multiple entity read mode - protected Long limit; - protected Long createdTimeBegin; - protected Long createdTimeEnd; - protected Long modifiedTimeBegin; - protected Long modifiedTimeEnd; - protected Map<String, Set<String>> relatesTo; - protected Map<String, Set<String>> isRelatedTo; - protected Map<String, Object> infoFilters; - protected Map<String, String> configFilters; - protected Set<String> metricFilters; - protected Set<String> eventFilters; - protected TimelineFilterList confsToRetrieve; - protected TimelineFilterList metricsToRetrieve; - - /** - * Main table the entity reader uses. - */ - protected BaseTable<?> table; - - /** - * Specifies whether keys for this table are sorted in a manner where entities - * can be retrieved by created time. If true, it will be sufficient to collect - * the first results as specified by the limit. Otherwise all matched entities - * will be fetched and then limit applied. - */ - private boolean sortedKeys = false; - - /** - * Instantiates a reader for multiple-entity reads. - */ - protected TimelineEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) { - this.singleEntityRead = false; - this.sortedKeys = sortedKeys; - this.userId = userId; - this.clusterId = clusterId; - this.flowName = flowName; - this.flowRunId = flowRunId; - this.appId = appId; - this.entityType = entityType; - this.fieldsToRetrieve = fieldsToRetrieve; - this.limit = limit; - this.createdTimeBegin = createdTimeBegin; - this.createdTimeEnd = createdTimeEnd; - this.modifiedTimeBegin = modifiedTimeBegin; - this.modifiedTimeEnd = modifiedTimeEnd; - this.relatesTo = relatesTo; - this.isRelatedTo = isRelatedTo; - this.infoFilters = infoFilters; - this.configFilters = configFilters; - this.metricFilters = metricFilters; - this.eventFilters = eventFilters; - this.confsToRetrieve = confsToRetrieve; - this.metricsToRetrieve = metricsToRetrieve; - - this.table = getTable(); - } - - /** - * Instantiates a reader for single-entity reads. - */ - protected TimelineEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { - this.singleEntityRead = true; - this.userId = userId; - this.clusterId = clusterId; - this.flowName = flowName; - this.flowRunId = flowRunId; - this.appId = appId; - this.entityType = entityType; - this.fieldsToRetrieve = fieldsToRetrieve; - this.entityId = entityId; - this.confsToRetrieve = confsToRetrieve; - this.metricsToRetrieve = metricsToRetrieve; - - this.table = getTable(); - } - - /** - * Creates a {@link FilterList} based on fields, confs and metrics to - * retrieve. This filter list will be set in Scan/Get objects to trim down - * results fetched from HBase back-end storage. - * @return a {@link FilterList} object. - */ - protected abstract FilterList constructFilterListBasedOnFields(); - - /** - * Reads and deserializes a single timeline entity from the HBase storage. - */ - public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) - throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - FilterList filterList = constructFilterListBasedOnFields(); - Result result = getResult(hbaseConf, conn, filterList); - if (result == null || result.isEmpty()) { - // Could not find a matching row. - LOG.info("Cannot find matching entity of type " + entityType); - return null; - } - return parseEntity(result); - } - - /** - * Reads and deserializes a set of timeline entities from the HBase storage. - * It goes through all the results available, and returns the number of - * entries as specified in the limit in the entity's natural sort order. - */ - public Set<TimelineEntity> readEntities(Configuration hbaseConf, - Connection conn) throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - NavigableSet<TimelineEntity> entities = new TreeSet<>(); - FilterList filterList = constructFilterListBasedOnFields(); - ResultScanner results = getResults(hbaseConf, conn, filterList); - try { - for (Result result : results) { - TimelineEntity entity = parseEntity(result); - if (entity == null) { - continue; - } - entities.add(entity); - if (!sortedKeys) { - if (entities.size() > limit) { - entities.pollLast(); - } - } else { - if (entities.size() == limit) { - break; - } - } - } - return entities; - } finally { - results.close(); - } - } - - /** - * Returns the main table to be used by the entity reader. - */ - protected abstract BaseTable<?> getTable(); - - /** - * Validates the required parameters to read the entities. - */ - protected abstract void validateParams(); - - /** - * Sets certain parameters to defaults if the values are not provided. - */ - protected abstract void augmentParams(Configuration hbaseConf, - Connection conn) throws IOException; - - /** - * Fetches a {@link Result} instance for a single-entity read. - * - * @return the {@link Result} instance or null if no such record is found. - */ - protected abstract Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException; - - /** - * Fetches a {@link ResultScanner} for a multi-entity read. - */ - protected abstract ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException; - - /** - * Given a {@link Result} instance, deserializes and creates a - * {@link TimelineEntity}. - * - * @return the {@link TimelineEntity} instance, or null if the {@link Result} - * is null or empty. - */ - protected abstract TimelineEntity parseEntity(Result result) - throws IOException; - - /** - * Helper method for reading and deserializing {@link TimelineMetric} objects - * using the specified column prefix. The timeline metrics then are added to - * the given timeline entity. - */ - protected void readMetrics(TimelineEntity entity, Result result, - ColumnPrefix<?> columnPrefix) throws IOException { - NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - columnPrefix.readResultsWithTimestamps(result); - for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: - metricsResult.entrySet()) { - TimelineMetric metric = new TimelineMetric(); - metric.setId(metricResult.getKey()); - // Simply assume that if the value set contains more than 1 elements, the - // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric - metric.setType(metricResult.getValue().size() > 1 ? - TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); - metric.addValues(metricResult.getValue()); - entity.addMetric(metric); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java deleted file mode 100644 index 36ed4ca..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; - -/** - * Factory methods for instantiating a timeline entity reader. - */ -class TimelineEntityReaderFactory { - /** - * Creates a timeline entity reader instance for reading a single entity with - * the specified input. - */ - public static TimelineEntityReader createSingleEntityReader(String userId, - String clusterId, String flowName, Long flowRunId, String appId, - String entityType, String entityId, TimelineFilterList confs, - TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { - return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, confs, metrics, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { - return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, confs, metrics, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { - return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, confs, metrics, fieldsToRetrieve); - } - } - - /** - * Creates a timeline entity reader instance for reading set of entities with - * the specified input and predicates. - */ - public static TimelineEntityReader createMultipleEntitiesReader(String userId, - String clusterId, String flowName, Long flowRunId, String appId, - String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, - Map<String, Object> infoFilters, Map<String, String> configFilters, - Set<String> metricFilters, Set<String> eventFilters, - TimelineFilterList confs, TimelineFilterList metrics, - EnumSet<Field> fieldsToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { - return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, confs, - metrics, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { - return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { - return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, confs, - metrics, fieldsToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, confs, - metrics, fieldsToRetrieve, false); - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org