HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher that stops the thread. (zhouyingchao via cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3d5c0d4a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3d5c0d4a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3d5c0d4a Branch: refs/heads/YARN-2928 Commit: 3d5c0d4a2a8d733bcd6a95a85932a2a6898c20b4 Parents: 61510b9 Author: Colin Patrick Mccabe <cmcc...@cloudera.com> Authored: Thu May 28 11:52:28 2015 -0700 Committer: Zhijie Shen <zjs...@apache.org> Committed: Tue Jun 2 16:12:55 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../hadoop/net/unix/DomainSocketWatcher.java | 21 +++++- .../hadoop/net/unix/DomainSocketWatcher.c | 2 +- .../net/unix/TestDomainSocketWatcher.java | 75 ++++++++++++++++++++ 4 files changed, 99 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d5c0d4a/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 69f5aa7..51eff78 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -782,6 +782,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12035. shellcheck plugin displays a wrong version potentially (Kengo Seki via aw) + HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher + that stops the thread. (zhouyingchao via cmccabe) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d5c0d4a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java index 5648ae1..ad2fbfb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java @@ -470,6 +470,7 @@ public final class DomainSocketWatcher implements Closeable { // Handle pending additions (before pending removes). for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) { Entry entry = iter.next(); + iter.remove(); DomainSocket sock = entry.getDomainSocket(); Entry prevEntry = entries.put(sock.fd, entry); Preconditions.checkState(prevEntry == null, @@ -479,7 +480,6 @@ public final class DomainSocketWatcher implements Closeable { LOG.trace(this + ": adding fd " + sock.fd); } fdSet.add(sock.fd); - iter.remove(); } // Handle pending removals while (true) { @@ -525,6 +525,25 @@ public final class DomainSocketWatcher implements Closeable { } entries.clear(); fdSet.close(); + closed = true; + if (!(toAdd.isEmpty() && toRemove.isEmpty())) { + // Items in toAdd might not be added to entries, handle it here + for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext();) { + Entry entry = iter.next(); + entry.getDomainSocket().refCount.unreference(); + entry.getHandler().handle(entry.getDomainSocket()); + IOUtils.cleanup(LOG, entry.getDomainSocket()); + iter.remove(); + } + // Items in toRemove might not be really removed, handle it here + while (true) { + Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry(); + if (entry == null) + break; + sendCallback("close", entries, fdSet, entry.getValue().fd); + } + } + processedCond.signalAll(); } finally { lock.unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d5c0d4a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c index 596601b..82e6af5 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c @@ -212,7 +212,7 @@ done: free(carr); if (jthr) { (*env)->DeleteLocalRef(env, jarr); - jarr = NULL; + (*env)->Throw(env, jthr); } return jarr; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d5c0d4a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java index 4b0e2a8..4cc86a7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import java.util.ArrayList; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -181,6 +182,80 @@ public class TestDomainSocketWatcher { watcher.close(); } + @Test(timeout = 300000) + public void testStressInterruption() throws Exception { + final int SOCKET_NUM = 250; + final ReentrantLock lock = new ReentrantLock(); + final DomainSocketWatcher watcher = newDomainSocketWatcher(10); + final ArrayList<DomainSocket[]> pairs = new ArrayList<DomainSocket[]>(); + final AtomicInteger handled = new AtomicInteger(0); + + final Thread adderThread = new Thread(new Runnable() { + @Override + public void run() { + try { + for (int i = 0; i < SOCKET_NUM; i++) { + DomainSocket pair[] = DomainSocket.socketpair(); + watcher.add(pair[1], new DomainSocketWatcher.Handler() { + @Override + public boolean handle(DomainSocket sock) { + handled.incrementAndGet(); + return true; + } + }); + lock.lock(); + try { + pairs.add(pair); + } finally { + lock.unlock(); + } + TimeUnit.MILLISECONDS.sleep(1); + } + } catch (Throwable e) { + LOG.error(e); + throw new RuntimeException(e); + } + } + }); + + final Thread removerThread = new Thread(new Runnable() { + @Override + public void run() { + final Random random = new Random(); + try { + while (handled.get() != SOCKET_NUM) { + lock.lock(); + try { + if (!pairs.isEmpty()) { + int idx = random.nextInt(pairs.size()); + DomainSocket pair[] = pairs.remove(idx); + if (random.nextBoolean()) { + pair[0].close(); + } else { + watcher.remove(pair[1]); + } + TimeUnit.MILLISECONDS.sleep(1); + } + } finally { + lock.unlock(); + } + } + } catch (Throwable e) { + LOG.error(e); + throw new RuntimeException(e); + } + } + }); + + adderThread.start(); + removerThread.start(); + TimeUnit.MILLISECONDS.sleep(100); + watcher.watcherThread.interrupt(); + Uninterruptibles.joinUninterruptibly(adderThread); + Uninterruptibles.joinUninterruptibly(removerThread); + Uninterruptibles.joinUninterruptibly(watcher.watcherThread); + } + /** * Creates a new DomainSocketWatcher and tracks its thread for termination due * to an unexpected exception. At the end of each test, if there was an