This is an automated email from the ASF dual-hosted git repository. shv pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 14fd477 HDFS-15915. Race condition with async edits logging due to updating txId outside of the namesystem log. Contributed by Konstantin V Shvachko. 14fd477 is described below commit 14fd4776ae034ba3b2d941f1db344548fbd1f42a Author: Konstantin V Shvachko <s...@apache.org> AuthorDate: Wed May 26 12:07:13 2021 -0700 HDFS-15915. Race condition with async edits logging due to updating txId outside of the namesystem log. Contributed by Konstantin V Shvachko. (cherry picked from commit 1abd03d68f4f236674ce929164cc460037730abb) --- .../hdfs/server/namenode/EditLogOutputStream.java | 11 ++ .../hadoop/hdfs/server/namenode/FSEditLog.java | 39 +++++-- .../hdfs/server/namenode/FSEditLogAsync.java | 7 +- .../hadoop/hdfs/server/namenode/JournalSet.java | 19 +++- .../hdfs/server/namenode/NameNodeAdapter.java | 36 ++++++- .../hdfs/server/namenode/TestEditLogRace.java | 66 +++++++----- .../hadoop/hdfs/server/namenode/ha/HATestUtil.java | 12 +++ .../hdfs/server/namenode/ha/TestObserverNode.java | 118 +++++++++++++++++++++ 8 files changed, 270 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java index 27733cf..6f43d73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Time.monotonicNow; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; /** * A generic abstract class to support journaling of edits logs into @@ -43,6 +44,16 @@ public abstract class EditLogOutputStream implements Closeable { } /** + * Get the last txId journalled in the stream. + * The txId is recorded when FSEditLogOp is written to the stream. + * The default implementation is dummy. + * JournalSet tracks the txId uniformly for all underlying streams. + */ + public long getLastJournalledTxId() { + return HdfsServerConstants.INVALID_TXID; + }; + + /** * Write edits log operation to the stream. * * @param op operation diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 7b3f6a0..a24e5dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -217,7 +217,10 @@ public class FSEditLog implements LogsPurgeable { private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() { @Override protected synchronized TransactionId initialValue() { - return new TransactionId(Long.MAX_VALUE); + // If an RPC call did not generate any transactions, + // logSync() should exit without syncing + // Therefore the initial value of myTransactionId should be 0 + return new TransactionId(0L); } }; @@ -462,6 +465,7 @@ public class FSEditLog implements LogsPurgeable { // wait if an automatic sync is scheduled waitIfAutoSyncScheduled(); + beginTransaction(op); // check if it is time to schedule an automatic sync needsSync = doEditTransaction(op); if (needsSync) { @@ -476,9 +480,11 @@ public class FSEditLog implements LogsPurgeable { } synchronized boolean doEditTransaction(final FSEditLogOp op) { - long start = beginTransaction(); - op.setTransactionId(txid); + LOG.debug("doEditTx() op={} txid={}", op, txid); + assert op.hasTransactionId() : + "Transaction id is not set for " + op + " EditLog.txId=" + txid; + long start = monotonicNow(); try { editLogStream.write(op); } catch (IOException ex) { @@ -522,7 +528,7 @@ public class FSEditLog implements LogsPurgeable { return editLogStream.shouldForceSync(); } - private long beginTransaction() { + protected void beginTransaction(final FSEditLogOp op) { assert Thread.holdsLock(this); // get a new transactionId txid++; @@ -532,7 +538,9 @@ public class FSEditLog implements LogsPurgeable { // TransactionId id = myTransactionId.get(); id.txid = txid; - return monotonicNow(); + if(op != null) { + op.setTransactionId(txid); + } } private void endTransaction(long start) { @@ -649,7 +657,7 @@ public class FSEditLog implements LogsPurgeable { } protected void logSync(long mytxid) { - long syncStart = 0; + long lastJournalledTxId = HdfsServerConstants.INVALID_TXID; boolean sync = false; long editsBatchedInSync = 0; try { @@ -676,8 +684,16 @@ public class FSEditLog implements LogsPurgeable { // now, this thread will do the sync. track if other edits were // included in the sync - ie. batched. if this is the only edit // synced then the batched count is 0 - editsBatchedInSync = txid - synctxid - 1; - syncStart = txid; + lastJournalledTxId = editLogStream.getLastJournalledTxId(); + LOG.debug("logSync(tx) synctxid={} lastJournalledTxId={} mytxid={}", + synctxid, lastJournalledTxId, mytxid); + assert lastJournalledTxId <= txid : "lastJournalledTxId exceeds txid"; + // The stream has already been flushed, or there are no active streams + // We still try to flush up to mytxid + if(lastJournalledTxId <= synctxid) { + lastJournalledTxId = mytxid; + } + editsBatchedInSync = lastJournalledTxId - synctxid - 1; isSyncRunning = true; sync = true; @@ -737,7 +753,7 @@ public class FSEditLog implements LogsPurgeable { // Prevent RuntimeException from blocking other log edit sync synchronized (this) { if (sync) { - synctxid = syncStart; + synctxid = lastJournalledTxId; for (JournalManager jm : journalSet.getJournalManagers()) { /** * {@link FileJournalManager#lastReadableTxId} is only meaningful @@ -745,7 +761,7 @@ public class FSEditLog implements LogsPurgeable { * other types of {@link JournalManager}. */ if (jm instanceof FileJournalManager) { - ((FileJournalManager)jm).setLastReadableTxId(syncStart); + ((FileJournalManager)jm).setLastReadableTxId(synctxid); } } isSyncRunning = false; @@ -1585,7 +1601,8 @@ public class FSEditLog implements LogsPurgeable { * store yet. */ synchronized void logEdit(final int length, final byte[] data) { - long start = beginTransaction(); + beginTransaction(null); + long start = monotonicNow(); try { editLogStream.writeRaw(data, 0, length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java index f6ae5dc..349b1b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java @@ -119,9 +119,14 @@ class FSEditLogAsync extends FSEditLog implements Runnable { @Override void logEdit(final FSEditLogOp op) { + assert isOpenForWrite(); + Edit edit = getEditInstance(op); THREAD_EDIT.set(edit); - enqueueEdit(edit); + synchronized(this) { + enqueueEdit(edit); + beginTransaction(op); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java index 51f8b4f..95d9996 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.INVALID_TXID; import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; @@ -180,9 +181,11 @@ public class JournalSet implements JournalManager { final int minimumRedundantJournals; private boolean closed; - + private long lastJournalledTxId; + JournalSet(int minimumRedundantResources) { this.minimumRedundantJournals = minimumRedundantResources; + lastJournalledTxId = INVALID_TXID; } @Override @@ -432,6 +435,16 @@ public class JournalSet implements JournalManager { super(); } + /** + * Get the last txId journalled in the stream. + * The txId is recorded when FSEditLogOp is written to the journal. + * JournalSet tracks the txId uniformly for all underlying streams. + */ + @Override + public long getLastJournalledTxId() { + return lastJournalledTxId; + } + @Override public void write(final FSEditLogOp op) throws IOException { @@ -443,6 +456,10 @@ public class JournalSet implements JournalManager { } } }, "write op"); + + assert lastJournalledTxId < op.txid : "TxId order violation for op=" + + op + ", lastJournalledTxId=" + lastJournalledTxId; + lastJournalledTxId = op.txid; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 733bcb8..200f93c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -21,6 +21,8 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; + +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import java.io.File; @@ -55,7 +57,12 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.test.Whitebox; +import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import static org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY; /** @@ -315,7 +322,34 @@ public class NameNodeAdapter { } return spyEditLog; } - + + /** + * Spy on EditLog to delay execution of doEditTransaction() for MkdirOp. + */ + public static FSEditLog spyDelayMkDirTransaction( + final NameNode nn, final long delay) { + FSEditLog realEditLog = nn.getFSImage().getEditLog(); + FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog); + DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog); + Answer<Boolean> ans = new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(delay); + return (Boolean) invocation.callRealMethod(); + } + }; + ArgumentMatcher<FSEditLogOp> am = new ArgumentMatcher<FSEditLogOp>() { + @Override + public boolean matches(Object argument) { + FSEditLogOp op = (FSEditLogOp) argument; + return op.opCode == FSEditLogOpCodes.OP_MKDIR; + } + }; + doAnswer(ans).when(spyEditLog).doEditTransaction( + Matchers.argThat(am)); + return spyEditLog; + } + public static JournalSet spyOnJournalSet(NameNode nn) { FSEditLog editLog = nn.getFSImage().getEditLog(); JournalSet js = Mockito.spy(editLog.getJournalSet()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java index f844eb3..f8adff9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -53,13 +55,15 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; -import org.mockito.Mockito; +import org.mockito.ArgumentMatcher; import org.slf4j.event.Level; import org.junit.Test; import org.junit.runner.RunWith; @@ -286,12 +290,12 @@ public class TestEditLogRace { File editFile = new File(sd.getCurrentDir(), logFileName); - System.out.println("Verifying file: " + editFile); + LOG.info("Verifying file: " + editFile); FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId); long numEditsThisLog = loader.loadFSEdits( new EditLogFileInputStream(editFile), startTxId); - System.out.println("Number of edits: " + numEditsThisLog); + LOG.info("Number of edits: " + numEditsThisLog); assertTrue(numEdits == -1 || numEditsThisLog == numEdits); numEdits = numEditsThisLog; } @@ -576,9 +580,29 @@ public class TestEditLogRace { } } + static SetOwnerOp getSetOwnerOp(OpInstanceCache cache, String group) { + return ((SetOwnerOp)cache.get(OP_SET_OWNER)) + .setSource("/").setUser("u").setGroup(group); + } + + static class BlockingOpMatcher extends ArgumentMatcher<FSEditLogOp> { + @Override + public boolean matches(Object o) { + if(o instanceof FSEditLogOp.SetOwnerOp) { + FSEditLogOp.SetOwnerOp op = (FSEditLogOp.SetOwnerOp)o; + if("b".equals(op.groupname)) { + LOG.info("Blocking op: " + op); + return true; + } + } + return false; + } + } + @Test(timeout=180000) public void testDeadlock() throws Throwable { - GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO); + GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(FSEditLogAsync.LOG, Level.DEBUG); Configuration conf = getConf(); NameNode.initMetrics(conf, NamenodeRole.NAMENODE); @@ -591,21 +615,17 @@ public class TestEditLogRace { ExecutorService executor = Executors.newCachedThreadPool(); try { - final FSEditLog editLog = namesystem.getEditLog(); + final FSEditLog editLog = spy(namesystem.getEditLog()); + DFSTestUtil.setEditLogForTesting(namesystem, editLog); - FSEditLogOp.OpInstanceCache cache = editLog.cache.get(); - final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache) - .setSource("/").setUser("u").setGroup("g"); - // don't reset fields so instance can be reused. - final FSEditLogOp reuseOp = Mockito.spy(op); - Mockito.doNothing().when(reuseOp).reset(); + final OpInstanceCache cache = editLog.cache.get(); // only job is spam edits. it will fill the queue when the test // loop injects the blockingOp. - Future[] logSpammers = new Future[16]; + Future<?>[] logSpammers = new Future<?>[16]; for (int i=0; i < logSpammers.length; i++) { final int ii = i; - logSpammers[i] = executor.submit(new Callable() { + logSpammers[i] = executor.submit(new Callable<Void>() { @Override public Void call() throws Exception { Thread.currentThread().setName("Log spammer " + ii); @@ -613,7 +633,7 @@ public class TestEditLogRace { startSpamLatch.await(); for (int i = 0; !done.get() && i < 1000000; i++) { // do not logSync here because we need to congest the queue. - editLog.logEdit(reuseOp); + editLog.logEdit(getSetOwnerOp(cache, "g")); if (i % 2048 == 0) { LOG.info("thread[" + ii +"] edits=" + i); } @@ -624,10 +644,9 @@ public class TestEditLogRace { }); } - // the tx id is set while the edit log monitor is held, so this will - // effectively stall the async processing thread which will cause the - // edit queue to fill up. - final FSEditLogOp blockingOp = Mockito.spy(op); + // doEditTransaction is set while the edit log monitor is held, so this + // will effectively stall the async processing thread which will cause + // the edit queue to fill up. doAnswer( new Answer<Void>() { @Override @@ -641,9 +660,7 @@ public class TestEditLogRace { return null; } } - ).when(blockingOp).setTransactionId(Mockito.anyLong()); - // don't reset fields so instance can be reused. - Mockito.doNothing().when(blockingOp).reset(); + ).when(editLog).doEditTransaction(argThat(new BlockingOpMatcher())); // repeatedly overflow the queue and verify it doesn't deadlock. for (int i = 0; i < 8; i++) { @@ -651,10 +668,11 @@ public class TestEditLogRace { // spammers to overflow the edit queue, then waits for a permit // from blockerSemaphore that will be released at the bottom of // this loop. - Future blockingEdit = executor.submit(new Callable() { + Future<Void> blockingEdit = executor.submit(new Callable<Void>() { @Override public Void call() throws Exception { Thread.currentThread().setName("Log blocker"); + final FSEditLogOp blockingOp = getSetOwnerOp(cache, "b"); editLog.logEdit(blockingOp); editLog.logSync(); return null; @@ -685,7 +703,7 @@ public class TestEditLogRace { // what log rolling does), unblock the op currently holding the // monitor, and ensure deadlock does not occur. CountDownLatch readyLatch = new CountDownLatch(1); - Future synchedEdits = executor.submit(new Callable() { + Future<Void> synchedEdits = executor.submit(new Callable<Void>() { @Override public Void call() throws Exception { Thread.currentThread().setName("Log synchronizer"); @@ -693,7 +711,7 @@ public class TestEditLogRace { // log rolling to deadlock when queue is full. readyLatch.countDown(); synchronized (editLog) { - editLog.logEdit(reuseOp); + editLog.logEdit(getSetOwnerOp(cache, "g")); editLog.logSync(); } return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 8378f3c..4abb2b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -372,4 +372,16 @@ public abstract class HATestUtil { lastSeenStateId.accumulate(stateId); return currentStateId; } + + /** + * Get last seen stateId from the client AlignmentContext. + */ + public static long getLastSeenStateId(DistributedFileSystem dfs) + throws Exception { + ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>) + ((RetryInvocationHandler<?>) Proxy.getInvocationHandler( + dfs.getClient().getNamenode())).getProxyProvider(); + ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext()); + return ac.getLastSeenStateId(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index dbb5b0c..e2aa243 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -29,13 +29,18 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doAnswer; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -53,12 +58,14 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.tools.GetGroups; import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -489,6 +496,117 @@ public class TestObserverNode { assertTrue(result.contains("The filesystem under path '/' is CORRUPT")); } + /** + * The test models the race of two mkdirs RPC calls on the same path to + * Active NameNode. The first arrived call will journal a mkdirs transaction. + * The subsequent call hitting the NameNode before the mkdirs transaction is + * synced will see that the directory already exists, but will obtain + * lastSeenStateId smaller than the txId of the mkdirs transaction + * since the latter hasn't been synced yet. + * This causes stale read from Observer for the second client. + * See HDFS-15915. + */ + @Test + public void testMkdirsRaceWithObserverRead() throws Exception { + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + dfsCluster.rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2); + + // Create a spy on FSEditLog, which delays MkdirOp transaction by 100 mec + FSEditLog spyEditLog = NameNodeAdapter.spyDelayMkDirTransaction( + dfsCluster.getNameNode(0), 100); + + final int numThreads = 4; + ClientState[] clientStates = new ClientState[numThreads]; + final ExecutorService threadPool = + HadoopExecutors.newFixedThreadPool(numThreads); + final Future<?>[] futures = new Future<?>[numThreads]; + + Configuration conf2 = new Configuration(conf); + // Disable FS cache so that different DFS clients are used + conf2.setBoolean("fs.hdfs.impl.disable.cache", true); + + for (int i = 0; i < numThreads; i++) { + clientStates[i] = new ClientState(); + futures[i] = threadPool.submit(new MkDirRunner(conf2, clientStates[i])); + } + + Thread.sleep(150); // wait until mkdir is logged + long activStateId = + dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(); + dfsCluster.rollEditLogAndTail(0); + boolean finished = true; + // wait for all dispatcher threads to finish + for (Future<?> future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + finished = false; + LOG.warn("MkDirRunner thread failed", e.getCause()); + } + } + assertTrue("Not all threads finished", finished); + threadPool.shutdown(); + + assertEquals("Active and Observer stateIds don't match", + dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(), + dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId()); + for (int i = 0; i < numThreads; i++) { + assertTrue("Client #" + i + + " lastSeenStateId=" + clientStates[i].lastSeenStateId + + " activStateId=" + activStateId + + "\n" + clientStates[i].fnfe, + clientStates[i].lastSeenStateId >= activStateId && + clientStates[i].fnfe == null); + } + + // Restore edit log + Mockito.reset(spyEditLog); + } + + static class ClientState { + private long lastSeenStateId = -7; + private FileNotFoundException fnfe; + } + + static class MkDirRunner implements Runnable { + private static final Path DIR_PATH = + new Path("/TestObserverNode/testMkdirsRaceWithObserverRead"); + + private DistributedFileSystem fs; + private ClientState clientState; + + MkDirRunner(Configuration conf, ClientState cs) throws IOException { + super(); + fs = (DistributedFileSystem) FileSystem.get(conf); + clientState = cs; + } + + @Override + public void run() { + try { + fs.mkdirs(DIR_PATH); + clientState.lastSeenStateId = HATestUtil.getLastSeenStateId(fs); + assertSentTo(fs, 0); + + FileStatus stat = fs.getFileStatus(DIR_PATH); + assertSentTo(fs, 2); + assertTrue("Should be a directory", stat.isDirectory()); + } catch (FileNotFoundException ioe) { + clientState.fnfe = ioe; + } catch (Exception e) { + fail("Unexpected exception: " + e); + } + } + } + + private static void assertSentTo(DistributedFileSystem fs, int nnIdx) + throws IOException { + assertTrue("Request was not sent to the expected namenode " + nnIdx, + HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx)); + } private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org