Author: vinayakumarb Date: Wed Aug 13 18:43:05 2014 New Revision: 1617799 URL: http://svn.apache.org/r1617799 Log: HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate responses to Balancer (vinayakumarb)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1617799&r1=1617798&r2=1617799&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 13 18:43:05 2014 @@ -500,6 +500,9 @@ Release 2.6.0 - UNRELEASED HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a block replica (Arpit Agarwal) + HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate + responses to Balancer (vinayakumarb) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1617799&r1=1617798&r2=1617799&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Wed Aug 13 18:43:05 2014 @@ -87,8 +87,6 @@ public class Dispatcher { private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds - private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20 - // minutes private final NameNodeConnector nnc; private final SaslDataTransferClient saslClient; @@ -278,13 +276,6 @@ public class Dispatcher { sock.connect( NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); - /* - * Unfortunately we don't have a good way to know if the Datanode is - * taking a really long time to move a block, OR something has gone - * wrong and it's never going to finish. To deal with this scenario, we - * set a long timeout (20 minutes) to avoid hanging indefinitely. - */ - sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); sock.setKeepAlive(true); @@ -341,8 +332,12 @@ public class Dispatcher { /** Receive a block copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { - BlockOpResponseProto response = BlockOpResponseProto - .parseFrom(vintPrefixed(in)); + BlockOpResponseProto response = + BlockOpResponseProto.parseFrom(vintPrefixed(in)); + while (response.getStatus() == Status.IN_PROGRESS) { + // read intermediate responses + response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); + } if (response.getStatus() != Status.SUCCESS) { if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new IOException("block move failed due to access token error"); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1617799&r1=1617798&r2=1617799&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Aug 13 18:43:05 2014 @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.d import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; @@ -123,6 +124,14 @@ class BlockReceiver implements Closeable private boolean syncOnClose; private long restartBudget; + /** + * for replaceBlock response + */ + private final long responseInterval; + private long lastResponseTime = 0; + private boolean isReplaceBlock = false; + private DataOutputStream replyOut = null; + BlockReceiver(final ExtendedBlock block, final StorageType storageType, final DataInputStream in, final String inAddr, final String myAddr, @@ -144,6 +153,9 @@ class BlockReceiver implements Closeable this.isClient = !this.isDatanode; this.restartBudget = datanode.getDnConf().restartReplicaExpiry; this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs; + // For replaceBlock() calls response should be sent to avoid socketTimeout + // at clients. So sending with the interval of 0.5 * socketTimeout + this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5); //for datanode, we have //1: clientName.length() == 0, and //2: stage == null or PIPELINE_SETUP_CREATE @@ -651,6 +663,20 @@ class BlockReceiver implements Closeable lastPacketInBlock, offsetInBlock, Status.SUCCESS); } + /* + * Send in-progress responses for the replaceBlock() calls back to caller to + * avoid timeouts due to balancer throttling. HDFS-6247 + */ + if (isReplaceBlock + && (Time.monotonicNow() - lastResponseTime > responseInterval)) { + BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() + .setStatus(Status.IN_PROGRESS); + response.build().writeDelimitedTo(replyOut); + replyOut.flush(); + + lastResponseTime = Time.monotonicNow(); + } + if (throttler != null) { // throttle I/O throttler.throttle(len); } @@ -718,7 +744,8 @@ class BlockReceiver implements Closeable DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, - DatanodeInfo[] downstreams) throws IOException { + DatanodeInfo[] downstreams, + boolean isReplaceBlock) throws IOException { syncOnClose = datanode.getDnConf().syncOnClose; boolean responderClosed = false; @@ -726,6 +753,9 @@ class BlockReceiver implements Closeable mirrorAddr = mirrAddr; throttler = throttlerArg; + this.replyOut = replyOut; + this.isReplaceBlock = isReplaceBlock; + try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1617799&r1=1617798&r2=1617799&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 13 18:43:05 2014 @@ -708,7 +708,7 @@ class DataXceiver extends Receiver imple if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, - mirrorAddr, null, targets); + mirrorAddr, null, targets, false); // send close-ack for transfer-RBW/Finalized if (isTransfer) { @@ -983,7 +983,7 @@ class DataXceiver extends Receiver imple String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; - + DataOutputStream replyOut = new DataOutputStream(getOutputStream()); try { // get the output stream to the proxy final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); @@ -1040,8 +1040,8 @@ class DataXceiver extends Receiver imple CachingStrategy.newDropBehind()); // receive a block - blockReceiver.receiveBlock(null, null, null, null, - dataXceiverServer.balanceThrottler, null); + blockReceiver.receiveBlock(null, null, replyOut, null, + dataXceiverServer.balanceThrottler, null, true); // notify name node datanode.notifyNamenodeReceivedBlock( @@ -1076,6 +1076,7 @@ class DataXceiver extends Receiver imple IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); IOUtils.closeStream(proxyReply); + IOUtils.closeStream(replyOut); } //update metrics Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1617799&r1=1617798&r2=1617799&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Wed Aug 13 18:43:05 2014 @@ -207,6 +207,7 @@ enum Status { OOB_RESERVED1 = 9; // Reserved OOB_RESERVED2 = 10; // Reserved OOB_RESERVED3 = 11; // Reserved + IN_PROGRESS = 12; } message PipelineAckProto { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1617799&r1=1617798&r2=1617799&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Wed Aug 13 18:43:05 2014 @@ -272,8 +272,10 @@ public class TestBlockReplacement { // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); - BlockOpResponseProto proto = - BlockOpResponseProto.parseDelimitedFrom(reply); + BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); + while (proto.getStatus() == Status.IN_PROGRESS) { + proto = BlockOpResponseProto.parseDelimitedFrom(reply); + } return proto.getStatus() == Status.SUCCESS; }