YARN-3051. Created storage oriented reader interface for fetching raw entity data and made the filesystem based implementation. Contributed by Varun Saxena.
(cherry picked from commit 499ce52c7b645ec0b1cc8ac62dc9a3127b987a20) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/89e6c693 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/89e6c693 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/89e6c693 Branch: refs/heads/YARN-2928 Commit: 89e6c6932a57cf69b1d97f8e6ac6ccd464143327 Parents: 5751abd Author: Zhijie Shen <zjs...@apache.org> Authored: Mon Jul 6 18:11:27 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Tue Aug 25 10:47:13 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../records/timelineservice/TimelineEntity.java | 5 + .../storage/FileSystemTimelineReaderImpl.java | 490 ++++++++++++++++ .../timelineservice/storage/TimelineReader.java | 162 ++++++ .../TestFileSystemTimelineReaderImpl.java | 556 +++++++++++++++++++ 5 files changed, 1216 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/89e6c693/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a294fc5..58232b7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -73,6 +73,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3801. [JDK-8] Exclude jdk.tools from hbase-client and hbase-testing-util (Tsuyoshi Ozawa via sjlee) + YARN-3051. Created storage oriented reader interface for fetching raw entity + data and made the filesystem based implementation. (Varun Saxena via zjshen) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/89e6c693/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index a641f32..60fba85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records.timelineservice; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.util.TimelineServiceHelper; +import org.codehaus.jackson.annotate.JsonSetter; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; @@ -335,6 +336,7 @@ public class TimelineEntity { } } + @JsonSetter("isrelatedto") public void setIsRelatedToEntities( Map<String, Set<String>> isRelatedToEntities) { if (real == null) { @@ -423,6 +425,7 @@ public class TimelineEntity { } } + @JsonSetter("relatesto") public void setRelatesToEntities(Map<String, Set<String>> relatesToEntities) { if (real == null) { this.relatesToEntities = @@ -441,6 +444,7 @@ public class TimelineEntity { } } + @JsonSetter("createdtime") public void setCreatedTime(long createdTime) { if (real == null) { this.createdTime = createdTime; @@ -458,6 +462,7 @@ public class TimelineEntity { } } + @JsonSetter("modifiedtime") public void setModifiedTime(long modifiedTime) { if (real == null) { this.modifiedTime = modifiedTime; http://git-wip-us.apache.org/repos/asf/hadoop/blob/89e6c693/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java new file mode 100644 index 0000000..f9f1d1d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +import com.google.common.annotations.VisibleForTesting; + +/** + * File System based implementation for TimelineReader. + */ +public class FileSystemTimelineReaderImpl extends AbstractService + implements TimelineReader { + + private static final Log LOG = + LogFactory.getLog(FileSystemTimelineReaderImpl.class); + + private String rootPath; + private static final String ENTITIES_DIR = "entities"; + + /** Default extension for output files. */ + private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; + + @VisibleForTesting + /** Default extension for output files. */ + static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv"; + + @VisibleForTesting + /** Config param for timeline service file system storage root. */ + static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; + + @VisibleForTesting + /** Default value for storage location on local disk. */ + static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT = + "/tmp/timeline_service_data"; + + private final CSVFormat csvFormat = + CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); + + public FileSystemTimelineReaderImpl() { + super(FileSystemTimelineReaderImpl.class.getName()); + } + + @VisibleForTesting + String getRootPath() { + return rootPath; + } + + private static ObjectMapper mapper; + + static { + mapper = new ObjectMapper(); + YarnJacksonJaxbJsonProvider.configObjectMapper(mapper); + } + + /** + * Deserialize a POJO object from a JSON string. + * @param clazz + * class to be desirialized + * + * @param jsonString + * json string to deserialize + * @return TimelineEntity object + * @throws IOException + * @throws JsonMappingException + * @throws JsonGenerationException + */ + public static <T> T getTimelineRecordFromJSON( + String jsonString, Class<T> clazz) + throws JsonGenerationException, JsonMappingException, IOException { + return mapper.readValue(jsonString, clazz); + } + + private static void fillFields(TimelineEntity finalEntity, + TimelineEntity real, EnumSet<Field> fields) { + if (fields.contains(Field.ALL)) { + finalEntity.setConfigs(real.getConfigs()); + finalEntity.setMetrics(real.getMetrics()); + finalEntity.setInfo(real.getInfo()); + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + finalEntity.setEvents(real.getEvents()); + return; + } + for (Field field : fields) { + switch(field) { + case CONFIGS: + finalEntity.setConfigs(real.getConfigs()); + break; + case METRICS: + finalEntity.setMetrics(real.getMetrics()); + break; + case INFO: + finalEntity.setInfo(real.getInfo()); + break; + case IS_RELATED_TO: + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + break; + case RELATES_TO: + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + break; + case EVENTS: + finalEntity.setEvents(real.getEvents()); + break; + default: + continue; + } + } + } + + private static boolean matchFilter(Object infoValue, Object filterValue) { + return infoValue.equals(filterValue); + } + + private static boolean matchFilters(Map<String, ? extends Object> entityInfo, + Map<String, ? extends Object> filters) { + if (entityInfo == null || entityInfo.isEmpty()) { + return false; + } + for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) { + Object infoValue = entityInfo.get(filter.getKey()); + if (infoValue == null) { + return false; + } + if (!matchFilter(infoValue, filter.getValue())) { + return false; + } + } + return true; + } + + private String getFlowRunPath(String userId, String clusterId, String flowId, + Long flowRunId, String appId) + throws IOException { + if (userId != null && flowId != null && flowRunId != null) { + return userId + "/" + flowId + "/" + flowRunId; + } + if (clusterId == null || appId == null) { + throw new IOException("Unable to get flow info"); + } + String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" + + clusterId + "/" + APP_FLOW_MAPPING_FILE; + try (BufferedReader reader = + new BufferedReader(new InputStreamReader( + new FileInputStream( + appFlowMappingFile), Charset.forName("UTF-8"))); + CSVParser parser = new CSVParser(reader, csvFormat)) { + for (CSVRecord record : parser.getRecords()) { + if (record.size() < 4) { + continue; + } + String applicationId = record.get("APP"); + if (applicationId != null && !applicationId.trim().isEmpty() && + !applicationId.trim().equals(appId)) { + continue; + } + return record.get(1).trim() + "/" + record.get(2).trim() + "/" + + record.get(3).trim(); + } + parser.close(); + } + throw new IOException("Unable to get flow info"); + } + + private static boolean matchMetricFilters(Set<TimelineMetric> metrics, + Set<String> metricFilters) { + Set<String> tempMetrics = new HashSet<String>(); + for (TimelineMetric metric : metrics) { + tempMetrics.add(metric.getId()); + } + + for (String metricFilter : metricFilters) { + if (!tempMetrics.contains(metricFilter)) { + return false; + } + } + return true; + } + + private static boolean matchEventFilters(Set<TimelineEvent> entityEvents, + Set<String> eventFilters) { + Set<String> tempEvents = new HashSet<String>(); + for (TimelineEvent event : entityEvents) { + tempEvents.add(event.getId()); + } + + for (String eventFilter : eventFilters) { + if (!tempEvents.contains(eventFilter)) { + return false; + } + } + return true; + } + + private static TimelineEntity createEntityToBeReturned(TimelineEntity entity, + EnumSet<Field> fieldsToRetrieve) { + TimelineEntity entityToBeReturned = new TimelineEntity(); + entityToBeReturned.setIdentifier(entity.getIdentifier()); + entityToBeReturned.setCreatedTime(entity.getCreatedTime()); + entityToBeReturned.setModifiedTime(entity.getModifiedTime()); + if (fieldsToRetrieve != null) { + fillFields(entityToBeReturned, entity, fieldsToRetrieve); + } + return entityToBeReturned; + } + + private static boolean isTimeInRange(Long time, Long timeBegin, + Long timeEnd) { + return (time >= timeBegin) && (time <= timeEnd); + } + + private static boolean matchRelations( + Map<String, Set<String>> entityRelations, + Map<String, Set<String>> relations) { + for (Map.Entry<String, Set<String>> relation : relations.entrySet()) { + Set<String> ids = entityRelations.get(relation.getKey()); + if (ids == null) { + return false; + } + for (String id : relation.getValue()) { + if (!ids.contains(id)) { + return false; + } + } + } + return true; + } + + private static void mergeEntities(TimelineEntity entity1, + TimelineEntity entity2) { + // Ideally created time wont change except in the case of issue from client. + if (entity2.getCreatedTime() > 0) { + entity1.setCreatedTime(entity2.getCreatedTime()); + } + if (entity2.getModifiedTime() > 0) { + entity1.setModifiedTime(entity2.getModifiedTime()); + } + for (Entry<String, String> configEntry : entity2.getConfigs().entrySet()) { + entity1.addConfig(configEntry.getKey(), configEntry.getValue()); + } + for (Entry<String, Object> infoEntry : entity2.getInfo().entrySet()) { + entity1.addInfo(infoEntry.getKey(), infoEntry.getValue()); + } + for (Entry<String, Set<String>> isRelatedToEntry : + entity2.getIsRelatedToEntities().entrySet()) { + String type = isRelatedToEntry.getKey(); + for (String entityId : isRelatedToEntry.getValue()) { + entity1.addIsRelatedToEntity(type, entityId); + } + } + for (Entry<String, Set<String>> relatesToEntry : + entity2.getRelatesToEntities().entrySet()) { + String type = relatesToEntry.getKey(); + for (String entityId : relatesToEntry.getValue()) { + entity1.addRelatesToEntity(type, entityId); + } + } + for (TimelineEvent event : entity2.getEvents()) { + entity1.addEvent(event); + } + for (TimelineMetric metric2 : entity2.getMetrics()) { + boolean found = false; + for (TimelineMetric metric1 : entity1.getMetrics()) { + if (metric1.getId().equals(metric2.getId())) { + metric1.addValues(metric2.getValues()); + found = true; + break; + } + } + if (!found) { + entity1.addMetric(metric2); + } + } + } + + private static TimelineEntity readEntityFromFile(BufferedReader reader) + throws IOException { + TimelineEntity entity = + getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class); + String entityStr = ""; + while ((entityStr = reader.readLine()) != null) { + if (entityStr.trim().isEmpty()) { + continue; + } + TimelineEntity anotherEntity = + getTimelineRecordFromJSON(entityStr, TimelineEntity.class); + if (!entity.getId().equals(anotherEntity.getId()) || + !entity.getType().equals(anotherEntity.getType())) { + continue; + } + mergeEntities(entity, anotherEntity); + } + return entity; + } + + private Set<TimelineEntity> getEntities(File dir, String entityType, + Long limit, Long createdTimeBegin, + Long createdTimeEnd, Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) throws IOException { + if (limit == null || limit <= 0) { + limit = DEFAULT_LIMIT; + } + if (createdTimeBegin == null || createdTimeBegin <= 0) { + createdTimeBegin = 0L; + } + if (createdTimeEnd == null || createdTimeEnd <= 0) { + createdTimeEnd = Long.MAX_VALUE; + } + if (modifiedTimeBegin == null || modifiedTimeBegin <= 0) { + modifiedTimeBegin = 0L; + } + if (modifiedTimeEnd == null || modifiedTimeEnd <= 0) { + modifiedTimeEnd = Long.MAX_VALUE; + } + + // First sort the selected entities based on created/start time. + Map<Long, Set<TimelineEntity>> sortedEntities = + new TreeMap<>( + new Comparator<Long>() { + @Override + public int compare(Long l1, Long l2) { + return l2.compareTo(l1); + } + } + ); + for (File entityFile : dir.listFiles()) { + if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) { + continue; + } + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader( + new FileInputStream( + entityFile), Charset.forName("UTF-8")))) { + TimelineEntity entity = readEntityFromFile(reader); + if (!entity.getType().equals(entityType)) { + continue; + } + if (!isTimeInRange(entity.getCreatedTime(), createdTimeBegin, + createdTimeEnd)) { + continue; + } + if (!isTimeInRange(entity.getModifiedTime(), modifiedTimeBegin, + modifiedTimeEnd)) { + continue; + } + if (relatesTo != null && !relatesTo.isEmpty() && + !matchRelations(entity.getRelatesToEntities(), relatesTo)) { + continue; + } + if (isRelatedTo != null && !isRelatedTo.isEmpty() && + !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) { + continue; + } + if (infoFilters != null && !infoFilters.isEmpty() && + !matchFilters(entity.getInfo(), infoFilters)) { + continue; + } + if (configFilters != null && !configFilters.isEmpty() && + !matchFilters(entity.getConfigs(), configFilters)) { + continue; + } + if (metricFilters != null && !metricFilters.isEmpty() && + !matchMetricFilters(entity.getMetrics(), metricFilters)) { + continue; + } + if (eventFilters != null && !eventFilters.isEmpty() && + !matchEventFilters(entity.getEvents(), eventFilters)) { + continue; + } + TimelineEntity entityToBeReturned = + createEntityToBeReturned(entity, fieldsToRetrieve); + Set<TimelineEntity> entitiesCreatedAtSameTime = + sortedEntities.get(entityToBeReturned.getCreatedTime()); + if (entitiesCreatedAtSameTime == null) { + entitiesCreatedAtSameTime = new HashSet<TimelineEntity>(); + } + entitiesCreatedAtSameTime.add(entityToBeReturned); + sortedEntities.put( + entityToBeReturned.getCreatedTime(), entitiesCreatedAtSameTime); + } + } + + Set<TimelineEntity> entities = new HashSet<TimelineEntity>(); + long entitiesAdded = 0; + for (Set<TimelineEntity> entitySet : sortedEntities.values()) { + for (TimelineEntity entity : entitySet) { + entities.add(entity); + ++entitiesAdded; + if (entitiesAdded >= limit) { + return entities; + } + } + } + return entities; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, + DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + super.serviceInit(conf); + } + + @Override + public TimelineEntity getEntity(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet<Field> fieldsToRetrieve) throws IOException { + String flowRunPath = getFlowRunPath(userId, clusterId, flowId, + flowRunId, appId); + File dir = new File(new File(rootPath, ENTITIES_DIR), + clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType); + File entityFile = + new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION); + try (BufferedReader reader = + new BufferedReader(new InputStreamReader( + new FileInputStream(entityFile), Charset.forName("UTF-8")))) { + TimelineEntity entity = readEntityFromFile(reader); + return createEntityToBeReturned(entity, fieldsToRetrieve); + } + } + + @Override + public Set<TimelineEntity> getEntities(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) throws IOException { + String flowRunPath = + getFlowRunPath(userId, clusterId, flowId, flowRunId, appId); + File dir = + new File(new File(rootPath, ENTITIES_DIR), + clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType); + return getEntities(dir, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/89e6c693/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java new file mode 100644 index 0000000..e4e305e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; + +/** ATSv2 reader interface. */ +@Private +@Unstable +public interface TimelineReader extends Service { + + /** + * Default limit for {@link #getEntities}. + */ + long DEFAULT_LIMIT = 100; + + /** + * Possible fields to retrieve for {@link #getEntities} and + * {@link #getEntity}. + */ + public enum Field { + ALL, + EVENTS, + INFO, + METRICS, + CONFIGS, + RELATES_TO, + IS_RELATED_TO + } + + /** + * <p>The API to fetch the single entity given the entity identifier in the + * scope of the given context.</p> + * + * @param userId + * Context user Id(optional). + * @param clusterId + * Context cluster Id(mandatory). + * @param flowId + * Context flow Id (optional). + * @param flowRunId + * Context flow run Id (optional). + * @param appId + * Context app Id (mandatory) + * @param entityType + * Entity type (mandatory) + * @param entityId + * Entity Id (mandatory) + * @param fieldsToRetrieve + * Specifies which fields of the entity object to retrieve(optional), see + * {@link Field}. If null, retrieves 4 fields namely entity id, + * entity type, entity created time and entity modified time. All + * entities will be returned if {@link Field#ALL} is specified. + * @return a {@link TimelineEntity} instance or null. The entity will + * contain the metadata plus the given fields to retrieve. + * @throws IOException + */ + TimelineEntity getEntity(String userId, String clusterId, String flowId, + Long flowRunId, String appId, String entityType, String entityId, + EnumSet<Field> fieldsToRetrieve) throws IOException; + + /** + * <p>The API to search for a set of entities of the given the entity type in + * the scope of the given context which matches the given predicates. The + * predicates include the created/modified time window, limit to number of + * entities to be returned, and the entities can be filtered by checking + * whether they contain the given info/configs entries in the form of + * key/value pairs, given metrics in the form of metricsIds and its relation + * with metric values given events in the form of the Ids, and whether they + * relate to/are related to other entities. For those parameters which have + * multiple entries, the qualified entity needs to meet all or them.</p> + * + * @param userId + * Context user Id(optional). + * @param clusterId + * Context cluster Id(mandatory). + * @param flowId + * Context flow Id (optional). + * @param flowRunId + * Context flow run Id (optional). + * @param appId + * Context app Id (mandatory) + * @param entityType + * Entity type (mandatory) + * @param limit + * A limit on the number of entities to return (optional). If null or <=0, + * defaults to {@link #DEFAULT_LIMIT}. + * @param createdTimeBegin + * Matched entities should not be created before this timestamp (optional). + * If null or <=0, defaults to 0. + * @param createdTimeEnd + * Matched entities should not be created after this timestamp (optional). + * If null or <=0, defaults to {@link Long#MAX_VALUE}. + * @param modifiedTimeBegin + * Matched entities should not be modified before this timestamp + * (optional). If null or <=0, defaults to 0. + * @param modifiedTimeEnd + * Matched entities should not be modified after this timestamp (optional). + * If null or <=0, defaults to {@link Long#MAX_VALUE}. + * @param relatesTo + * Matched entities should relate to given entities (optional). + * @param isRelatedTo + * Matched entities should be related to given entities (optional). + * @param infoFilters + * Matched entities should have exact matches to the given info represented + * as key-value pairs (optional). If null or empty, the filter is not + * applied. + * @param configFilters + * Matched entities should have exact matches to the given configs + * represented as key-value pairs (optional). If null or empty, the filter + * is not applied. + * @param metricFilters + * Matched entities should contain the given metrics (optional). If null + * or empty, the filter is not applied. + * @param eventFilters + * Matched entities should contain the given events (optional). If null + * or empty, the filter is not applied. + * @param fieldsToRetrieve + * Specifies which fields of the entity object to retrieve(optional), see + * {@link Field}. If null, retrieves 4 fields namely entity id, + * entity type, entity created time and entity modified time. All + * entities will be returned if {@link Field#ALL} is specified. + * @return A set of {@link TimelineEntity} instances of the given entity type + * in the given context scope which matches the given predicates + * ordered by created time, descending. Each entity will only contain the + * metadata(id, type, created and modified times) plus the given fields to + * retrieve. + * @throws IOException + */ + Set<TimelineEntity> getEntities(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/89e6c693/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java new file mode 100644 index 0000000..4e23e49 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java @@ -0,0 +1,556 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestFileSystemTimelineReaderImpl { + + private static final String rootDir = + FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + FileSystemTimelineReaderImpl reader; + + @BeforeClass + public static void setup() throws Exception { + loadEntityData(); + // Create app flow mapping file. + CSVFormat format = + CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); + String appFlowMappingFile = rootDir + "/entities/cluster1/" + + FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE; + try (PrintWriter out = + new PrintWriter(new BufferedWriter( + new FileWriter(appFlowMappingFile, true))); + CSVPrinter printer = new CSVPrinter(out, format)){ + printer.printRecord("app1", "user1", "flow1", 1); + printer.printRecord("app2","user1","flow1,flow",1); + printer.close(); + } + (new File(rootDir)).deleteOnExit(); + } + + @AfterClass + public static void tearDown() throws Exception { + FileUtils.deleteDirectory(new File(rootDir)); + } + + @Before + public void init() throws Exception { + reader = new FileSystemTimelineReaderImpl(); + Configuration conf = new YarnConfiguration(); + conf.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + rootDir); + reader.init(conf); + } + + private static void writeEntityFile(TimelineEntity entity, File dir) + throws Exception { + if (!dir.exists()) { + if (!dir.mkdirs()) { + throw new IOException("Could not create directories for " + dir); + } + } + String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist"; + try (PrintWriter out = + new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){ + out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); + out.write("\n"); + out.close(); + } + } + + private static void loadEntityData() throws Exception { + File appDir = new File(rootDir + + "/entities/cluster1/user1/flow1/1/app1/app/"); + TimelineEntity entity11 = new TimelineEntity(); + entity11.setId("id_1"); + entity11.setType("app"); + entity11.setCreatedTime(1425016502000L); + entity11.setModifiedTime(1425016502050L); + Map<String, Object> info1 = new HashMap<String, Object>(); + info1.put("info1", "val1"); + entity11.addInfo(info1); + TimelineEvent event = new TimelineEvent(); + event.setId("event_1"); + event.setTimestamp(1425016502003L); + entity11.addEvent(event); + Set<TimelineMetric> metrics = new HashSet<TimelineMetric>(); + TimelineMetric metric1 = new TimelineMetric(); + metric1.setId("metric1"); + metric1.setType(TimelineMetric.Type.SINGLE_VALUE); + metric1.addValue(1425016502006L, 113.2F); + metrics.add(metric1); + TimelineMetric metric2 = new TimelineMetric(); + metric2.setId("metric2"); + metric2.setType(TimelineMetric.Type.TIME_SERIES); + metric2.addValue(1425016502016L, 34); + metrics.add(metric2); + entity11.setMetrics(metrics); + Map<String,String> configs = new HashMap<String, String>(); + configs.put("config_1", "123"); + entity11.setConfigs(configs); + entity11.addRelatesToEntity("flow", "flow1"); + entity11.addIsRelatedToEntity("type1", "tid1_1"); + writeEntityFile(entity11, appDir); + TimelineEntity entity12 = new TimelineEntity(); + entity12.setId("id_1"); + entity12.setType("app"); + entity12.setModifiedTime(1425016503000L); + configs.clear(); + configs.put("config_2", "23"); + configs.put("config_3", "abc"); + entity12.addConfigs(configs); + metrics.clear(); + TimelineMetric metric12 = new TimelineMetric(); + metric12.setId("metric2"); + metric12.setType(TimelineMetric.Type.TIME_SERIES); + metric12.addValue(1425016502032L, 48); + metric12.addValue(1425016502054L, 51); + metrics.add(metric12); + TimelineMetric metric3 = new TimelineMetric(); + metric3.setId("metric3"); + metric3.setType(TimelineMetric.Type.SINGLE_VALUE); + metric3.addValue(1425016502060L, 23L); + metrics.add(metric3); + entity12.setMetrics(metrics); + entity12.addIsRelatedToEntity("type1", "tid1_2"); + entity12.addIsRelatedToEntity("type2", "tid2_1`"); + TimelineEvent event15 = new TimelineEvent(); + event15.setId("event_5"); + event15.setTimestamp(1425016502017L); + entity12.addEvent(event15); + writeEntityFile(entity12, appDir); + + TimelineEntity entity2 = new TimelineEntity(); + entity2.setId("id_2"); + entity2.setType("app"); + entity2.setCreatedTime(1425016501050L); + entity2.setModifiedTime(1425016502010L); + Map<String, Object> info2 = new HashMap<String, Object>(); + info1.put("info2", 4); + entity2.addInfo(info2); + Map<String,String> configs2 = new HashMap<String, String>(); + configs2.put("config_1", "123"); + configs2.put("config_3", "def"); + entity2.setConfigs(configs2); + TimelineEvent event2 = new TimelineEvent(); + event2.setId("event_2"); + event2.setTimestamp(1425016501003L); + entity2.addEvent(event2); + Set<TimelineMetric> metrics2 = new HashSet<TimelineMetric>(); + TimelineMetric metric21 = new TimelineMetric(); + metric21.setId("metric1"); + metric21.setType(TimelineMetric.Type.SINGLE_VALUE); + metric21.addValue(1425016501006L, 123.2F); + metrics2.add(metric21); + TimelineMetric metric22 = new TimelineMetric(); + metric22.setId("metric2"); + metric22.setType(TimelineMetric.Type.TIME_SERIES); + metric22.addValue(1425016501056L, 31); + metric22.addValue(1425016501084L, 70); + metrics2.add(metric22); + TimelineMetric metric23 = new TimelineMetric(); + metric23.setId("metric3"); + metric23.setType(TimelineMetric.Type.SINGLE_VALUE); + metric23.addValue(1425016502060L, 23L); + metrics2.add(metric23); + entity2.setMetrics(metrics2); + entity2.addRelatesToEntity("flow", "flow2"); + writeEntityFile(entity2, appDir); + + TimelineEntity entity3 = new TimelineEntity(); + entity3.setId("id_3"); + entity3.setType("app"); + entity3.setCreatedTime(1425016501050L); + entity3.setModifiedTime(1425016502010L); + Map<String, Object> info3 = new HashMap<String, Object>(); + info3.put("info2", 3.5); + entity3.addInfo(info3); + Map<String,String> configs3 = new HashMap<String, String>(); + configs3.put("config_1", "123"); + configs3.put("config_3", "abc"); + entity3.setConfigs(configs3); + TimelineEvent event3 = new TimelineEvent(); + event3.setId("event_2"); + event3.setTimestamp(1425016501003L); + entity3.addEvent(event3); + TimelineEvent event4 = new TimelineEvent(); + event4.setId("event_4"); + event4.setTimestamp(1425016502006L); + entity3.addEvent(event4); + Set<TimelineMetric> metrics3 = new HashSet<TimelineMetric>(); + TimelineMetric metric31 = new TimelineMetric(); + metric31.setId("metric1"); + metric31.setType(TimelineMetric.Type.SINGLE_VALUE); + metric31.addValue(1425016501006L, 124.8F); + metrics3.add(metric31); + TimelineMetric metric32 = new TimelineMetric(); + metric32.setId("metric2"); + metric32.setType(TimelineMetric.Type.TIME_SERIES); + metric32.addValue(1425016501056L, 31); + metric32.addValue(1425016501084L, 74); + metrics3.add(metric32); + entity3.setMetrics(metrics3); + entity3.addIsRelatedToEntity("type1", "tid1_2"); + writeEntityFile(entity3, appDir); + + TimelineEntity entity4 = new TimelineEntity(); + entity4.setId("id_4"); + entity4.setType("app"); + entity4.setCreatedTime(1425016502050L); + entity4.setModifiedTime(1425016503010L); + TimelineEvent event44 = new TimelineEvent(); + event44.setId("event_4"); + event44.setTimestamp(1425016502003L); + entity4.addEvent(event44); + writeEntityFile(entity4, appDir); + + File appDir2 = new File(rootDir + + "/entities/cluster1/user1/flow1,flow/1/app2/app/"); + TimelineEntity entity5 = new TimelineEntity(); + entity5.setId("id_5"); + entity5.setType("app"); + entity5.setCreatedTime(1425016502050L); + entity5.setModifiedTime(1425016503010L); + writeEntityFile(entity5, appDir2); + } + + public TimelineReader getTimelineReader() { + return reader; + } + + @Test + public void testGetEntityDefaultView() throws Exception { + // If no fields are specified, entity is returned with default view i.e. + // only the id, created and modified time + TimelineEntity result = + reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", + "app", "id_1", null); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_1")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals(1425016502000L, result.getCreatedTime()); + Assert.assertEquals(1425016503000L, result.getModifiedTime()); + Assert.assertEquals(0, result.getConfigs().size()); + Assert.assertEquals(0, result.getMetrics().size()); + } + + @Test + public void testGetEntityByClusterAndApp() throws Exception { + // Cluster and AppId should be enough to get an entity. + TimelineEntity result = + reader.getEntity(null, "cluster1", null, null, "app1", + "app", "id_1", null); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_1")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals(1425016502000L, result.getCreatedTime()); + Assert.assertEquals(1425016503000L, result.getModifiedTime()); + Assert.assertEquals(0, result.getConfigs().size()); + Assert.assertEquals(0, result.getMetrics().size()); + } + + /** This test checks whether we can handle commas in app flow mapping csv */ + @Test + public void testAppFlowMappingCsv() throws Exception { + // Test getting an entity by cluster and app where flow entry + // in app flow mapping csv has commas. + TimelineEntity result = + reader.getEntity(null, "cluster1", null, null, "app2", + "app", "id_5", null); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_5")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals(1425016502050L, result.getCreatedTime()); + Assert.assertEquals(1425016503010L, result.getModifiedTime()); + } + + @Test + public void testGetEntityCustomFields() throws Exception { + // Specified fields in addition to default view will be returned. + TimelineEntity result = + reader.getEntity("user1", "cluster1", "flow1", 1L, + "app1", "app", "id_1", + EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS)); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_1")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals(1425016502000L, result.getCreatedTime()); + Assert.assertEquals(1425016503000L, result.getModifiedTime()); + Assert.assertEquals(3, result.getConfigs().size()); + Assert.assertEquals(3, result.getMetrics().size()); + Assert.assertEquals(1, result.getInfo().size()); + // No events will be returned + Assert.assertEquals(0, result.getEvents().size()); + } + + @Test + public void testGetEntityAllFields() throws Exception { + // All fields of TimelineEntity will be returned. + TimelineEntity result = + reader.getEntity("user1", "cluster1", "flow1", 1L, + "app1", "app", "id_1", EnumSet.of(Field.ALL)); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_1")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals(1425016502000L, result.getCreatedTime()); + Assert.assertEquals(1425016503000L, result.getModifiedTime()); + Assert.assertEquals(3, result.getConfigs().size()); + Assert.assertEquals(3, result.getMetrics().size()); + // All fields including events will be returned. + Assert.assertEquals(2, result.getEvents().size()); + } + + @Test + public void testGetAllEntities() throws Exception { + Set<TimelineEntity> result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, null, null, null, null, null, null, null, + null, null); + // All 3 entities will be returned + Assert.assertEquals(4, result.size()); + } + + @Test + public void testGetEntitiesWithLimit() throws Exception { + Set<TimelineEntity> result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + 2L, null, null, null, null, null, null, null, null, null, + null, null); + Assert.assertEquals(2, result.size()); + // Needs to be rewritten once hashcode and equals for + // TimelineEntity is implemented + // Entities with id_1 and id_4 should be returned, + // based on created time, descending. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) { + Assert.fail("Entity not sorted by created time"); + } + } + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + 3L, null, null, null, null, null, null, null, null, null, + null, null); + // Even though 2 entities out of 4 have same created time, one entity + // is left out due to limit + Assert.assertEquals(3, result.size()); + } + + @Test + public void testGetEntitiesByTimeWindows() throws Exception { + // Get entities based on created time start and end time range. + Set<TimelineEntity> result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, 1425016502030L, 1425016502060L, null, null, null, null, null, + null, null, null, null); + Assert.assertEquals(1, result.size()); + // Only one entity with ID id_4 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_4")) { + Assert.fail("Incorrect filtering based on created time range"); + } + } + + // Get entities if only created time end is specified. + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, 1425016502010L, null, null, null, null, null, null, + null, null, null); + Assert.assertEquals(3, result.size()); + for (TimelineEntity entity : result) { + if (entity.getId().equals("id_4")) { + Assert.fail("Incorrect filtering based on created time range"); + } + } + + // Get entities if only created time start is specified. + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, 1425016502010L, null, null, null, null, null, null, null, + null, null, null); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_4")) { + Assert.fail("Incorrect filtering based on created time range"); + } + } + + // Get entities based on modified time start and end time range. + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, 1425016502090L, 1425016503020L, null, null, null, + null, null, null, null); + Assert.assertEquals(2, result.size()); + // Two entities with IDs' id_1 and id_4 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) { + Assert.fail("Incorrect filtering based on modified time range"); + } + } + + // Get entities if only modified time end is specified. + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, null, 1425016502090L, null, null, null, null, + null, null, null); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) { + Assert.fail("Incorrect filtering based on modified time range"); + } + } + + // Get entities if only modified time start is specified. + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, 1425016503005L, null, null, null, null, null, + null, null, null); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_4")) { + Assert.fail("Incorrect filtering based on modified time range"); + } + } + } + + @Test + public void testGetFilteredEntities() throws Exception { + // Get entities based on info filters. + Map<String, Object> infoFilters = new HashMap<String, Object>(); + infoFilters.put("info2", 3.5); + Set<TimelineEntity> result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, null, null, null, null, infoFilters, null, null, + null, null); + Assert.assertEquals(1, result.size()); + // Only one entity with ID id_3 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on info filters"); + } + } + + // Get entities based on config filters. + Map<String, String> configFilters = new HashMap<String, String>(); + configFilters.put("config_1", "123"); + configFilters.put("config_3", "abc"); + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, null, null, null, null, null, configFilters, null, + null, null); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + // Get entities based on event filters. + Set<String> eventFilters = new HashSet<String>(); + eventFilters.add("event_2"); + eventFilters.add("event_4"); + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, null, null, null, null, null, null, null, + eventFilters, null); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on event filters"); + } + } + + // Get entities based on metric filters. + Set<String> metricFilters = new HashSet<String>(); + metricFilters.add("metric3"); + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, null, null, null, null, null, null, metricFilters, + null, null); + Assert.assertEquals(2, result.size()); + // Two entities with IDs' id_1 and id_2 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + } + + @Test + public void testGetEntitiesByRelations() throws Exception { + // Get entities based on relatesTo. + Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>(); + Set<String> relatesToIds = new HashSet<String>(); + relatesToIds.add("flow1"); + relatesTo.put("flow", relatesToIds); + Set<TimelineEntity> result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, null, null, relatesTo, null, null, null, null, + null, null); + Assert.assertEquals(1, result.size()); + // Only one entity with ID id_1 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1")) { + Assert.fail("Incorrect filtering based on relatesTo"); + } + } + + // Get entities based on isRelatedTo. + Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>(); + Set<String> isRelatedToIds = new HashSet<String>(); + isRelatedToIds.add("tid1_2"); + isRelatedTo.put("type1", isRelatedToIds); + result = + reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", + null, null, null, null, null, null, isRelatedTo, null, null, null, + null, null); + Assert.assertEquals(2, result.size()); + // Two entities with IDs' id_1 and id_3 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on isRelatedTo"); + } + } + } +}