This is an automated email from the ASF dual-hosted git repository. shv pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 90ebbaa HDFS-13174. hdfs mover -p /path times out after 20 min. Contributed by Istvan Fajth. 90ebbaa is described below commit 90ebbaa3938a5c2908cb53fb9cd2d6630a949dc3 Author: Wei-Chiu Chuang <weic...@apache.org> AuthorDate: Fri Jun 15 13:35:50 2018 -0700 HDFS-13174. hdfs mover -p /path times out after 20 min. Contributed by Istvan Fajth. (cherry picked from commit c966a3837af1c1a1c4a441f491b0d76d5c9e5d78) (cherry picked from commit 975d4b3d603632a5edacb138cf4a1ce92ebed02e) --- .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +- .../hadoop/hdfs/server/balancer/Balancer.java | 6 +- .../hadoop/hdfs/server/balancer/Dispatcher.java | 28 +++++--- .../src/main/resources/hdfs-default.xml | 10 +++ .../hadoop/hdfs/server/balancer/TestBalancer.java | 79 ++++++++++++++++++++++ .../apache/hadoop/hdfs/server/mover/TestMover.java | 46 +++++++++++++ 6 files changed, 162 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index bde8677..1e53a2e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -533,7 +533,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout"; public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0; public static final String DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval"; - public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute + public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute + public static final String DFS_BALANCER_MAX_ITERATION_TIME_KEY = "dfs.balancer.max-iteration-time"; + public static final long DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT = 20 * 60 * 1000L; // 20 mins public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 94f84a5..be72eb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -286,13 +286,17 @@ public class Balancer { final int maxNoMoveInterval = conf.getInt( DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT); + final long maxIterationTime = conf.getLong( + DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, + DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT); this.nnc = theblockpool; this.dispatcher = new Dispatcher(theblockpool, p.getIncludedNodes(), p.getExcludedNodes(), movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, - getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, conf); + getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, + maxIterationTime, conf); this.threshold = p.getThreshold(); this.policy = p.getBalancingPolicy(); this.sourceNodes = p.getSourceNodes(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index d574750..84cc13b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -134,6 +134,8 @@ public class Dispatcher { private final boolean connectToDnViaHostname; private BlockPlacementPolicy placementPolicy; + private long maxIterationTime; + static class Allocator { private final int max; private int count = 0; @@ -335,12 +337,18 @@ public class Dispatcher { /** Dispatch the move to the proxy source & wait for the response. */ private void dispatch() { - LOG.info("Start moving " + this); - Socket sock = new Socket(); DataOutputStream out = null; DataInputStream in = null; try { + if (source.isIterationOver()){ + LOG.info("Cancel moving " + this + + " as iteration is already cancelled due to" + + " dfs.balancer.max-iteration-time is passed."); + throw new IOException("Block move cancelled."); + } + LOG.info("Start moving " + this); + sock.connect( NetUtils.createSocketAddr(target.getDatanodeInfo(). getXferAddr(Dispatcher.this.connectToDnViaHostname)), @@ -679,7 +687,10 @@ public class Dispatcher { * Check if the iteration is over */ public boolean isIterationOver() { - return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME); + if (maxIterationTime < 0){ + return false; + } + return (Time.monotonicNow()-startTime > maxIterationTime); } /** Add a task */ @@ -811,8 +822,6 @@ public class Dispatcher { return blocksToReceive > 0; } - private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins - /** * This method iteratively does the following: it first selects a block to * move, then sends a request to the proxy source to start the block move @@ -881,7 +890,7 @@ public class Dispatcher { } if (isIterationOver()) { - LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000 + LOG.info("The maximum iteration time (" + maxIterationTime/1000 + " seconds) has been reached. Stopping " + this); } } @@ -904,14 +913,14 @@ public class Dispatcher { int maxNoMoveInterval, Configuration conf) { this(nnc, includedNodes, excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, - 0L, 0L, 0, maxNoMoveInterval, conf); + 0L, 0L, 0, maxNoMoveInterval, -1, conf); } Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, - long getBlocksSize, long getBlocksMinBlockSize, - int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) { + long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout, + int maxNoMoveInterval, long maxIterationTime, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; @@ -939,6 +948,7 @@ public class Dispatcher { HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); this.placementPolicy = BlockPlacementPolicy.getInstance(conf, null, cluster, null); + this.maxIterationTime = maxIterationTime; } public DistributedFileSystem getDistributedFileSystem() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 262d5be..ad9104e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3540,6 +3540,16 @@ </property> <property> + <name>dfs.balancer.max-iteration-time</name> + <value>1200000</value> + <description> + Maximum amount of time while an iteration can be run by the Balancer. After + this time the Balancer will stop the iteration, and reevaluate the work + needs to be done to Balance the cluster. The default value is 20 minutes. + </description> +</property> + +<property> <name>dfs.block.invalidate.limit</name> <value>1000</value> <description> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 830dc0e..079cb4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -1554,6 +1554,85 @@ public class TestBalancer { CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true); } + + @Test(timeout = 100000) + public void testMaxIterationTime() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + int blockSize = 10*1024*1024; // 10MB block size + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + // limit the worker thread count of Balancer to have only 1 queue per DN + conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1); + // limit the bandwitdh to 1 packet per sec to emulate slow block moves + conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, + 64 * 1024); + // set client socket timeout to have an IN_PROGRESS notification back from + // the DataNode about the copy in every second. + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L); + // set max iteration time to 2 seconds to timeout before moving any block + conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 2000L); + // setup the cluster + final long capacity = 10L * blockSize; + final long[] dnCapacities = new long[] {capacity, capacity}; + final short rep = 1; + final long seed = 0xFAFAFA; + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .build(); + try { + cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + cluster.startDataNodes(conf, 1, true, null, null, dnCapacities); + cluster.waitClusterUp(); + cluster.waitActive(); + final Path path = new Path("/testMaxIterationTime.dat"); + DistributedFileSystem fs = cluster.getFileSystem(); + // fill the DN to 40% + DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed); + // start a new DN + cluster.startDataNodes(conf, 1, true, null, null, dnCapacities); + cluster.triggerHeartbeats(); + // setup Balancer and run one iteration + List<NameNodeConnector> connectors = Collections.emptyList(); + try { + BalancerParameters bParams = BalancerParameters.DEFAULT; + connectors = NameNodeConnector.newNameNodeConnectors( + DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(), + Balancer.BALANCER_ID_PATH, conf, bParams.getMaxIdleIteration()); + for (NameNodeConnector nnc : connectors) { + LOG.info("NNC to work on: " + nnc); + Balancer b = new Balancer(nnc, bParams, conf); + long startTime = Time.monotonicNow(); + Result r = b.runOneIteration(); + long runtime = Time.monotonicNow() - startTime; + assertEquals("We expect ExitStatus.IN_PROGRESS to be reported.", + ExitStatus.IN_PROGRESS, r.exitStatus); + // accept runtime if it is under 3.5 seconds, as we need to wait for + // IN_PROGRESS report from DN, and some spare to be able to finish. + // NOTE: This can be a source of flaky tests, if the box is busy, + // assertion here is based on the following: Balancer is already set + // up, iteration gets the blocks from the NN, and makes the decision + // to move 2 blocks. After that the PendingMoves are scheduled, and + // DataNode heartbeats in for the Balancer every second, iteration is + // two seconds long. This means that it will fail if the setup and the + // heartbeat from the DataNode takes more than 500ms, as the iteration + // should end at the 3rd second from start. As the number of + // operations seems to be pretty low, and all comm happens locally, I + // think the possibility of a failure due to node busyness is low. + assertTrue("Unexpected iteration runtime: " + runtime + "ms > 3.5s", + runtime < 3500); + } + } finally { + for (NameNodeConnector nnc : connectors) { + IOUtils.cleanupWithLogger(null, nnc); + } + } + } finally { + cluster.shutdown(true, true); + } + } + /* * Test Balancer with Ram_Disk configured * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index c7d04d5..b4db701 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -415,6 +415,52 @@ public class TestMover { } } + @Test(timeout=100000) + public void testBalancerMaxIterationTimeNotAffectMover() throws Exception { + long blockSize = 10*1024*1024; + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 1); + // set a fairly large block size to run into the limitation + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + // set a somewhat grater than zero max iteration time to have the move time + // to surely exceed it + conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 200L); + conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 1); + // set client socket timeout to have an IN_PROGRESS notification back from + // the DataNode about the copy in every second. + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000L); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}) + .build(); + try { + cluster.waitActive(); + final DistributedFileSystem fs = cluster.getFileSystem(); + final String file = "/testMaxIterationTime.dat"; + final Path path = new Path(file); + short rep_factor = 1; + int seed = 0xFAFAFA; + // write to DISK + DFSTestUtil.createFile(fs, path, 4L * blockSize, rep_factor, seed); + + // move to ARCHIVE + fs.setStoragePolicy(new Path(file), "COLD"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", file}); + Assert.assertEquals("Retcode expected to be ExitStatus.SUCCESS (0).", + ExitStatus.SUCCESS.getExitCode(), rc); + } finally { + cluster.shutdown(); + } + } + @Test(timeout = 300000) public void testMoverWhenStoragePolicyUnset() throws Exception { final Configuration conf = new HdfsConfiguration(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org