This is an automated email from the ASF dual-hosted git repository. vrushali pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 22362c8 YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend is unreachable for async calls. Contributed by Abhishesk Modi. 22362c8 is described below commit 22362c876d28c081c37dd74f6f1ae8139695e254 Author: Vrushali C <vrush...@apache.org> AuthorDate: Fri Apr 5 12:06:51 2019 -0700 YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend is unreachable for async calls. Contributed by Abhishesk Modi. --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 9 +++++ .../src/main/resources/yarn-default.xml | 7 ++++ .../collector/TimelineCollector.java | 24 +++++++++++- .../collector/TestTimelineCollector.java | 43 ++++++++++++++++++++-- 4 files changed, 79 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fa75eb4..34f1e93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2767,6 +2767,15 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60; + /** The setting that controls the capacity of the queue for async writes + * to timeline collector. + */ + public static final String TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY = + TIMELINE_SERVICE_PREFIX + "writer.async.queue.capacity"; + + public static final int + DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY = 100; + /** * The name for setting that controls how long the final value of * a metric of a completed app is retained before merging diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index dfbffd4..004af7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2591,6 +2591,13 @@ </property> <property> + <description>The setting that decides the capacity of the queue to hold + asynchronous timeline entities.</description> + <name>yarn.timeline-service.writer.async.queue.capacity</name> + <value>100</value> + </property> + + <property> <description>Time period till which the application collector will be alive in NM, after the application master container finishes.</description> <name>yarn.timeline-service.app-collector.linger-period.ms</name> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 6c83665..0c54ed0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -23,8 +23,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -37,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +65,7 @@ public abstract class TimelineCollector extends CompositeService { = new ConcurrentHashMap<>(); private static Set<String> entityTypesSkipAggregation = new HashSet<>(); + private ThreadPoolExecutor pool; private volatile boolean readyToAggregate = false; @@ -73,6 +78,14 @@ public abstract class TimelineCollector extends CompositeService { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + int capacity = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY + ); + pool = new ThreadPoolExecutor(1, 1, 3, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(capacity)); + pool.setRejectedExecutionHandler( + new ThreadPoolExecutor.DiscardOldestPolicy()); } @Override @@ -83,6 +96,7 @@ public abstract class TimelineCollector extends CompositeService { @Override protected void serviceStop() throws Exception { isStopped = true; + pool.shutdownNow(); super.serviceStop(); } @@ -213,7 +227,15 @@ public abstract class TimelineCollector extends CompositeService { LOG.debug("putEntitiesAsync(entities={}, callerUgi={})", entities, callerUgi); - writeTimelineEntities(entities, callerUgi); + pool.execute(new Runnable() { + @Override public void run() { + try { + writeTimelineEntities(entities, callerUgi); + } catch (IOException ie) { + LOG.error("Got an exception while writing entity", ie); + } + } + }); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index f951540..8051792 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -27,11 +27,15 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.Test; import com.google.common.collect.Sets; +import org.mockito.internal.stubbing.answers.AnswersWithDelay; +import org.mockito.internal.stubbing.answers.Returns; import java.io.IOException; import java.util.HashSet; @@ -46,6 +50,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestTimelineCollector { @@ -165,17 +170,49 @@ public class TestTimelineCollector { * putEntityAsync() calls. */ @Test - public void testPutEntityAsync() throws IOException { + public void testPutEntityAsync() throws Exception { TimelineWriter writer = mock(TimelineWriter.class); TimelineCollector collector = new TimelineCollectorForTest(writer); - + collector.init(new Configuration()); + collector.start(); TimelineEntities entities = generateTestEntities(1, 1); collector.putEntitiesAsync( entities, UserGroupInformation.createRemoteUser("test-user")); - + Thread.sleep(1000); verify(writer, times(1)).write(any(TimelineCollectorContext.class), any(TimelineEntities.class), any(UserGroupInformation.class)); verify(writer, never()).flush(); + collector.stop(); + } + + /** + * Test TimelineCollector's discarding entities in case of async writes if + * write is taking too much time. + */ + @Test + public void testAsyncEntityDiscard() throws Exception { + TimelineWriter writer = mock(TimelineWriter.class); + + when(writer.write(any(), any(), any())).thenAnswer( + new AnswersWithDelay(500, new Returns(new TimelineWriteResponse()))); + TimelineCollector collector = new TimelineCollectorForTest(writer); + Configuration config = new Configuration(); + config + .setInt(YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY, + 3); + collector.init(config); + collector.start(); + for (int i = 0; i < 10; ++i) { + TimelineEntities entities = generateTestEntities(i + 1, 1); + collector.putEntitiesAsync(entities, + UserGroupInformation.createRemoteUser("test-user")); + } + Thread.sleep(3000); + verify(writer, times(4)) + .write(any(TimelineCollectorContext.class), any(TimelineEntities.class), + any(UserGroupInformation.class)); + verify(writer, never()).flush(); + collector.stop(); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org