Repository: hadoop
Updated Branches:
  refs/heads/branch-2 f448ce2a8 -> bb6d86620


HDFS-9781. FsDatasetImpl#getBlockReports can occasionally throw 
NullPointerException. Contributed by Manoj Govindassamy.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb6d8662
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb6d8662
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb6d8662

Branch: refs/heads/branch-2
Commit: bb6d8662070984c7a3be6d1b8f3e1cfdc787b60f
Parents: f448ce2
Author: Xiao Chen <x...@apache.org>
Authored: Fri Sep 9 18:25:35 2016 -0700
Committer: Xiao Chen <x...@apache.org>
Committed: Fri Sep 9 18:25:35 2016 -0700

----------------------------------------------------------------------
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 21 +++--
 .../fsdataset/impl/TestFsDatasetImpl.java       | 89 +++++++++++++++-----
 2 files changed, 86 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6d8662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 388256f..04f887f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1738,13 +1738,24 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     Map<String, BlockListAsLongs.Builder> builders =
         new HashMap<String, BlockListAsLongs.Builder>();
 
-    List<FsVolumeImpl> curVolumes = volumes.getVolumes();
-    for (FsVolumeSpi v : curVolumes) {
-      builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
-    }
-
+    List<FsVolumeImpl> curVolumes = null;
     synchronized(this) {
+      curVolumes = volumes.getVolumes();
+      for (FsVolumeSpi v : curVolumes) {
+        builders.put(v.getStorageID(), 
BlockListAsLongs.builder(maxDataLength));
+      }
+
+      Set<String> missingVolumesReported = new HashSet<>();
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+        String volStorageID = b.getVolume().getStorageID();
+        if (!builders.containsKey(volStorageID)) {
+          if (!missingVolumesReported.contains(volStorageID)) {
+            LOG.warn("Storage volume: " + volStorageID + " missing for the"
+                + " replica block: " + b + ". Probably being removed!");
+            missingVolumesReported.add(volStorageID);
+          }
+          continue;
+        }
         switch(b.getState()) {
           case FINALIZED:
           case RBW:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6d8662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index e73a612..69349fc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -589,47 +589,98 @@ public class TestFsDatasetImpl {
     // Will write and remove on dn0.
     final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0);
     final CountDownLatch startFinalizeLatch = new CountDownLatch(1);
-    final CountDownLatch brReceivedLatch = new CountDownLatch(1);
+    final CountDownLatch blockReportReceivedLatch = new CountDownLatch(1);
+    final CountDownLatch volRemoveStartedLatch = new CountDownLatch(1);
+    final CountDownLatch volRemoveCompletedLatch = new CountDownLatch(1);
     class BlockReportThread extends Thread {
       public void run() {
+        // Lets wait for the volume remove process to start
+        try {
+          volRemoveStartedLatch.await();
+        } catch (Exception e) {
+          LOG.info("Unexpected exception when waiting for vol removal:", e);
+        }
         LOG.info("Getting block report");
         dataset.getBlockReports(eb.getBlockPoolId());
         LOG.info("Successfully received block report");
-        brReceivedLatch.countDown();
+        blockReportReceivedLatch.countDown();
       }
     }
 
-    final BlockReportThread brt = new BlockReportThread();
     class ResponderThread extends Thread {
       public void run() {
         try (ReplicaHandler replica = dataset
-            .createRbw(StorageType.DEFAULT, eb, false)) {
-          LOG.info("createRbw finished");
+                .createRbw(StorageType.DEFAULT, eb, false)) {
+          LOG.info("CreateRbw finished");
           startFinalizeLatch.countDown();
 
-          // Slow down while we're holding the reference to the volume
-          Thread.sleep(1000);
+          // Slow down while we're holding the reference to the volume.
+          // As we finalize a block, the volume is removed in parallel.
+          // Ignore any interrupts coming out of volume shutdown.
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) {
+            LOG.info("Ignoring ", ie);
+          }
+
+          // Lets wait for the other thread finish getting block report
+          blockReportReceivedLatch.await();
+
           dataset.finalizeBlock(eb);
-          LOG.info("finalizeBlock finished");
+          LOG.info("FinalizeBlock finished");
         } catch (Exception e) {
           LOG.warn("Exception caught. This should not affect the test", e);
         }
       }
     }
 
-    ResponderThread res = new ResponderThread();
-    res.start();
+    class VolRemoveThread extends Thread {
+      public void run() {
+        Set<File> volumesToRemove = new HashSet<>();
+        try {
+          volumesToRemove.add(StorageLocation.parse(
+                  dataset.getVolume(eb).getBasePath()).getFile());
+        } catch (Exception e) {
+          LOG.info("Problem preparing volumes to remove: " + e);
+          Assert.fail("Exception in remove volume thread, check log for " +
+                  "details.");
+        }
+        LOG.info("Removing volume " + volumesToRemove);
+        dataset.removeVolumes(volumesToRemove, true);
+        volRemoveCompletedLatch.countDown();
+        LOG.info("Removed volume " + volumesToRemove);
+      }
+    }
+
+    // Start the volume write operation
+    ResponderThread responderThread = new ResponderThread();
+    responderThread.start();
     startFinalizeLatch.await();
 
-    Set<File> volumesToRemove = new HashSet<>();
-    volumesToRemove.add(
-        StorageLocation.parse(dataset.getVolume(eb).getBasePath()).getFile());
-    LOG.info("Removing volume " + volumesToRemove);
-    // Verify block report can be received during this
-    brt.start();
-    dataset.removeVolumes(volumesToRemove, true);
-    LOG.info("Volumes removed");
-    brReceivedLatch.await();
+    // Start the block report get operation
+    final BlockReportThread blockReportThread = new BlockReportThread();
+    blockReportThread.start();
+
+    // Start the volume remove operation
+    VolRemoveThread volRemoveThread = new VolRemoveThread();
+    volRemoveThread.start();
+
+    // Let volume write and remove operation be
+    // blocked for few seconds
+    Thread.sleep(2000);
+
+    // Signal block report receiver and volume writer
+    // thread to complete their operations so that vol
+    // remove can proceed
+    volRemoveStartedLatch.countDown();
+
+    // Verify if block report can be received
+    // when volume is in use and also being removed
+    blockReportReceivedLatch.await();
+
+    // Verify if volume can be removed safely when there
+    // are read/write operation in-progress
+    volRemoveCompletedLatch.await();
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to