Repository: hadoop
Updated Branches:
  refs/heads/trunk 0b18e5e8c -> b64242c0d


HDFS-9236. Missing sanity check for block size during block recovery. (Tony Wu 
via Yongjun Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b64242c0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b64242c0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b64242c0

Branch: refs/heads/trunk
Commit: b64242c0d2cabd225a8fb7d25fed449d252e4fa1
Parents: 0b18e5e
Author: Yongjun Zhang <yzh...@cloudera.com>
Authored: Fri Nov 6 11:15:54 2015 -0800
Committer: Yongjun Zhang <yzh...@cloudera.com>
Committed: Fri Nov 6 11:15:54 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../server/datanode/BlockRecoveryWorker.java    | 66 +++++++++++++++++++-
 .../server/protocol/ReplicaRecoveryInfo.java    |  6 ++
 .../hdfs/server/datanode/TestBlockRecovery.java | 37 +++++++++++
 4 files changed, 110 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64242c0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 63a99c4..a512da5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1629,6 +1629,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9377. Fix findbugs warnings in FSDirSnapshotOp.
     (Mingliang Liu via Yongjun Zhang)
 
+    HDFS-9236. Missing sanity check for block size during block recovery.
+    (Tony Wu via Yongjun Zhang)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64242c0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
index 42fcf48..9bd8703 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
@@ -103,8 +103,13 @@ public class BlockRecoveryWorker {
     protected void recover() throws IOException {
       List<BlockRecord> syncList = new ArrayList<>(locs.length);
       int errorCount = 0;
+      int candidateReplicaCnt = 0;
 
-      //check generation stamps
+      // Check generation stamps, replica size and state. Replica must satisfy
+      // the following criteria to be included in syncList for recovery:
+      // - Valid generation stamp
+      // - Non-zero length
+      // - Original state is RWR or better
       for(DatanodeID id : locs) {
         try {
           DatanodeID bpReg =datanode.getBPOfferService(bpid).bpRegistration;
@@ -115,7 +120,28 @@ public class BlockRecoveryWorker {
           if (info != null &&
               info.getGenerationStamp() >= block.getGenerationStamp() &&
               info.getNumBytes() > 0) {
-            syncList.add(new BlockRecord(id, proxyDN, info));
+            // Count the number of candidate replicas received.
+            ++candidateReplicaCnt;
+            if (info.getOriginalReplicaState().getValue() <=
+                ReplicaState.RWR.getValue()) {
+              syncList.add(new BlockRecord(id, proxyDN, info));
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Block recovery: Ignored replica with invalid " +
+                    "original state: " + info + " from DataNode: " + id);
+              }
+            }
+          } else {
+            if (LOG.isDebugEnabled()) {
+              if (info == null) {
+                LOG.debug("Block recovery: DataNode: " + id + " does not have "
+                    + "replica for block: " + block);
+              } else {
+                LOG.debug("Block recovery: Ignored replica with invalid "
+                    + "generation stamp or length: " + info + " from " +
+                    "DataNode: " + id);
+              }
+            }
           }
         } catch (RecoveryInProgressException ripE) {
           InterDatanodeProtocol.LOG.warn(
@@ -136,6 +162,15 @@ public class BlockRecoveryWorker {
             + ", datanodeids=" + Arrays.asList(locs));
       }
 
+      // None of the replicas reported by DataNodes has the required original
+      // state, report the error.
+      if (candidateReplicaCnt > 0 && syncList.isEmpty()) {
+        throw new IOException("Found " + candidateReplicaCnt +
+            " replica(s) for block " + block + " but none is in " +
+            ReplicaState.RWR.name() + " or better state. datanodeids=" +
+            Arrays.asList(locs));
+      }
+
       syncBlock(syncList);
     }
 
@@ -157,6 +192,11 @@ public class BlockRecoveryWorker {
       // or their replicas have 0 length.
       // The block can be deleted.
       if (syncList.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("syncBlock for block " + block + ", all datanodes don't " +
+              "have the block or their replicas have 0 length. The block can " 
+
+              "be deleted.");
+        }
         nn.commitBlockSynchronization(block, recoveryId, 0,
             true, true, DatanodeID.EMPTY_ARRAY, null);
         return;
@@ -195,6 +235,12 @@ public class BlockRecoveryWorker {
                   r.rInfo.getNumBytes() == finalizedLength) {
             participatingList.add(r);
           }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("syncBlock replicaInfo: block=" + block +
+                ", from datanode " + r.id + ", receivedState=" + rState.name() 
+
+                ", receivedLength=" + r.rInfo.getNumBytes() +
+                ", bestState=FINALIZED, finalizedLength=" + finalizedLength);
+          }
         }
         newBlock.setNumBytes(finalizedLength);
         break;
@@ -207,7 +253,16 @@ public class BlockRecoveryWorker {
             minLength = Math.min(minLength, r.rInfo.getNumBytes());
             participatingList.add(r);
           }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("syncBlock replicaInfo: block=" + block +
+                ", from datanode " + r.id + ", receivedState=" + rState.name() 
+
+                ", receivedLength=" + r.rInfo.getNumBytes() + ", bestState=" +
+                bestState.name());
+          }
         }
+        // recover() guarantees syncList will have at least one replica with 
RWR
+        // or better state.
+        assert minLength != Long.MAX_VALUE : "wrong minLength";
         newBlock.setNumBytes(minLength);
         break;
       case RUR:
@@ -254,6 +309,13 @@ public class BlockRecoveryWorker {
         datanodes[i] = r.id;
         storages[i] = r.storageID;
       }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Datanode triggering commitBlockSynchronization, block=" +
+            block + ", newGs=" + newBlock.getGenerationStamp() +
+            ", newLength=" + newBlock.getNumBytes());
+      }
+
       nn.commitBlockSynchronization(block,
           newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
           datanodes, storages);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64242c0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
index ee1fa1b..86bfc28 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
@@ -49,4 +49,10 @@ public class ReplicaRecoveryInfo extends Block {
   public int hashCode() {
     return super.hashCode();
   }
+
+  @Override
+  public String toString() {
+    return super.toString() + "[numBytes=" + this.getNumBytes() +
+        ",originalReplicaState=" + this.originalState.name() + "]";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64242c0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index f60c973..31e8bcd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
@@ -680,4 +681,40 @@ public class TestBlockRecovery {
       }
     }
   }
+
+  /**
+   * DNs report RUR instead of RBW, RWR or FINALIZED. Primary DN expected to
+   * throw an exception.
+   * @throws Exception
+   */
+  @Test
+  public void testRURReplicas() throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Running " + GenericTestUtils.getMethodName());
+    }
+
+    doReturn(new ReplicaRecoveryInfo(block.getBlockId(), block.getNumBytes(),
+        block.getGenerationStamp(), ReplicaState.RUR)).when(spyDN).
+        initReplicaRecovery(any(RecoveringBlock.class));
+
+    boolean exceptionThrown = false;
+    try {
+      for (RecoveringBlock rBlock : initRecoveringBlocks()) {
+        BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
+            recoveryWorker.new RecoveryTaskContiguous(rBlock);
+        BlockRecoveryWorker.RecoveryTaskContiguous spyTask =
+            spy(RecoveryTaskContiguous);
+        spyTask.recover();
+      }
+    } catch (IOException e) {
+      // expect IOException to be thrown here
+      e.printStackTrace();
+      assertTrue("Wrong exception was thrown: " + e.getMessage(),
+          e.getMessage().contains("Found 1 replica(s) for block " + block +
+          " but none is in RWR or better state"));
+      exceptionThrown = true;
+    } finally {
+      assertTrue(exceptionThrown);
+    }
+  }
 }

Reply via email to