Repository: hadoop Updated Branches: refs/heads/YARN-2928 a95b8f5a0 -> 09c357695
YARN-3864. Implement support for querying single app and all apps for a flow run (Varun Saxena via sjlee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/09c35769 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/09c35769 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/09c35769 Branch: refs/heads/YARN-2928 Commit: 09c35769513250b4f89dbc429474532adecb665b Parents: a95b8f5 Author: Sangjin Lee <sj...@apache.org> Authored: Mon Oct 5 13:14:11 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Mon Oct 5 13:14:11 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../reader/TimelineReaderWebServices.java | 206 +++++- .../storage/ApplicationEntityReader.java | 64 +- .../storage/FlowActivityEntityReader.java | 33 +- .../storage/FlowRunEntityReader.java | 2 +- .../storage/GenericEntityReader.java | 16 +- .../storage/TimelineEntityReader.java | 21 +- .../storage/TimelineEntityReaderFactory.java | 2 +- .../storage/application/ApplicationRowKey.java | 34 + .../TestTimelineReaderWebServicesFlowRun.java | 405 ----------- ...stTimelineReaderWebServicesHBaseStorage.java | 673 +++++++++++++++++++ 11 files changed, 1000 insertions(+), 459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e2ef463..96e9c5c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -118,6 +118,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-4210. HBase reader throws NPE if Get returns no rows (Varun Saxena via vrushali) + YARN-3864. Implement support for querying single app and all apps for a + flow run (Varun Saxena via sjlee) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index a327099..610f74c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -286,7 +286,7 @@ public class TimelineReaderWebServices { @QueryParam("eventfilters") String eventfilters, @QueryParam("fields") String fields) { String url = req.getRequestURI() + - (null == req.getQueryString() ? "" : + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); UserGroupInformation callerUGI = getUser(req); LOG.info("Received URL " + url + " from user " + getUserName(callerUGI)); @@ -310,7 +310,7 @@ public class TimelineReaderWebServices { parseFieldsStr(fields, COMMA_DELIMITER)); } catch (Exception e) { handleException(e, url, startTime, - "createdTime or modifiedTime start/end or limit or flowId"); + "createdTime or modifiedTime start/end or limit or flowrunid"); } long endTime = Time.monotonicNow(); if (entities == null) { @@ -360,7 +360,7 @@ public class TimelineReaderWebServices { @QueryParam("flowrunid") String flowRunId, @QueryParam("fields") String fields) { String url = req.getRequestURI() + - (null == req.getQueryString() ? "" : + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); UserGroupInformation callerUGI = getUser(req); LOG.info("Received URL " + url + " from user " + getUserName(callerUGI)); @@ -420,7 +420,7 @@ public class TimelineReaderWebServices { @QueryParam("userid") String userId, @QueryParam("fields") String fields) { String url = req.getRequestURI() + - (null == req.getQueryString() ? "" : + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); UserGroupInformation callerUGI = getUser(req); LOG.info("Received URL " + url + " from user " + getUserName(callerUGI)); @@ -477,7 +477,7 @@ public class TimelineReaderWebServices { @QueryParam("limit") String limit, @QueryParam("fields") String fields) { String url = req.getRequestURI() + - (null == req.getQueryString() ? "" : + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); UserGroupInformation callerUGI = getUser(req); LOG.info("Received URL " + url + " from user " + getUserName(callerUGI)); @@ -502,4 +502,200 @@ public class TimelineReaderWebServices { " (Took " + (endTime - startTime) + " ms.)"); return entities; } + + /** + * Return a single app for given app id. Cluster ID is not provided by + * client so default cluster ID has to be taken. + */ + @GET + @Path("/app/{appid}/") + @Produces(MediaType.APPLICATION_JSON) + public TimelineEntity getApp( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("appid") String appId, + @QueryParam("flowid") String flowId, + @QueryParam("flowrunid") String flowRunId, + @QueryParam("userid") String userId, + @QueryParam("fields") String fields) { + return getApp(req, res, null, appId, flowId, flowRunId, userId, fields); + } + + /** + * Return a single app for given cluster id and app id. + */ + @GET + @Path("/app/{clusterid}/{appid}/") + @Produces(MediaType.APPLICATION_JSON) + public TimelineEntity getApp( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("clusterid") String clusterId, + @PathParam("appid") String appId, + @QueryParam("flowid") String flowId, + @QueryParam("flowrunid") String flowRunId, + @QueryParam("userid") String userId, + @QueryParam("fields") String fields) { + String url = req.getRequestURI() + + (req.getQueryString() == null ? "" : + QUERY_STRING_SEP + req.getQueryString()); + UserGroupInformation callerUGI = getUser(req); + LOG.info("Received URL " + url + " from user " + getUserName(callerUGI)); + long startTime = Time.monotonicNow(); + init(res); + TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); + TimelineEntity entity = null; + try { + entity = timelineReaderManager.getEntity( + parseUser(callerUGI, userId), parseStr(clusterId), + parseStr(flowId), parseLongStr(flowRunId), parseStr(appId), + TimelineEntityType.YARN_APPLICATION.toString(), null, + parseFieldsStr(fields, COMMA_DELIMITER)); + } catch (Exception e) { + handleException(e, url, startTime, "flowrunid"); + } + long endTime = Time.monotonicNow(); + if (entity == null) { + LOG.info("Processed URL " + url + " but app not found" + " (Took " + + (endTime - startTime) + " ms.)"); + throw new NotFoundException("App " + appId + " not found"); + } + LOG.info("Processed URL " + url + + " (Took " + (endTime - startTime) + " ms.)"); + return entity; + } + + /** + * Return a list of apps for given flow id and flow run id. Cluster ID is not + * provided by client so default cluster ID has to be taken. If number of + * matching apps are more than the limit, most recent apps till the limit is + * reached, will be returned. + */ + @GET + @Path("/flowrunapps/{flowid}/{flowrunid}/") + @Produces(MediaType.APPLICATION_JSON) + public Set<TimelineEntity> getFlowRunApps( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("flowid") String flowId, + @PathParam("flowrunid") String flowRunId, + @QueryParam("userid") String userId, + @QueryParam("limit") String limit, + @QueryParam("createdtimestart") String createdTimeStart, + @QueryParam("createdtimeend") String createdTimeEnd, + @QueryParam("modifiedtimestart") String modifiedTimeStart, + @QueryParam("modifiedtimeend") String modifiedTimeEnd, + @QueryParam("relatesto") String relatesTo, + @QueryParam("isrelatedto") String isRelatedTo, + @QueryParam("infofilters") String infofilters, + @QueryParam("conffilters") String conffilters, + @QueryParam("metricfilters") String metricfilters, + @QueryParam("eventfilters") String eventfilters, + @QueryParam("fields") String fields) { + return getEntities(req, res, null, null, + TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId, + flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart, + modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, + metricfilters, eventfilters, fields); + } + + /** + * Return a list of apps for a given cluster id, flow id and flow run id. If + * number of matching apps are more than the limit, most recent apps till the + * limit is reached, will be returned. + */ + @GET + @Path("/flowrunapps/{clusterid}/{flowid}/{flowrunid}/") + @Produces(MediaType.APPLICATION_JSON) + public Set<TimelineEntity> getFlowRunApps( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("clusterid") String clusterId, + @PathParam("flowid") String flowId, + @PathParam("flowrunid") String flowRunId, + @QueryParam("userid") String userId, + @QueryParam("limit") String limit, + @QueryParam("createdtimestart") String createdTimeStart, + @QueryParam("createdtimeend") String createdTimeEnd, + @QueryParam("modifiedtimestart") String modifiedTimeStart, + @QueryParam("modifiedtimeend") String modifiedTimeEnd, + @QueryParam("relatesto") String relatesTo, + @QueryParam("isrelatedto") String isRelatedTo, + @QueryParam("infofilters") String infofilters, + @QueryParam("conffilters") String conffilters, + @QueryParam("metricfilters") String metricfilters, + @QueryParam("eventfilters") String eventfilters, + @QueryParam("fields") String fields) { + return getEntities(req, res, clusterId, null, + TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId, + flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart, + modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, + metricfilters, eventfilters, fields); + } + + /** + * Return a list of apps for given flow id. Cluster ID is not provided by + * client so default cluster ID has to be taken. If number of matching apps + * are more than the limit, most recent apps till the limit is reached, will + * be returned. + */ + @GET + @Path("/flowapps/{flowid}/") + @Produces(MediaType.APPLICATION_JSON) + public Set<TimelineEntity> getFlowApps( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("flowid") String flowId, + @QueryParam("userid") String userId, + @QueryParam("limit") String limit, + @QueryParam("createdtimestart") String createdTimeStart, + @QueryParam("createdtimeend") String createdTimeEnd, + @QueryParam("modifiedtimestart") String modifiedTimeStart, + @QueryParam("modifiedtimeend") String modifiedTimeEnd, + @QueryParam("relatesto") String relatesTo, + @QueryParam("isrelatedto") String isRelatedTo, + @QueryParam("infofilters") String infofilters, + @QueryParam("conffilters") String conffilters, + @QueryParam("metricfilters") String metricfilters, + @QueryParam("eventfilters") String eventfilters, + @QueryParam("fields") String fields) { + return getEntities(req, res, null, null, + TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId, + null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart, + modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, + metricfilters, eventfilters, fields); + } + + /** + * Return a list of apps for a given cluster id and flow id. If number of + * matching apps are more than the limit, most recent apps till the limit is + * reached, will be returned. + */ + @GET + @Path("/flowapps/{clusterid}/{flowid}/") + @Produces(MediaType.APPLICATION_JSON) + public Set<TimelineEntity> getFlowApps( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("clusterid") String clusterId, + @PathParam("flowid") String flowId, + @QueryParam("userid") String userId, + @QueryParam("limit") String limit, + @QueryParam("createdtimestart") String createdTimeStart, + @QueryParam("createdtimeend") String createdTimeEnd, + @QueryParam("modifiedtimestart") String modifiedTimeStart, + @QueryParam("modifiedtimeend") String modifiedTimeEnd, + @QueryParam("relatesto") String relatesTo, + @QueryParam("isrelatedto") String isRelatedTo, + @QueryParam("infofilters") String infofilters, + @QueryParam("conffilters") String conffilters, + @QueryParam("metricfilters") String metricfilters, + @QueryParam("eventfilters") String eventfilters, + @QueryParam("fields") String fields) { + return getEntities(req, res, clusterId, null, + TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId, + null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart, + modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, + metricfilters, eventfilters, fields); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java index d5b5d63..61954e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.IOException; -import java.util.Collections; import java.util.EnumSet; import java.util.Map; import java.util.Set; @@ -28,6 +27,8 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; @@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; +import com.google.common.base.Preconditions; + /** * Timeline entity reader for application entities that are stored in the * application table. @@ -57,7 +60,7 @@ class ApplicationEntityReader extends GenericEntityReader { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve); + eventFilters, fieldsToRetrieve, true); } public ApplicationEntityReader(String userId, String clusterId, @@ -86,10 +89,63 @@ class ApplicationEntityReader extends GenericEntityReader { } @Override + protected void validateParams() { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + } else { + Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + if (singleEntityRead) { + if (flowId == null || flowRunId == null) { + FlowContext context = + lookupFlowContext(clusterId, appId, hbaseConf, conn); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (!singleEntityRead) { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + } + } + + @Override protected ResultScanner getResults(Configuration hbaseConf, Connection conn) throws IOException { - throw new UnsupportedOperationException( - "we don't support multiple apps query"); + Scan scan = new Scan(); + if (flowRunId != null) { + scan.setRowPrefixFilter(ApplicationRowKey. + getRowKeyPrefix(clusterId, userId, flowId, flowRunId)); + } else { + scan.setRowPrefixFilter(ApplicationRowKey. + getRowKeyPrefix(clusterId, userId, flowId)); + } + scan.setFilter(new PageFilter(limit)); + return table.getResultScanner(hbaseConf, conn, scan); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java index e68ca17..70a0915 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java @@ -20,9 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.IOException; import java.util.EnumSet; import java.util.Map; -import java.util.NavigableSet; import java.util.Set; -import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -60,7 +58,7 @@ class FlowActivityEntityReader extends TimelineEntityReader { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve); + eventFilters, fieldsToRetrieve, true); } public FlowActivityEntityReader(String userId, String clusterId, @@ -78,35 +76,6 @@ class FlowActivityEntityReader extends TimelineEntityReader { return FLOW_ACTIVITY_TABLE; } - /** - * Since this is strictly sorted by the row key, it is sufficient to collect - * the first results as specified by the limit. - */ - @Override - public Set<TimelineEntity> readEntities(Configuration hbaseConf, - Connection conn) throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - NavigableSet<TimelineEntity> entities = new TreeSet<>(); - ResultScanner results = getResults(hbaseConf, conn); - try { - for (Result result : results) { - TimelineEntity entity = parseEntity(result); - if (entity == null) { - continue; - } - entities.add(entity); - if (entities.size() == limit) { - break; - } - } - return entities; - } finally { - results.close(); - } - } - @Override protected void validateParams() { Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java index b5d7ae5..90ce28f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java @@ -56,7 +56,7 @@ class FlowRunEntityReader extends TimelineEntityReader { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve); + eventFilters, fieldsToRetrieve, false); } public FlowRunEntityReader(String userId, String clusterId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java index 396a02b..42079d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java @@ -61,8 +61,8 @@ class GenericEntityReader extends TimelineEntityReader { private static final EntityTable ENTITY_TABLE = new EntityTable(); private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); - private static final long DEFAULT_BEGIN_TIME = 0L; - private static final long DEFAULT_END_TIME = Long.MAX_VALUE; + protected static final long DEFAULT_BEGIN_TIME = 0L; + protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; /** * Used to look up the flow context. @@ -76,11 +76,11 @@ class GenericEntityReader extends TimelineEntityReader { Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, Map<String, Object> infoFilters, Map<String, String> configFilters, Set<String> metricFilters, Set<String> eventFilters, - EnumSet<Field> fieldsToRetrieve) { + EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve); + eventFilters, fieldsToRetrieve, sortedKeys); } public GenericEntityReader(String userId, String clusterId, @@ -97,7 +97,7 @@ class GenericEntityReader extends TimelineEntityReader { return ENTITY_TABLE; } - private FlowContext lookupFlowContext(String clusterId, String appId, + protected FlowContext lookupFlowContext(String clusterId, String appId, Configuration hbaseConf, Connection conn) throws IOException { byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); Get get = new Get(rowKey); @@ -113,9 +113,9 @@ class GenericEntityReader extends TimelineEntityReader { } } - private static class FlowContext { - private final String flowId; - private final Long flowRunId; + protected static class FlowContext { + protected final String flowId; + protected final Long flowRunId; public FlowContext(String flowId, Long flowRunId) { this.flowId = flowId; this.flowRunId = flowRunId; http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java index 93be2db..d4a659c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java @@ -74,6 +74,14 @@ abstract class TimelineEntityReader { protected BaseTable<?> table; /** + * Specifies whether keys for this table are sorted in a manner where entities + * can be retrieved by created time. If true, it will be sufficient to collect + * the first results as specified by the limit. Otherwise all matched entities + * will be fetched and then limit applied. + */ + private boolean sortedKeys = false; + + /** * Instantiates a reader for multiple-entity reads. */ protected TimelineEntityReader(String userId, String clusterId, @@ -83,8 +91,9 @@ abstract class TimelineEntityReader { Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, Map<String, Object> infoFilters, Map<String, String> configFilters, Set<String> metricFilters, Set<String> eventFilters, - EnumSet<Field> fieldsToRetrieve) { + EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) { this.singleEntityRead = false; + this.sortedKeys = sortedKeys; this.userId = userId; this.clusterId = clusterId; this.flowId = flowId; @@ -162,8 +171,14 @@ abstract class TimelineEntityReader { continue; } entities.add(entity); - if (entities.size() > limit) { - entities.pollLast(); + if (!sortedKeys) { + if (entities.size() > limit) { + entities.pollLast(); + } + } else { + if (entities.size() == limit) { + break; + } } } return entities; http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java index 4fdef40..f5341c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java @@ -91,7 +91,7 @@ class TimelineEntityReaderFactory { appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); + fieldsToRetrieve, false); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java index e3b5a87..10e3c2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java @@ -61,6 +61,40 @@ public class ApplicationRowKey { } /** + * Constructs a row key prefix for the application table as follows: + * {@code clusterId!userName!flowId!} + * + * @param clusterId + * @param userId + * @param flowId + * @return byte array with the row key prefix + */ + public static byte[] getRowKeyPrefix(String clusterId, String userId, + String flowId) { + byte[] first = Bytes.toBytes( + Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId)); + return Separator.QUALIFIERS.join(first, new byte[0]); + } + + /** + * Constructs a row key prefix for the application table as follows: + * {@code clusterId!userName!flowId!flowRunId!} + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @return byte array with the row key prefix + */ + public static byte[] getRowKeyPrefix(String clusterId, String userId, + String flowId, Long flowRunId) { + byte[] first = Bytes.toBytes( + Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId)); + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); + return Separator.QUALIFIERS.join(first, second, new byte[0]); + } + + /** * Constructs a row key for the application table as follows: * {@code clusterId!userName!flowId!flowRunId!AppId} * http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java deleted file mode 100644 index e359f78..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java +++ /dev/null @@ -1,405 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.reader; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.HttpURLConnection; -import java.net.URI; -import java.net.URL; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import javax.ws.rs.core.MediaType; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.ClientResponse.Status; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; - -public class TestTimelineReaderWebServicesFlowRun { - private int serverPort; - private TimelineReaderServer server; - private static HBaseTestingUtility util; - private static long ts = System.currentTimeMillis(); - - @BeforeClass - public static void setup() throws Exception { - util = new HBaseTestingUtility(); - Configuration conf = util.getConfiguration(); - conf.setInt("hfile.format.version", 3); - util.startMiniCluster(); - TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); - loadData(); - } - - private static void loadData() throws Exception { - String cluster = "cluster1"; - String user = "user1"; - String flow = "flow_name"; - String flowVersion = "CF7022C10F1354"; - Long runid = 1002345678919L; - Long runid1 = 1002345678920L; - - TimelineEntities te = new TimelineEntities(); - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunMetrics_test"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - Long cTime = 1425016501000L; - entity.setCreatedTime(cTime); - - // add metrics - Set<TimelineMetric> metrics = new HashSet<>(); - TimelineMetric m1 = new TimelineMetric(); - m1.setId("MAP_SLOT_MILLIS"); - Map<Long, Number> metricValues = new HashMap<Long, Number>(); - metricValues.put(ts - 100000, 2); - metricValues.put(ts - 80000, 40); - m1.setType(Type.TIME_SERIES); - m1.setValues(metricValues); - metrics.add(m1); - - m1 = new TimelineMetric(); - m1.setId("HDFS_BYTES_READ"); - metricValues = new HashMap<Long, Number>(); - metricValues.put(ts - 100000, 31); - metricValues.put(ts - 80000, 57); - m1.setType(Type.TIME_SERIES); - m1.setValues(metricValues); - metrics.add(m1); - entity.addMetrics(metrics); - - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - Long expTs = 1436512802000L; - event.setTimestamp(expTs); - String expKey = "foo_event"; - Object expVal = "test"; - event.addInfo(expKey, expVal); - entity.addEvent(event); - te.addEntity(entity); - - // write another application with same metric to this flow - TimelineEntities te1 = new TimelineEntities(); - TimelineEntity entity1 = new TimelineEntity(); - id = "flowRunMetrics_test"; - type = TimelineEntityType.YARN_APPLICATION.toString(); - entity1.setId(id); - entity1.setType(type); - cTime = 1425016501000L; - entity1.setCreatedTime(cTime); - // add metrics - metrics.clear(); - TimelineMetric m2 = new TimelineMetric(); - m2.setId("MAP_SLOT_MILLIS"); - metricValues = new HashMap<Long, Number>(); - metricValues.put(ts - 100000, 5L); - metricValues.put(ts - 80000, 101L); - m2.setType(Type.TIME_SERIES); - m2.setValues(metricValues); - metrics.add(m2); - entity1.addMetrics(metrics); - te1.addEntity(entity1); - - String flow2 = "flow_name2"; - String flowVersion2 = "CF7022C10F1454"; - Long runid2 = 2102356789046L; - TimelineEntities te3 = new TimelineEntities(); - TimelineEntity entity3 = new TimelineEntity(); - id = "flowRunMetrics_test1"; - entity3.setId(id); - entity3.setType(type); - cTime = 1425016501030L; - entity3.setCreatedTime(cTime); - TimelineEvent event2 = new TimelineEvent(); - event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event2.setTimestamp(1436512802030L); - event2.addInfo("foo_event", "test"); - entity3.addEvent(event2); - te3.addEntity(entity3); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); - hbi.write(cluster, user, flow, flowVersion, runid1, appName, te); - appName = "application_11111111111111_2223"; - hbi.write(cluster, user, flow2, flowVersion2, runid2, appName, te3); - hbi.flush(); - } finally { - hbi.close(); - } - } - - @AfterClass - public static void tearDown() throws Exception { - util.shutdownMiniCluster(); - } - - @Before - public void init() throws Exception { - try { - Configuration config = util.getConfiguration(); - config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - "localhost:0"); - config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); - config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, - "org.apache.hadoop.yarn.server.timelineservice.storage." + - "HBaseTimelineReaderImpl"); - config.setInt("hfile.format.version", 3); - server = new TimelineReaderServer(); - server.init(config); - server.start(); - serverPort = server.getWebServerPort(); - } catch (Exception e) { - Assert.fail("Web server failed to start"); - } - } - - private static Client createClient() { - ClientConfig cfg = new DefaultClientConfig(); - cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); - return new Client(new URLConnectionClientHandler( - new DummyURLConnectionFactory()), cfg); - } - - private static ClientResponse getResponse(Client client, URI uri) - throws Exception { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); - if (resp == null || - resp.getClientResponseStatus() != ClientResponse.Status.OK) { - String msg = new String(); - if (resp != null) { - msg = resp.getClientResponseStatus().toString(); - } - throw new IOException("Incorrect response from timeline reader. " + - "Status=" + msg); - } - return resp; - } - - private static class DummyURLConnectionFactory - implements HttpURLConnectionFactory { - - @Override - public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { - try { - return (HttpURLConnection)url.openConnection(); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } - } - } - - private static TimelineMetric newMetric(String id, long ts, Number value) { - TimelineMetric metric = new TimelineMetric(); - metric.setId(id); - metric.addValue(ts, value); - return metric; - } - - private static boolean verifyMetricValues(Map<Long, Number> m1, - Map<Long, Number> m2) { - for (Map.Entry<Long, Number> entry : m1.entrySet()) { - if (!m2.containsKey(entry.getKey())) { - return false; - } - if (m2.get(entry.getKey()).equals(entry.getValue())) { - return false; - } - } - return true; - } - - private static boolean verifyMetrics( - TimelineMetric m, TimelineMetric... metrics) { - for (TimelineMetric metric : metrics) { - if (!metric.equals(m)) { - continue; - } - if (!verifyMetricValues(metric.getValues(), m.getValues())) { - continue; - } - return true; - } - return false; - } - - private static void verifyHttpResponse(Client client, URI uri, - Status status) { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertNotNull(resp); - assertTrue("Response from server should have been " + status, - resp.getClientResponseStatus().equals(status)); - } - - @Test - public void testGetFlowRun() throws Exception { - Client client = createClient(); - try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1"); - ClientResponse resp = getResponse(client, uri); - FlowRunEntity entity = resp.getEntity(FlowRunEntity.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); - assertNotNull(entity); - assertEquals("user1@flow_name/1002345678919", entity.getId()); - assertEquals(2, entity.getMetrics().size()); - TimelineMetric m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L); - TimelineMetric m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L); - for (TimelineMetric metric : entity.getMetrics()) { - assertTrue(verifyMetrics(metric, m1, m2)); - } - - // Query without specifying cluster ID. - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrun/flow_name/1002345678919?userid=user1"); - resp = getResponse(client, uri); - entity = resp.getEntity(FlowRunEntity.class); - assertNotNull(entity); - assertEquals("user1@flow_name/1002345678919", entity.getId()); - assertEquals(2, entity.getMetrics().size()); - m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L); - m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L); - for (TimelineMetric metric : entity.getMetrics()) { - assertTrue(verifyMetrics(metric, m1, m2)); - } - } finally { - client.destroy(); - } - } - - @Test - public void testGetFlows() throws Exception { - Client client = createClient(); - try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flows/cluster1"); - ClientResponse resp = getResponse(client, uri); - Set<FlowActivityEntity> entities = - resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); - assertNotNull(entities); - assertEquals(2, entities.size()); - for (FlowActivityEntity entity : entities) { - assertTrue((entity.getId().endsWith("@flow_name") && - entity.getFlowRuns().size() == 2) || - (entity.getId().endsWith("@flow_name2") && - entity.getFlowRuns().size() == 1)); - } - - // Query without specifying cluster ID. - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flows/"); - resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); - assertNotNull(entities); - assertEquals(2, entities.size()); - - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flows/cluster1?limit=1"); - resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); - assertNotNull(entities); - assertEquals(1, entities.size()); - } finally { - client.destroy(); - } - } - - @Test - public void testGetFlowRunNotPresent() throws Exception { - Client client = createClient(); - try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1"); - verifyHttpResponse(client, uri, Status.NOT_FOUND); - } finally { - client.destroy(); - } - } - - @Test - public void testGetFlowsNotPresent() throws Exception { - Client client = createClient(); - try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flows/cluster2"); - ClientResponse resp = getResponse(client, uri); - Set<FlowActivityEntity> entities = - resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); - assertNotNull(entities); - assertEquals(0, entities.size()); - } finally { - client.destroy(); - } - } - - @After - public void stop() throws Exception { - if (server != null) { - server.stop(); - server = null; - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/09c35769/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java new file mode 100644 index 0000000..a89d2fc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -0,0 +1,673 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.GenericType; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; + +public class TestTimelineReaderWebServicesHBaseStorage { + private int serverPort; + private TimelineReaderServer server; + private static HBaseTestingUtility util; + private static long ts = System.currentTimeMillis(); + + @BeforeClass + public static void setup() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); + loadData(); + } + + private static void loadData() throws Exception { + String cluster = "cluster1"; + String user = "user1"; + String flow = "flow_name"; + String flowVersion = "CF7022C10F1354"; + Long runid = 1002345678919L; + Long runid1 = 1002345678920L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "application_1111111111_1111"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + entity.addConfig("cfg2", "value1"); + + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + metricValues.put(ts - 100000, 2); + metricValues.put(ts - 80000, 40); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + m1 = new TimelineMetric(); + m1.setId("HDFS_BYTES_READ"); + metricValues = new HashMap<Long, Number>(); + metricValues.put(ts - 100000, 31); + metricValues.put(ts - 80000, 57); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + TimelineEvent event11 = new TimelineEvent(); + event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + expTs = 1436512802010L; + event11.setTimestamp(expTs); + entity.addEvent(event11); + + te.addEntity(entity); + + // write another application with same metric to this flow + TimelineEntities te1 = new TimelineEntities(); + TimelineEntity entity1 = new TimelineEntity(); + id = "application_1111111111_2222"; + type = TimelineEntityType.YARN_APPLICATION.toString(); + entity1.setId(id); + entity1.setType(type); + cTime = 1425016501000L; + entity1.setCreatedTime(cTime); + entity1.addConfig("cfg1", "value1"); + // add metrics + metrics.clear(); + TimelineMetric m2 = new TimelineMetric(); + m2.setId("MAP_SLOT_MILLIS"); + metricValues = new HashMap<Long, Number>(); + metricValues.put(ts - 100000, 5L); + metricValues.put(ts - 80000, 101L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + entity1.addMetrics(metrics); + TimelineEvent event1 = new TimelineEvent(); + event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event1.setTimestamp(expTs); + event1.addInfo(expKey, expVal); + entity1.addEvent(event1); + te1.addEntity(entity1); + + String flow2 = "flow_name2"; + String flowVersion2 = "CF7022C10F1454"; + Long runid2 = 2102356789046L; + TimelineEntities te3 = new TimelineEntities(); + TimelineEntity entity3 = new TimelineEntity(); + id = "application_11111111111111_2223"; + entity3.setId(id); + entity3.setType(type); + cTime = 1425016501030L; + entity3.setCreatedTime(cTime); + TimelineEvent event2 = new TimelineEvent(); + event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event2.setTimestamp(1436512802030L); + event2.addInfo("foo_event", "test"); + entity3.addEvent(event2); + te3.addEntity(entity3); + + TimelineEntities te4 = new TimelineEntities(); + TimelineEntity entity4 = new TimelineEntity(); + id = "application_1111111111_2224"; + entity4.setId(id); + entity4.setType(type); + cTime = 1425016501034L; + entity4.setCreatedTime(cTime); + TimelineEvent event4 = new TimelineEvent(); + event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event4.setTimestamp(1436512802037L); + event4.addInfo("foo_event", "test"); + entity4.addEvent(event4); + te4.addEntity(entity4); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te); + hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1); + hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4); + hbi.write(cluster, user, flow2, + flowVersion2, runid2, entity3.getId(), te3); + hbi.flush(); + } finally { + hbi.close(); + } + } + + @AfterClass + public static void tearDown() throws Exception { + util.shutdownMiniCluster(); + } + + @Before + public void init() throws Exception { + try { + Configuration config = util.getConfiguration(); + config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + "localhost:0"); + config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, + "org.apache.hadoop.yarn.server.timelineservice.storage." + + "HBaseTimelineReaderImpl"); + config.setInt("hfile.format.version", 3); + server = new TimelineReaderServer(); + server.init(config); + server.start(); + serverPort = server.getWebServerPort(); + } catch (Exception e) { + Assert.fail("Web server failed to start"); + } + } + + private static Client createClient() { + ClientConfig cfg = new DefaultClientConfig(); + cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); + return new Client(new URLConnectionClientHandler( + new DummyURLConnectionFactory()), cfg); + } + + private static ClientResponse getResponse(Client client, URI uri) + throws Exception { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + if (resp == null || + resp.getClientResponseStatus() != ClientResponse.Status.OK) { + String msg = new String(); + if (resp != null) { + msg = resp.getClientResponseStatus().toString(); + } + throw new IOException("Incorrect response from timeline reader. " + + "Status=" + msg); + } + return resp; + } + + private static class DummyURLConnectionFactory + implements HttpURLConnectionFactory { + + @Override + public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { + try { + return (HttpURLConnection)url.openConnection(); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } + } + } + + private static TimelineEntity newEntity(String type, String id) { + TimelineEntity entity = new TimelineEntity(); + entity.setIdentifier(new TimelineEntity.Identifier(type, id)); + return entity; + } + + private static TimelineMetric newMetric(TimelineMetric.Type type, + String id, long ts, Number value) { + TimelineMetric metric = new TimelineMetric(type); + metric.setId(id); + metric.addValue(ts, value); + return metric; + } + + private static boolean verifyMetricValues(Map<Long, Number> m1, + Map<Long, Number> m2) { + for (Map.Entry<Long, Number> entry : m1.entrySet()) { + if (!m2.containsKey(entry.getKey())) { + return false; + } + if (m2.get(entry.getKey()).equals(entry.getValue())) { + return false; + } + } + return true; + } + + private static boolean verifyMetrics( + TimelineMetric m, TimelineMetric... metrics) { + for (TimelineMetric metric : metrics) { + if (!metric.equals(m)) { + continue; + } + if (!verifyMetricValues(metric.getValues(), m.getValues())) { + continue; + } + return true; + } + return false; + } + + private static void verifyHttpResponse(Client client, URI uri, + Status status) { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertNotNull(resp); + assertTrue("Response from server should have been " + status, + resp.getClientResponseStatus().equals(status)); + } + + @Test + public void testGetFlowRun() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1"); + ClientResponse resp = getResponse(client, uri); + FlowRunEntity entity = resp.getEntity(FlowRunEntity.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entity); + assertEquals("user1@flow_name/1002345678919", entity.getId()); + assertEquals(2, entity.getMetrics().size()); + TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE, + "HDFS_BYTES_READ", ts - 80000, 57L); + TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE, + "MAP_SLOT_MILLIS", ts - 80000, 141L); + for (TimelineMetric metric : entity.getMetrics()) { + assertTrue(verifyMetrics(metric, m1, m2)); + } + + // Query without specifying cluster ID. + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowrun/flow_name/1002345678919?userid=user1"); + resp = getResponse(client, uri); + entity = resp.getEntity(FlowRunEntity.class); + assertNotNull(entity); + assertEquals("user1@flow_name/1002345678919", entity.getId()); + assertEquals(2, entity.getMetrics().size()); + m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE, + "HDFS_BYTES_READ", ts - 80000, 57L); + m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE, + "MAP_SLOT_MILLIS", ts - 80000, 141L); + for (TimelineMetric metric : entity.getMetrics()) { + assertTrue(verifyMetrics(metric, m1, m2)); + } + } finally { + client.destroy(); + } + } + + @Test + public void testGetFlows() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1"); + ClientResponse resp = getResponse(client, uri); + Set<FlowActivityEntity> entities = + resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); + assertNotNull(entities); + assertEquals(2, entities.size()); + for (FlowActivityEntity entity : entities) { + assertTrue((entity.getId().endsWith("@flow_name") && + entity.getFlowRuns().size() == 2) || + (entity.getId().endsWith("@flow_name2") && + entity.getFlowRuns().size() == 1)); + } + + // Query without specifying cluster ID. + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); + assertNotNull(entities); + assertEquals(2, entities.size()); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster1?limit=1"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); + assertNotNull(entities); + assertEquals(1, entities.size()); + } finally { + client.destroy(); + } + } + + @Test + public void testGetApp() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/app/cluster1/application_1111111111_1111?" + + "userid=user1&fields=ALL&flowid=flow_name&flowrunid=1002345678919"); + ClientResponse resp = getResponse(client, uri); + TimelineEntity entity = resp.getEntity(TimelineEntity.class); + assertNotNull(entity); + assertEquals("application_1111111111_1111", entity.getId()); + assertEquals(2, entity.getMetrics().size()); + TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES, + "HDFS_BYTES_READ", ts - 100000, 31L); + m1.addValue(ts - 80000, 57L); + TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES, + "MAP_SLOT_MILLIS", ts - 100000, 2L); + m2.addValue(ts - 80000, 40L); + for (TimelineMetric metric : entity.getMetrics()) { + assertTrue(verifyMetrics(metric, m1, m2)); + } + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/app/application_1111111111_2222?userid=user1" + + "&fields=metrics&flowid=flow_name&flowrunid=1002345678919"); + resp = getResponse(client, uri); + entity = resp.getEntity(TimelineEntity.class); + assertNotNull(entity); + assertEquals("application_1111111111_2222", entity.getId()); + assertEquals(1, entity.getMetrics().size()); + TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES, + "MAP_SLOT_MILLIS", ts - 100000, 5L); + m2.addValue(ts - 80000, 101L); + for (TimelineMetric metric : entity.getMetrics()) { + assertTrue(verifyMetrics(metric, m3)); + } + } finally { + client.destroy(); + } + } + + @Test + public void testGetAppWithoutFlowInfo() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/app/cluster1/application_1111111111_1111?" + + "userid=user1&fields=ALL"); + ClientResponse resp = getResponse(client, uri); + TimelineEntity entity = resp.getEntity(TimelineEntity.class); + assertNotNull(entity); + assertEquals("application_1111111111_1111", entity.getId()); + assertEquals(2, entity.getMetrics().size()); + TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES, + "HDFS_BYTES_READ", ts - 100000, 31L); + m1.addValue(ts - 80000, 57L); + TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES, + "MAP_SLOT_MILLIS", ts - 100000, 2L); + m2.addValue(ts - 80000, 40L); + for (TimelineMetric metric : entity.getMetrics()) { + assertTrue(verifyMetrics(metric, m1, m2)); + } + } finally { + client.destroy(); + } + } + + @Test + public void testGetFlowRunApps() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowrunapps/cluster1/flow_name/1002345678919?" + + "userid=user1&fields=ALL"); + ClientResponse resp = getResponse(client, uri); + Set<TimelineEntity> entities = + resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(2, entities.size()); + for (TimelineEntity entity : entities) { + assertTrue("Unexpected app in result", + (entity.getId().equals("application_1111111111_1111") && + entity.getMetrics().size() == 2) || + (entity.getId().equals("application_1111111111_2222") && + entity.getMetrics().size() == 1)); + } + + // Query without specifying cluster ID. + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowrunapps/flow_name/1002345678919?userid=user1"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(2, entities.size()); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowrunapps/flow_name/1002345678919?userid=user1&limit=1"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(1, entities.size()); + } finally { + client.destroy(); + } + } + + @Test + public void testGetFlowApps() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowapps/cluster1/flow_name?userid=user1&fields=ALL"); + ClientResponse resp = getResponse(client, uri); + Set<TimelineEntity> entities = + resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(3, entities.size()); + for (TimelineEntity entity : entities) { + assertTrue("Unexpected app in result", + (entity.getId().equals("application_1111111111_1111") && + entity.getMetrics().size() == 2) || + (entity.getId().equals("application_1111111111_2222") && + entity.getMetrics().size() == 1) || + (entity.getId().equals("application_1111111111_2224") && + entity.getMetrics().size() == 0)); + } + + // Query without specifying cluster ID. + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowapps/flow_name?userid=user1"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(3, entities.size()); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowapps/flow_name?userid=user1&limit=1"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(1, entities.size()); + } finally { + client.destroy(); + } + } + + @Test + public void testGetFlowAppsFilters() throws Exception { + Client client = createClient(); + try { + String entityType = TimelineEntityType.YARN_APPLICATION.toString(); + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowapps/cluster1/flow_name?userid=user1&eventfilters=" + + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + ClientResponse resp = getResponse(client, uri); + Set<TimelineEntity> entities = + resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(1, entities.size()); + assertTrue("Unexpected app in result", entities.contains( + newEntity(entityType, "application_1111111111_1111"))); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowapps/cluster1/flow_name?userid=user1&metricfilters=" + + "HDFS_BYTES_READ"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(1, entities.size()); + assertTrue("Unexpected app in result", entities.contains( + newEntity(entityType, "application_1111111111_1111"))); + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowapps/cluster1/flow_name?userid=user1&conffilters=" + + "cfg1:value1"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(1, entities.size()); + assertTrue("Unexpected app in result", entities.contains( + newEntity(entityType, "application_1111111111_2222"))); + } finally { + client.destroy(); + } + } + + @Test + public void testGetFlowRunNotPresent() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1"); + verifyHttpResponse(client, uri, Status.NOT_FOUND); + } finally { + client.destroy(); + } + } + + @Test + public void testGetFlowsNotPresent() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flows/cluster2"); + ClientResponse resp = getResponse(client, uri); + Set<FlowActivityEntity> entities = + resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entities); + assertEquals(0, entities.size()); + } finally { + client.destroy(); + } + } + + @Test + public void testGetAppNotPresent() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/app/cluster1/flow_name/1002345678919/" + + "application_1111111111_1378?userid=user1"); + verifyHttpResponse(client, uri, Status.NOT_FOUND); + } finally { + client.destroy(); + } + } + + @Test + public void testGetFlowRunAppsNotPresent() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowrunapps/cluster2/flow_name/1002345678919"); + ClientResponse resp = getResponse(client, uri); + Set<TimelineEntity> entities = + resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entities); + assertEquals(0, entities.size()); + } finally { + client.destroy(); + } + } + + @Test + public void testGetFlowAppsNotPresent() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowapps/cluster2/flow_name55"); + ClientResponse resp = getResponse(client, uri); + Set<TimelineEntity> entities = + resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entities); + assertEquals(0, entities.size()); + } finally { + client.destroy(); + } + } + + @After + public void stop() throws Exception { + if (server != null) { + server.stop(); + server = null; + } + } +}