Repository: hadoop Updated Branches: refs/heads/HADOOP-13345 86a67ffac -> 5e93093e6
YARN-5911. DrainDispatcher does not drain all events on stop even if setDrainEventsOnStop is true. Contributed by Varun Saxena. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/46675641 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/46675641 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/46675641 Branch: refs/heads/HADOOP-13345 Commit: 466756416214a4bbc77af8a29da1a33e01106864 Parents: 2bf9a15 Author: Naganarasimha <[email protected]> Authored: Wed Nov 23 08:44:58 2016 +0530 Committer: Naganarasimha <[email protected]> Committed: Wed Nov 23 08:49:48 2016 +0530 ---------------------------------------------------------------------- .../hadoop/yarn/event/AsyncDispatcher.java | 6 ++- .../hadoop/yarn/event/DrainDispatcher.java | 9 +---- .../hadoop/yarn/event/TestAsyncDispatcher.java | 42 ++++++++++++++++++++ 3 files changed, 48 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/46675641/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 42a6819..94bfab6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -151,7 +151,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { while (!isDrained() && eventHandlingThread != null && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { - waitForDrained.wait(1000); + waitForDrained.wait(100); LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + eventHandlingThread.getState()); } @@ -308,4 +308,8 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { protected boolean isDrained() { return drained; } + + protected boolean isStopped() { + return stopped; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/46675641/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 1369465..c5ba072 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue; @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { private volatile boolean drained = false; - private volatile boolean stopped = false; private final BlockingQueue<Event> queue; private final Object mutex; @@ -69,7 +68,7 @@ public class DrainDispatcher extends AsyncDispatcher { return new Runnable() { @Override public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { + while (!isStopped() && !Thread.currentThread().isInterrupted()) { synchronized (mutex) { // !drained if dispatch queued new events on this dispatcher drained = queue.isEmpty(); @@ -109,10 +108,4 @@ public class DrainDispatcher extends AsyncDispatcher { return drained; } } - - @Override - protected void serviceStop() throws Exception { - stopped = true; - super.serviceStop(); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/46675641/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 018096b..2b9d745 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; public class TestAsyncDispatcher { @@ -77,5 +78,46 @@ public class TestAsyncDispatcher { disp.waitForEventThreadToWait(); disp.close(); } + + @SuppressWarnings("rawtypes") + private static class DummyHandler implements EventHandler<Event> { + @Override + public void handle(Event event) { + try { + Thread.sleep(500); + } catch (InterruptedException e) {} + } + } + + private enum DummyType { + DUMMY + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void dispatchDummyEvents(Dispatcher disp, int count) { + for (int i = 0; i < count; i++) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(DummyType.DUMMY); + disp.getEventHandler().handle(event); + } + } + + // Test if drain dispatcher drains events on stop. + @SuppressWarnings({ "rawtypes" }) + @Test(timeout=10000) + public void testDrainDispatcherDrainEventsOnStop() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 2000); + BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>(); + DrainDispatcher disp = new DrainDispatcher(queue); + disp.init(conf); + disp.register(DummyType.class, new DummyHandler()); + disp.setDrainEventsOnStop(); + disp.start(); + disp.waitForEventThreadToWait(); + dispatchDummyEvents(disp, 2); + disp.close(); + assertEquals(0, queue.size()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
