HDFS-12482. Provide a configuration to adjust the weight of EC recovery tasks 
to adjust the speed of recovery. (lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9367c25d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9367c25d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9367c25d

Branch: refs/heads/HDFS-7240
Commit: 9367c25dbdfedf60cdbd65611281cf9c667829e6
Parents: ed24da3
Author: Lei Xu <l...@apache.org>
Authored: Tue Oct 31 21:58:14 2017 -0700
Committer: Lei Xu <l...@apache.org>
Committed: Tue Oct 31 21:58:14 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  4 ++
 .../server/datanode/DataNodeFaultInjector.java  |  6 ++
 .../erasurecode/ErasureCodingWorker.java        | 12 +++-
 .../erasurecode/StripedBlockReconstructor.java  |  2 +
 .../src/main/resources/hdfs-default.xml         | 13 ++++
 .../src/site/markdown/HDFSErasureCoding.md      |  6 ++
 .../hadoop/hdfs/TestReconstructStripedFile.java | 64 ++++++++++++++++++++
 7 files changed, 106 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9367c25d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 3d1f0b6..37071b6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -596,6 +596,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
   public static final int     
DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s
   public static final String  DFS_DN_EC_RECONSTRUCTION_THREADS_KEY = 
"dfs.datanode.ec.reconstruction.threads";
   public static final int     DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT = 8;
+  public static final String  DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY =
+      "dfs.datanode.ec.reconstruction.xmits.weight";
+  public static final float   DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT =
+      0.5f;
 
   public static final String
       DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9367c25d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 0a2a60b..1dd779e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -89,4 +89,10 @@ public class DataNodeFaultInjector {
 
   public void throwTooManyOpenFiles() throws FileNotFoundException {
   }
+
+  /**
+   * Used as a hook to inject failure in erasure coding reconstruction
+   * process.
+   */
+  public void stripedBlockReconstruction() throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9367c25d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 63498bc..45e29ff 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -47,6 +48,7 @@ public final class ErasureCodingWorker {
 
   private final DataNode datanode;
   private final Configuration conf;
+  private final float xmitWeight;
 
   private ThreadPoolExecutor stripedReconstructionPool;
   private ThreadPoolExecutor stripedReadPool;
@@ -54,6 +56,14 @@ public final class ErasureCodingWorker {
   public ErasureCodingWorker(Configuration conf, DataNode datanode) {
     this.datanode = datanode;
     this.conf = conf;
+    this.xmitWeight = conf.getFloat(
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY,
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT
+    );
+    Preconditions.checkArgument(this.xmitWeight >= 0,
+        "Invalid value configured for " +
+            DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY +
+            ", it can not be negative value (" + this.xmitWeight + ").");
 
     initializeStripedReadThreadPool();
     initializeStripedBlkReconstructionThreadPool(conf.getInt(
@@ -128,7 +138,7 @@ public final class ErasureCodingWorker {
           //   1) NN will not send more tasks than what DN can execute and
           //   2) DN will not throw away reconstruction tasks, and instead 
keeps
           //      an unbounded number of tasks in the executor's task queue.
-          xmitsSubmitted = task.getXmits();
+          xmitsSubmitted = Math.max((int)(task.getXmits() * xmitWeight), 1);
           getDatanode().incrementXmitsInProcess(xmitsSubmitted);
           stripedReconstructionPool.submit(task);
         } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9367c25d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index 17b54e8..ce1254c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.util.Time;
 
@@ -80,6 +81,7 @@ class StripedBlockReconstructor extends StripedReconstructor
 
   void reconstruct() throws IOException {
     while (getPositionInBlock() < getMaxTargetLength()) {
+      DataNodeFaultInjector.get().stripedBlockReconstruction();
       long remaining = getMaxTargetLength() - getPositionInBlock();
       final int toReconstructLen =
           (int) Math.min(getStripedReader().getBufferSize(), remaining);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9367c25d/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2be7d0a..28621ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3073,6 +3073,19 @@
 </property>
 
 <property>
+  <name>dfs.datanode.ec.reconstruction.xmits.weight</name>
+  <value>0.5</value>
+  <description>
+    Datanode uses xmits weight to calculate the relative cost of EC recovery
+    tasks comparing to replicated block recovery, of which xmits is always 1.
+    Namenode then uses xmits reported from datanode to throttle recovery tasks
+    for EC and replicated blocks.
+    The xmits of an erasure coding recovery task is calculated as the maximum
+    value between the number of read streams and the number of write streams.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.quota.init-threads</name>
   <value>4</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9367c25d/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md
index 270201a..a884ed8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSErasureCoding.md
@@ -136,6 +136,12 @@ Deployment
   1. `dfs.datanode.ec.reconstruction.stripedread.timeout.millis` - Timeout for 
striped reads. Default value is 5000 ms.
   1. `dfs.datanode.ec.reconstruction.stripedread.buffer.size` - Buffer size 
for reader service. Default value is 64KB.
   1. `dfs.datanode.ec.reconstruction.threads` - Number of threads used by the 
Datanode for background reconstruction work. Default value is 8 threads.
+  1. `dfs.datanode.ec.reconstruction.xmits.weight` - Relative weight of xmits 
used by EC background recovery task comparing to replicated block recovery. 
Default value is 0.5.
+  It sets to `0` to disable calculate weights for EC recovery tasks, that is, 
EC task always has `1` xmits.
+  The xmits of an erasure coding recovery task is calculated as the maximum 
value between the number of read streams and the number of write streams. For 
example, if an EC recovery
+  task need to read from 6 nodes and write to 2 nodes, it has xmits of `max(6, 
2) * 0.5 = 3`. Recovery task for replicated file always counts
+  as `1` xmit. NameNode utilizes `dfs.namenode.replication.max-streams` minus 
the total `xmitsInProgress` on the DataNode that combines of the xmits from
+  replicated file and EC files, to schedule recovery tasks to this DataNode.
 
 ### Enable Intel ISA-L
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9367c25d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index 713a10b..e3843a0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -30,6 +30,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +47,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
@@ -53,6 +56,7 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
 import 
org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
@@ -488,4 +492,64 @@ public class TestReconstructStripedFile {
         500, 30000
     );
   }
+
+  @Test(timeout = 180000)
+  public void testErasureCodingWorkerXmitsWeight() throws Exception {
+    testErasureCodingWorkerXmitsWeight(1f, ecPolicy.getNumDataUnits());
+    testErasureCodingWorkerXmitsWeight(0f, 1);
+    testErasureCodingWorkerXmitsWeight(10f, 10 * ecPolicy.getNumDataUnits());
+  }
+
+  private void testErasureCodingWorkerXmitsWeight(
+      float weight, int expectedWeight)
+      throws Exception {
+
+    // Reset cluster with customized xmits weight.
+    conf.setFloat(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY,
+        weight);
+    cluster.shutdown();
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    fs.enableErasureCodingPolicy(
+        StripedFileTestUtil.getDefaultECPolicy().getName());
+    fs.getClient().setErasureCodingPolicy("/",
+        StripedFileTestUtil.getDefaultECPolicy().getName());
+
+    final int fileLen = cellSize * ecPolicy.getNumDataUnits() * 2;
+    writeFile(fs, "/ec-xmits-weight", fileLen);
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    int corruptBlocks = dn.getFSDataset().getFinalizedBlocks(
+        cluster.getNameNode().getNamesystem().getBlockPoolId()).size();
+    int expectedXmits = corruptBlocks * expectedWeight;
+
+    final CyclicBarrier barrier = new CyclicBarrier(corruptBlocks + 1);
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector delayInjector = new DataNodeFaultInjector() {
+      public void stripedBlockReconstruction() throws IOException {
+        try {
+          barrier.await();
+        } catch (InterruptedException | BrokenBarrierException e) {
+          throw new IOException(e);
+        }
+      }
+    };
+    DataNodeFaultInjector.set(delayInjector);
+
+    try {
+      shutdownDataNode(dn);
+      LambdaTestUtils.await(30 * 1000, 500,
+          () -> {
+            int totalXmits = cluster.getDataNodes().stream()
+                  .mapToInt(DataNode::getXmitsInProgress).sum();
+            return totalXmits == expectedXmits;
+          }
+      );
+    } finally {
+      barrier.await();
+      DataNodeFaultInjector.set(oldInjector);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to