HDFS-13961. [SBN read] TestObserverNode refactoring. Contributed by Konstantin 
Shvachko.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dc76e0f4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dc76e0f4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dc76e0f4

Branch: refs/heads/HDFS-12943
Commit: dc76e0f4d6dd9689135d69b5d892d3c895bf7815
Parents: eb75861
Author: Konstantin V Shvachko <s...@apache.org>
Authored: Fri Oct 5 15:03:38 2018 -0700
Committer: Konstantin V Shvachko <s...@apache.org>
Committed: Fri Oct 5 15:03:38 2018 -0700

----------------------------------------------------------------------
 .../hdfs/server/namenode/ha/EditLogTailer.java  |   2 +-
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |  10 +-
 .../hdfs/TestStateAlignmentContextWithHA.java   |   6 +-
 .../hadoop/hdfs/qjournal/MiniQJMHACluster.java  |   4 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   6 +
 .../hdfs/server/namenode/ha/HATestUtil.java     |  84 +++-
 .../ha/TestConsistentReadsObserver.java         | 182 +++++++++
 .../namenode/ha/TestMultiObserverNode.java      | 155 ++++++++
 .../server/namenode/ha/TestObserverNode.java    | 387 +++++--------------
 9 files changed, 517 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc76e0f4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
index 780a0f6..f490393 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
@@ -281,7 +281,7 @@ public class EditLogTailer {
   }
   
   @VisibleForTesting
-  void doTailEdits() throws IOException, InterruptedException {
+  public void doTailEdits() throws IOException, InterruptedException {
     // Write lock needs to be interruptible here because the 
     // transitionToActive RPC takes the write lock before calling
     // tailer.stop() -- so if we're not interruptible, it will

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc76e0f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index f60546c..8d88361 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -2599,8 +2599,14 @@ public class MiniDFSCluster implements AutoCloseable {
     getNameNode(nnIndex).getRpcServer().transitionToObserver(
         new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED));
   }
-  
-  
+
+  public void rollEditLogAndTail(int nnIndex) throws Exception {
+    getNameNode(nnIndex).getRpcServer().rollEditLog();
+    for (int i = 2; i < getNumNameNodes(); i++) {
+      getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits();
+    }
+  }
+
   public void triggerBlockReports()
       throws IOException {
     for (DataNode dn : getDataNodes()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc76e0f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
index a494252..a642872 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
@@ -100,10 +100,8 @@ public class TestStateAlignmentContextWithHA {
     cluster.transitionToActive(0);
     cluster.transitionToObserver(2);
 
-    String nameservice = HATestUtil.getLogicalHostname(cluster);
-    HATestUtil.setFailoverConfigurations(cluster, CONF, nameservice, 0);
-    CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
-        "." + nameservice, ORPPwithAlignmentContexts.class.getName());
+    HATestUtil.setupHAConfiguration(
+        cluster, CONF, 0, ORPPwithAlignmentContexts.class);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc76e0f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
index 6a68bd4..3ece3d7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 
 import java.io.IOException;
@@ -171,7 +172,8 @@ public class MiniQJMHACluster {
     }
 
     // use standard failover configurations
-    HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns);
+    HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns,
+        ConfiguredFailoverProxyProvider.class);
     return conf;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc76e0f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 9a94554..ebd5faf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.mockito.Mockito.spy;
@@ -176,6 +177,11 @@ public class NameNodeAdapter {
     return l == null ? -1 : l.getLastUpdate();
   }
 
+
+  public static HAServiceState getServiceState(NameNode nn) {
+    return nn.getServiceState();
+  }
+
   /**
    * Return the datanode descriptor for the given datanode.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc76e0f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index cc5b3d4..f4a766d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSUtil.createUri;
 
 import java.io.IOException;
+import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -32,6 +34,7 @@ import java.util.concurrent.TimeoutException;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,10 +45,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryInvocationHandler;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 
@@ -158,17 +163,66 @@ public abstract class HATestUtil {
     FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
     return (DistributedFileSystem)fs;
   }
-  
+
   public static DistributedFileSystem configureObserverReadFs(
       MiniDFSCluster cluster, Configuration conf,
-      int nsIndex) throws IOException, URISyntaxException {
+      boolean isObserverReadEnabled)
+          throws IOException, URISyntaxException {
     conf = new Configuration(conf);
-    String logicalName = getLogicalHostname(cluster);
-    setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
-    conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." +
-        logicalName, ObserverReadProxyProvider.class.getName());
-    FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
-    return (DistributedFileSystem) fs;
+    setupHAConfiguration(cluster, conf, 0, ObserverReadProxyProvider.class);
+    DistributedFileSystem dfs = (DistributedFileSystem)
+        FileSystem.get(getLogicalUri(cluster), conf);
+    ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
+        ((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
+            dfs.getClient().getNamenode())).getProxyProvider();
+    provider.setObserverReadEnabled(isObserverReadEnabled);
+    return dfs;
+  }
+
+  public static boolean isSentToAnyOfNameNodes(
+      DistributedFileSystem dfs,
+      MiniDFSCluster cluster, int... nnIndices) throws IOException {
+    ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
+        ((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
+            dfs.getClient().getNamenode())).getProxyProvider();
+    FailoverProxyProvider.ProxyInfo<?> pi = provider.getLastProxy();
+    for (int nnIdx : nnIndices) {
+      if (pi.proxyInfo.equals(
+          cluster.getNameNode(nnIdx).getNameNodeAddress().toString())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static MiniQJMHACluster setUpObserverCluster(
+      Configuration conf, int numObservers) throws IOException {
+    MiniQJMHACluster qjmhaCluster = new MiniQJMHACluster.Builder(conf)
+        .setNumNameNodes(2 + numObservers)
+        .build();
+    MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();
+
+    dfsCluster.transitionToActive(0);
+    dfsCluster.waitActive(0);
+
+    for (int i = 0; i < numObservers; i++) {
+      dfsCluster.transitionToObserver(2 + i);
+    }
+    return qjmhaCluster;
+  }
+
+  public static <P extends FailoverProxyProvider<?>>
+  void setupHAConfiguration(MiniDFSCluster cluster,
+      Configuration conf, int nsIndex, Class<P> classFPP) {
+    MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
+    List<String> nnAddresses = new ArrayList<String>();
+    for (MiniDFSCluster.NameNodeInfo nn : nns) {
+      InetSocketAddress addr = nn.nameNode.getNameNodeAddress();
+      nnAddresses.add(
+          createUri(HdfsConstants.HDFS_URI_SCHEME, addr).toString());
+    }
+    setFailoverConfigurations(
+        conf, getLogicalHostname(cluster), nnAddresses, classFPP);
   }
 
   public static void setFailoverConfigurations(MiniDFSCluster cluster,
@@ -211,11 +265,13 @@ public abstract class HATestUtil {
           public String apply(InetSocketAddress addr) {
             return "hdfs://" + addr.getHostName() + ":" + addr.getPort();
           }
-        }));
+        }), ConfiguredFailoverProxyProvider.class);
   }
 
-  public static void setFailoverConfigurations(Configuration conf, String 
logicalName,
-      Iterable<String> nnAddresses) {
+  public static <P extends FailoverProxyProvider<?>>
+  void setFailoverConfigurations(
+      Configuration conf, String logicalName,
+      Iterable<String> nnAddresses, Class<P> classFPP) {
     List<String> nnids = new ArrayList<String>();
     int i = 0;
     for (String address : nnAddresses) {
@@ -227,8 +283,8 @@ public abstract class HATestUtil {
     conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
     conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName),
         Joiner.on(',').join(nnids));
-    conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + 
logicalName,
-        ConfiguredFailoverProxyProvider.class.getName());
+    conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+        + "." + logicalName, classFPP.getName());
     conf.set("fs.defaultFS", "hdfs://" + logicalName);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc76e0f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
new file mode 100644
index 0000000..26ad3a2
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test consistency of reads while accessing an ObserverNode.
+ * The tests are based on traditional (non fast path) edits tailing.
+ */
+public class TestConsistentReadsObserver {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestConsistentReadsObserver.class.getName());
+
+  private static Configuration conf;
+  private static MiniQJMHACluster qjmhaCluster;
+  private static MiniDFSCluster dfsCluster;
+  private static DistributedFileSystem dfs;
+
+  private final Path testPath= new Path("/TestConsistentReadsObserver");
+
+  @BeforeClass
+  public static void startUpCluster() throws Exception {
+    conf = new Configuration();
+    // disable block scanner
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+    // disable fast tailing here because this test's assertions are based on 
the
+    // timing of explicitly called rollEditLogAndTail. Although this means this
+    // test takes some time to run
+    // TODO: revisit if there is a better way.
+    conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
+
+    // disable fast tailing so that coordination takes time.
+    conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS);
+    conf.setTimeDuration(DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS);
+
+    qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1);
+    dfsCluster = qjmhaCluster.getDfsCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    setObserverRead(true);
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    dfs.delete(testPath, true);
+  }
+
+  @AfterClass
+  public static void shutDownCluster() throws IOException {
+    if (qjmhaCluster != null) {
+      qjmhaCluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testMsyncSimple() throws Exception {
+    // 0 == not completed, 1 == succeeded, -1 == failed
+    AtomicInteger readStatus = new AtomicInteger(0);
+
+    dfs.mkdir(testPath, FsPermission.getDefault());
+    assertSentTo(0);
+
+    Thread reader = new Thread(() -> {
+      try {
+        // this read will block until roll and tail edits happen.
+        dfs.getFileStatus(testPath);
+        readStatus.set(1);
+      } catch (IOException e) {
+        e.printStackTrace();
+        readStatus.set(-1);
+      }
+    });
+
+    reader.start();
+    // the reader is still blocking, not succeeded yet.
+    assertEquals(0, readStatus.get());
+    dfsCluster.rollEditLogAndTail(0);
+    // wait a while for all the change to be done
+    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
+    // the reader should have succeed.
+    assertEquals(1, readStatus.get());
+  }
+
+  // @Ignore("Move to another test file")
+  @Test
+  public void testUncoordinatedCall() throws Exception {
+    // make a write call so that client will be ahead of
+    // observer for now.
+    dfs.mkdir(testPath, FsPermission.getDefault());
+
+    // a status flag, initialized to 0, after reader finished, this will be
+    // updated to 1, -1 on error
+    AtomicInteger readStatus = new AtomicInteger(0);
+
+    // create a separate thread to make a blocking read.
+    Thread reader = new Thread(() -> {
+      try {
+        // this read call will block until server state catches up. But due to
+        // configuration, this will take a very long time.
+        dfs.getClient().getFileInfo("/");
+        readStatus.set(1);
+        fail("Should have been interrupted before getting here.");
+      } catch (IOException e) {
+        e.printStackTrace();
+        readStatus.set(-1);
+      }
+    });
+    reader.start();
+
+    long before = Time.now();
+    dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
+    long after = Time.now();
+
+    // should succeed immediately, because datanodeReport is marked an
+    // uncoordinated call, and will not be waiting for server to catch up.
+    assertTrue(after - before < 200);
+    // by this time, reader thread should still be blocking, so the status not
+    // updated
+    assertEquals(0, readStatus.get());
+    Thread.sleep(5000);
+    // reader thread status should still be unchanged after 5 sec...
+    assertEquals(0, readStatus.get());
+    // and the reader thread is not dead, so it must be still waiting
+    assertEquals(Thread.State.WAITING, reader.getState());
+    reader.interrupt();
+  }
+
+  private void assertSentTo(int nnIdx) throws IOException {
+    assertTrue("Request was not sent to the expected namenode " + nnIdx,
+        HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
+  }
+
+  private static void setObserverRead(boolean flag) throws Exception {
+    dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc76e0f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
new file mode 100644
index 0000000..ab1251e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests multiple ObserverNodes.
+ */
+public class TestMultiObserverNode {
+  private static Configuration conf;
+  private static MiniQJMHACluster qjmhaCluster;
+  private static MiniDFSCluster dfsCluster;
+  private static DistributedFileSystem dfs;
+
+  private final Path testPath= new Path("/TestMultiObserverNode");
+
+  @BeforeClass
+  public static void startUpCluster() throws Exception {
+    conf = new Configuration();
+    // disable block scanner
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+    conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+    conf.setTimeDuration(
+        DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
+
+    qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2);
+    dfsCluster = qjmhaCluster.getDfsCluster();
+    dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, true);
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    dfs.delete(testPath, true);
+  }
+
+  @AfterClass
+  public static void shutDownCluster() throws IOException {
+    if (qjmhaCluster != null) {
+      qjmhaCluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testObserverFailover() throws Exception {
+    dfs.mkdir(testPath, FsPermission.getDefault());
+    dfsCluster.rollEditLogAndTail(0);
+    dfs.getFileStatus(testPath);
+    assertSentTo(2, 3);
+
+    // Transition observer #2 to standby, request should go to the #3.
+    dfsCluster.transitionToStandby(2);
+    dfs.getFileStatus(testPath);
+    assertSentTo(3);
+
+    // Transition observer #3 to standby, request should go to active
+    dfsCluster.transitionToStandby(3);
+    dfs.getFileStatus(testPath);
+    assertSentTo(0);
+
+    // Transition #2 back to observer, request should go to #2
+    dfsCluster.transitionToObserver(2);
+    dfs.getFileStatus(testPath);
+    assertSentTo(2);
+
+    // Transition #3 back to observer, request should go to either #2 or #3
+    dfsCluster.transitionToObserver(3);
+    dfs.getFileStatus(testPath);
+    assertSentTo(2, 3);
+  }
+
+  @Test
+  public void testMultiObserver() throws Exception {
+    Path testPath2 = new Path(testPath, "test2");
+    Path testPath3 = new Path(testPath, "test3");
+    dfs.mkdir(testPath, FsPermission.getDefault());
+    assertSentTo(0);
+
+    dfsCluster.rollEditLogAndTail(0);
+    dfs.getFileStatus(testPath);
+    assertSentTo(2, 3);
+
+    dfs.mkdir(testPath2, FsPermission.getDefault());
+    dfsCluster.rollEditLogAndTail(0);
+
+    // Shutdown first observer, request should go to the second one
+    dfsCluster.shutdownNameNode(2);
+    dfs.listStatus(testPath2);
+    assertSentTo(3);
+
+    // Restart the first observer
+    dfsCluster.restartNameNode(2);
+    dfs.listStatus(testPath);
+    assertSentTo(3);
+
+    dfsCluster.transitionToObserver(2);
+    dfs.listStatus(testPath);
+    assertSentTo(2, 3);
+
+    dfs.mkdir(testPath3, FsPermission.getDefault());
+    dfsCluster.rollEditLogAndTail(0);
+
+    // Now shutdown the second observer, request should go to the first one
+    dfsCluster.shutdownNameNode(3);
+    dfs.listStatus(testPath3);
+    assertSentTo(2);
+
+    // Shutdown both, request should go to active
+    dfsCluster.shutdownNameNode(2);
+    dfs.listStatus(testPath3);
+    assertSentTo(0);
+
+    dfsCluster.restartNameNode(2);
+    dfsCluster.transitionToObserver(2);
+    dfsCluster.restartNameNode(3);
+    dfsCluster.transitionToObserver(3);
+  }
+
+  private void assertSentTo(int... nnIndices) throws IOException {
+    assertTrue("Request was not sent to any of the expected namenodes.",
+        HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc76e0f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
index c9e79fa..2c826e6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
@@ -17,83 +17,94 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
+import static 
org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryInvocationHandler;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Proxy;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+/**
+ * Test main functionality of ObserverNode.
+ */
+public class TestObserverNode {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestObserverNode.class.getName());
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyShort;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
+  private static Configuration conf;
+  private static MiniQJMHACluster qjmhaCluster;
+  private static MiniDFSCluster dfsCluster;
+  private static DistributedFileSystem dfs;
 
-// Main unit tests for ObserverNode
-public class TestObserverNode {
-  private Configuration conf;
-  private MiniQJMHACluster qjmhaCluster;
-  private MiniDFSCluster dfsCluster;
-  private NameNode[] namenodes;
-  private Path testPath;
-  private Path testPath2;
-  private Path testPath3;
-
-  /** These are set in each individual test case */
-  private DistributedFileSystem dfs;
-  private ObserverReadProxyProvider<?> provider;
+  private final Path testPath= new Path("/TestObserverNode");
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void startUpCluster() throws Exception {
     conf = new Configuration();
+    // disable block scanner
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
     conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
     conf.setTimeDuration(
         DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
 
-    testPath = new Path("/test");
-    testPath2 = new Path("/test2");
-    testPath3 = new Path("/test3");
+    qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1);
+    dfsCluster = qjmhaCluster.getDfsCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    setObserverRead(true);
   }
 
   @After
   public void cleanUp() throws IOException {
+    dfs.delete(testPath, true);
+    assertEquals("NN[0] should be active", HAServiceState.ACTIVE,
+        getServiceState(dfsCluster.getNameNode(0)));
+    assertEquals("NN[1] should be standby", HAServiceState.STANDBY,
+        getServiceState(dfsCluster.getNameNode(1)));
+    assertEquals("NN[2] should be observer", HAServiceState.OBSERVER,
+        getServiceState(dfsCluster.getNameNode(2)));
+  }
+
+  @AfterClass
+  public static void shutDownCluster() throws IOException {
     if (qjmhaCluster != null) {
       qjmhaCluster.shutdown();
     }
@@ -101,13 +112,12 @@ public class TestObserverNode {
 
   @Test
   public void testSimpleRead() throws Exception {
-    setUpCluster(1);
-    setObserverRead(true);
+    Path testPath2 = new Path(testPath, "test2");
 
     dfs.mkdir(testPath, FsPermission.getDefault());
     assertSentTo(0);
 
-    rollEditLogAndTail(0);
+    dfsCluster.rollEditLogAndTail(0);
     dfs.getFileStatus(testPath);
     assertSentTo(2);
 
@@ -117,7 +127,7 @@ public class TestObserverNode {
 
   @Test
   public void testFailover() throws Exception {
-    setUpCluster(1);
+    Path testPath2 = new Path(testPath, "test2");
     setObserverRead(false);
 
     dfs.mkdir(testPath, FsPermission.getDefault());
@@ -127,23 +137,26 @@ public class TestObserverNode {
 
     dfsCluster.transitionToStandby(0);
     dfsCluster.transitionToActive(1);
-    dfsCluster.waitActive();
+    dfsCluster.waitActive(1);
 
     dfs.mkdir(testPath2, FsPermission.getDefault());
     assertSentTo(1);
     dfs.getFileStatus(testPath);
     assertSentTo(1);
+
+    dfsCluster.transitionToStandby(1);
+    dfsCluster.transitionToActive(0);
+    dfsCluster.waitActive(0);
   }
 
   @Test
   public void testDoubleFailover() throws Exception {
-    setUpCluster(1);
-    setObserverRead(true);
-
+    Path testPath2 = new Path(testPath, "test2");
+    Path testPath3 = new Path(testPath, "test3");
     dfs.mkdir(testPath, FsPermission.getDefault());
     assertSentTo(0);
 
-    rollEditLogAndTail(0);
+    dfsCluster.rollEditLogAndTail(0);
     dfs.getFileStatus(testPath);
     assertSentTo(2);
     dfs.mkdir(testPath2, FsPermission.getDefault());
@@ -153,7 +166,7 @@ public class TestObserverNode {
     dfsCluster.transitionToActive(1);
     dfsCluster.waitActive(1);
 
-    rollEditLogAndTail(1);
+    dfsCluster.rollEditLogAndTail(1);
     dfs.getFileStatus(testPath2);
     assertSentTo(2);
     dfs.mkdir(testPath3, FsPermission.getDefault());
@@ -163,7 +176,7 @@ public class TestObserverNode {
     dfsCluster.transitionToActive(0);
     dfsCluster.waitActive(0);
 
-    rollEditLogAndTail(0);
+    dfsCluster.rollEditLogAndTail(0);
     dfs.getFileStatus(testPath3);
     assertSentTo(2);
     dfs.delete(testPath3, false);
@@ -171,43 +184,9 @@ public class TestObserverNode {
   }
 
   @Test
-  public void testObserverFailover() throws Exception {
-    setUpCluster(2);
-    setObserverRead(true);
-
-    dfs.mkdir(testPath, FsPermission.getDefault());
-    rollEditLogAndTail(0);
-    dfs.getFileStatus(testPath);
-    assertSentToAny(2, 3);
-
-    // Transition observer #2 to standby, request should go to the #3.
-    dfsCluster.transitionToStandby(2);
-    dfs.getFileStatus(testPath);
-    assertSentTo(3);
-
-    // Transition observer #3 to standby, request should go to active
-    dfsCluster.transitionToStandby(3);
-    dfs.getFileStatus(testPath);
-    assertSentTo(0);
-
-    // Transition #2 back to observer, request should go to #2
-    dfsCluster.transitionToObserver(2);
-    dfs.getFileStatus(testPath);
-    assertSentTo(2);
-
-    // Transition #3 back to observer, request should go to either #2 or #3
-    dfsCluster.transitionToObserver(3);
-    dfs.getFileStatus(testPath);
-    assertSentToAny(2, 3);
-  }
-
-  @Test
   public void testObserverShutdown() throws Exception {
-    setUpCluster(1);
-    setObserverRead(true);
-
     dfs.mkdir(testPath, FsPermission.getDefault());
-    rollEditLogAndTail(0);
+    dfsCluster.rollEditLogAndTail(0);
     dfs.getFileStatus(testPath);
     assertSentTo(2);
 
@@ -228,18 +207,14 @@ public class TestObserverNode {
 
   @Test
   public void testObserverFailOverAndShutdown() throws Exception {
-    setUpCluster(1);
-    // Test the case when there is a failover before ONN shutdown
-    setObserverRead(true);
-
     dfs.mkdir(testPath, FsPermission.getDefault());
-    rollEditLogAndTail(0);
+    dfsCluster.rollEditLogAndTail(0);
     dfs.getFileStatus(testPath);
     assertSentTo(2);
 
     dfsCluster.transitionToStandby(0);
     dfsCluster.transitionToActive(1);
-    dfsCluster.waitActive();
+    dfsCluster.waitActive(1);
 
     // Shutdown the observer - requests should go to active
     dfsCluster.shutdownNameNode(2);
@@ -257,54 +232,14 @@ public class TestObserverNode {
     // the second will properly go to the observer
     dfs.getFileStatus(testPath);
     assertSentTo(2);
-  }
-
-  @Test
-  public void testMultiObserver() throws Exception {
-    setUpCluster(2);
-    setObserverRead(true);
-
-    dfs.mkdir(testPath, FsPermission.getDefault());
-    assertSentTo(0);
-
-    rollEditLogAndTail(0);
-    dfs.getFileStatus(testPath);
-    assertSentToAny(2, 3);
-
-    dfs.mkdir(testPath2, FsPermission.getDefault());
-    rollEditLogAndTail(0);
 
-    // Shutdown first observer, request should go to the second one
-    dfsCluster.shutdownNameNode(2);
-    dfs.listStatus(testPath2);
-    assertSentTo(3);
-
-    // Restart the first observer
-    dfsCluster.restartNameNode(2);
-    dfs.listStatus(testPath);
-    assertSentTo(3);
-
-    dfsCluster.transitionToObserver(2);
-    dfs.listStatus(testPath);
-    assertSentToAny(2, 3);
-
-    dfs.mkdir(testPath3, FsPermission.getDefault());
-    rollEditLogAndTail(0);
-
-    // Now shutdown the second observer, request should go to the first one
-    dfsCluster.shutdownNameNode(3);
-    dfs.listStatus(testPath3);
-    assertSentTo(2);
-
-    // Shutdown both, request should go to active
-    dfsCluster.shutdownNameNode(2);
-    dfs.listStatus(testPath3);
-    assertSentTo(0);
+    dfsCluster.transitionToStandby(1);
+    dfsCluster.transitionToActive(0);
+    dfsCluster.waitActive(0);
   }
 
   @Test
   public void testBootstrap() throws Exception {
-    setUpCluster(1);
     for (URI u : dfsCluster.getNameDirs(2)) {
       File dir = new File(u.getPath());
       assertTrue(FileUtil.fullyDelete(dir));
@@ -323,20 +258,12 @@ public class TestObserverNode {
    */
   @Test
   public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
-    setUpCluster(1);
-    setObserverRead(true);
-
-    // Avoid starting DNs for the mini cluster.
-    BlockManager bmSpy = NameNodeAdapter.spyOnBlockManager(namenodes[0]);
-    doNothing().when(bmSpy)
-        .verifyReplication(anyString(), anyShort(), anyString());
-
     // Create a new file - the request should go to active.
-    dfs.createNewFile(testPath);
+    dfs.create(testPath, (short)1).close();
     assertSentTo(0);
 
-    rollEditLogAndTail(0);
-    dfs.open(testPath);
+    dfsCluster.rollEditLogAndTail(0);
+    dfs.open(testPath).close();
     assertSentTo(2);
 
     // Set observer to safe mode.
@@ -345,7 +272,8 @@ public class TestObserverNode {
     // Mock block manager for observer to generate some fake blocks which
     // will trigger the (retriable) safe mode exception.
     final DatanodeInfo[] empty = {};
-    bmSpy = NameNodeAdapter.spyOnBlockManager(namenodes[2]);
+    BlockManager bmSpy =
+        NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2));
     doAnswer((invocation) -> {
       ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
       LocatedBlock fakeBlock = new LocatedBlock(b, empty);
@@ -357,158 +285,23 @@ public class TestObserverNode {
 
     // Open the file again - it should throw retriable exception and then
     // failover to active.
-    dfs.open(testPath);
+    dfs.open(testPath).close();
     assertSentTo(0);
 
     // Remove safe mode on observer, request should still go to it.
     dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
-    dfs.open(testPath);
+    dfs.open(testPath).close();
     assertSentTo(2);
-  }
-
-  // TODO this does not currently work because fetching the service state from
-  // e.g. the StandbyNameNode also waits for the transaction ID to catch up.
-  // This is disabled pending HDFS-13872 and HDFS-13749.
-  @Ignore("Disabled until HDFS-13872 and HDFS-13749 are committed")
-  @Test
-  public void testMsyncSimple() throws Exception {
-    // disable fast path here because this test's assertions are based on the
-    // timing of explicitly called rollEditLogAndTail. Although this means this
-    // test takes some time to run
-    // TODO: revisit if there is a better way.
-    conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
-    conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 60, TimeUnit.SECONDS);
-    conf.setTimeDuration(
-        DFS_HA_TAILEDITS_PERIOD_KEY, 30, TimeUnit.SECONDS);
-    setUpCluster(1);
-    setObserverRead(true);
-
-    // 0 == not completed, 1 == succeeded, -1 == failed
-    AtomicInteger readStatus = new AtomicInteger(0);
-
-    dfs.mkdir(testPath, FsPermission.getDefault());
-    assertSentTo(0);
-
-    Thread reader = new Thread(() -> {
-      try {
-        // this read will block until roll and tail edits happen.
-        dfs.getFileStatus(testPath);
-        readStatus.set(1);
-      } catch (IOException e) {
-        e.printStackTrace();
-        readStatus.set(-1);
-      }
-    });
-
-    reader.start();
-    // the reader is still blocking, not succeeded yet.
-    assertEquals(0, readStatus.get());
-    rollEditLogAndTail(0);
-    // wait a while for all the change to be done
-    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
-    // the reader should have succeed.
-    assertEquals(1, readStatus.get());
-  }
-
-  @Test
-  public void testUncoordinatedCall() throws Exception {
-    // disable fast tailing so that coordination takes time.
-    conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
-    conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS);
-    conf.setTimeDuration(
-        DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS);
-    setUpCluster(1);
-    setObserverRead(true);
-
-    // make a write call so that client will be ahead of
-    // observer for now.
-    dfs.mkdir(testPath, FsPermission.getDefault());
 
-    // a status flag, initialized to 0, after reader finished, this will be
-    // updated to 1, -1 on error
-    AtomicInteger readStatus = new AtomicInteger(0);
-
-    // create a separate thread to make a blocking read.
-    Thread reader = new Thread(() -> {
-      try {
-        // this read call will block until server state catches up. But due to
-        // configuration, this will take a very long time.
-        dfs.getClient().getFileInfo("/");
-        readStatus.set(1);
-        fail("Should have been interrupted before getting here.");
-      } catch (IOException e) {
-        e.printStackTrace();
-        readStatus.set(-1);
-      }
-    });
-    reader.start();
-
-    long before = System.currentTimeMillis();
-    dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
-    long after = System.currentTimeMillis();
-
-    // should succeed immediately, because datanodeReport is marked an
-    // uncoordinated call, and will not be waiting for server to catch up.
-    assertTrue(after - before < 200);
-    // by this time, reader thread should still be blocking, so the status not
-    // updated
-    assertEquals(0, readStatus.get());
-    Thread.sleep(5000);
-    // reader thread status should still be unchanged after 5 sec...
-    assertEquals(0, readStatus.get());
-    // and the reader thread is not dead, so it must be still waiting
-    assertEquals(Thread.State.WAITING, reader.getState());
-    reader.interrupt();
+    Mockito.reset(bmSpy);
   }
 
-  private void setUpCluster(int numObservers) throws Exception {
-    qjmhaCluster = new MiniQJMHACluster.Builder(conf)
-        .setNumNameNodes(2 + numObservers)
-        .build();
-    dfsCluster = qjmhaCluster.getDfsCluster();
-
-    namenodes = new NameNode[2 + numObservers];
-    for (int i = 0; i < namenodes.length; i++) {
-      namenodes[i] = dfsCluster.getNameNode(i);
-    }
-
-    dfsCluster.transitionToActive(0);
-    dfsCluster.waitActive(0);
-
-    for (int i = 0; i < numObservers; i++) {
-      dfsCluster.transitionToObserver(2 + i);
-    }
+  private void assertSentTo(int nnIdx) throws IOException {
+    assertTrue("Request was not sent to the expected namenode " + nnIdx,
+        HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
   }
 
-  private void assertSentTo(int nnIdx) {
-    assertSentToAny(nnIdx);
-  }
-
-  private void assertSentToAny(int... nnIndices) {
-    FailoverProxyProvider.ProxyInfo<?> pi = provider.getLastProxy();
-    for (int nnIdx : nnIndices) {
-      if (pi.proxyInfo.equals(
-          dfsCluster.getNameNode(nnIdx).getNameNodeAddress().toString())) {
-        return;
-      }
-    }
-    fail("Request was not sent to any of the expected namenodes");
-  }
-
-  private void setObserverRead(boolean flag) throws Exception {
-    dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, 0);
-    RetryInvocationHandler<?> handler =
-        (RetryInvocationHandler<?>) Proxy.getInvocationHandler(
-            dfs.getClient().getNamenode());
-    provider = (ObserverReadProxyProvider<?>) handler.getProxyProvider();
-    provider.setObserverReadEnabled(flag);
-  }
-
-  private void rollEditLogAndTail(int indexForActiveNN) throws Exception {
-    dfsCluster.getNameNode(indexForActiveNN).getRpcServer().rollEditLog();
-    for (int i = 2; i < namenodes.length; i++) {
-      dfsCluster.getNameNode(i).getNamesystem().getEditLogTailer()
-          .doTailEdits();
-    }
+  private static void setObserverRead(boolean flag) throws Exception {
+    dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, flag);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to