Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 4c6e24528 -> 6e7c76a5b


HDFS-9106. Transfer failure during pipeline recovery causes permanent write 
failures. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e7c76a5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e7c76a5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e7c76a5

Branch: refs/heads/branch-2.7
Commit: 6e7c76a5bd6e3376efad6763308932ae02917002
Parents: 4c6e245
Author: Kihwal Lee <kih...@apache.org>
Authored: Mon Sep 28 15:19:57 2015 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Mon Sep 28 15:19:57 2015 -0500

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 59 +++++++++++++++-----
 2 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e7c76a5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 771ffd0..acdb531 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -50,6 +50,9 @@ Release 2.7.2 - UNRELEASED
     HDFS-9043. Doc updation for commands in HDFS Federation
     (J.Andreina via vinayakumab)
 
+    HDFS-9106. Transfer failure during pipeline recovery causes permanent
+    write failures (kihwal)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e7c76a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index f105530..def829c 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -972,22 +972,46 @@ public class DFSOutputStream extends FSOutputSummer
         return;
       }
 
-      //get a new datanode
+      int tried = 0;
       final DatanodeInfo[] original = nodes;
-      final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-          src, fileId, block, nodes, storageIDs,
-          failed.toArray(new DatanodeInfo[failed.size()]),
-          1, dfsClient.clientName);
-      setPipeline(lb);
+      final StorageType[] originalTypes = storageTypes;
+      final String[] originalIDs = storageIDs;
+      IOException caughtException = null;
+      ArrayList<DatanodeInfo> exclude = new ArrayList<DatanodeInfo>(failed);
+      while (tried < 3) {
+        LocatedBlock lb;
+        //get a new datanode
+        lb = dfsClient.namenode.getAdditionalDatanode(
+            src, fileId, block, nodes, storageIDs,
+            exclude.toArray(new DatanodeInfo[exclude.size()]),
+            1, dfsClient.clientName);
+        // a new node was allocated by the namenode. Update nodes.
+        setPipeline(lb);
+
+        //find the new datanode
+        final int d = findNewDatanode(original);
+        //transfer replica. pick a source from the original nodes
+        final DatanodeInfo src = original[tried % original.length];
+        final DatanodeInfo[] targets = {nodes[d]};
+        final StorageType[] targetStorageTypes = {storageTypes[d]};
 
-      //find the new datanode
-      final int d = findNewDatanode(original);
-
-      //transfer replica
-      final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
-      final DatanodeInfo[] targets = {nodes[d]};
-      final StorageType[] targetStorageTypes = {storageTypes[d]};
-      transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+        try {
+          transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+        } catch (IOException ioe) {
+          DFSClient.LOG.warn("Error transferring data from " + src + " to " +
+              nodes[d] + ": " + ioe.getMessage());
+          caughtException = ioe;
+          // add the allocated node to the exclude list.
+          exclude.add(nodes[d]);
+          setPipeline(original, originalTypes, originalIDs);
+          tried++;
+          continue;
+        }
+        return; // finished successfully
+      }
+      // All retries failed
+      throw (caughtException != null) ? caughtException :
+         new IOException("Failed to add a node");
     }
 
     private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
@@ -1001,8 +1025,13 @@ public class DFSOutputStream extends FSOutputSummer
         sock = createSocketForPipeline(src, 2, dfsClient);
         final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
         
+        // transfer timeout multiplier based on the transfer size
+        // One per 200 packets = 12.8MB. Minimum is 2.
+        int multi = 2 + 
(int)(bytesSent/dfsClient.getConf().writePacketSize)/200;
+        final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
+
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
-        InputStream unbufIn = NetUtils.getInputStream(sock);
+        InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
         IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
           unbufOut, unbufIn, dfsClient, blockToken, src);
         unbufOut = saslStreams.out;

Reply via email to