YARN-4221. Store user in app to flow table (Varun Saxena via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28fc7b14 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28fc7b14 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28fc7b14 Branch: refs/heads/YARN-2928-rebase Commit: 28fc7b14040f7d7584495e3f9b157d1e647876f7 Parents: aaf7454 Author: Sangjin Lee <sj...@apache.org> Authored: Fri Oct 23 22:07:00 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Mon Nov 9 16:13:16 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../reader/TimelineReaderWebServices.java | 103 +++++++++---------- .../storage/ApplicationEntityReader.java | 5 +- .../storage/GenericEntityReader.java | 11 +- .../storage/HBaseTimelineWriterImpl.java | 8 +- .../storage/apptoflow/AppToFlowColumn.java | 8 +- .../storage/apptoflow/AppToFlowTable.java | 3 + ...stTimelineReaderWebServicesHBaseStorage.java | 98 +++++++++++++----- 8 files changed, 147 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/28fc7b14/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7c40f8c..252d7b3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -138,6 +138,8 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-4129. Refactor the SystemMetricPublisher in RM to better support newer events (Naganarasimha G R via sjlee) + YARN-4221. Store user in app to flow table (Varun Saxena via sjlee) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/28fc7b14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index d82a402..d3ff8b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -265,11 +265,6 @@ public class TimelineReaderWebServices { return str == null ? null : str.trim(); } - private static String parseUser(UserGroupInformation callerUGI, String user) { - return (callerUGI != null && (user == null || user.isEmpty()) ? - callerUGI.getUserName().trim() : parseStr(user)); - } - private static UserGroupInformation getUser(HttpServletRequest req) { String remoteUser = req.getRemoteUser(); UserGroupInformation callerUGI = null; @@ -389,7 +384,7 @@ public class TimelineReaderWebServices { Set<TimelineEntity> entities = null; try { entities = timelineReaderManager.getEntities( - parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId), + parseStr(userId), parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId), parseStr(appId), parseStr(entityType), parseLongStr(limit), parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart), @@ -463,7 +458,7 @@ public class TimelineReaderWebServices { TimelineEntity entity = null; try { entity = timelineReaderManager.getEntity( - parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId), + parseStr(userId), parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId), parseStr(appId), parseStr(entityType), parseStr(entityId), parseFieldsStr(fields, COMMA_DELIMITER)); } catch (Exception e) { @@ -482,35 +477,35 @@ public class TimelineReaderWebServices { } /** - * Return a single flow run for the given cluster, flow id and run id. + * Return a single flow run for the given user, flow id and run id. * Cluster ID is not provided by client so default cluster ID has to be taken. */ @GET - @Path("/flowrun/{flowid}/{flowrunid}/") + @Path("/flowrun/{userid}/{flowid}/{flowrunid}/") @Produces(MediaType.APPLICATION_JSON) public TimelineEntity getFlowRun( @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam("userid") String userId, @PathParam("flowid") String flowId, @PathParam("flowrunid") String flowRunId, - @QueryParam("userid") String userId, @QueryParam("fields") String fields) { - return getFlowRun(req, res, null, flowId, flowRunId, userId, fields); + return getFlowRun(req, res, userId, null, flowId, flowRunId, fields); } /** - * Return a single flow run for the given cluster, flow id and run id. + * Return a single flow run for the given user, cluster, flow id and run id. */ @GET - @Path("/flowrun/{clusterid}/{flowid}/{flowrunid}/") + @Path("/flowrun/{userid}/{clusterid}/{flowid}/{flowrunid}/") @Produces(MediaType.APPLICATION_JSON) public TimelineEntity getFlowRun( @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam("userid") String userId, @PathParam("clusterid") String clusterId, @PathParam("flowid") String flowId, @PathParam("flowrunid") String flowRunId, - @QueryParam("userid") String userId, @QueryParam("fields") String fields) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : @@ -522,9 +517,8 @@ public class TimelineReaderWebServices { TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); TimelineEntity entity = null; try { - entity = timelineReaderManager.getEntity( - parseUser(callerUGI, userId), parseStr(clusterId), - parseStr(flowId), parseLongStr(flowRunId), null, + entity = timelineReaderManager.getEntity(parseStr(userId), + parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId), null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, parseFieldsStr(fields, COMMA_DELIMITER)); } catch (Exception e) { @@ -543,37 +537,37 @@ public class TimelineReaderWebServices { } /** - * Return a set of flows runs for the given flow id. + * Return a set of flows runs for the given user and flow id. * Cluster ID is not provided by client so default cluster ID has to be taken. */ @GET - @Path("/flowruns/{flowid}/") + @Path("/flowruns/{userid}/{flowid}/") @Produces(MediaType.APPLICATION_JSON) public Set<TimelineEntity> getFlowRuns( @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam("userid") String userId, @PathParam("flowid") String flowId, - @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("fields") String fields) { - return getFlowRuns(req, res, null, flowId, userId, limit, createdTimeStart, + return getFlowRuns(req, res, userId, null, flowId, limit, createdTimeStart, createdTimeEnd, fields); } /** - * Return a set of flow runs for the given cluster and flow id. + * Return a set of flow runs for the given user, cluster and flow id. */ @GET - @Path("/flowruns/{clusterid}/{flowid}/") + @Path("/flowruns/{userid}/{clusterid}/{flowid}/") @Produces(MediaType.APPLICATION_JSON) public Set<TimelineEntity> getFlowRuns( @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam("userid") String userId, @PathParam("clusterid") String clusterId, @PathParam("flowid") String flowId, - @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @@ -589,11 +583,11 @@ public class TimelineReaderWebServices { Set<TimelineEntity> entities = null; try { entities = timelineReaderManager.getEntities( - parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId), - null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), - parseLongStr(limit), parseLongStr(createdTimeStart), - parseLongStr(createdTimeEnd), null, null, null, null, null, null, - null, null, parseFieldsStr(fields, COMMA_DELIMITER)); + parseStr(userId), parseStr(clusterId), parseStr(flowId), null, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), parseLongStr(limit), + parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd), null, + null, null, null, null, null, null, null, + parseFieldsStr(fields, COMMA_DELIMITER)); } catch (Exception e) { handleException(e, url, startTime, "createdTime start/end or limit"); } @@ -730,10 +724,9 @@ public class TimelineReaderWebServices { TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); TimelineEntity entity = null; try { - entity = timelineReaderManager.getEntity( - parseUser(callerUGI, userId), parseStr(clusterId), - parseStr(flowId), parseLongStr(flowRunId), parseStr(appId), - TimelineEntityType.YARN_APPLICATION.toString(), null, + entity = timelineReaderManager.getEntity(parseStr(userId), + parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId), + parseStr(appId), TimelineEntityType.YARN_APPLICATION.toString(), null, parseFieldsStr(fields, COMMA_DELIMITER)); } catch (Exception e) { handleException(e, url, startTime, "flowrunid"); @@ -750,20 +743,20 @@ public class TimelineReaderWebServices { } /** - * Return a list of apps for given flow id and flow run id. Cluster ID is not - * provided by client so default cluster ID has to be taken. If number of - * matching apps are more than the limit, most recent apps till the limit is - * reached, will be returned. + * Return a list of apps for given user, flow id and flow run id. Cluster ID + * is not provided by client so default cluster ID has to be taken. If number + * of matching apps are more than the limit, most recent apps till the limit + * is reached, will be returned. */ @GET - @Path("/flowrunapps/{flowid}/{flowrunid}/") + @Path("/flowrunapps/{userid}/{flowid}/{flowrunid}/") @Produces(MediaType.APPLICATION_JSON) public Set<TimelineEntity> getFlowRunApps( @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam("userid") String userId, @PathParam("flowid") String flowId, @PathParam("flowrunid") String flowRunId, - @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @@ -784,20 +777,20 @@ public class TimelineReaderWebServices { } /** - * Return a list of apps for a given cluster id, flow id and flow run id. If - * number of matching apps are more than the limit, most recent apps till the - * limit is reached, will be returned. + * Return a list of apps for a given user, cluster id, flow id and flow run + * id. If number of matching apps are more than the limit, most recent apps + * till the limit is reached, will be returned. */ @GET - @Path("/flowrunapps/{clusterid}/{flowid}/{flowrunid}/") + @Path("/flowrunapps/{userid}/{clusterid}/{flowid}/{flowrunid}/") @Produces(MediaType.APPLICATION_JSON) public Set<TimelineEntity> getFlowRunApps( @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam("userid") String userId, @PathParam("clusterid") String clusterId, @PathParam("flowid") String flowId, @PathParam("flowrunid") String flowRunId, - @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @@ -818,19 +811,19 @@ public class TimelineReaderWebServices { } /** - * Return a list of apps for given flow id. Cluster ID is not provided by - * client so default cluster ID has to be taken. If number of matching apps - * are more than the limit, most recent apps till the limit is reached, will - * be returned. + * Return a list of apps for given user and flow id. Cluster ID is not + * provided by client so default cluster ID has to be taken. If number of + * matching apps are more than the limit, most recent apps till the limit is + * reached, will be returned. */ @GET - @Path("/flowapps/{flowid}/") + @Path("/flowapps/{userid}/{flowid}/") @Produces(MediaType.APPLICATION_JSON) public Set<TimelineEntity> getFlowApps( @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam("userid") String userId, @PathParam("flowid") String flowId, - @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @@ -851,19 +844,19 @@ public class TimelineReaderWebServices { } /** - * Return a list of apps for a given cluster id and flow id. If number of - * matching apps are more than the limit, most recent apps till the limit is - * reached, will be returned. + * Return a list of apps for a given user, cluster id and flow id. If number + * of matching apps are more than the limit, most recent apps till the limit + * is reached, will be returned. */ @GET - @Path("/flowapps/{clusterid}/{flowid}/") + @Path("/flowapps/{userid}/{clusterid}/{flowid}/") @Produces(MediaType.APPLICATION_JSON) public Set<TimelineEntity> getFlowApps( @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam("userid") String userId, @PathParam("clusterid") String clusterId, @PathParam("flowid") String flowId, - @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, http://git-wip-us.apache.org/repos/asf/hadoop/blob/28fc7b14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java index 6d1a2ff..8324afd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java @@ -90,12 +90,12 @@ class ApplicationEntityReader extends GenericEntityReader { @Override protected void validateParams() { - Preconditions.checkNotNull(userId, "userId shouldn't be null"); Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); if (singleEntityRead) { Preconditions.checkNotNull(appId, "appId shouldn't be null"); } else { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); } } @@ -104,11 +104,12 @@ class ApplicationEntityReader extends GenericEntityReader { protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { if (singleEntityRead) { - if (flowId == null || flowRunId == null) { + if (flowId == null || flowRunId == null || userId == null) { FlowContext context = lookupFlowContext(clusterId, appId, hbaseConf, conn); flowId = context.flowId; flowRunId = context.flowRunId; + userId = context.userId; } } if (fieldsToRetrieve == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/28fc7b14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java index bbca209..04fc8ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java @@ -100,6 +100,7 @@ class GenericEntityReader extends TimelineEntityReader { Result result = appToFlowTable.getResult(hbaseConf, conn, get); if (result != null && !result.isEmpty()) { return new FlowContext( + AppToFlowColumn.USER_ID.readResult(result).toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(), ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); } else { @@ -110,9 +111,11 @@ class GenericEntityReader extends TimelineEntityReader { } protected static class FlowContext { + protected final String userId; protected final String flowId; protected final Long flowRunId; - public FlowContext(String flowId, Long flowRunId) { + public FlowContext(String user, String flowId, Long flowRunId) { + this.userId = user; this.flowId = flowId; this.flowRunId = flowRunId; } @@ -120,7 +123,6 @@ class GenericEntityReader extends TimelineEntityReader { @Override protected void validateParams() { - Preconditions.checkNotNull(userId, "userId shouldn't be null"); Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); Preconditions.checkNotNull(appId, "appId shouldn't be null"); Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); @@ -132,12 +134,13 @@ class GenericEntityReader extends TimelineEntityReader { @Override protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { - // In reality both should be null or neither should be null - if (flowId == null || flowRunId == null) { + // In reality all three should be null or neither should be null + if (flowId == null || flowRunId == null || userId == null) { FlowContext context = lookupFlowContext(clusterId, appId, hbaseConf, conn); flowId = context.flowId; flowRunId = context.flowRunId; + userId = context.userId; } if (fieldsToRetrieve == null) { fieldsToRetrieve = EnumSet.noneOf(Field.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/28fc7b14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 3649865..a57be55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -162,8 +162,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements String flowName, String flowVersion, long flowRunId, String appId, TimelineEntity te) throws IOException { // store in App to flow table - storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId, - appId, te); + storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te); // store in flow run table storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion, flowRunId, appId, te); @@ -200,11 +199,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements } private void storeInAppToFlowTable(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntity te) throws IOException { + String flowName, long flowRunId, String appId, TimelineEntity te) + throws IOException { byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); + AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId); } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/28fc7b14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java index 859fdca..7f1ecaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java @@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBuffere import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import java.io.IOException; -import java.util.Map; /** * Identifies fully qualified columns for the {@link AppToFlowTable}. @@ -43,7 +42,12 @@ public enum AppToFlowColumn implements Column<AppToFlowTable> { /** * The flow run ID */ - FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"); + FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"), + + /** + * The user + */ + USER_ID(AppToFlowColumnFamily.MAPPING, "user_id"); private final ColumnHelper<AppToFlowTable> column; private final ColumnFamily<AppToFlowTable> columnFamily; http://git-wip-us.apache.org/repos/asf/hadoop/blob/28fc7b14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java index 2467856..b30f253 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java @@ -49,6 +49,9 @@ import java.io.IOException; * | | flowRunId: | * | | 1452828720457 | * | | | + * | | user_id: | + * | | admin | + * | | | * | | | * | | | * |--------------------------------------| http://git-wip-us.apache.org/repos/asf/hadoop/blob/28fc7b14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 4f53fe2..3b285aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -199,6 +199,18 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity4.addEvent(event4); te4.addEntity(entity4); + TimelineEntities te5 = new TimelineEntities(); + TimelineEntity entity5 = new TimelineEntity(); + entity5.setId("entity1"); + entity5.setType("type1"); + entity5.setCreatedTime(1425016501034L); + te5.addEntity(entity5); + TimelineEntity entity6 = new TimelineEntity(); + entity6.setId("entity2"); + entity6.setType("type1"); + entity6.setCreatedTime(1425016501034L); + te5.addEntity(entity6); + HBaseTimelineWriterImpl hbi = null; Configuration c1 = util.getConfiguration(); try { @@ -209,6 +221,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4); hbi.write(cluster, user, flow2, flowVersion2, runid2, entity3.getId(), te3); + hbi.write(cluster, user, flow, flowVersion, runid, + "application_1111111111_1111", te5); hbi.flush(); } finally { hbi.close(); @@ -333,7 +347,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1"); + "timeline/flowrun/user1/cluster1/flow_name/1002345678919"); ClientResponse resp = getResponse(client, uri); FlowRunEntity entity = resp.getEntity(FlowRunEntity.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); @@ -350,7 +364,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // Query without specifying cluster ID. uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrun/flow_name/1002345678919?userid=user1"); + "timeline/flowrun/user1/flow_name/1002345678919"); resp = getResponse(client, uri); entity = resp.getEntity(FlowRunEntity.class); assertNotNull(entity); @@ -374,7 +388,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowruns/cluster1/flow_name?userid=user1"); + "timeline/flowruns/user1/cluster1/flow_name"); ClientResponse resp = getResponse(client, uri); Set<FlowRunEntity> entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); @@ -393,7 +407,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowruns/cluster1/flow_name?userid=user1&limit=1"); + "timeline/flowruns/user1/cluster1/flow_name?limit=1"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); @@ -408,7 +422,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowruns/cluster1/flow_name?userid=user1&" + + "timeline/flowruns/user1/cluster1/flow_name?" + "createdtimestart=1425016501030"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); @@ -424,7 +438,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowruns/cluster1/flow_name?userid=user1&" + + "timeline/flowruns/user1/cluster1/flow_name?" + "createdtimestart=1425016500999&createdtimeend=1425016501035"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); @@ -443,7 +457,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowruns/cluster1/flow_name?userid=user1&" + + "timeline/flowruns/user1/cluster1/flow_name?" + "createdtimeend=1425016501030"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); @@ -459,7 +473,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowruns/cluster1/flow_name?userid=user1&fields=metrics"); + "timeline/flowruns/user1/cluster1/flow_name?fields=metrics"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); @@ -620,7 +634,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/app/cluster1/application_1111111111_1111?" + - "userid=user1&fields=ALL"); + "fields=ALL"); ClientResponse resp = getResponse(client, uri); TimelineEntity entity = resp.getEntity(TimelineEntity.class); assertNotNull(entity); @@ -641,12 +655,48 @@ public class TestTimelineReaderWebServicesHBaseStorage { } @Test + public void testGetEntityWithoutFlowInfo() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/entity/cluster1/application_1111111111_1111/type1/entity1"); + ClientResponse resp = getResponse(client, uri); + TimelineEntity entity = resp.getEntity(TimelineEntity.class); + assertNotNull(entity); + assertEquals("entity1", entity.getId()); + assertEquals("type1", entity.getType()); + } finally { + client.destroy(); + } + } + + @Test + public void testGetEntitiesWithoutFlowInfo() throws Exception { + Client client = createClient(); + try { + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + "timeline/entities/cluster1/application_1111111111_1111/type1"); + ClientResponse resp = getResponse(client, uri); + Set<TimelineEntity> entities = + resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + assertNotNull(entities); + assertEquals(2, entities.size()); + for (TimelineEntity entity : entities) { + assertTrue(entity.getId().equals("entity1") || + entity.getId().equals("entity2")); + } + } finally { + client.destroy(); + } + } + + @Test public void testGetFlowRunApps() throws Exception { Client client = createClient(); try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrunapps/cluster1/flow_name/1002345678919?" + - "userid=user1&fields=ALL"); + "timeline/flowrunapps/user1/cluster1/flow_name/1002345678919?" + + "fields=ALL"); ClientResponse resp = getResponse(client, uri); Set<TimelineEntity> entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); @@ -662,14 +712,14 @@ public class TestTimelineReaderWebServicesHBaseStorage { // Query without specifying cluster ID. uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrunapps/flow_name/1002345678919?userid=user1"); + "timeline/flowrunapps/user1/flow_name/1002345678919"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrunapps/flow_name/1002345678919?userid=user1&limit=1"); + "timeline/flowrunapps/user1/flow_name/1002345678919?limit=1"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); @@ -684,7 +734,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowapps/cluster1/flow_name?userid=user1&fields=ALL"); + "timeline/flowapps/user1/cluster1/flow_name?fields=ALL"); ClientResponse resp = getResponse(client, uri); Set<TimelineEntity> entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); @@ -702,14 +752,14 @@ public class TestTimelineReaderWebServicesHBaseStorage { // Query without specifying cluster ID. uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowapps/flow_name?userid=user1"); + "timeline/flowapps/user1/flow_name"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(3, entities.size()); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowapps/flow_name?userid=user1&limit=1"); + "timeline/flowapps/user1/flow_name?limit=1"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); @@ -725,7 +775,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { try { String entityType = TimelineEntityType.YARN_APPLICATION.toString(); URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowapps/cluster1/flow_name?userid=user1&eventfilters=" + + "timeline/flowapps/user1/cluster1/flow_name?eventfilters=" + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); ClientResponse resp = getResponse(client, uri); Set<TimelineEntity> entities = @@ -736,7 +786,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { newEntity(entityType, "application_1111111111_1111"))); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowapps/cluster1/flow_name?userid=user1&metricfilters=" + + "timeline/flowapps/user1/cluster1/flow_name?metricfilters=" + "HDFS_BYTES_READ"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); @@ -746,7 +796,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { newEntity(entityType, "application_1111111111_1111"))); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowapps/cluster1/flow_name?userid=user1&conffilters=" + + "timeline/flowapps/user1/cluster1/flow_name?conffilters=" + "cfg1:value1"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); @@ -764,7 +814,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1"); + "timeline/flowrun/user1/cluster1/flow_name/1002345678929"); verifyHttpResponse(client, uri, Status.NOT_FOUND); } finally { client.destroy(); @@ -793,8 +843,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/app/cluster1/flow_name/1002345678919/" + - "application_1111111111_1378?userid=user1"); + "timeline/app/user1/cluster1/flow_name/1002345678919/" + + "application_1111111111_1378"); verifyHttpResponse(client, uri, Status.NOT_FOUND); } finally { client.destroy(); @@ -806,7 +856,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowrunapps/cluster2/flow_name/1002345678919"); + "timeline/flowrunapps/user1/cluster2/flow_name/1002345678919"); ClientResponse resp = getResponse(client, uri); Set<TimelineEntity> entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); @@ -823,7 +873,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + - "timeline/flowapps/cluster2/flow_name55"); + "timeline/flowapps/user1/cluster2/flow_name55"); ClientResponse resp = getResponse(client, uri); Set<TimelineEntity> entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});