This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 90ebbaa  HDFS-13174. hdfs mover -p /path times out after 20 min. 
Contributed by Istvan Fajth.
90ebbaa is described below

commit 90ebbaa3938a5c2908cb53fb9cd2d6630a949dc3
Author: Wei-Chiu Chuang <weic...@apache.org>
AuthorDate: Fri Jun 15 13:35:50 2018 -0700

    HDFS-13174. hdfs mover -p /path times out after 20 min. Contributed by 
Istvan Fajth.
    
    (cherry picked from commit c966a3837af1c1a1c4a441f491b0d76d5c9e5d78)
    (cherry picked from commit 975d4b3d603632a5edacb138cf4a1ce92ebed02e)
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  4 +-
 .../hadoop/hdfs/server/balancer/Balancer.java      |  6 +-
 .../hadoop/hdfs/server/balancer/Dispatcher.java    | 28 +++++---
 .../src/main/resources/hdfs-default.xml            | 10 +++
 .../hadoop/hdfs/server/balancer/TestBalancer.java  | 79 ++++++++++++++++++++++
 .../apache/hadoop/hdfs/server/mover/TestMover.java | 46 +++++++++++++
 6 files changed, 162 insertions(+), 11 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index bde8677..1e53a2e 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -533,7 +533,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_BALANCER_BLOCK_MOVE_TIMEOUT = 
"dfs.balancer.block-move.timeout";
   public static final int     DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
   public static final String  DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = 
"dfs.balancer.max-no-move-interval";
-  public static final int    DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 
60*1000; // One minute
+  public static final int     DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 
60*1000; // One minute
+  public static final String  DFS_BALANCER_MAX_ITERATION_TIME_KEY = 
"dfs.balancer.max-iteration-time";
+  public static final long    DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT = 20 * 
60 * 1000L; // 20 mins
 
 
   public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = 
"dfs.mover.movedWinWidth";
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 94f84a5..be72eb4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -286,13 +286,17 @@ public class Balancer {
     final int maxNoMoveInterval = conf.getInt(
         DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
         DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
+    final long maxIterationTime = conf.getLong(
+        DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
+        DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
 
     this.nnc = theblockpool;
     this.dispatcher =
         new Dispatcher(theblockpool, p.getIncludedNodes(),
             p.getExcludedNodes(), movedWinWidth, moverThreads,
             dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
-            getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, conf);
+            getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval,
+            maxIterationTime, conf);
     this.threshold = p.getThreshold();
     this.policy = p.getBalancingPolicy();
     this.sourceNodes = p.getSourceNodes();
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index d574750..84cc13b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -134,6 +134,8 @@ public class Dispatcher {
   private final boolean connectToDnViaHostname;
   private BlockPlacementPolicy placementPolicy;
 
+  private long maxIterationTime;
+
   static class Allocator {
     private final int max;
     private int count = 0;
@@ -335,12 +337,18 @@ public class Dispatcher {
 
     /** Dispatch the move to the proxy source & wait for the response. */
     private void dispatch() {
-      LOG.info("Start moving " + this);
-
       Socket sock = new Socket();
       DataOutputStream out = null;
       DataInputStream in = null;
       try {
+        if (source.isIterationOver()){
+          LOG.info("Cancel moving " + this +
+              " as iteration is already cancelled due to" +
+              " dfs.balancer.max-iteration-time is passed.");
+          throw new IOException("Block move cancelled.");
+        }
+        LOG.info("Start moving " + this);
+
         sock.connect(
             NetUtils.createSocketAddr(target.getDatanodeInfo().
                 getXferAddr(Dispatcher.this.connectToDnViaHostname)),
@@ -679,7 +687,10 @@ public class Dispatcher {
      * Check if the iteration is over
      */
     public boolean isIterationOver() {
-      return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
+      if (maxIterationTime < 0){
+        return false;
+      }
+      return (Time.monotonicNow()-startTime > maxIterationTime);
     }
 
     /** Add a task */
@@ -811,8 +822,6 @@ public class Dispatcher {
       return blocksToReceive > 0;
     }
 
-    private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
-
     /**
      * This method iteratively does the following: it first selects a block to
      * move, then sends a request to the proxy source to start the block move
@@ -881,7 +890,7 @@ public class Dispatcher {
       }
 
       if (isIterationOver()) {
-        LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
+        LOG.info("The maximum iteration time (" + maxIterationTime/1000
             + " seconds) has been reached. Stopping " + this);
       }
     }
@@ -904,14 +913,14 @@ public class Dispatcher {
       int maxNoMoveInterval, Configuration conf) {
     this(nnc, includedNodes, excludedNodes, movedWinWidth,
         moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
-        0L, 0L, 0, maxNoMoveInterval, conf);
+        0L, 0L, 0, maxNoMoveInterval, -1, conf);
   }
 
   Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
       int dispatcherThreads, int maxConcurrentMovesPerNode,
-      long getBlocksSize, long getBlocksMinBlockSize,
-      int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
+      long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout,
+      int maxNoMoveInterval, long maxIterationTime, Configuration conf) {
     this.nnc = nnc;
     this.excludedNodes = excludedNodes;
     this.includedNodes = includedNodes;
@@ -939,6 +948,7 @@ public class Dispatcher {
         HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
     this.placementPolicy =
         BlockPlacementPolicy.getInstance(conf, null, cluster, null);
+    this.maxIterationTime = maxIterationTime;
   }
 
   public DistributedFileSystem getDistributedFileSystem() {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 262d5be..ad9104e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3540,6 +3540,16 @@
 </property>
 
 <property>
+  <name>dfs.balancer.max-iteration-time</name>
+  <value>1200000</value>
+  <description>
+    Maximum amount of time while an iteration can be run by the Balancer. After
+    this time the Balancer will stop the iteration, and reevaluate the work
+    needs to be done to Balance the cluster. The default value is 20 minutes.
+  </description>
+</property>
+
+<property>
   <name>dfs.block.invalidate.limit</name>
   <value>1000</value>
   <description>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 830dc0e..079cb4a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -1554,6 +1554,85 @@ public class TestBalancer {
         CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
   }
 
+
+  @Test(timeout = 100000)
+  public void testMaxIterationTime() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    int blockSize = 10*1024*1024; // 10MB block size
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
+    // limit the worker thread count of Balancer to have only 1 queue per DN
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1);
+    // limit the bandwitdh to 1 packet per sec to emulate slow block moves
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+        64 * 1024);
+    // set client socket timeout to have an IN_PROGRESS notification back from
+    // the DataNode about the copy in every second.
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L);
+    // set max iteration time to 2 seconds to timeout before moving any block
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 2000L);
+    // setup the cluster
+    final long capacity = 10L * blockSize;
+    final long[] dnCapacities = new long[] {capacity, capacity};
+    final short rep = 1;
+    final long seed = 0xFAFAFA;
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(0)
+        .build();
+    try {
+      cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+      conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+      cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
+      cluster.waitClusterUp();
+      cluster.waitActive();
+      final Path path = new Path("/testMaxIterationTime.dat");
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // fill the DN to 40%
+      DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed);
+      // start a new DN
+      cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
+      cluster.triggerHeartbeats();
+      // setup Balancer and run one iteration
+      List<NameNodeConnector> connectors = Collections.emptyList();
+      try {
+        BalancerParameters bParams = BalancerParameters.DEFAULT;
+        connectors = NameNodeConnector.newNameNodeConnectors(
+            DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(),
+            Balancer.BALANCER_ID_PATH, conf, bParams.getMaxIdleIteration());
+        for (NameNodeConnector nnc : connectors) {
+          LOG.info("NNC to work on: " + nnc);
+          Balancer b = new Balancer(nnc, bParams, conf);
+          long startTime = Time.monotonicNow();
+          Result r = b.runOneIteration();
+          long runtime = Time.monotonicNow() - startTime;
+          assertEquals("We expect ExitStatus.IN_PROGRESS to be reported.",
+              ExitStatus.IN_PROGRESS, r.exitStatus);
+          // accept runtime if it is under 3.5 seconds, as we need to wait for
+          // IN_PROGRESS report from DN, and some spare to be able to finish.
+          // NOTE: This can be a source of flaky tests, if the box is busy,
+          // assertion here is based on the following: Balancer is already set
+          // up, iteration gets the blocks from the NN, and makes the decision
+          // to move 2 blocks. After that the PendingMoves are scheduled, and
+          // DataNode heartbeats in for the Balancer every second, iteration is
+          // two seconds long. This means that it will fail if the setup and 
the
+          // heartbeat from the DataNode takes more than 500ms, as the 
iteration
+          // should end at the 3rd second from start. As the number of
+          // operations seems to be pretty low, and all comm happens locally, I
+          // think the possibility of a failure due to node busyness is low.
+          assertTrue("Unexpected iteration runtime: " + runtime + "ms > 3.5s",
+              runtime < 3500);
+        }
+      } finally {
+        for (NameNodeConnector nnc : connectors) {
+          IOUtils.cleanupWithLogger(null, nnc);
+        }
+      }
+    } finally {
+      cluster.shutdown(true, true);
+    }
+  }
+
   /*
    * Test Balancer with Ram_Disk configured
    * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index c7d04d5..b4db701 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -415,6 +415,52 @@ public class TestMover {
     }
   }
 
+  @Test(timeout=100000)
+  public void testBalancerMaxIterationTimeNotAffectMover() throws Exception {
+    long blockSize = 10*1024*1024;
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1);
+    conf.setInt(
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 1);
+    // set a fairly large block size to run into the limitation
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
+    // set a somewhat grater than zero max iteration time to have the move time
+    // to surely exceed it
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 200L);
+    conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 1);
+    // set client socket timeout to have an IN_PROGRESS notification back from
+    // the DataNode about the copy in every second.
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000L);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2)
+        .storageTypes(
+            new StorageType[][] {{StorageType.DISK, StorageType.DISK},
+                {StorageType.ARCHIVE, StorageType.ARCHIVE}})
+        .build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem fs = cluster.getFileSystem();
+      final String file = "/testMaxIterationTime.dat";
+      final Path path = new Path(file);
+      short rep_factor = 1;
+      int seed = 0xFAFAFA;
+      // write to DISK
+      DFSTestUtil.createFile(fs, path, 4L * blockSize, rep_factor, seed);
+
+      // move to ARCHIVE
+      fs.setStoragePolicy(new Path(file), "COLD");
+      int rc = ToolRunner.run(conf, new Mover.Cli(),
+          new String[] {"-p", file});
+      Assert.assertEquals("Retcode expected to be ExitStatus.SUCCESS (0).",
+          ExitStatus.SUCCESS.getExitCode(), rc);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test(timeout = 300000)
   public void testMoverWhenStoragePolicyUnset() throws Exception {
     final Configuration conf = new HdfsConfiguration();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to