YARN-4237 Support additional queries for ATSv2 Web UI. Contributed by Varun Saxena.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f10622e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f10622e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f10622e Branch: refs/heads/YARN-2928-rebase Commit: 7f10622e75a15be63826a6e991ff1dfa8f9b3e2f Parents: f1d0e93 Author: Li Lu <gtcarre...@apache.org> Authored: Thu Oct 15 10:49:36 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Mon Nov 9 16:13:15 2015 -0800 ---------------------------------------------------------------------- .../reader/TimelineReaderWebServices.java | 64 ++++++++++ .../storage/FlowRunEntityReader.java | 45 ++++++- .../storage/GenericEntityReader.java | 3 - .../storage/TimelineEntityReader.java | 3 + .../storage/flow/FlowRunRowKey.java | 15 +++ ...stTimelineReaderWebServicesHBaseStorage.java | 117 ++++++++++++++++++- 6 files changed, 236 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f10622e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index 610f74c..83062f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -450,6 +450,70 @@ public class TimelineReaderWebServices { } /** + * Return a set of flows runs for the given flow id. + * Cluster ID is not provided by client so default cluster ID has to be taken. + */ + @GET + @Path("/flowruns/{flowid}/") + @Produces(MediaType.APPLICATION_JSON) + public Set<TimelineEntity> getFlowRuns( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("flowid") String flowId, + @QueryParam("userid") String userId, + @QueryParam("limit") String limit, + @QueryParam("createdtimestart") String createdTimeStart, + @QueryParam("createdtimeend") String createdTimeEnd, + @QueryParam("fields") String fields) { + return getFlowRuns(req, res, null, flowId, userId, limit, createdTimeStart, + createdTimeEnd, fields); + } + + /** + * Return a set of flow runs for the given cluster and flow id. + */ + @GET + @Path("/flowruns/{clusterid}/{flowid}/") + @Produces(MediaType.APPLICATION_JSON) + public Set<TimelineEntity> getFlowRuns( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("clusterid") String clusterId, + @PathParam("flowid") String flowId, + @QueryParam("userid") String userId, + @QueryParam("limit") String limit, + @QueryParam("createdtimestart") String createdTimeStart, + @QueryParam("createdtimeend") String createdTimeEnd, + @QueryParam("fields") String fields) { + String url = req.getRequestURI() + + (req.getQueryString() == null ? "" : + QUERY_STRING_SEP + req.getQueryString()); + UserGroupInformation callerUGI = getUser(req); + LOG.info("Received URL " + url + " from user " + getUserName(callerUGI)); + long startTime = Time.monotonicNow(); + init(res); + TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); + Set<TimelineEntity> entities = null; + try { + entities = timelineReaderManager.getEntities( + parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId), + null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), + parseLongStr(limit), parseLongStr(createdTimeStart), + parseLongStr(createdTimeEnd), null, null, null, null, null, null, + null, null, parseFieldsStr(fields, COMMA_DELIMITER)); + } catch (Exception e) { + handleException(e, url, startTime, "createdTime start/end or limit"); + } + long endTime = Time.monotonicNow(); + if (entities == null) { + entities = Collections.emptySet(); + } + LOG.info("Processed URL " + url + + " (Took " + (endTime - startTime) + " ms.)"); + return entities; + } + + /** * Return a list of flows for a given cluster id. Cluster ID is not * provided by client so default cluster ID has to be taken. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f10622e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java index 90ce28f..c4b4e91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; @@ -56,7 +58,7 @@ class FlowRunEntityReader extends TimelineEntityReader { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, false); + eventFilters, fieldsToRetrieve, true); } public FlowRunEntityReader(String userId, String clusterId, @@ -79,11 +81,27 @@ class FlowRunEntityReader extends TimelineEntityReader { Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); Preconditions.checkNotNull(userId, "userId shouldn't be null"); Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); - Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); + } } @Override protected void augmentParams(Configuration hbaseConf, Connection conn) { + if (!singleEntityRead) { + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + } } @Override @@ -99,8 +117,11 @@ class FlowRunEntityReader extends TimelineEntityReader { @Override protected ResultScanner getResults(Configuration hbaseConf, Connection conn) throws IOException { - throw new UnsupportedOperationException( - "multiple entity query is not supported"); + Scan scan = new Scan(); + scan.setRowPrefixFilter( + FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId)); + scan.setFilter(new PageFilter(limit)); + return table.getResultScanner(hbaseConf, conn, scan); } @Override @@ -108,13 +129,23 @@ class FlowRunEntityReader extends TimelineEntityReader { FlowRunEntity flowRun = new FlowRunEntity(); flowRun.setUser(userId); flowRun.setName(flowId); - flowRun.setRunId(flowRunId); + if (singleEntityRead) { + flowRun.setRunId(flowRunId); + } else { + FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); + flowRun.setRunId(rowKey.getFlowRunId()); + } // read the start time Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result); if (startTime != null) { flowRun.setStartTime(startTime.longValue()); } + if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin || + flowRun.getStartTime() > createdTimeEnd)) { + return null; + } + // read the end time if available Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result); if (endTime != null) { @@ -128,7 +159,9 @@ class FlowRunEntityReader extends TimelineEntityReader { } // read metrics - readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); + if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) { + readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); + } // set the id flowRun.setId(flowRun.getId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f10622e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java index c18966f..bbca209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java @@ -60,9 +60,6 @@ class GenericEntityReader extends TimelineEntityReader { private static final EntityTable ENTITY_TABLE = new EntityTable(); private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); - protected static final long DEFAULT_BEGIN_TIME = 0L; - protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; - /** * Used to look up the flow context. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f10622e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java index d4a659c..adaf42e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java @@ -44,6 +44,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix */ abstract class TimelineEntityReader { private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); + protected static final long DEFAULT_BEGIN_TIME = 0L; + protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; + protected final boolean singleEntityRead; protected String userId; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f10622e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index 7ed3651..a14d2bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -55,6 +55,21 @@ public class FlowRunRowKey { } /** + * Constructs a row key prefix for the flow run table as follows: { + * clusterId!userI!flowId!} + * + * @param clusterId + * @param userId + * @param flowId + * @return byte array with the row key prefix + */ + public static byte[] getRowKeyPrefix(String clusterId, String userId, + String flowId) { + return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId, + flowId, "")); + } + + /** * Constructs a row key for the entity table as follows: { * clusterId!userI!flowId!Inverted Flow Run Id} * http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f10622e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index a89d2fc..f6a5090 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -172,11 +172,11 @@ public class TestTimelineReaderWebServicesHBaseStorage { id = "application_11111111111111_2223"; entity3.setId(id); entity3.setType(type); - cTime = 1425016501030L; + cTime = 1425016501037L; entity3.setCreatedTime(cTime); TimelineEvent event2 = new TimelineEvent(); event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event2.setTimestamp(1436512802030L); + event2.setTimestamp(1436512802037L); event2.addInfo("foo_event", "test"); entity3.addEvent(event2); te3.addEntity(entity3); @@ -364,6 +364,119 @@ public class TestTimelineReaderWebServicesHBaseStorage { } } + + @Test + public void testGetFlowRuns() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowruns/cluster1/flow_name?userid=user1"); + ClientResponse resp = getResponse(client, uri); + Set<FlowRunEntity> entities = + resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entities); + assertEquals(2, entities.size()); + for (FlowRunEntity entity : entities) { + assertTrue("Id, run id or start time does not match.", + ((entity.getId().equals("user1@flow_name/1002345678919")) && + (entity.getRunId() == 1002345678919L) && + (entity.getStartTime() == 1425016501000L)) || + ((entity.getId().equals("user1@flow_name/1002345678920")) && + (entity.getRunId() == 1002345678920L) && + (entity.getStartTime() == 1425016501034L))); + assertEquals(0, entity.getMetrics().size()); + } + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowruns/cluster1/flow_name?userid=user1&limit=1"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entities); + assertEquals(1, entities.size()); + for (FlowRunEntity entity : entities) { + assertTrue("Id, run id or start time does not match.", + entity.getId().equals("user1@flow_name/1002345678920") && + entity.getRunId() == 1002345678920L && + entity.getStartTime() == 1425016501034L); + assertEquals(0, entity.getMetrics().size()); + } + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowruns/cluster1/flow_name?userid=user1&" + + "createdtimestart=1425016501030"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entities); + assertEquals(1, entities.size()); + for (FlowRunEntity entity : entities) { + assertTrue("Id, run id or start time does not match.", + entity.getId().equals("user1@flow_name/1002345678920") && + entity.getRunId() == 1002345678920L && + entity.getStartTime() == 1425016501034L); + assertEquals(0, entity.getMetrics().size()); + } + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowruns/cluster1/flow_name?userid=user1&" + + "createdtimestart=1425016500999&createdtimeend=1425016501035"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entities); + assertEquals(2, entities.size()); + for (FlowRunEntity entity : entities) { + assertTrue("Id, run id or start time does not match.", + ((entity.getId().equals("user1@flow_name/1002345678919")) && + (entity.getRunId() == 1002345678919L) && + (entity.getStartTime() == 1425016501000L)) || + ((entity.getId().equals("user1@flow_name/1002345678920")) && + (entity.getRunId() == 1002345678920L) && + (entity.getStartTime() == 1425016501034L))); + assertEquals(0, entity.getMetrics().size()); + } + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowruns/cluster1/flow_name?userid=user1&" + + "createdtimeend=1425016501030"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entities); + assertEquals(1, entities.size()); + for (FlowRunEntity entity : entities) { + assertTrue("Id, run id or start time does not match.", + entity.getId().equals("user1@flow_name/1002345678919") && + entity.getRunId() == 1002345678919L && + entity.getStartTime() == 1425016501000L); + assertEquals(0, entity.getMetrics().size()); + } + + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/flowruns/cluster1/flow_name?userid=user1&fields=metrics"); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); + assertNotNull(entities); + assertEquals(2, entities.size()); + for (FlowRunEntity entity : entities) { + assertTrue("Id, run id or start time does not match.", + ((entity.getId().equals("user1@flow_name/1002345678919")) && + (entity.getRunId() == 1002345678919L) && + (entity.getStartTime() == 1425016501000L) && + (entity.getMetrics().size() == 2)) || + ((entity.getId().equals("user1@flow_name/1002345678920")) && + (entity.getRunId() == 1002345678920L) && + (entity.getStartTime() == 1425016501034L) && + (entity.getMetrics().size() == 0))); + } + } finally { + client.destroy(); + } + } + @Test public void testGetFlows() throws Exception { Client client = createClient();