[05/52] [abbrv] hadoop git commit: HDFS-9106. Transfer failure during pipeline recovery causes permanent write failures. Contributed by Kihwal Lee.

2015-10-02 Thread eclark
HDFS-9106. Transfer failure during pipeline recovery causes permanent write 
failures. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c9497cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c9497cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c9497cb

Branch: refs/heads/HADOOP-11890
Commit: 4c9497cbf02ecc82532a4e79e18912d8e0eb4731
Parents: fb2e525
Author: Kihwal Lee 
Authored: Mon Sep 28 13:29:19 2015 -0500
Committer: Kihwal Lee 
Committed: Mon Sep 28 13:29:56 2015 -0500

--
 .../org/apache/hadoop/hdfs/DataStreamer.java| 60 ++--
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt |  3 +
 2 files changed, 47 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c9497cb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 6482966..d1d8d37 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -1208,22 +1208,46 @@ class DataStreamer extends Daemon {
   return;
 }
 
-//get a new datanode
+int tried = 0;
 final DatanodeInfo[] original = nodes;
-final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-src, stat.getFileId(), block, nodes, storageIDs,
-failed.toArray(new DatanodeInfo[failed.size()]),
-1, dfsClient.clientName);
-setPipeline(lb);
-
-//find the new datanode
-final int d = findNewDatanode(original);
-
-//transfer replica
-final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
-final DatanodeInfo[] targets = {nodes[d]};
-final StorageType[] targetStorageTypes = {storageTypes[d]};
-transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+final StorageType[] originalTypes = storageTypes;
+final String[] originalIDs = storageIDs;
+IOException caughtException = null;
+ArrayList exclude = new ArrayList(failed);
+while (tried < 3) {
+  LocatedBlock lb;
+  //get a new datanode
+  lb = dfsClient.namenode.getAdditionalDatanode(
+  src, stat.getFileId(), block, nodes, storageIDs,
+  exclude.toArray(new DatanodeInfo[exclude.size()]),
+  1, dfsClient.clientName);
+  // a new node was allocated by the namenode. Update nodes.
+  setPipeline(lb);
+
+  //find the new datanode
+  final int d = findNewDatanode(original);
+  //transfer replica. pick a source from the original nodes
+  final DatanodeInfo src = original[tried % original.length];
+  final DatanodeInfo[] targets = {nodes[d]};
+  final StorageType[] targetStorageTypes = {storageTypes[d]};
+
+  try {
+transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+  } catch (IOException ioe) {
+DFSClient.LOG.warn("Error transferring data from " + src + " to " +
+nodes[d] + ": " + ioe.getMessage());
+caughtException = ioe;
+// add the allocated node to the exclude list.
+exclude.add(nodes[d]);
+setPipeline(original, originalTypes, originalIDs);
+tried++;
+continue;
+  }
+  return; // finished successfully
+}
+// All retries failed
+throw (caughtException != null) ? caughtException :
+new IOException("Failed to add a node");
   }
 
   private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
@@ -1236,7 +1260,11 @@ class DataStreamer extends Daemon {
 try {
   sock = createSocketForPipeline(src, 2, dfsClient);
   final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-  final long readTimeout = dfsClient.getDatanodeReadTimeout(2);
+
+  // transfer timeout multiplier based on the transfer size
+  // One per 200 packets = 12.8MB. Minimum is 2.
+  int multi = 2 + 
(int)(bytesSent/dfsClient.getConf().getWritePacketSize())/200;
+  final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
 
   OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
   InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c9497cb/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
--
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3571e4a..1

[22/58] [abbrv] hadoop git commit: HDFS-9106. Transfer failure during pipeline recovery causes permanent write failures. Contributed by Kihwal Lee.

2015-09-30 Thread zhz
HDFS-9106. Transfer failure during pipeline recovery causes permanent write 
failures. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c9497cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c9497cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c9497cb

Branch: refs/heads/HDFS-7285
Commit: 4c9497cbf02ecc82532a4e79e18912d8e0eb4731
Parents: fb2e525
Author: Kihwal Lee 
Authored: Mon Sep 28 13:29:19 2015 -0500
Committer: Kihwal Lee 
Committed: Mon Sep 28 13:29:56 2015 -0500

--
 .../org/apache/hadoop/hdfs/DataStreamer.java| 60 ++--
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt |  3 +
 2 files changed, 47 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c9497cb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 6482966..d1d8d37 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -1208,22 +1208,46 @@ class DataStreamer extends Daemon {
   return;
 }
 
-//get a new datanode
+int tried = 0;
 final DatanodeInfo[] original = nodes;
-final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-src, stat.getFileId(), block, nodes, storageIDs,
-failed.toArray(new DatanodeInfo[failed.size()]),
-1, dfsClient.clientName);
-setPipeline(lb);
-
-//find the new datanode
-final int d = findNewDatanode(original);
-
-//transfer replica
-final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
-final DatanodeInfo[] targets = {nodes[d]};
-final StorageType[] targetStorageTypes = {storageTypes[d]};
-transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+final StorageType[] originalTypes = storageTypes;
+final String[] originalIDs = storageIDs;
+IOException caughtException = null;
+ArrayList exclude = new ArrayList(failed);
+while (tried < 3) {
+  LocatedBlock lb;
+  //get a new datanode
+  lb = dfsClient.namenode.getAdditionalDatanode(
+  src, stat.getFileId(), block, nodes, storageIDs,
+  exclude.toArray(new DatanodeInfo[exclude.size()]),
+  1, dfsClient.clientName);
+  // a new node was allocated by the namenode. Update nodes.
+  setPipeline(lb);
+
+  //find the new datanode
+  final int d = findNewDatanode(original);
+  //transfer replica. pick a source from the original nodes
+  final DatanodeInfo src = original[tried % original.length];
+  final DatanodeInfo[] targets = {nodes[d]};
+  final StorageType[] targetStorageTypes = {storageTypes[d]};
+
+  try {
+transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+  } catch (IOException ioe) {
+DFSClient.LOG.warn("Error transferring data from " + src + " to " +
+nodes[d] + ": " + ioe.getMessage());
+caughtException = ioe;
+// add the allocated node to the exclude list.
+exclude.add(nodes[d]);
+setPipeline(original, originalTypes, originalIDs);
+tried++;
+continue;
+  }
+  return; // finished successfully
+}
+// All retries failed
+throw (caughtException != null) ? caughtException :
+new IOException("Failed to add a node");
   }
 
   private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
@@ -1236,7 +1260,11 @@ class DataStreamer extends Daemon {
 try {
   sock = createSocketForPipeline(src, 2, dfsClient);
   final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-  final long readTimeout = dfsClient.getDatanodeReadTimeout(2);
+
+  // transfer timeout multiplier based on the transfer size
+  // One per 200 packets = 12.8MB. Minimum is 2.
+  int multi = 2 + 
(int)(bytesSent/dfsClient.getConf().getWritePacketSize())/200;
+  final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
 
   OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
   InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c9497cb/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
--
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3571e4a..1d9f

[34/50] [abbrv] hadoop git commit: HDFS-9106. Transfer failure during pipeline recovery causes permanent write failures. Contributed by Kihwal Lee.

2015-09-29 Thread aengineer
HDFS-9106. Transfer failure during pipeline recovery causes permanent write 
failures. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c9497cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c9497cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c9497cb

Branch: refs/heads/HDFS-7240
Commit: 4c9497cbf02ecc82532a4e79e18912d8e0eb4731
Parents: fb2e525
Author: Kihwal Lee 
Authored: Mon Sep 28 13:29:19 2015 -0500
Committer: Kihwal Lee 
Committed: Mon Sep 28 13:29:56 2015 -0500

--
 .../org/apache/hadoop/hdfs/DataStreamer.java| 60 ++--
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt |  3 +
 2 files changed, 47 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c9497cb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 6482966..d1d8d37 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -1208,22 +1208,46 @@ class DataStreamer extends Daemon {
   return;
 }
 
-//get a new datanode
+int tried = 0;
 final DatanodeInfo[] original = nodes;
-final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-src, stat.getFileId(), block, nodes, storageIDs,
-failed.toArray(new DatanodeInfo[failed.size()]),
-1, dfsClient.clientName);
-setPipeline(lb);
-
-//find the new datanode
-final int d = findNewDatanode(original);
-
-//transfer replica
-final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
-final DatanodeInfo[] targets = {nodes[d]};
-final StorageType[] targetStorageTypes = {storageTypes[d]};
-transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+final StorageType[] originalTypes = storageTypes;
+final String[] originalIDs = storageIDs;
+IOException caughtException = null;
+ArrayList exclude = new ArrayList(failed);
+while (tried < 3) {
+  LocatedBlock lb;
+  //get a new datanode
+  lb = dfsClient.namenode.getAdditionalDatanode(
+  src, stat.getFileId(), block, nodes, storageIDs,
+  exclude.toArray(new DatanodeInfo[exclude.size()]),
+  1, dfsClient.clientName);
+  // a new node was allocated by the namenode. Update nodes.
+  setPipeline(lb);
+
+  //find the new datanode
+  final int d = findNewDatanode(original);
+  //transfer replica. pick a source from the original nodes
+  final DatanodeInfo src = original[tried % original.length];
+  final DatanodeInfo[] targets = {nodes[d]};
+  final StorageType[] targetStorageTypes = {storageTypes[d]};
+
+  try {
+transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+  } catch (IOException ioe) {
+DFSClient.LOG.warn("Error transferring data from " + src + " to " +
+nodes[d] + ": " + ioe.getMessage());
+caughtException = ioe;
+// add the allocated node to the exclude list.
+exclude.add(nodes[d]);
+setPipeline(original, originalTypes, originalIDs);
+tried++;
+continue;
+  }
+  return; // finished successfully
+}
+// All retries failed
+throw (caughtException != null) ? caughtException :
+new IOException("Failed to add a node");
   }
 
   private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
@@ -1236,7 +1260,11 @@ class DataStreamer extends Daemon {
 try {
   sock = createSocketForPipeline(src, 2, dfsClient);
   final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-  final long readTimeout = dfsClient.getDatanodeReadTimeout(2);
+
+  // transfer timeout multiplier based on the transfer size
+  // One per 200 packets = 12.8MB. Minimum is 2.
+  int multi = 2 + 
(int)(bytesSent/dfsClient.getConf().getWritePacketSize())/200;
+  final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
 
   OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
   InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c9497cb/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
--
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3571e4a..1d9f

hadoop git commit: HDFS-9106. Transfer failure during pipeline recovery causes permanent write failures. Contributed by Kihwal Lee.

2015-09-28 Thread kihwal
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 4c6e24528 -> 6e7c76a5b


HDFS-9106. Transfer failure during pipeline recovery causes permanent write 
failures. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e7c76a5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e7c76a5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e7c76a5

Branch: refs/heads/branch-2.7
Commit: 6e7c76a5bd6e3376efad6763308932ae02917002
Parents: 4c6e245
Author: Kihwal Lee 
Authored: Mon Sep 28 15:19:57 2015 -0500
Committer: Kihwal Lee 
Committed: Mon Sep 28 15:19:57 2015 -0500

--
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt |  3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 59 +++-
 2 files changed, 47 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e7c76a5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
--
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 771ffd0..acdb531 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -50,6 +50,9 @@ Release 2.7.2 - UNRELEASED
 HDFS-9043. Doc updation for commands in HDFS Federation
 (J.Andreina via vinayakumab)
 
+HDFS-9106. Transfer failure during pipeline recovery causes permanent
+write failures (kihwal)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e7c76a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index f105530..def829c 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -972,22 +972,46 @@ public class DFSOutputStream extends FSOutputSummer
 return;
   }
 
-  //get a new datanode
+  int tried = 0;
   final DatanodeInfo[] original = nodes;
-  final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-  src, fileId, block, nodes, storageIDs,
-  failed.toArray(new DatanodeInfo[failed.size()]),
-  1, dfsClient.clientName);
-  setPipeline(lb);
+  final StorageType[] originalTypes = storageTypes;
+  final String[] originalIDs = storageIDs;
+  IOException caughtException = null;
+  ArrayList exclude = new ArrayList(failed);
+  while (tried < 3) {
+LocatedBlock lb;
+//get a new datanode
+lb = dfsClient.namenode.getAdditionalDatanode(
+src, fileId, block, nodes, storageIDs,
+exclude.toArray(new DatanodeInfo[exclude.size()]),
+1, dfsClient.clientName);
+// a new node was allocated by the namenode. Update nodes.
+setPipeline(lb);
+
+//find the new datanode
+final int d = findNewDatanode(original);
+//transfer replica. pick a source from the original nodes
+final DatanodeInfo src = original[tried % original.length];
+final DatanodeInfo[] targets = {nodes[d]};
+final StorageType[] targetStorageTypes = {storageTypes[d]};
 
-  //find the new datanode
-  final int d = findNewDatanode(original);
-
-  //transfer replica
-  final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
-  final DatanodeInfo[] targets = {nodes[d]};
-  final StorageType[] targetStorageTypes = {storageTypes[d]};
-  transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+try {
+  transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+} catch (IOException ioe) {
+  DFSClient.LOG.warn("Error transferring data from " + src + " to " +
+  nodes[d] + ": " + ioe.getMessage());
+  caughtException = ioe;
+  // add the allocated node to the exclude list.
+  exclude.add(nodes[d]);
+  setPipeline(original, originalTypes, originalIDs);
+  tried++;
+  continue;
+}
+return; // finished successfully
+  }
+  // All retries failed
+  throw (caughtException != null) ? caughtException :
+ new IOException("Failed to add a node");
 }
 
 private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
@@ -1001,8 +1025,13 @@ public class DFSOutputStream extends FSOutputSummer
 sock = createSocketForPipeline(src, 

hadoop git commit: HDFS-9106. Transfer failure during pipeline recovery causes permanent write failures. Contributed by Kihwal Lee. (cherry picked from commit 4c9497cbf02ecc82532a4e79e18912d8e0eb4731)

2015-09-28 Thread kihwal
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 4d8b99423 -> 468b330fe


HDFS-9106. Transfer failure during pipeline recovery causes permanent write 
failures. Contributed by Kihwal Lee.
(cherry picked from commit 4c9497cbf02ecc82532a4e79e18912d8e0eb4731)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/468b330f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/468b330f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/468b330f

Branch: refs/heads/branch-2
Commit: 468b330feb007fd443f243add0e8f3f72a0db5af
Parents: 4d8b994
Author: Kihwal Lee 
Authored: Mon Sep 28 14:30:06 2015 -0500
Committer: Kihwal Lee 
Committed: Mon Sep 28 14:30:06 2015 -0500

--
 .../org/apache/hadoop/hdfs/DataStreamer.java| 60 ++--
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt |  3 +
 2 files changed, 47 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/468b330f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 6482966..d1d8d37 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -1208,22 +1208,46 @@ class DataStreamer extends Daemon {
   return;
 }
 
-//get a new datanode
+int tried = 0;
 final DatanodeInfo[] original = nodes;
-final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-src, stat.getFileId(), block, nodes, storageIDs,
-failed.toArray(new DatanodeInfo[failed.size()]),
-1, dfsClient.clientName);
-setPipeline(lb);
-
-//find the new datanode
-final int d = findNewDatanode(original);
-
-//transfer replica
-final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
-final DatanodeInfo[] targets = {nodes[d]};
-final StorageType[] targetStorageTypes = {storageTypes[d]};
-transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+final StorageType[] originalTypes = storageTypes;
+final String[] originalIDs = storageIDs;
+IOException caughtException = null;
+ArrayList exclude = new ArrayList(failed);
+while (tried < 3) {
+  LocatedBlock lb;
+  //get a new datanode
+  lb = dfsClient.namenode.getAdditionalDatanode(
+  src, stat.getFileId(), block, nodes, storageIDs,
+  exclude.toArray(new DatanodeInfo[exclude.size()]),
+  1, dfsClient.clientName);
+  // a new node was allocated by the namenode. Update nodes.
+  setPipeline(lb);
+
+  //find the new datanode
+  final int d = findNewDatanode(original);
+  //transfer replica. pick a source from the original nodes
+  final DatanodeInfo src = original[tried % original.length];
+  final DatanodeInfo[] targets = {nodes[d]};
+  final StorageType[] targetStorageTypes = {storageTypes[d]};
+
+  try {
+transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+  } catch (IOException ioe) {
+DFSClient.LOG.warn("Error transferring data from " + src + " to " +
+nodes[d] + ": " + ioe.getMessage());
+caughtException = ioe;
+// add the allocated node to the exclude list.
+exclude.add(nodes[d]);
+setPipeline(original, originalTypes, originalIDs);
+tried++;
+continue;
+  }
+  return; // finished successfully
+}
+// All retries failed
+throw (caughtException != null) ? caughtException :
+new IOException("Failed to add a node");
   }
 
   private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
@@ -1236,7 +1260,11 @@ class DataStreamer extends Daemon {
 try {
   sock = createSocketForPipeline(src, 2, dfsClient);
   final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-  final long readTimeout = dfsClient.getDatanodeReadTimeout(2);
+
+  // transfer timeout multiplier based on the transfer size
+  // One per 200 packets = 12.8MB. Minimum is 2.
+  int multi = 2 + 
(int)(bytesSent/dfsClient.getConf().getWritePacketSize())/200;
+  final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
 
   OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
   InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/468b330f/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
--

hadoop git commit: HDFS-9106. Transfer failure during pipeline recovery causes permanent write failures. Contributed by Kihwal Lee.

2015-09-28 Thread kihwal
Repository: hadoop
Updated Branches:
  refs/heads/trunk fb2e525c0 -> 4c9497cbf


HDFS-9106. Transfer failure during pipeline recovery causes permanent write 
failures. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c9497cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c9497cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c9497cb

Branch: refs/heads/trunk
Commit: 4c9497cbf02ecc82532a4e79e18912d8e0eb4731
Parents: fb2e525
Author: Kihwal Lee 
Authored: Mon Sep 28 13:29:19 2015 -0500
Committer: Kihwal Lee 
Committed: Mon Sep 28 13:29:56 2015 -0500

--
 .../org/apache/hadoop/hdfs/DataStreamer.java| 60 ++--
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt |  3 +
 2 files changed, 47 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c9497cb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 6482966..d1d8d37 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -1208,22 +1208,46 @@ class DataStreamer extends Daemon {
   return;
 }
 
-//get a new datanode
+int tried = 0;
 final DatanodeInfo[] original = nodes;
-final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-src, stat.getFileId(), block, nodes, storageIDs,
-failed.toArray(new DatanodeInfo[failed.size()]),
-1, dfsClient.clientName);
-setPipeline(lb);
-
-//find the new datanode
-final int d = findNewDatanode(original);
-
-//transfer replica
-final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
-final DatanodeInfo[] targets = {nodes[d]};
-final StorageType[] targetStorageTypes = {storageTypes[d]};
-transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+final StorageType[] originalTypes = storageTypes;
+final String[] originalIDs = storageIDs;
+IOException caughtException = null;
+ArrayList exclude = new ArrayList(failed);
+while (tried < 3) {
+  LocatedBlock lb;
+  //get a new datanode
+  lb = dfsClient.namenode.getAdditionalDatanode(
+  src, stat.getFileId(), block, nodes, storageIDs,
+  exclude.toArray(new DatanodeInfo[exclude.size()]),
+  1, dfsClient.clientName);
+  // a new node was allocated by the namenode. Update nodes.
+  setPipeline(lb);
+
+  //find the new datanode
+  final int d = findNewDatanode(original);
+  //transfer replica. pick a source from the original nodes
+  final DatanodeInfo src = original[tried % original.length];
+  final DatanodeInfo[] targets = {nodes[d]};
+  final StorageType[] targetStorageTypes = {storageTypes[d]};
+
+  try {
+transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+  } catch (IOException ioe) {
+DFSClient.LOG.warn("Error transferring data from " + src + " to " +
+nodes[d] + ": " + ioe.getMessage());
+caughtException = ioe;
+// add the allocated node to the exclude list.
+exclude.add(nodes[d]);
+setPipeline(original, originalTypes, originalIDs);
+tried++;
+continue;
+  }
+  return; // finished successfully
+}
+// All retries failed
+throw (caughtException != null) ? caughtException :
+new IOException("Failed to add a node");
   }
 
   private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
@@ -1236,7 +1260,11 @@ class DataStreamer extends Daemon {
 try {
   sock = createSocketForPipeline(src, 2, dfsClient);
   final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-  final long readTimeout = dfsClient.getDatanodeReadTimeout(2);
+
+  // transfer timeout multiplier based on the transfer size
+  // One per 200 packets = 12.8MB. Minimum is 2.
+  int multi = 2 + 
(int)(bytesSent/dfsClient.getConf().getWritePacketSize())/200;
+  final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
 
   OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
   InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c9497cb/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
--
diff --git a/hadoop-hdfs-project/hadoop-hdfs/C