http://git-wip-us.apache.org/repos/asf/hadoop/blob/441dfa48/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
index d07bb45..a8f7990 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
@@ -30,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +42,10 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
@@ -58,6 +65,7 @@ public class TestDataNodeVolumeFailureReporting {
   private MiniDFSCluster cluster;
   private Configuration conf;
   private String dataDir;
+  private long volumeCapacity;
 
   // Sleep at least 3 seconds (a 1s heartbeat plus padding) to allow
   // for heartbeats to propagate from the datanodes to the namenode.
@@ -69,29 +77,29 @@ public class TestDataNodeVolumeFailureReporting {
 
   @Before
   public void setUp() throws Exception {
-    conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
-    /*
-     * Lower the DN heartbeat, DF rate, and recheck interval to one second
-     * so state about failures and datanode death propagates faster.
-     */
-    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
-    conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
1000);
+    // These tests simulate volume failures by denying execute permission on 
the
+    // volume's path.  On Windows, the owner of an object is always allowed
+    // access, so we can't run these tests on Windows.
+    assumeTrue(!Path.WINDOWS);
     // Allow a single volume failure (there are two volumes)
-    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    fs = cluster.getFileSystem();
-    dataDir = cluster.getDataDirectory();
+    initCluster(1, 2, 1);
   }
 
   @After
   public void tearDown() throws Exception {
-    for (int i = 0; i < 3; i++) {
-      FileUtil.setExecutable(new File(dataDir, "data"+(2*i+1)), true);
-      FileUtil.setExecutable(new File(dataDir, "data"+(2*i+2)), true);
+    // Restore executable permission on all directories where a failure may 
have
+    // been simulated by denying execute access.  This is based on the maximum
+    // number of datanodes and the maximum number of storages per data node 
used
+    // throughout the tests in this suite.
+    int maxDataNodes = 3;
+    int maxStoragesPerDataNode = 4;
+    for (int i = 0; i < maxDataNodes; i++) {
+      for (int j = 1; j <= maxStoragesPerDataNode; j++) {
+        String subDir = "data" + ((i * maxStoragesPerDataNode) + j);
+        FileUtil.setExecutable(new File(dataDir, subDir), true);
+      }
     }
+    IOUtils.cleanup(LOG, fs);
     cluster.shutdown();
   }
 
@@ -102,8 +110,6 @@ public class TestDataNodeVolumeFailureReporting {
    */
   @Test
   public void testSuccessiveVolumeFailures() throws Exception {
-    assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
-
     // Bring up two more datanodes
     cluster.startDataNodes(conf, 2, true, null, null);
     cluster.waitActive();
@@ -151,12 +157,9 @@ public class TestDataNodeVolumeFailureReporting {
     /*
      * The metrics should confirm the volume failures.
      */
-    assertCounter("VolumeFailures", 1L, 
-        getMetrics(dns.get(0).getMetrics().name()));
-    assertCounter("VolumeFailures", 1L, 
-        getMetrics(dns.get(1).getMetrics().name()));
-    assertCounter("VolumeFailures", 0L, 
-        getMetrics(dns.get(2).getMetrics().name()));
+    checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(2), 0, true);
 
     // Ensure we wait a sufficient amount of time
     assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
@@ -164,6 +167,10 @@ public class TestDataNodeVolumeFailureReporting {
     // Eventually the NN should report two volume failures
     DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, 
         origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(2), true);
 
     /*
      * Now fail a volume on the third datanode. We should be able to get
@@ -174,17 +181,10 @@ public class TestDataNodeVolumeFailureReporting {
     DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
     DFSTestUtil.waitReplication(fs, file2, (short)3);
     assertTrue("DN3 should still be up", dns.get(2).isDatanodeUp());
-    assertCounter("VolumeFailures", 1L, 
-        getMetrics(dns.get(2).getMetrics().name()));
-
-    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-    dm.fetchDatanodes(live, dead, false);
-    live.clear();
-    dead.clear();
-    dm.fetchDatanodes(live, dead, false);
-    assertEquals("DN3 should have 1 failed volume",
-        1, live.get(2).getVolumeFailures());
+    checkFailuresAtDataNode(dns.get(2), 1, true, dn3Vol1.getAbsolutePath());
+
+    DataNodeTestUtils.triggerHeartbeat(dns.get(2));
+    checkFailuresAtNameNode(dm, dns.get(2), true, dn3Vol1.getAbsolutePath());
 
     /*
      * Once the datanodes have a chance to heartbeat their new capacity the
@@ -194,6 +194,10 @@ public class TestDataNodeVolumeFailureReporting {
     dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
     DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3, 
         origCapacity - (3*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 3);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(2), true, dn3Vol1.getAbsolutePath());
 
     /*
      * Now fail the 2nd volume on the 3rd datanode. All its volumes
@@ -210,12 +214,15 @@ public class TestDataNodeVolumeFailureReporting {
     DFSTestUtil.waitForDatanodeDeath(dns.get(2));
 
     // And report two failed volumes
-    assertCounter("VolumeFailures", 2L, 
-        getMetrics(dns.get(2).getMetrics().name()));
+    checkFailuresAtDataNode(dns.get(2), 2, true, dn3Vol1.getAbsolutePath(),
+        dn3Vol2.getAbsolutePath());
 
     // The NN considers the DN dead
     DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2, 
         origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
 
     /*
      * The datanode never tries to restore the failed volume, even if
@@ -240,6 +247,11 @@ public class TestDataNodeVolumeFailureReporting {
      */
     DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity, 
         WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 0);
+    dns = cluster.getDataNodes();
+    checkFailuresAtNameNode(dm, dns.get(0), true);
+    checkFailuresAtNameNode(dm, dns.get(1), true);
+    checkFailuresAtNameNode(dm, dns.get(2), true);
   }
 
   /**
@@ -247,8 +259,6 @@ public class TestDataNodeVolumeFailureReporting {
    */
   @Test
   public void testVolFailureStatsPreservedOnNNRestart() throws Exception {
-    assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
-
     // Bring up two more datanodes that can tolerate 1 failure
     cluster.startDataNodes(conf, 2, true, null, null);
     cluster.waitActive();
@@ -268,15 +278,346 @@ public class TestDataNodeVolumeFailureReporting {
     Path file1 = new Path("/test1");
     DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
     DFSTestUtil.waitReplication(fs, file1, (short)2);
+    ArrayList<DataNode> dns = cluster.getDataNodes();
 
     // The NN reports two volumes failures
     DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, 
         origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
 
     // After restarting the NN it still see the two failures
     cluster.restartNameNode(0);
     cluster.waitActive();
     DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
         origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
+  }
+
+  @Test
+  public void testMultipleVolFailuresOnNode() throws Exception {
+    // Reinitialize the cluster, configured with 4 storage locations per 
DataNode
+    // and tolerating up to 2 failures.
+    tearDown();
+    initCluster(3, 4, 2);
+
+    // Calculate the total capacity of all the datanodes. Sleep for three 
seconds
+    // to be sure the datanodes have had a chance to heartbeat their 
capacities.
+    Thread.sleep(WAIT_FOR_HEARTBEATS);
+    DatanodeManager dm = cluster.getNamesystem().getBlockManager()
+        .getDatanodeManager();
+
+    long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
+    long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
+
+    File dn1Vol1 = new File(dataDir, "data"+(4*0+1));
+    File dn1Vol2 = new File(dataDir, "data"+(4*0+2));
+    File dn2Vol1 = new File(dataDir, "data"+(4*1+1));
+    File dn2Vol2 = new File(dataDir, "data"+(4*1+2));
+
+    // Make the first two volume directories on the first two datanodes
+    // non-accessible.
+    assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1,
+        false));
+    assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol2,
+        false));
+    assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1,
+        false));
+    assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol2,
+        false));
+
+    // Create file1 and wait for 3 replicas (ie all DNs can still store a 
block).
+    // Then assert that all DNs are up, despite the volume failures.
+    Path file1 = new Path("/test1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
+    DFSTestUtil.waitReplication(fs, file1, (short)3);
+
+    ArrayList<DataNode> dns = cluster.getDataNodes();
+    assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
+    assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
+    assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
+
+    checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath(),
+        dn1Vol2.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath(),
+        dn2Vol2.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(2), 0, true);
+
+    // Ensure we wait a sufficient amount of time
+    assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
+
+    // Eventually the NN should report four volume failures
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 4,
+        origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 4);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath(),
+        dn1Vol2.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath(),
+        dn2Vol2.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(2), true);
+  }
+
+  @Test
+  public void testDataNodeReconfigureWithVolumeFailures() throws Exception {
+    // Bring up two more datanodes
+    cluster.startDataNodes(conf, 2, true, null, null);
+    cluster.waitActive();
+
+    final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
+        ).getDatanodeManager();
+    long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
+    long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
+
+    // Fail the first volume on both datanodes (we have to keep the
+    // third healthy so one node in the pipeline will not fail).
+    File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
+    File dn1Vol2 = new File(dataDir, "data"+(2*0+2));
+    File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
+    File dn2Vol2 = new File(dataDir, "data"+(2*1+2));
+    assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1, 
false));
+    assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1, 
false));
+
+    Path file1 = new Path("/test1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
+    DFSTestUtil.waitReplication(fs, file1, (short)2);
+
+    ArrayList<DataNode> dns = cluster.getDataNodes();
+    assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
+    assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
+    assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
+
+    checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(2), 0, true);
+
+    // Ensure we wait a sufficient amount of time
+    assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
+
+    // The NN reports two volume failures
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
+        origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
+
+    // Reconfigure each DataNode to remove its failed volumes.
+    reconfigureDataNode(dns.get(0), dn1Vol2);
+    reconfigureDataNode(dns.get(1), dn2Vol2);
+
+    DataNodeTestUtils.triggerHeartbeat(dns.get(0));
+    DataNodeTestUtils.triggerHeartbeat(dns.get(1));
+
+    checkFailuresAtDataNode(dns.get(0), 1, true);
+    checkFailuresAtDataNode(dns.get(1), 1, true);
+
+    // NN sees reduced capacity, but no volume failures.
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0,
+        origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 0);
+    checkFailuresAtNameNode(dm, dns.get(0), true);
+    checkFailuresAtNameNode(dm, dns.get(1), true);
+
+    // Reconfigure again to try to add back the failed volumes.
+    reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
+    reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
+
+    DataNodeTestUtils.triggerHeartbeat(dns.get(0));
+    DataNodeTestUtils.triggerHeartbeat(dns.get(1));
+
+    checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
+
+    // Ensure we wait a sufficient amount of time.
+    assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
+
+    // The NN reports two volume failures again.
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
+        origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(false, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
+
+    // Reconfigure a third time with the failed volumes.  Afterwards, we expect
+    // the same volume failures to be reported.  (No double-counting.)
+    reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
+    reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
+
+    DataNodeTestUtils.triggerHeartbeat(dns.get(0));
+    DataNodeTestUtils.triggerHeartbeat(dns.get(1));
+
+    checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
+
+    // Ensure we wait a sufficient amount of time.
+    assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
+
+    // The NN reports two volume failures again.
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
+        origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(false, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
+  }
+
+  /**
+   * Checks the NameNode for correct values of aggregate counters tracking 
failed
+   * volumes across all DataNodes.
+   *
+   * @param expectCapacityKnown if true, then expect that the capacities of the
+   *     volumes were known before the failures, and therefore the lost 
capacity
+   *     can be reported
+   * @param expectedVolumeFailuresTotal expected number of failed volumes
+   */
+  private void checkAggregateFailuresAtNameNode(boolean expectCapacityKnown,
+      int expectedVolumeFailuresTotal) {
+    FSNamesystem ns = cluster.getNamesystem();
+    assertEquals(expectedVolumeFailuresTotal, ns.getVolumeFailuresTotal());
+    long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
+        expectedVolumeFailuresTotal);
+    assertEquals(expectedCapacityLost, ns.getEstimatedCapacityLostTotal());
+  }
+
+  /**
+   * Checks a DataNode for correct reporting of failed volumes.
+   *
+   * @param dn DataNode to check
+   * @param expectedVolumeFailuresCounter metric counter value for
+   *     VolumeFailures.  The current implementation actually counts the number
+   *     of failed disk checker cycles, which may be different from the length 
of
+   *     expectedFailedVolumes if multiple disks fail in the same disk checker
+   *     cycle
+   * @param expectCapacityKnown if true, then expect that the capacities of the
+   *     volumes were known before the failures, and therefore the lost 
capacity
+   *     can be reported
+   * @param expectedFailedVolumes expected locations of failed volumes
+   * @throws Exception if there is any failure
+   */
+  private void checkFailuresAtDataNode(DataNode dn,
+      long expectedVolumeFailuresCounter, boolean expectCapacityKnown,
+      String... expectedFailedVolumes) throws Exception {
+    assertCounter("VolumeFailures", expectedVolumeFailuresCounter,
+        getMetrics(dn.getMetrics().name()));
+    FsDatasetSpi<?> fsd = dn.getFSDataset();
+    assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
+    assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations());
+    if (expectedFailedVolumes.length > 0) {
+      assertTrue(fsd.getLastVolumeFailureDate() > 0);
+      long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
+          expectedFailedVolumes.length);
+      assertEquals(expectedCapacityLost, fsd.getEstimatedCapacityLostTotal());
+    } else {
+      assertEquals(0, fsd.getLastVolumeFailureDate());
+      assertEquals(0, fsd.getEstimatedCapacityLostTotal());
+    }
+  }
+
+  /**
+   * Checks NameNode tracking of a particular DataNode for correct reporting of
+   * failed volumes.
+   *
+   * @param dm DatanodeManager to check
+   * @param dn DataNode to check
+   * @param expectCapacityKnown if true, then expect that the capacities of the
+   *     volumes were known before the failures, and therefore the lost 
capacity
+   *     can be reported
+   * @param expectedFailedVolumes expected locations of failed volumes
+   * @throws Exception if there is any failure
+   */
+  private void checkFailuresAtNameNode(DatanodeManager dm, DataNode dn,
+      boolean expectCapacityKnown, String... expectedFailedVolumes)
+      throws Exception {
+    DatanodeDescriptor dd = cluster.getNamesystem().getBlockManager()
+        .getDatanodeManager().getDatanode(dn.getDatanodeId());
+    assertEquals(expectedFailedVolumes.length, dd.getVolumeFailures());
+    VolumeFailureSummary volumeFailureSummary = dd.getVolumeFailureSummary();
+    if (expectedFailedVolumes.length > 0) {
+      assertArrayEquals(expectedFailedVolumes, volumeFailureSummary
+          .getFailedStorageLocations());
+      assertTrue(volumeFailureSummary.getLastVolumeFailureDate() > 0);
+      long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
+          expectedFailedVolumes.length);
+      assertEquals(expectedCapacityLost,
+          volumeFailureSummary.getEstimatedCapacityLostTotal());
+    } else {
+      assertNull(volumeFailureSummary);
+    }
+  }
+
+  /**
+   * Returns expected capacity lost for use in assertions.  The return value is
+   * dependent on whether or not it is expected that the volume capacities were
+   * known prior to the failures.
+   *
+   * @param expectCapacityKnown if true, then expect that the capacities of the
+   *     volumes were known before the failures, and therefore the lost 
capacity
+   *     can be reported
+   * @param expectedVolumeFailuresTotal expected number of failed volumes
+   * @return estimated capacity lost in bytes
+   */
+  private long getExpectedCapacityLost(boolean expectCapacityKnown,
+      int expectedVolumeFailuresTotal) {
+    return expectCapacityKnown ? expectedVolumeFailuresTotal * volumeCapacity :
+        0;
+  }
+
+  /**
+   * Initializes the cluster.
+   *
+   * @param numDataNodes number of datanodes
+   * @param storagesPerDatanode number of storage locations on each datanode
+   * @param failedVolumesTolerated number of acceptable volume failures
+   * @throws Exception if there is any failure
+   */
+  private void initCluster(int numDataNodes, int storagesPerDatanode,
+      int failedVolumesTolerated) throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
+    /*
+     * Lower the DN heartbeat, DF rate, and recheck interval to one second
+     * so state about failures and datanode death propagates faster.
+     */
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
1000);
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
+        failedVolumesTolerated);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
+        .storagesPerDatanode(storagesPerDatanode).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    dataDir = cluster.getDataDirectory();
+    long dnCapacity = DFSTestUtil.getDatanodeCapacity(
+        cluster.getNamesystem().getBlockManager().getDatanodeManager(), 0);
+    volumeCapacity = dnCapacity / cluster.getStoragesPerDatanode();
+  }
+
+  /**
+   * Reconfigure a DataNode by setting a new list of volumes.
+   *
+   * @param dn DataNode to reconfigure
+   * @param newVols new volumes to configure
+   * @throws Exception if there is any failure
+   */
+  private static void reconfigureDataNode(DataNode dn, File... newVols)
+      throws Exception {
+    StringBuilder dnNewDataDirs = new StringBuilder();
+    for (File newVol: newVols) {
+      if (dnNewDataDirs.length() > 0) {
+        dnNewDataDirs.append(',');
+      }
+      dnNewDataDirs.append(newVol.getAbsolutePath());
+    }
+    try {
+      dn.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
+          dnNewDataDirs.toString());
+    } catch (ReconfigurationException e) {
+      // This can be thrown if reconfiguration tries to use a failed volume.
+      // We need to swallow the exception, because some of our tests want to
+      // cover this case.
+      LOG.warn("Could not reconfigure DataNode.", e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/441dfa48/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index c049d81..7a09630 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -67,6 +67,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
@@ -162,7 +163,7 @@ public class TestFsDatasetCache {
     doReturn(response).when(spyNN).sendHeartbeat(
         (DatanodeRegistration) any(),
         (StorageReport[]) any(), anyLong(), anyLong(),
-        anyInt(), anyInt(), anyInt());
+        anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any());
   }
 
   private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/441dfa48/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index b6b3fe6..ec39892 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -101,7 +102,8 @@ public class TestStorageReport {
     Mockito.verify(nnSpy).sendHeartbeat(
         any(DatanodeRegistration.class),
         captor.capture(),
-        anyLong(), anyLong(), anyInt(), anyInt(), anyInt());
+        anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
+        Mockito.any(VolumeFailureSummary.class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/441dfa48/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index cff8ca8..69285ba 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -376,6 +377,26 @@ public class ExternalDatasetImpl implements 
FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
+  public String[] getFailedStorageLocations() {
+    return null;
+  }
+
+  @Override
+  public long getLastVolumeFailureDate() {
+    return 0;
+  }
+
+  @Override
+  public long getEstimatedCapacityLostTotal() {
+    return 0;
+  }
+
+  @Override
+  public VolumeFailureSummary getVolumeFailureSummary() {
+    return null;
+  }
+
+  @Override
   public long getCacheUsed() {
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/441dfa48/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 8f87f57..c3b871c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -47,6 +47,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -102,6 +103,7 @@ public class TestFsDatasetImpl {
 
     String dataDir = StringUtils.join(",", dirStrings);
     conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
+    when(storage.dirIterator()).thenReturn(dirs.iterator());
     when(storage.getNumStorageDirs()).thenReturn(numDirs);
   }
 
@@ -240,8 +242,8 @@ public class TestFsDatasetImpl {
     RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
         new RoundRobinVolumeChoosingPolicy<>();
     final BlockScanner blockScanner = new BlockScanner(datanode, conf);
-    final FsVolumeList volumeList =
-        new FsVolumeList(0, blockScanner, blockChooser);
+    final FsVolumeList volumeList = new FsVolumeList(
+        Collections.<VolumeFailureInfo>emptyList(), blockScanner, 
blockChooser);
     final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
 
     // Initialize FsVolumeList with 5 mock volumes.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/441dfa48/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index d477e5b..f87c404 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -31,6 +31,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertNotEquals;
@@ -57,7 +58,8 @@ public class TestFsVolumeList {
 
   @Test
   public void testGetNextVolumeWithClosedVolume() throws IOException {
-    FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
+    FsVolumeList volumeList = new FsVolumeList(
+        Collections.<VolumeFailureInfo>emptyList(), blockScanner, 
blockChooser);
     List<FsVolumeImpl> volumes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       File curDir = new File(baseDir, "nextvolume-" + i);
@@ -82,7 +84,8 @@ public class TestFsVolumeList {
 
   @Test
   public void testCheckDirsWithClosedVolume() throws IOException {
-    FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
+    FsVolumeList volumeList = new FsVolumeList(
+        Collections.<VolumeFailureInfo>emptyList(), blockScanner, 
blockChooser);
     List<FsVolumeImpl> volumes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       File curDir = new File(baseDir, "volume-" + i);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/441dfa48/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index c8def37..c11abfc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -951,7 +951,7 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage, false,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
       DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
-          0L, 0L, 0, 0, 0).getCommands();
+          0L, 0L, 0, 0, 0, null).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -998,7 +998,7 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage,
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
       DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
-          rep, 0L, 0L, 0, 0, 0).getCommands();
+          rep, 0L, 0L, 0, 0, 0, null).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/441dfa48/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 7aad378..fa23fbf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -117,7 +117,7 @@ public class NameNodeAdapter {
       DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
-        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0);
+        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/441dfa48/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 4ba3d59..fb1418a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -117,8 +117,8 @@ public class TestDeadDatanode {
     StorageReport[] rep = { new StorageReport(
         new DatanodeStorage(reg.getDatanodeUuid()),
         false, 0, 0, 0, 0) };
-    DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0)
-      .getCommands();
+    DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null)
+        .getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

Reply via email to