Repository: hadoop Updated Branches: refs/heads/YARN-5355_branch2 d57d8f10e -> 9aee24df3 (forced update)
YARN-6357. Implement putEntitiesAsync API in TimelineCollector (Haibo Chen via Varun Saxena) (cherry picked from commit 063b513b1c10987461caab3d26c8543c6e657bf7) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/021c3b89 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/021c3b89 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/021c3b89 Branch: refs/heads/YARN-5355_branch2 Commit: 021c3b89ccdf4d56e11b4c8c6b84f6e4fc76b886 Parents: a022500 Author: Varun Saxena <varunsax...@apache.org> Authored: Wed Mar 29 03:48:03 2017 +0530 Committer: Varun Saxena <varunsax...@apache.org> Committed: Thu Aug 31 01:10:03 2017 +0530 ---------------------------------------------------------------------- .../collector/TimelineCollector.java | 31 ++++++++-- .../collector/TimelineCollectorWebService.java | 12 ++-- .../collector/TestTimelineCollector.java | 63 ++++++++++++++++++++ 3 files changed, 96 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/021c3b89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 2fc3033..353066b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -133,19 +133,35 @@ public abstract class TimelineCollector extends CompositeService { public TimelineWriteResponse putEntities(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - TimelineCollectorContext context = getTimelineEntityContext(); + TimelineWriteResponse response = writeTimelineEntities(entities); + flushBufferedTimelineEntities(); + + return response; + } + + private TimelineWriteResponse writeTimelineEntities( + TimelineEntities entities) throws IOException { // Update application metrics for aggregation updateAggregateStatus(entities, aggregationGroups, getEntityTypesSkipAggregation()); + final TimelineCollectorContext context = getTimelineEntityContext(); return writer.write(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(), - context.getAppId(), entities); + context.getFlowName(), context.getFlowVersion(), + context.getFlowRunId(), context.getAppId(), entities); + } + + /** + * Flush buffered timeline entities, if any. + * @throws IOException if there is any exception encountered while + * flushing buffered entities. + */ + private void flushBufferedTimelineEntities() throws IOException { + writer.flush(); } /** @@ -158,14 +174,17 @@ public abstract class TimelineCollector extends CompositeService { * * @param entities entities to post * @param callerUgi the caller UGI + * @throws IOException if there is any exception encounted while putting + * entities. */ public void putEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) { - // TODO implement + UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } + + writeTimelineEntities(entities); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/021c3b89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 29ef1f8..806a85b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -150,9 +150,6 @@ public class TimelineCollectorWebService { throw new ForbiddenException(msg); } - // TODO how to express async posts and handle them - boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); - try { ApplicationId appID = parseApplicationId(appId); if (appID == null) { @@ -167,7 +164,14 @@ public class TimelineCollectorWebService { throw new NotFoundException(); // different exception? } - collector.putEntities(processTimelineEntities(entities), callerUgi); + boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); + if (isAsync) { + collector.putEntitiesAsync( + processTimelineEntities(entities), callerUgi); + } else { + collector.putEntities(processTimelineEntities(entities), callerUgi); + } + return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/021c3b89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index 5b4dc50..a55f227 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -18,17 +18,27 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.Test; +import java.io.IOException; import java.util.HashSet; import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class TestTimelineCollector { @@ -124,4 +134,57 @@ public class TestTimelineCollector { } } + + /** + * Test TimelineCollector's interaction with TimelineWriter upon + * putEntity() calls. + */ + @Test + public void testPutEntity() throws IOException { + TimelineWriter writer = mock(TimelineWriter.class); + TimelineCollector collector = new TimelineCollectorForTest(writer); + + TimelineEntities entities = generateTestEntities(1, 1); + collector.putEntities( + entities, UserGroupInformation.createRemoteUser("test-user")); + + verify(writer, times(1)).write( + anyString(), anyString(), anyString(), anyString(), anyLong(), + anyString(), any(TimelineEntities.class)); + verify(writer, times(1)).flush(); + } + + /** + * Test TimelineCollector's interaction with TimelineWriter upon + * putEntityAsync() calls. + */ + @Test + public void testPutEntityAsync() throws IOException { + TimelineWriter writer = mock(TimelineWriter.class); + TimelineCollector collector = new TimelineCollectorForTest(writer); + + TimelineEntities entities = generateTestEntities(1, 1); + collector.putEntitiesAsync( + entities, UserGroupInformation.createRemoteUser("test-user")); + + verify(writer, times(1)).write( + anyString(), anyString(), anyString(), anyString(), anyLong(), + anyString(), any(TimelineEntities.class)); + verify(writer, never()).flush(); + } + + private static class TimelineCollectorForTest extends TimelineCollector { + private final TimelineCollectorContext context = + new TimelineCollectorContext(); + + TimelineCollectorForTest(TimelineWriter writer) { + super("TimelineCollectorForTest"); + setWriter(writer); + } + + @Override + public TimelineCollectorContext getTimelineEntityContext() { + return context; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org