YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa067c6a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa067c6a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa067c6a Branch: refs/heads/HDFS-7240 Commit: aa067c6aa47b4c79577096817acc00ad6421180c Parents: 527c40e Author: Karthik Kambatla <ka...@apache.org> Authored: Thu Jul 9 09:48:29 2015 -0700 Committer: Karthik Kambatla <ka...@apache.org> Committed: Thu Jul 9 09:48:29 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/event/AsyncDispatcher.java | 24 ++++---- .../hadoop/yarn/event/DrainDispatcher.java | 13 ++++- .../hadoop/yarn/event/TestAsyncDispatcher.java | 61 ++++++++++++++++++++ 4 files changed, 87 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa067c6a/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 19f0854..3c232eb 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -624,6 +624,9 @@ Release 2.7.2 - UNRELEASED YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka) + YARN-3878. AsyncDispatcher can hang while stopping if it is configured for + draining events on stop. (Varun Saxena via kasha) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa067c6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index c54b9c7..646611f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -55,9 +55,6 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { // stop functionality. private volatile boolean drainEventsOnStop = false; - // Indicates all the remaining dispatcher's events on stop have been drained - // and processed. - private volatile boolean drained = true; private Object waitForDrained = new Object(); // For drainEventsOnStop enabled only, block newly coming events into the @@ -84,13 +81,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { - drained = eventQueue.isEmpty(); // blockNewEvents is only set when dispatcher is draining to stop, // adding this check is to avoid the overhead of acquiring the lock // and calling notify every time in the normal run of the loop. if (blockNewEvents) { synchronized (waitForDrained) { - if (drained) { + if (eventQueue.isEmpty()) { waitForDrained.notify(); } } @@ -139,7 +135,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { blockNewEvents = true; LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); synchronized (waitForDrained) { - while (!drained && eventHandlingThread.isAlive()) { + while (!eventQueue.isEmpty() && eventHandlingThread.isAlive()) { waitForDrained.wait(1000); LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + eventHandlingThread.getState()); @@ -223,12 +219,21 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { return handlerInstance; } + @VisibleForTesting + protected boolean hasPendingEvents() { + return !eventQueue.isEmpty(); + } + + @VisibleForTesting + protected boolean isEventThreadWaiting() { + return eventHandlingThread.getState() == Thread.State.WAITING; + } + class GenericEventHandler implements EventHandler<Event> { public void handle(Event event) { if (blockNewEvents) { return; } - drained = false; /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); @@ -285,9 +290,4 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } }; } - - @VisibleForTesting - protected boolean isDrained() { - return this.drained; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa067c6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index da5ae44..d1f4fe9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -27,15 +27,24 @@ public class DrainDispatcher extends AsyncDispatcher { this(new LinkedBlockingQueue<Event>()); } - private DrainDispatcher(BlockingQueue<Event> eventQueue) { + public DrainDispatcher(BlockingQueue<Event> eventQueue) { super(eventQueue); } /** + * Wait till event thread enters WAITING state (i.e. waiting for new events). + */ + public void waitForEventThreadToWait() { + while (!isEventThreadWaiting()) { + Thread.yield(); + } + } + + /** * Busy loop waiting for all queued events to drain. */ public void await() { - while (!isDrained()) { + while (hasPendingEvents()) { Thread.yield(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa067c6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java new file mode 100644 index 0000000..ee17ddd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.Assert; +import org.junit.Test; + +public class TestAsyncDispatcher { + + /* This test checks whether dispatcher hangs on close if following two things + * happen : + * 1. A thread which was putting event to event queue is interrupted. + * 2. Event queue is empty on close. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout=10000) + public void testDispatcherOnCloseIfQueueEmpty() throws Exception { + BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>()); + Event event = mock(Event.class); + doThrow(new InterruptedException()).when(eventQueue).put(event); + DrainDispatcher disp = new DrainDispatcher(eventQueue); + disp.init(new Configuration()); + disp.setDrainEventsOnStop(); + disp.start(); + // Wait for event handler thread to start and begin waiting for events. + disp.waitForEventThreadToWait(); + try { + disp.getEventHandler().handle(event); + } catch (YarnRuntimeException e) { + } + // Queue should be empty and dispatcher should not hang on close + Assert.assertTrue("Event Queue should have been empty", + eventQueue.isEmpty()); + disp.close(); + } +}