Repository: hadoop
Updated Branches:
refs/heads/branch-3.0 48dc8de28 -> a5da4df73
HDFS-13051. Fix dead lock during async editlog rolling if edit queue is full.
Contributed by Daryn Sharp.
(cherry picked from commit 8e54da1511e78477c1d4655d5ff0a69d0330869f)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
(cherry picked from commit 2dd27c999b22c550058de0e6eca7209b346cd143)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a5da4df7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a5da4df7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a5da4df7
Branch: refs/heads/branch-3.0
Commit: a5da4df73c525d68c76487509660a6b13f7fe99e
Parents: 48dc8de
Author: Xiao Chen <[email protected]>
Authored: Mon Sep 10 22:14:02 2018 -0700
Committer: Xiao Chen <[email protected]>
Committed: Mon Sep 10 23:00:41 2018 -0700
----------------------------------------------------------------------
.../hdfs/server/namenode/FSEditLogAsync.java | 61 ++++++-
.../hdfs/server/namenode/TestEditLogRace.java | 158 ++++++++++++++++++-
2 files changed, 215 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5da4df7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
index 5990c22..1604872 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
@@ -24,7 +24,9 @@ import java.util.Deque;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -145,15 +147,68 @@ class FSEditLogAsync extends FSEditLog implements
Runnable {
edit.logSyncWait();
}
+ // draining permits is intended to provide a high priority reservation.
+ // however, release of outstanding permits must be postponed until
+ // drained permits are restored to avoid starvation. logic has some races
+ // but is good enough to serve its purpose.
+ private Semaphore overflowMutex = new Semaphore(8){
+ private AtomicBoolean draining = new AtomicBoolean();
+ private AtomicInteger pendingReleases = new AtomicInteger();
+ @Override
+ public int drainPermits() {
+ draining.set(true);
+ return super.drainPermits();
+ }
+ // while draining, count the releases until release(int)
+ private void tryRelease(int permits) {
+ pendingReleases.getAndAdd(permits);
+ if (!draining.get()) {
+ super.release(pendingReleases.getAndSet(0));
+ }
+ }
+ @Override
+ public void release() {
+ tryRelease(1);
+ }
+ @Override
+ public void release(int permits) {
+ draining.set(false);
+ tryRelease(permits);
+ }
+ };
+
private void enqueueEdit(Edit edit) {
if (LOG.isDebugEnabled()) {
LOG.debug("logEdit " + edit);
}
try {
- if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) {
+ // not checking for overflow yet to avoid penalizing performance of
+ // the common case. if there is persistent overflow, a mutex will be
+ // use to throttle contention on the queue.
+ if (!editPendingQ.offer(edit)) {
Preconditions.checkState(
isSyncThreadAlive(), "sync thread is not alive");
- editPendingQ.put(edit);
+ if (Thread.holdsLock(this)) {
+ // if queue is full, synchronized caller must immediately relinquish
+ // the monitor before re-offering to avoid deadlock with sync thread
+ // which needs the monitor to write transactions.
+ int permits = overflowMutex.drainPermits();
+ try {
+ do {
+ this.wait(1000); // will be notified by next logSync.
+ } while (!editPendingQ.offer(edit));
+ } finally {
+ overflowMutex.release(permits);
+ }
+ } else {
+ // mutex will throttle contention during persistent overflow.
+ overflowMutex.acquire();
+ try {
+ editPendingQ.put(edit);
+ } finally {
+ overflowMutex.release();
+ }
+ }
}
} catch (Throwable t) {
// should never happen! failure to enqueue an edit is fatal
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5da4df7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
index 42aa6f8..a43a834 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
@@ -28,12 +28,19 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -53,6 +60,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
+import org.mockito.Mockito;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -565,5 +573,153 @@ public class TestEditLogRace {
LOG.info("Closing nn");
if(namesystem != null) namesystem.close();
}
- }
+ }
+
+ @Test(timeout=180000)
+ public void testDeadlock() throws Throwable {
+ GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO);
+
+ Configuration conf = getConf();
+ NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
+ DFSTestUtil.formatNameNode(conf);
+ final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf);
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final Semaphore blockerSemaphore = new Semaphore(0);
+ final CountDownLatch startSpamLatch = new CountDownLatch(1);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ try {
+ final FSEditLog editLog = namesystem.getEditLog();
+
+ FSEditLogOp.OpInstanceCache cache = editLog.cache.get();
+ final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache)
+ .setSource("/").setUser("u").setGroup("g");
+ // don't reset fields so instance can be reused.
+ final FSEditLogOp reuseOp = Mockito.spy(op);
+ Mockito.doNothing().when(reuseOp).reset();
+
+ // only job is spam edits. it will fill the queue when the test
+ // loop injects the blockingOp.
+ Future[] logSpammers = new Future[16];
+ for (int i=0; i < logSpammers.length; i++) {
+ final int ii = i;
+ logSpammers[i] = executor.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ Thread.currentThread().setName("Log spammer " + ii);
+ // wait until a blocking edit op notifies us to go.
+ startSpamLatch.await();
+ for (int i = 0; !done.get() && i < 1000000; i++) {
+ // do not logSync here because we need to congest the queue.
+ editLog.logEdit(reuseOp);
+ if (i % 2048 == 0) {
+ LOG.info("thread[" + ii +"] edits=" + i);
+ }
+ }
+ assertTrue("too many edits", done.get());
+ return null;
+ }
+ });
+ }
+
+ // the tx id is set while the edit log monitor is held, so this will
+ // effectively stall the async processing thread which will cause the
+ // edit queue to fill up.
+ final FSEditLogOp blockingOp = Mockito.spy(op);
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ // flip the latch to unleash the spamming threads to congest
+ // the queue.
+ startSpamLatch.countDown();
+ // wait until unblocked after a synchronized thread is started.
+ blockerSemaphore.acquire();
+ invocation.callRealMethod();
+ return null;
+ }
+ }
+ ).when(blockingOp).setTransactionId(Mockito.anyLong());
+ // don't reset fields so instance can be reused.
+ Mockito.doNothing().when(blockingOp).reset();
+
+ // repeatedly overflow the queue and verify it doesn't deadlock.
+ for (int i = 0; i < 8; i++) {
+ // when the blockingOp is logged, it triggers the latch to unleash the
+ // spammers to overflow the edit queue, then waits for a permit
+ // from blockerSemaphore that will be released at the bottom of
+ // this loop.
+ Future blockingEdit = executor.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ Thread.currentThread().setName("Log blocker");
+ editLog.logEdit(blockingOp);
+ editLog.logSync();
+ return null;
+ }
+ });
+
+ // wait for spammers to seize up the edit log.
+ long startTxId = editLog.getLastWrittenTxIdWithoutLock();
+ final long[] txIds = { startTxId, startTxId, startTxId };
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ txIds[0] = txIds[1];
+ txIds[1] = txIds[2];
+ txIds[2] = editLog.getLastWrittenTxIdWithoutLock();
+ return (txIds[0] == txIds[1] &&
+ txIds[1] == txIds[2] &&
+ txIds[2] > startTxId);
+ }
+ }, 100, 10000);
+
+ // callers that synchronize on the edit log while the queue is full
+ // are prone to deadlock if the locking is incorrect. at this point:
+ // 1. the blocking edit is holding the log's monitor.
+ // 2. the spammers have filled the queue.
+ // 3. the spammers are blocked waiting to queue another edit.
+ // Now we'll start another thread to synchronize on the log (simulates
+ // what log rolling does), unblock the op currently holding the
+ // monitor, and ensure deadlock does not occur.
+ CountDownLatch readyLatch = new CountDownLatch(1);
+ Future synchedEdits = executor.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ Thread.currentThread().setName("Log synchronizer");
+ // the sync is CRUCIAL for this test. it's what causes edit
+ // log rolling to deadlock when queue is full.
+ readyLatch.countDown();
+ synchronized (editLog) {
+ editLog.logEdit(reuseOp);
+ editLog.logSync();
+ }
+ return null;
+ }
+ });
+ // unblock the edit jammed in setting its txid. queued edits should
+ // start flowing and the synced edits should complete.
+ readyLatch.await();
+ blockerSemaphore.release();
+ blockingEdit.get();
+ synchedEdits.get();
+ }
+
+ // tell spammers to stop.
+ done.set(true);
+ for (int i=0; i < logSpammers.length; i++) {
+ logSpammers[i].get();
+ }
+ // just make sure everything can be synced.
+ editLog.logSyncAll();
+ } finally {
+ LOG.info("Closing nn");
+ executor.shutdownNow();
+ if (namesystem != null) {
+ namesystem.getFSImage().getStorage().close();
+ namesystem.close();
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]