HDFS-1172. Blocks in newly completed files are considered under-replicated too 
quickly. Contributed by Masatake Iwasaki.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2a987243
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2a987243
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2a987243

Branch: refs/heads/HDFS-7966
Commit: 2a987243423eb5c7e191de2ba969b7591a441c70
Parents: 40cac59
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Oct 13 23:00:18 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Oct 13 23:00:18 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    |  32 ++-
 .../org/apache/hadoop/hdfs/TestReplication.java | 194 ++++++++++++++++++-
 3 files changed, 224 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a987243/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index de7f349..e6cd98f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1521,6 +1521,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain.
     (Ming Ma via lei)
 
+    HDFS-1172. Blocks in newly completed files are considered under-replicated
+    too quickly. (Masatake Iwasaki via jing9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a987243/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 8a64b74..cdf43fb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -676,13 +676,38 @@ public class BlockManager implements BlockStatsMXBean {
       return false; // already completed (e.g. by syncBlock)
     
     final boolean b = commitBlock(lastBlock, commitBlock);
-      if (hasMinStorage(lastBlock)) {
+    if (hasMinStorage(lastBlock)) {
+      if (b && !bc.isStriped()) {
+        addExpectedReplicasToPending(lastBlock);
+      }
       completeBlock(lastBlock, false);
     }
     return b;
   }
 
   /**
+   * If IBR is not sent from expected locations yet, add the datanodes to
+   * pendingReplications in order to keep ReplicationMonitor from scheduling
+   * the block.
+   */
+  private void addExpectedReplicasToPending(BlockInfo lastBlock) {
+    DatanodeStorageInfo[] expectedStorages =
+        lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
+    if (expectedStorages.length - lastBlock.numNodes() > 0) {
+      ArrayList<DatanodeDescriptor> pendingNodes =
+          new ArrayList<DatanodeDescriptor>();
+      for (DatanodeStorageInfo storage : expectedStorages) {
+        DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
+        if (lastBlock.findStorageInfo(dnd) == null) {
+          pendingNodes.add(dnd);
+        }
+      }
+      pendingReplications.increment(lastBlock,
+          pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
+    }
+  }
+
+  /**
    * Convert a specified block of the file to a complete block.
    * @throws IOException if the block does not have at least a minimal number
    * of replicas reported from data-nodes.
@@ -3764,8 +3789,9 @@ public class BlockManager implements BlockStatsMXBean {
     for (BlockInfo block : bc.getBlocks()) {
       short expected = getExpectedReplicaNum(block);
       final NumberReplicas n = countNodes(block);
-      if (isNeededReplication(block, n.liveReplicas())) {
-        neededReplications.add(block, n.liveReplicas(),
+      final int pending = pendingReplications.getNumReplicas(block);
+      if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
+        neededReplications.add(block, n.liveReplicas() + pending,
             n.decommissionedAndDecommissioning(), expected);
       } else if (n.liveReplicas() > expected) {
         processOverReplicatedBlock(block, expected, null, null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a987243/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
index 2139df9..6424bc3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
@@ -17,10 +17,15 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.base.Supplier;
 
 import java.io.File;
 import java.io.IOException;
@@ -40,20 +45,34 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
 import org.apache.hadoop.util.Time;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * This class tests the replication of a DFS file.
@@ -278,6 +297,14 @@ public class TestReplication {
                                        ClientProtocol namenode,
                                        int expected, long maxWaitSec) 
                                        throws IOException {
+    waitForBlockReplication(filename, namenode, expected, maxWaitSec, false, 
false);
+  }
+
+  private void waitForBlockReplication(String filename,
+      ClientProtocol namenode,
+      int expected, long maxWaitSec,
+      boolean isUnderConstruction, boolean noOverReplication)
+      throws IOException {
     long start = Time.monotonicNow();
     
     //wait for all the blocks to be replicated;
@@ -290,7 +317,13 @@ public class TestReplication {
       for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
            iter.hasNext();) {
         LocatedBlock block = iter.next();
+        if (isUnderConstruction && !iter.hasNext()) {
+          break; // do not check the last block
+        }
         int actual = block.getLocations().length;
+        if (noOverReplication) {
+          assertTrue(actual <= expected);
+        }
         if ( actual < expected ) {
           LOG.info("Not enough replicas for " + block.getBlock()
               + " yet. Expecting " + expected + ", got " + actual + ".");
@@ -560,4 +593,161 @@ public class TestReplication {
       }
     }
   }
+
+
+  /**
+   * This test makes sure that, when a file is closed before all
+   * of the datanodes in the pipeline have reported their replicas,
+   * the NameNode doesn't consider the block under-replicated too
+   * aggressively. It is a regression test for HDFS-1172.
+   */
+  @Test(timeout=60000)
+  public void testNoExtraReplicationWhenBlockReceivedIsLate()
+      throws Exception {
+    LOG.info("Test block replication when blockReceived is late" );
+    final short numDataNodes = 3;
+    final short replication = 3;
+    final Configuration conf = new Configuration();
+        conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDataNodes).build();
+    final String testFile = "/replication-test-file";
+    final Path testPath = new Path(testFile);
+    final BlockManager bm =
+        cluster.getNameNode().getNamesystem().getBlockManager();
+
+    try {
+      cluster.waitActive();
+
+      // Artificially delay IBR from 1 DataNode.
+      // this ensures that the client's completeFile() RPC will get to the
+      // NN before some of the replicas are reported.
+      NameNode nn = cluster.getNameNode();
+      DataNode dn = cluster.getDataNodes().get(0);
+      DatanodeProtocolClientSideTranslatorPB spy =
+          DataNodeTestUtils.spyOnBposToNN(dn, nn);
+      DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
+      Mockito.doAnswer(delayer).when(spy).blockReceivedAndDeleted(
+          Mockito.<DatanodeRegistration>anyObject(),
+          Mockito.anyString(),
+          Mockito.<StorageReceivedDeletedBlocks[]>anyObject());
+
+      FileSystem fs = cluster.getFileSystem();
+      // Create and close a small file with two blocks
+      DFSTestUtil.createFile(fs, testPath, 1500, replication, 0);
+
+      // schedule replication via BlockManager#computeReplicationWork
+      BlockManagerTestUtil.computeAllPendingWork(bm);
+
+      // Initially, should have some pending replication since the close()
+      // is earlier than at lease one of the reportReceivedDeletedBlocks calls
+      assertTrue(pendingReplicationCount(bm) > 0);
+
+      // release pending IBR.
+      delayer.waitForCall();
+      delayer.proceed();
+      delayer.waitForResult();
+
+      // make sure DataNodes do replication work if exists
+      for (DataNode d : cluster.getDataNodes()) {
+        DataNodeTestUtils.triggerHeartbeat(d);
+      }
+
+      // Wait until there is nothing pending
+      try {
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return pendingReplicationCount(bm) == 0;
+          }
+        }, 100, 3000);
+      } catch (TimeoutException e) {
+        fail("timed out while waiting for no pending replication.");
+      }
+
+      // Check that none of the datanodes have serviced a replication request.
+      // i.e. that the NameNode didn't schedule any spurious replication.
+      assertNoReplicationWasPerformed(cluster);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * This test makes sure that, if a file is under construction, blocks
+   * in the middle of that file are properly re-replicated if they
+   * become corrupt.
+   */
+  @Test(timeout=60000)
+  public void testReplicationWhileUnderConstruction()
+      throws Exception {
+    LOG.info("Test block replication in under construction" );
+    MiniDFSCluster cluster = null;
+    final short numDataNodes = 6;
+    final short replication = 3;
+    String testFile = "/replication-test-file";
+    Path testPath = new Path(testFile);
+    FSDataOutputStream stm = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+      cluster.waitActive();
+
+      FileSystem fs = cluster.getFileSystem();
+
+      stm = AppendTestUtil.createFile(fs, testPath, replication);
+
+      // Write a full block
+      byte[] buffer = AppendTestUtil.initBuffer(AppendTestUtil.BLOCK_SIZE);
+      stm.write(buffer); // block 1
+      stm.write(buffer); // block 2
+      stm.write(buffer, 0, 1); // start block 3
+      stm.hflush(); // make sure blocks are persisted, etc
+
+      // Everything should be fully replicated
+      waitForBlockReplication(testFile, cluster.getNameNodeRpc(), replication, 
30000, true, true);
+
+      // Check that none of the datanodes have serviced a replication request.
+      // i.e. that the NameNode didn't schedule any spurious replication.
+      assertNoReplicationWasPerformed(cluster);
+
+      // Mark one the blocks corrupt
+      List<LocatedBlock> blocks;
+      FSDataInputStream in = fs.open(testPath);
+      try {
+        blocks = DFSTestUtil.getAllBlocks(in);
+      } finally {
+        in.close();
+      }
+      LocatedBlock lb = blocks.get(0);
+      LocatedBlock lbOneReplica = new LocatedBlock(lb.getBlock(),
+          new DatanodeInfo[] { lb.getLocations()[0] });
+      cluster.getNameNodeRpc().reportBadBlocks(
+          new LocatedBlock[] { lbOneReplica });
+
+      // Everything should be fully replicated
+      waitForBlockReplication(testFile, cluster.getNameNodeRpc(), replication, 
30000, true, true);
+    } finally {
+      if (stm != null) {
+        IOUtils.closeStream(stm);
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private long pendingReplicationCount(BlockManager bm) {
+    BlockManagerTestUtil.updateState(bm);
+    return bm.getPendingReplicationBlocksCount();
+  }
+
+  private void assertNoReplicationWasPerformed(MiniDFSCluster cluster) {
+    for (DataNode dn : cluster.getDataNodes()) {
+      MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
+      assertCounter("BlocksReplicated", 0L, rb);
+    }
+  }
 }

Reply via email to