Repository: hadoop Updated Branches: refs/heads/branch-2.7 4c6e24528 -> 6e7c76a5b
HDFS-9106. Transfer failure during pipeline recovery causes permanent write failures. Contributed by Kihwal Lee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e7c76a5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e7c76a5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e7c76a5 Branch: refs/heads/branch-2.7 Commit: 6e7c76a5bd6e3376efad6763308932ae02917002 Parents: 4c6e245 Author: Kihwal Lee <kih...@apache.org> Authored: Mon Sep 28 15:19:57 2015 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Mon Sep 28 15:19:57 2015 -0500 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSOutputStream.java | 59 +++++++++++++++----- 2 files changed, 47 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e7c76a5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 771ffd0..acdb531 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -50,6 +50,9 @@ Release 2.7.2 - UNRELEASED HDFS-9043. Doc updation for commands in HDFS Federation (J.Andreina via vinayakumab) + HDFS-9106. Transfer failure during pipeline recovery causes permanent + write failures (kihwal) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e7c76a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f105530..def829c 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -972,22 +972,46 @@ public class DFSOutputStream extends FSOutputSummer return; } - //get a new datanode + int tried = 0; final DatanodeInfo[] original = nodes; - final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode( - src, fileId, block, nodes, storageIDs, - failed.toArray(new DatanodeInfo[failed.size()]), - 1, dfsClient.clientName); - setPipeline(lb); + final StorageType[] originalTypes = storageTypes; + final String[] originalIDs = storageIDs; + IOException caughtException = null; + ArrayList<DatanodeInfo> exclude = new ArrayList<DatanodeInfo>(failed); + while (tried < 3) { + LocatedBlock lb; + //get a new datanode + lb = dfsClient.namenode.getAdditionalDatanode( + src, fileId, block, nodes, storageIDs, + exclude.toArray(new DatanodeInfo[exclude.size()]), + 1, dfsClient.clientName); + // a new node was allocated by the namenode. Update nodes. + setPipeline(lb); + + //find the new datanode + final int d = findNewDatanode(original); + //transfer replica. pick a source from the original nodes + final DatanodeInfo src = original[tried % original.length]; + final DatanodeInfo[] targets = {nodes[d]}; + final StorageType[] targetStorageTypes = {storageTypes[d]}; - //find the new datanode - final int d = findNewDatanode(original); - - //transfer replica - final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; - final DatanodeInfo[] targets = {nodes[d]}; - final StorageType[] targetStorageTypes = {storageTypes[d]}; - transfer(src, targets, targetStorageTypes, lb.getBlockToken()); + try { + transfer(src, targets, targetStorageTypes, lb.getBlockToken()); + } catch (IOException ioe) { + DFSClient.LOG.warn("Error transferring data from " + src + " to " + + nodes[d] + ": " + ioe.getMessage()); + caughtException = ioe; + // add the allocated node to the exclude list. + exclude.add(nodes[d]); + setPipeline(original, originalTypes, originalIDs); + tried++; + continue; + } + return; // finished successfully + } + // All retries failed + throw (caughtException != null) ? caughtException : + new IOException("Failed to add a node"); } private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, @@ -1001,8 +1025,13 @@ public class DFSOutputStream extends FSOutputSummer sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); + // transfer timeout multiplier based on the transfer size + // One per 200 packets = 12.8MB. Minimum is 2. + int multi = 2 + (int)(bytesSent/dfsClient.getConf().writePacketSize)/200; + final long readTimeout = dfsClient.getDatanodeReadTimeout(multi); + OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(sock); + InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout); IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src); unbufOut = saslStreams.out;