This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 14fd477  HDFS-15915. Race condition with async edits logging due to 
updating txId outside of the namesystem log. Contributed by Konstantin V 
Shvachko.
14fd477 is described below

commit 14fd4776ae034ba3b2d941f1db344548fbd1f42a
Author: Konstantin V Shvachko <s...@apache.org>
AuthorDate: Wed May 26 12:07:13 2021 -0700

    HDFS-15915. Race condition with async edits logging due to updating txId 
outside of the namesystem log. Contributed by Konstantin V Shvachko.
    
    (cherry picked from commit 1abd03d68f4f236674ce929164cc460037730abb)
---
 .../hdfs/server/namenode/EditLogOutputStream.java  |  11 ++
 .../hadoop/hdfs/server/namenode/FSEditLog.java     |  39 +++++--
 .../hdfs/server/namenode/FSEditLogAsync.java       |   7 +-
 .../hadoop/hdfs/server/namenode/JournalSet.java    |  19 +++-
 .../hdfs/server/namenode/NameNodeAdapter.java      |  36 ++++++-
 .../hdfs/server/namenode/TestEditLogRace.java      |  66 +++++++-----
 .../hadoop/hdfs/server/namenode/ha/HATestUtil.java |  12 +++
 .../hdfs/server/namenode/ha/TestObserverNode.java  | 118 +++++++++++++++++++++
 8 files changed, 270 insertions(+), 38 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
index 27733cf..6f43d73 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 
 /**
  * A generic abstract class to support journaling of edits logs into 
@@ -43,6 +44,16 @@ public abstract class EditLogOutputStream implements 
Closeable {
   }
 
   /**
+   * Get the last txId journalled in the stream.
+   * The txId is recorded when FSEditLogOp is written to the stream.
+   * The default implementation is dummy.
+   * JournalSet tracks the txId uniformly for all underlying streams.
+   */
+  public long getLastJournalledTxId() {
+    return HdfsServerConstants.INVALID_TXID;
+  };
+
+  /**
    * Write edits log operation to the stream.
    * 
    * @param op operation
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 7b3f6a0..a24e5dd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -217,7 +217,10 @@ public class FSEditLog implements LogsPurgeable {
   private static final ThreadLocal<TransactionId> myTransactionId = new 
ThreadLocal<TransactionId>() {
     @Override
     protected synchronized TransactionId initialValue() {
-      return new TransactionId(Long.MAX_VALUE);
+      // If an RPC call did not generate any transactions,
+      // logSync() should exit without syncing
+      // Therefore the initial value of myTransactionId should be 0
+      return new TransactionId(0L);
     }
   };
 
@@ -462,6 +465,7 @@ public class FSEditLog implements LogsPurgeable {
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
 
+      beginTransaction(op);
       // check if it is time to schedule an automatic sync
       needsSync = doEditTransaction(op);
       if (needsSync) {
@@ -476,9 +480,11 @@ public class FSEditLog implements LogsPurgeable {
   }
 
   synchronized boolean doEditTransaction(final FSEditLogOp op) {
-    long start = beginTransaction();
-    op.setTransactionId(txid);
+    LOG.debug("doEditTx() op={} txid={}", op, txid);
+    assert op.hasTransactionId() :
+      "Transaction id is not set for " + op + " EditLog.txId=" + txid;
 
+    long start = monotonicNow();
     try {
       editLogStream.write(op);
     } catch (IOException ex) {
@@ -522,7 +528,7 @@ public class FSEditLog implements LogsPurgeable {
     return editLogStream.shouldForceSync();
   }
   
-  private long beginTransaction() {
+  protected void beginTransaction(final FSEditLogOp op) {
     assert Thread.holdsLock(this);
     // get a new transactionId
     txid++;
@@ -532,7 +538,9 @@ public class FSEditLog implements LogsPurgeable {
     //
     TransactionId id = myTransactionId.get();
     id.txid = txid;
-    return monotonicNow();
+    if(op != null) {
+      op.setTransactionId(txid);
+    }
   }
   
   private void endTransaction(long start) {
@@ -649,7 +657,7 @@ public class FSEditLog implements LogsPurgeable {
   }
 
   protected void logSync(long mytxid) {
-    long syncStart = 0;
+    long lastJournalledTxId = HdfsServerConstants.INVALID_TXID;
     boolean sync = false;
     long editsBatchedInSync = 0;
     try {
@@ -676,8 +684,16 @@ public class FSEditLog implements LogsPurgeable {
           // now, this thread will do the sync.  track if other edits were
           // included in the sync - ie. batched.  if this is the only edit
           // synced then the batched count is 0
-          editsBatchedInSync = txid - synctxid - 1;
-          syncStart = txid;
+          lastJournalledTxId = editLogStream.getLastJournalledTxId();
+          LOG.debug("logSync(tx) synctxid={} lastJournalledTxId={} mytxid={}",
+              synctxid, lastJournalledTxId, mytxid);
+          assert lastJournalledTxId <= txid : "lastJournalledTxId exceeds 
txid";
+          // The stream has already been flushed, or there are no active 
streams
+          // We still try to flush up to mytxid
+          if(lastJournalledTxId <= synctxid) {
+            lastJournalledTxId = mytxid;
+          }
+          editsBatchedInSync = lastJournalledTxId - synctxid - 1;
           isSyncRunning = true;
           sync = true;
 
@@ -737,7 +753,7 @@ public class FSEditLog implements LogsPurgeable {
       // Prevent RuntimeException from blocking other log edit sync 
       synchronized (this) {
         if (sync) {
-          synctxid = syncStart;
+          synctxid = lastJournalledTxId;
           for (JournalManager jm : journalSet.getJournalManagers()) {
             /**
              * {@link FileJournalManager#lastReadableTxId} is only meaningful
@@ -745,7 +761,7 @@ public class FSEditLog implements LogsPurgeable {
              * other types of {@link JournalManager}.
              */
             if (jm instanceof FileJournalManager) {
-              ((FileJournalManager)jm).setLastReadableTxId(syncStart);
+              ((FileJournalManager)jm).setLastReadableTxId(synctxid);
             }
           }
           isSyncRunning = false;
@@ -1585,7 +1601,8 @@ public class FSEditLog implements LogsPurgeable {
    * store yet.
    */   
   synchronized void logEdit(final int length, final byte[] data) {
-    long start = beginTransaction();
+    beginTransaction(null);
+    long start = monotonicNow();
 
     try {
       editLogStream.writeRaw(data, 0, length);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
index f6ae5dc..349b1b3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
@@ -119,9 +119,14 @@ class FSEditLogAsync extends FSEditLog implements Runnable 
{
 
   @Override
   void logEdit(final FSEditLogOp op) {
+    assert isOpenForWrite();
+
     Edit edit = getEditInstance(op);
     THREAD_EDIT.set(edit);
-    enqueueEdit(edit);
+    synchronized(this) {
+      enqueueEdit(edit);
+      beginTransaction(op);
+    }
   }
 
   @Override
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
index 51f8b4f..95d9996 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.INVALID_TXID;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.IOException;
@@ -180,9 +181,11 @@ public class JournalSet implements JournalManager {
   final int minimumRedundantJournals;
 
   private boolean closed;
-  
+  private long lastJournalledTxId;
+
   JournalSet(int minimumRedundantResources) {
     this.minimumRedundantJournals = minimumRedundantResources;
+    lastJournalledTxId = INVALID_TXID;
   }
   
   @Override
@@ -432,6 +435,16 @@ public class JournalSet implements JournalManager {
       super();
     }
 
+    /**
+     * Get the last txId journalled in the stream.
+     * The txId is recorded when FSEditLogOp is written to the journal.
+     * JournalSet tracks the txId uniformly for all underlying streams.
+     */
+    @Override
+    public long getLastJournalledTxId() {
+      return lastJournalledTxId;
+    }
+
     @Override
     public void write(final FSEditLogOp op)
         throws IOException {
@@ -443,6 +456,10 @@ public class JournalSet implements JournalManager {
           }
         }
       }, "write op");
+
+      assert lastJournalledTxId < op.txid : "TxId order violation for op=" +
+        op + ", lastJournalledTxId=" + lastJournalledTxId;
+      lastJournalledTxId = op.txid;
     }
 
     @Override
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 733bcb8..200f93c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 
 import java.io.File;
@@ -55,7 +57,12 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.test.Whitebox;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 import static 
org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY;
 
 /**
@@ -315,7 +322,34 @@ public class NameNodeAdapter {
     }
     return spyEditLog;
   }
-  
+
+  /**
+   * Spy on EditLog to delay execution of doEditTransaction() for MkdirOp.
+   */
+  public static FSEditLog spyDelayMkDirTransaction(
+      final NameNode nn, final long delay) {
+    FSEditLog realEditLog = nn.getFSImage().getEditLog();
+    FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog);
+    DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
+    Answer<Boolean> ans = new Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(delay);
+        return (Boolean) invocation.callRealMethod();
+      }
+    };
+    ArgumentMatcher<FSEditLogOp> am = new ArgumentMatcher<FSEditLogOp>() {
+      @Override
+      public boolean matches(Object argument) {
+        FSEditLogOp op = (FSEditLogOp) argument;
+        return op.opCode == FSEditLogOpCodes.OP_MKDIR;
+      }
+    };
+    doAnswer(ans).when(spyEditLog).doEditTransaction(
+        Matchers.argThat(am));
+    return spyEditLog;
+  }
+
   public static JournalSet spyOnJournalSet(NameNode nn) {
     FSEditLog editLog = nn.getFSImage().getEditLog();
     JournalSet js = Mockito.spy(editLog.getJournalSet());
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
index f844eb3..f8adff9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static 
org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 
@@ -53,13 +55,15 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
-import org.mockito.Mockito;
+import org.mockito.ArgumentMatcher;
 import org.slf4j.event.Level;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -286,12 +290,12 @@ public class TestEditLogRace {
 
       File editFile = new File(sd.getCurrentDir(), logFileName);
         
-      System.out.println("Verifying file: " + editFile);
+      LOG.info("Verifying file: " + editFile);
       FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId);
       long numEditsThisLog = loader.loadFSEdits(
           new EditLogFileInputStream(editFile), startTxId);
       
-      System.out.println("Number of edits: " + numEditsThisLog);
+      LOG.info("Number of edits: " + numEditsThisLog);
       assertTrue(numEdits == -1 || numEditsThisLog == numEdits);
       numEdits = numEditsThisLog;
     }
@@ -576,9 +580,29 @@ public class TestEditLogRace {
     }
   }
 
+  static SetOwnerOp getSetOwnerOp(OpInstanceCache cache, String group) {
+    return ((SetOwnerOp)cache.get(OP_SET_OWNER))
+        .setSource("/").setUser("u").setGroup(group);
+  }
+
+  static class BlockingOpMatcher extends ArgumentMatcher<FSEditLogOp> {
+    @Override
+    public boolean matches(Object o) {
+      if(o instanceof FSEditLogOp.SetOwnerOp) {
+        FSEditLogOp.SetOwnerOp op = (FSEditLogOp.SetOwnerOp)o;
+        if("b".equals(op.groupname)) {
+          LOG.info("Blocking op: " + op);
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
   @Test(timeout=180000)
   public void testDeadlock() throws Throwable {
-    GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO);
+    GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(FSEditLogAsync.LOG, Level.DEBUG);
 
     Configuration conf = getConf();
     NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
@@ -591,21 +615,17 @@ public class TestEditLogRace {
 
     ExecutorService executor = Executors.newCachedThreadPool();
     try {
-      final FSEditLog editLog = namesystem.getEditLog();
+      final FSEditLog editLog = spy(namesystem.getEditLog());
+      DFSTestUtil.setEditLogForTesting(namesystem, editLog);
 
-      FSEditLogOp.OpInstanceCache cache = editLog.cache.get();
-      final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache)
-        .setSource("/").setUser("u").setGroup("g");
-      // don't reset fields so instance can be reused.
-      final FSEditLogOp reuseOp = Mockito.spy(op);
-      Mockito.doNothing().when(reuseOp).reset();
+      final OpInstanceCache cache = editLog.cache.get();
 
       // only job is spam edits.  it will fill the queue when the test
       // loop injects the blockingOp.
-      Future[] logSpammers = new Future[16];
+      Future<?>[] logSpammers = new Future<?>[16];
       for (int i=0; i < logSpammers.length; i++) {
         final int ii = i;
-        logSpammers[i] = executor.submit(new Callable() {
+        logSpammers[i] = executor.submit(new Callable<Void>() {
           @Override
           public Void call() throws Exception {
             Thread.currentThread().setName("Log spammer " + ii);
@@ -613,7 +633,7 @@ public class TestEditLogRace {
             startSpamLatch.await();
             for (int i = 0; !done.get() && i < 1000000; i++) {
               // do not logSync here because we need to congest the queue.
-              editLog.logEdit(reuseOp);
+              editLog.logEdit(getSetOwnerOp(cache, "g"));
               if (i % 2048 == 0) {
                 LOG.info("thread[" + ii +"] edits=" + i);
               }
@@ -624,10 +644,9 @@ public class TestEditLogRace {
         });
       }
 
-      // the tx id is set while the edit log monitor is held, so this will
-      // effectively stall the async processing thread which will cause the
-      // edit queue to fill up.
-      final FSEditLogOp blockingOp = Mockito.spy(op);
+      // doEditTransaction is set while the edit log monitor is held, so this
+      // will effectively stall the async processing thread which will cause
+      // the edit queue to fill up.
       doAnswer(
         new Answer<Void>() {
           @Override
@@ -641,9 +660,7 @@ public class TestEditLogRace {
             return null;
           }
         }
-      ).when(blockingOp).setTransactionId(Mockito.anyLong());
-      // don't reset fields so instance can be reused.
-      Mockito.doNothing().when(blockingOp).reset();
+      ).when(editLog).doEditTransaction(argThat(new BlockingOpMatcher()));
 
       // repeatedly overflow the queue and verify it doesn't deadlock.
       for (int i = 0; i < 8; i++) {
@@ -651,10 +668,11 @@ public class TestEditLogRace {
         // spammers to overflow the edit queue, then waits for a permit
         // from blockerSemaphore that will be released at the bottom of
         // this loop.
-        Future blockingEdit = executor.submit(new Callable() {
+        Future<Void> blockingEdit = executor.submit(new Callable<Void>() {
           @Override
           public Void call() throws Exception {
             Thread.currentThread().setName("Log blocker");
+            final FSEditLogOp blockingOp = getSetOwnerOp(cache, "b");
             editLog.logEdit(blockingOp);
             editLog.logSync();
             return null;
@@ -685,7 +703,7 @@ public class TestEditLogRace {
         // what log rolling does), unblock the op currently holding the
         // monitor, and ensure deadlock does not occur.
         CountDownLatch readyLatch = new CountDownLatch(1);
-        Future synchedEdits = executor.submit(new Callable() {
+        Future<Void> synchedEdits = executor.submit(new Callable<Void>() {
           @Override
           public Void call() throws Exception {
             Thread.currentThread().setName("Log synchronizer");
@@ -693,7 +711,7 @@ public class TestEditLogRace {
             // log rolling to deadlock when queue is full.
             readyLatch.countDown();
             synchronized (editLog) {
-              editLog.logEdit(reuseOp);
+              editLog.logEdit(getSetOwnerOp(cache, "g"));
               editLog.logSync();
             }
             return null;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index 8378f3c..4abb2b2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -372,4 +372,16 @@ public abstract class HATestUtil {
     lastSeenStateId.accumulate(stateId);
     return currentStateId;
   }
+
+  /**
+   * Get last seen stateId from the client AlignmentContext.
+   */
+  public static long getLastSeenStateId(DistributedFileSystem dfs)
+      throws Exception {
+    ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
+        ((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
+            dfs.getClient().getNamenode())).getProxyProvider();
+    ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext());
+    return ac.getLastSeenStateId();
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
index dbb5b0c..e2aa243 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
@@ -29,13 +29,18 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -53,12 +58,14 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.TestFsck;
 import org.apache.hadoop.hdfs.tools.GetGroups;
 import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -489,6 +496,117 @@ public class TestObserverNode {
     assertTrue(result.contains("The filesystem under path '/' is CORRUPT"));
   }
 
+  /**
+   * The test models the race of two mkdirs RPC calls on the same path to
+   * Active NameNode. The first arrived call will journal a mkdirs transaction.
+   * The subsequent call hitting the NameNode before the mkdirs transaction is
+   * synced will see that the directory already exists, but will obtain
+   * lastSeenStateId smaller than the txId of the mkdirs transaction
+   * since the latter hasn't been synced yet.
+   * This causes stale read from Observer for the second client.
+   * See HDFS-15915.
+   */
+  @Test
+  public void testMkdirsRaceWithObserverRead() throws Exception {
+    dfs.mkdir(testPath, FsPermission.getDefault());
+    assertSentTo(0);
+    dfsCluster.rollEditLogAndTail(0);
+    dfs.getFileStatus(testPath);
+    assertSentTo(2);
+
+    // Create a spy on FSEditLog, which delays MkdirOp transaction by 100 mec
+    FSEditLog spyEditLog = NameNodeAdapter.spyDelayMkDirTransaction(
+        dfsCluster.getNameNode(0), 100);
+
+    final int numThreads = 4;
+    ClientState[] clientStates = new ClientState[numThreads];
+    final ExecutorService threadPool =
+        HadoopExecutors.newFixedThreadPool(numThreads);
+    final Future<?>[] futures = new Future<?>[numThreads];
+
+    Configuration conf2 = new Configuration(conf);
+    // Disable FS cache so that different DFS clients are used
+    conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+    for (int i = 0; i < numThreads; i++) {
+      clientStates[i] = new ClientState();
+      futures[i] = threadPool.submit(new MkDirRunner(conf2, clientStates[i]));
+    }
+
+    Thread.sleep(150); // wait until mkdir is logged
+    long activStateId =
+        dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId();
+    dfsCluster.rollEditLogAndTail(0);
+    boolean finished = true;
+    // wait for all dispatcher threads to finish
+    for (Future<?> future : futures) {
+      try {
+        future.get();
+      } catch (ExecutionException e) {
+        finished = false;
+        LOG.warn("MkDirRunner thread failed", e.getCause());
+      }
+    }
+    assertTrue("Not all threads finished", finished);
+    threadPool.shutdown();
+
+    assertEquals("Active and Observer stateIds don't match",
+        dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(),
+        dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId());
+    for (int i = 0; i < numThreads; i++) {
+      assertTrue("Client #" + i
+          + " lastSeenStateId=" + clientStates[i].lastSeenStateId
+          + " activStateId=" + activStateId
+          + "\n" + clientStates[i].fnfe,
+          clientStates[i].lastSeenStateId >= activStateId &&
+          clientStates[i].fnfe == null);
+    }
+
+    // Restore edit log
+    Mockito.reset(spyEditLog);
+  }
+
+  static class ClientState {
+    private long lastSeenStateId = -7;
+    private FileNotFoundException fnfe;
+  }
+
+  static class MkDirRunner implements Runnable {
+    private static final Path DIR_PATH =
+        new Path("/TestObserverNode/testMkdirsRaceWithObserverRead");
+
+    private DistributedFileSystem fs;
+    private ClientState clientState;
+
+    MkDirRunner(Configuration conf, ClientState cs) throws IOException {
+      super();
+      fs = (DistributedFileSystem) FileSystem.get(conf);
+      clientState = cs;
+    }
+
+    @Override
+    public void run() {
+      try {
+        fs.mkdirs(DIR_PATH);
+        clientState.lastSeenStateId = HATestUtil.getLastSeenStateId(fs);
+        assertSentTo(fs, 0);
+
+        FileStatus stat = fs.getFileStatus(DIR_PATH);
+        assertSentTo(fs, 2);
+        assertTrue("Should be a directory", stat.isDirectory());
+      } catch (FileNotFoundException ioe) {
+        clientState.fnfe = ioe;
+      } catch (Exception e) {
+        fail("Unexpected exception: " + e);
+      }
+    }
+  }
+
+  private static void assertSentTo(DistributedFileSystem fs, int nnIdx)
+      throws IOException {
+    assertTrue("Request was not sent to the expected namenode " + nnIdx,
+        HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx));
+  }
 
   private void assertSentTo(int nnIdx) throws IOException {
     assertTrue("Request was not sent to the expected namenode " + nnIdx,

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to