HDFS-11856. Ability to re-add Upgrading Nodes to pipeline for future pipeline 
updates. Contributed by Vinayakumar B.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29b7df96
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29b7df96
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29b7df96

Branch: refs/heads/YARN-1011
Commit: 29b7df960fc3d0a7d1416225c3106c7d4222f0ca
Parents: 4fb41b3
Author: Kihwal Lee <kih...@apache.org>
Authored: Thu May 25 13:04:09 2017 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Thu May 25 13:05:23 2017 -0500

----------------------------------------------------------------------
 .../hadoop/hdfs/DFSClientFaultInjector.java     |  4 +
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 70 +++++++++++----
 .../hdfs/server/datanode/BlockReceiver.java     |  6 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |  2 +-
 .../impl/FsDatasetAsyncDiskService.java         | 14 ++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 85 ++++++++++++------
 .../TestClientProtocolForPipelineRecovery.java  | 92 ++++++++++++++++++++
 .../server/datanode/SimulatedFSDataset.java     |  6 +-
 .../server/datanode/TestSimulatedFSDataset.java |  2 +-
 .../extdataset/ExternalDatasetImpl.java         |  3 +-
 .../fsdataset/impl/TestWriteToReplica.java      | 20 +++--
 11 files changed, 241 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 4eb4c52..748edcd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -57,4 +57,8 @@ public class DFSClientFaultInjector {
   public void fetchFromDatanodeException() {}
 
   public void readFromDatanodeDelay() {}
+
+  public boolean skipRollingRestartWait() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 49c17b9..f5ce0ff 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -327,6 +327,7 @@ class DataStreamer extends Daemon {
   static class ErrorState {
     ErrorType error = ErrorType.NONE;
     private int badNodeIndex = -1;
+    private boolean waitForRestart = true;
     private int restartingNodeIndex = -1;
     private long restartingNodeDeadline = 0;
     private final long datanodeRestartTimeout;
@@ -342,6 +343,7 @@ class DataStreamer extends Daemon {
       badNodeIndex = -1;
       restartingNodeIndex = -1;
       restartingNodeDeadline = 0;
+      waitForRestart = true;
     }
 
     synchronized void reset() {
@@ -349,6 +351,7 @@ class DataStreamer extends Daemon {
       badNodeIndex = -1;
       restartingNodeIndex = -1;
       restartingNodeDeadline = 0;
+      waitForRestart = true;
     }
 
     synchronized boolean hasInternalError() {
@@ -389,14 +392,19 @@ class DataStreamer extends Daemon {
       return restartingNodeIndex;
     }
 
-    synchronized void initRestartingNode(int i, String message) {
+    synchronized void initRestartingNode(int i, String message,
+        boolean shouldWait) {
       restartingNodeIndex = i;
-      restartingNodeDeadline =  Time.monotonicNow() + datanodeRestartTimeout;
-      // If the data streamer has already set the primary node
-      // bad, clear it. It is likely that the write failed due to
-      // the DN shutdown. Even if it was a real failure, the pipeline
-      // recovery will take care of it.
-      badNodeIndex = -1;
+      if (shouldWait) {
+        restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
+        // If the data streamer has already set the primary node
+        // bad, clear it. It is likely that the write failed due to
+        // the DN shutdown. Even if it was a real failure, the pipeline
+        // recovery will take care of it.
+        badNodeIndex = -1;
+      } else {
+        this.waitForRestart = false;
+      }
       LOG.info(message);
     }
 
@@ -405,7 +413,7 @@ class DataStreamer extends Daemon {
     }
 
     synchronized boolean isNodeMarked() {
-      return badNodeIndex >= 0 || isRestartingNode();
+      return badNodeIndex >= 0 || (isRestartingNode() && doWaitForRestart());
     }
 
     /**
@@ -430,7 +438,7 @@ class DataStreamer extends Daemon {
         } else if (badNodeIndex < restartingNodeIndex) {
           // the node index has shifted.
           restartingNodeIndex--;
-        } else {
+        } else if (waitForRestart) {
           throw new IllegalStateException("badNodeIndex = " + badNodeIndex
               + " = restartingNodeIndex = " + restartingNodeIndex);
         }
@@ -472,6 +480,10 @@ class DataStreamer extends Daemon {
         }
       }
     }
+
+    boolean doWaitForRestart() {
+      return waitForRestart;
+    }
   }
 
   private volatile boolean streamerClosed = false;
@@ -491,6 +503,8 @@ class DataStreamer extends Daemon {
 
   /** Nodes have been used in the pipeline before and have failed. */
   private final List<DatanodeInfo> failed = new ArrayList<>();
+  /** Restarting Nodes */
+  private List<DatanodeInfo> restartingNodes = new ArrayList<>();
   /** The times have retried to recover pipeline, for the same packet. */
   private volatile int pipelineRecoveryCount = 0;
   /** Has the current block been hflushed? */
@@ -1043,6 +1057,13 @@ class DataStreamer extends Daemon {
       return true;
     }
 
+    /*
+     * Treat all nodes as remote for test when skip enabled.
+     */
+    if (DFSClientFaultInjector.get().skipRollingRestartWait()) {
+      return false;
+    }
+
     // Is it a local node?
     InetAddress addr = null;
     try {
@@ -1110,11 +1131,11 @@ class DataStreamer extends Daemon {
             }
             // Restart will not be treated differently unless it is
             // the local node or the only one in the pipeline.
-            if (PipelineAck.isRestartOOBStatus(reply) &&
-                shouldWaitForRestart(i)) {
+            if (PipelineAck.isRestartOOBStatus(reply)) {
               final String message = "Datanode " + i + " is restarting: "
                   + targets[i];
-              errorState.initRestartingNode(i, message);
+              errorState.initRestartingNode(i, message,
+                  shouldWaitForRestart(i));
               throw new IOException(message);
             }
             // node error
@@ -1492,6 +1513,14 @@ class DataStreamer extends Daemon {
    */
   boolean handleRestartingDatanode() {
     if (errorState.isRestartingNode()) {
+      if (!errorState.doWaitForRestart()) {
+        // If node is restarting and not worth to wait for restart then can go
+        // ahead with error recovery considering it as bad node for now. Later
+        // it should be able to re-consider the same node for future pipeline
+        // updates.
+        errorState.setBadNodeIndex(errorState.getRestartingNodeIndex());
+        return true;
+      }
       // 4 seconds or the configured deadline period, whichever is shorter.
       // This is the retry interval and recovery will be retried in this
       // interval until timeout or success.
@@ -1523,9 +1552,14 @@ class DataStreamer extends Daemon {
         return false;
       }
 
+      String reason = "bad.";
+      if (errorState.getRestartingNodeIndex() == badNodeIndex) {
+        reason = "restarting.";
+        restartingNodes.add(nodes[badNodeIndex]);
+      }
       LOG.warn("Error Recovery for " + block + " in pipeline "
           + Arrays.toString(nodes) + ": datanode " + badNodeIndex
-          + "("+ nodes[badNodeIndex] + ") is bad.");
+          + "("+ nodes[badNodeIndex] + ") is " + reason);
       failed.add(nodes[badNodeIndex]);
 
       DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
@@ -1735,6 +1769,9 @@ class DataStreamer extends Daemon {
         blockStream = out;
         result =  true; // success
         errorState.resetInternalError();
+        // remove all restarting nodes from failed nodes list
+        failed.removeAll(restartingNodes);
+        restartingNodes.clear();
       } catch (IOException ie) {
         if (!errorState.isRestartingNode()) {
           LOG.info("Exception in createBlockOutputStream " + this, ie);
@@ -1768,9 +1805,10 @@ class DataStreamer extends Daemon {
 
         final int i = errorState.getBadNodeIndex();
         // Check whether there is a restart worth waiting for.
-        if (checkRestart && shouldWaitForRestart(i)) {
-          errorState.initRestartingNode(i, "Datanode " + i + " is restarting: "
-              + nodes[i]);
+        if (checkRestart) {
+          errorState.initRestartingNode(i,
+              "Datanode " + i + " is restarting: " + nodes[i],
+              shouldWaitForRestart(i));
         }
         errorState.setInternalError();
         lastException.set(ie);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 2ab4067..c5462a9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -207,7 +207,7 @@ class BlockReceiver implements Closeable {
       //
       if (isDatanode) { //replication or move
         replicaHandler =
-            datanode.data.createTemporary(storageType, storageId, block);
+            datanode.data.createTemporary(storageType, storageId, block, 
false);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
@@ -236,8 +236,8 @@ class BlockReceiver implements Closeable {
         case TRANSFER_RBW:
         case TRANSFER_FINALIZED:
           // this is a transfer destination
-          replicaHandler =
-              datanode.data.createTemporary(storageType, storageId, block);
+          replicaHandler = datanode.data.createTemporary(storageType, 
storageId,
+              block, isTransfer);
           break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index d7e29cf..fd3af5d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -319,7 +319,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
    * @throws IOException if an error occurs
    */
   ReplicaHandler createTemporary(StorageType storageType, String storageId,
-      ExtendedBlock b) throws IOException;
+      ExtendedBlock b, boolean isTransfer) throws IOException;
 
   /**
    * Creates a RBW replica and returns the meta info of the replica

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 416609d..9174cb0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -227,7 +227,19 @@ class FsDatasetAsyncDiskService {
         volumeRef, replicaToDelete, block, trashDirectory);
     execute(((FsVolumeImpl) volumeRef.getVolume()), deletionTask);
   }
-  
+
+  /**
+   * Delete the block file and meta file from the disk synchronously, adjust
+   * dfsUsed statistics accordingly.
+   */
+  void deleteSync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete,
+      ExtendedBlock block, String trashDirectory) {
+    LOG.info("Deleting " + block.getLocalBlock() + " replica " + 
replicaToDelete);
+    ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(volumeRef,
+        replicaToDelete, block, trashDirectory);
+    deletionTask.run();
+  }
+
   /** A task for deleting a block file and its associated meta file, as well
    *  as decrement the dfs usage of the volume.
    *  Optionally accepts a trash directory. If one is specified then the files

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e7d4d25..eb4455b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1504,37 +1504,29 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public ReplicaHandler createTemporary(
-      StorageType storageType, String storageId, ExtendedBlock b)
+  public ReplicaHandler createTemporary(StorageType storageType,
+      String storageId, ExtendedBlock b, boolean isTransfer)
       throws IOException {
     long startTimeMs = Time.monotonicNow();
     long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
     ReplicaInfo lastFoundReplicaInfo = null;
+    boolean isInPipeline = false;
     do {
       try (AutoCloseableLock lock = datasetLock.acquire()) {
         ReplicaInfo currentReplicaInfo =
             volumeMap.get(b.getBlockPoolId(), b.getBlockId());
         if (currentReplicaInfo == lastFoundReplicaInfo) {
-          if (lastFoundReplicaInfo != null) {
-            invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo 
});
-          }
-          FsVolumeReference ref =
-              volumes.getNextVolume(storageType, storageId, b.getNumBytes());
-          FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
-          ReplicaInPipeline newReplicaInfo;
-          try {
-            newReplicaInfo = v.createTemporary(b);
-          } catch (IOException e) {
-            IOUtils.cleanup(null, ref);
-            throw e;
-          }
-
-          volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
-          return new ReplicaHandler(newReplicaInfo, ref);
+          break;
         } else {
-          if (!(currentReplicaInfo.getGenerationStamp() < 
b.getGenerationStamp()
-                && (currentReplicaInfo.getState() == ReplicaState.TEMPORARY
-                    || currentReplicaInfo.getState() == ReplicaState.RBW))) {
+          isInPipeline = currentReplicaInfo.getState() == 
ReplicaState.TEMPORARY
+              || currentReplicaInfo.getState() == ReplicaState.RBW;
+          /*
+           * If the current block is old, reject.
+           * else If transfer request, then accept it.
+           * else if state is not RBW/Temporary, then reject
+           */
+          if ((currentReplicaInfo.getGenerationStamp() >= 
b.getGenerationStamp())
+              || (!isTransfer && !isInPipeline)) {
             throw new ReplicaAlreadyExistsException("Block " + b
                 + " already exists in state " + currentReplicaInfo.getState()
                 + " and thus cannot be created.");
@@ -1542,7 +1534,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
           lastFoundReplicaInfo = currentReplicaInfo;
         }
       }
-
+      if (!isInPipeline) {
+        continue;
+      }
       // Hang too long, just bail out. This is not supposed to happen.
       long writerStopMs = Time.monotonicNow() - startTimeMs;
       if (writerStopMs > writerStopTimeoutMs) {
@@ -1555,6 +1549,29 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
       // Stop the previous writer
       
((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs);
     } while (true);
+
+    if (lastFoundReplicaInfo != null) {
+      // Old blockfile should be deleted synchronously as it might collide
+      // with the new block if allocated in same volume.
+      // Do the deletion outside of lock as its DISK IO.
+      invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
+          false);
+    }
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
+          .getNumBytes());
+      FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
+      ReplicaInPipeline newReplicaInfo;
+      try {
+        newReplicaInfo = v.createTemporary(b);
+      } catch (IOException e) {
+        IOUtils.cleanup(null, ref);
+        throw e;
+      }
+
+      volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
+      return new ReplicaHandler(newReplicaInfo, ref);
+    }
   }
 
   /**
@@ -1877,6 +1894,11 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
    */
   @Override // FsDatasetSpi
   public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
+    invalidate(bpid, invalidBlks, true);
+  }
+
+  private void invalidate(String bpid, Block[] invalidBlks, boolean async)
+      throws IOException {
     final List<String> errors = new ArrayList<String>();
     for (int i = 0; i < invalidBlks.length; i++) {
       final ReplicaInfo removing;
@@ -1947,13 +1969,20 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
       // If the block is cached, start uncaching it.
       cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
 
-      // Delete the block asynchronously to make sure we can do it fast enough.
-      // It's ok to unlink the block file before the uncache operation
-      // finishes.
       try {
-        asyncDiskService.deleteAsync(v.obtainReference(), removing,
-            new ExtendedBlock(bpid, invalidBlks[i]),
-            dataStorage.getTrashDirectoryForReplica(bpid, removing));
+        if (async) {
+          // Delete the block asynchronously to make sure we can do it fast
+          // enough.
+          // It's ok to unlink the block file before the uncache operation
+          // finishes.
+          asyncDiskService.deleteAsync(v.obtainReference(), removing,
+              new ExtendedBlock(bpid, invalidBlks[i]),
+              dataStorage.getTrashDirectoryForReplica(bpid, removing));
+        } else {
+          asyncDiskService.deleteSync(v.obtainReference(), removing,
+              new ExtendedBlock(bpid, invalidBlks[i]),
+              dataStorage.getTrashDirectoryForReplica(bpid, removing));
+        }
       } catch (ClosedChannelException e) {
         LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
             "block " + invalidBlks[i]);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 1a640b4..0212c4e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
@@ -33,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -437,6 +440,95 @@ public class TestClientProtocolForPipelineRecovery {
     }
   }
 
+  @Test
+  public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
+    MiniDFSCluster cluster = null;
+    DFSClientFaultInjector old = DFSClientFaultInjector.get();
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
+      DFSTestUtil.createFile(fileSys, file, 10240L, (short) 3, 0L);
+      // treat all restarting nodes as remote for test.
+      DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+        public boolean skipRollingRestartWait() {
+          return true;
+        }
+      });
+
+      final DFSOutputStream out = (DFSOutputStream) fileSys.append(file)
+          .getWrappedStream();
+      final AtomicBoolean running = new AtomicBoolean(true);
+      final AtomicBoolean failed = new AtomicBoolean(false);
+      Thread t = new Thread() {
+        public void run() {
+          while (running.get()) {
+            try {
+              out.write("test".getBytes());
+              out.hflush();
+              // Keep writing data every one second
+              Thread.sleep(1000);
+            } catch (IOException | InterruptedException e) {
+              LOG.error("Exception during write", e);
+              failed.set(true);
+              break;
+            }
+          }
+          running.set(false);
+        }
+      };
+      t.start();
+      // Let write start
+      Thread.sleep(1000);
+      DatanodeInfo[] pipeline = out.getPipeline();
+      for (DatanodeInfo node : pipeline) {
+        assertFalse("Write should be going on", failed.get());
+        ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+        int indexToShutdown = 0;
+        for (int i = 0; i < dataNodes.size(); i++) {
+          if (dataNodes.get(i).getIpcPort() == node.getIpcPort()) {
+            indexToShutdown = i;
+            break;
+          }
+        }
+
+        // Note old genstamp to findout pipeline recovery
+        final long oldGs = out.getBlock().getGenerationStamp();
+        MiniDFSCluster.DataNodeProperties dnProps = cluster
+            .stopDataNodeForUpgrade(indexToShutdown);
+        GenericTestUtils.waitForThreadTermination(
+            "Async datanode shutdown thread", 100, 10000);
+        cluster.restartDataNode(dnProps, true);
+        cluster.waitActive();
+        // wait pipeline to be recovered
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return out.getBlock().getGenerationStamp() > oldGs;
+          }
+        }, 100, 10000);
+        Assert.assertEquals("The pipeline recovery count shouldn't increase", 
0,
+            out.getStreamer().getPipelineRecoveryCount());
+      }
+      assertFalse("Write should be going on", failed.get());
+      running.set(false);
+      t.join();
+      out.write("testagain".getBytes());
+      assertTrue("There should be atleast 2 nodes in pipeline still", out
+          .getPipeline().length >= 2);
+      out.close();
+    } finally {
+      DFSClientFaultInjector.set(old);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   /**
    * Test to make sure the checksum is set correctly after pipeline
    * recovery transfers 0 byte partial block. If fails the test case

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 18b4922..afa7a82 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1025,12 +1025,12 @@ public class SimulatedFSDataset implements 
FsDatasetSpi<FsVolumeSpi> {
   public synchronized ReplicaHandler createRbw(
       StorageType storageType, String storageId, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
-    return createTemporary(storageType, storageId, b);
+    return createTemporary(storageType, storageId, b, false);
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaHandler createTemporary(
-      StorageType storageType, String storageId, ExtendedBlock b)
+  public synchronized ReplicaHandler createTemporary(StorageType storageType,
+      String storageId, ExtendedBlock b, boolean isTransfer)
       throws IOException {
     if (isValidBlock(b)) {
       throw new ReplicaAlreadyExistsException("Block " + b +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 2e69595..469e249b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -368,7 +368,7 @@ public class TestSimulatedFSDataset {
           ExtendedBlock block = new ExtendedBlock(newbpid,1);
           try {
             // it will throw an exception if the block pool is not found
-            fsdataset.createTemporary(StorageType.DEFAULT, null, block);
+            fsdataset.createTemporary(StorageType.DEFAULT, null, block, false);
           } catch (IOException ioe) {
             // JUnit does not capture exception in non-main thread,
             // so cache it and then let main thread throw later.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 2e439d6..d14bd72 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -139,8 +139,7 @@ public class ExternalDatasetImpl implements 
FsDatasetSpi<ExternalVolumeImpl> {
 
   @Override
   public ReplicaHandler createTemporary(StorageType t, String i,
-      ExtendedBlock b)
-      throws IOException {
+      ExtendedBlock b, boolean isTransfer) throws IOException {
     return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 11525ed..657e618 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -435,44 +435,48 @@ public class TestWriteToReplica {
   
   private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] 
blocks) throws IOException {
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED],
+          false);
       Assert.fail("Should not have created a temporary replica that was " +
                "finalized " + blocks[FINALIZED]);
     } catch (ReplicaAlreadyExistsException e) {
     }
  
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY],
+          false);
       Assert.fail("Should not have created a replica that had created as" +
                "temporary " + blocks[TEMPORARY]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW], false);
       Assert.fail("Should not have created a replica that had created as RBW " 
+
           blocks[RBW]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR], false);
       Assert.fail("Should not have created a replica that was waiting to be " +
                "recovered " + blocks[RWR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR], false);
       Assert.fail("Should not have created a replica that was under recovery " 
+
           blocks[RUR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
-    dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
+    dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT],
+        false);
 
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT],
+          false);
       Assert.fail("Should not have created a replica that had already been "
           + "created " + blocks[NON_EXISTENT]);
     } catch (Exception e) {
@@ -486,7 +490,7 @@ public class TestWriteToReplica {
     try {
       ReplicaInPipeline replicaInfo =
           dataSet.createTemporary(StorageType.DEFAULT, null,
-              blocks[NON_EXISTENT]).getReplica();
+              blocks[NON_EXISTENT], false).getReplica();
       Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
       Assert.assertTrue(
           replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to