Repository: hadoop
Updated Branches:
  refs/heads/branch-2 c112bf683 -> 212a56608


HDFS-11856. Ability to re-add Upgrading nodes to pipeline for future pipeline 
updates. Contributed by Vinayakumar B.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/212a5660
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/212a5660
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/212a5660

Branch: refs/heads/branch-2
Commit: 212a566088bab20c47234c435ae784c4ee84469b
Parents: c112bf6
Author: Kihwal Lee <kih...@apache.org>
Authored: Wed May 31 12:42:37 2017 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Wed May 31 12:45:33 2017 -0500

----------------------------------------------------------------------
 .../hadoop/hdfs/DFSClientFaultInjector.java     |  4 +
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 69 +++++++++++----
 .../hdfs/server/datanode/BlockReceiver.java     |  4 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |  5 +-
 .../impl/FsDatasetAsyncDiskService.java         | 14 ++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 93 +++++++++++++-------
 .../TestClientProtocolForPipelineRecovery.java  | 92 +++++++++++++++++++
 .../server/datanode/SimulatedFSDataset.java     |  6 +-
 .../server/datanode/TestSimulatedFSDataset.java |  2 +-
 .../extdataset/ExternalDatasetImpl.java         |  4 +-
 .../fsdataset/impl/TestWriteToReplica.java      | 17 ++--
 11 files changed, 244 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 4eb4c52..748edcd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -57,4 +57,8 @@ public class DFSClientFaultInjector {
   public void fetchFromDatanodeException() {}
 
   public void readFromDatanodeDelay() {}
+
+  public boolean skipRollingRestartWait() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index bcf740f..3279590 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -328,6 +328,7 @@ class DataStreamer extends Daemon {
   static class ErrorState {
     private boolean error = false;
     private int badNodeIndex = -1;
+    private boolean waitForRestart = true;
     private int restartingNodeIndex = -1;
     private long restartingNodeDeadline = 0;
     private final long datanodeRestartTimeout;
@@ -341,6 +342,7 @@ class DataStreamer extends Daemon {
       badNodeIndex = -1;
       restartingNodeIndex = -1;
       restartingNodeDeadline = 0;
+      waitForRestart = true;
     }
 
     synchronized boolean hasError() {
@@ -367,14 +369,19 @@ class DataStreamer extends Daemon {
       return restartingNodeIndex;
     }
 
-    synchronized void initRestartingNode(int i, String message) {
+    synchronized void initRestartingNode(int i, String message,
+        boolean shouldWait) {
       restartingNodeIndex = i;
-      restartingNodeDeadline =  Time.monotonicNow() + datanodeRestartTimeout;
-      // If the data streamer has already set the primary node
-      // bad, clear it. It is likely that the write failed due to
-      // the DN shutdown. Even if it was a real failure, the pipeline
-      // recovery will take care of it.
-      badNodeIndex = -1;
+      if (shouldWait) {
+        restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
+        // If the data streamer has already set the primary node
+        // bad, clear it. It is likely that the write failed due to
+        // the DN shutdown. Even if it was a real failure, the pipeline
+        // recovery will take care of it.
+        badNodeIndex = -1;
+      } else {
+        this.waitForRestart = false;
+      }
       LOG.info(message);
     }
 
@@ -383,7 +390,7 @@ class DataStreamer extends Daemon {
     }
 
     synchronized boolean isNodeMarked() {
-      return badNodeIndex >= 0 || isRestartingNode();
+      return badNodeIndex >= 0 || (isRestartingNode() && doWaitForRestart());
     }
 
     /**
@@ -408,7 +415,7 @@ class DataStreamer extends Daemon {
         } else if (badNodeIndex < restartingNodeIndex) {
           // the node index has shifted.
           restartingNodeIndex--;
-        } else {
+        } else if (waitForRestart) {
           throw new IllegalStateException("badNodeIndex = " + badNodeIndex
               + " = restartingNodeIndex = " + restartingNodeIndex);
         }
@@ -450,6 +457,10 @@ class DataStreamer extends Daemon {
         }
       }
     }
+
+    boolean doWaitForRestart() {
+      return waitForRestart;
+    }
   }
 
   private volatile boolean streamerClosed = false;
@@ -469,6 +480,8 @@ class DataStreamer extends Daemon {
 
   /** Nodes have been used in the pipeline before and have failed. */
   private final List<DatanodeInfo> failed = new ArrayList<>();
+  /** Restarting Nodes */
+  private List<DatanodeInfo> restartingNodes = new ArrayList<>();
   /** The times have retried to recover pipeline, for the same packet. */
   private volatile int pipelineRecoveryCount = 0;
   /** Has the current block been hflushed? */
@@ -1020,6 +1033,13 @@ class DataStreamer extends Daemon {
       return true;
     }
 
+    /*
+     * Treat all nodes as remote for test when skip enabled.
+     */
+    if (DFSClientFaultInjector.get().skipRollingRestartWait()) {
+      return false;
+    }
+
     // Is it a local node?
     InetAddress addr = null;
     try {
@@ -1087,11 +1107,11 @@ class DataStreamer extends Daemon {
             }
             // Restart will not be treated differently unless it is
             // the local node or the only one in the pipeline.
-            if (PipelineAck.isRestartOOBStatus(reply) &&
-                shouldWaitForRestart(i)) {
+            if (PipelineAck.isRestartOOBStatus(reply)) {
               final String message = "Datanode " + i + " is restarting: "
                   + targets[i];
-              errorState.initRestartingNode(i, message);
+              errorState.initRestartingNode(i, message,
+                  shouldWaitForRestart(i));
               throw new IOException(message);
             }
             // node error
@@ -1452,6 +1472,14 @@ class DataStreamer extends Daemon {
    */
   private boolean handleRestartingDatanode() {
     if (errorState.isRestartingNode()) {
+      if (!errorState.doWaitForRestart()) {
+        // If node is restarting and not worth to wait for restart then can go
+        // ahead with error recovery considering it as bad node for now. Later
+        // it should be able to re-consider the same node for future pipeline
+        // updates.
+        errorState.setBadNodeIndex(errorState.getRestartingNodeIndex());
+        return true;
+      }
       // 4 seconds or the configured deadline period, whichever is shorter.
       // This is the retry interval and recovery will be retried in this
       // interval until timeout or success.
@@ -1483,9 +1511,14 @@ class DataStreamer extends Daemon {
         return false;
       }
 
+      String reason = "bad.";
+      if (errorState.getRestartingNodeIndex() == badNodeIndex) {
+        reason = "restarting.";
+        restartingNodes.add(nodes[badNodeIndex]);
+      }
       LOG.warn("Error Recovery for " + block + " in pipeline "
           + Arrays.toString(nodes) + ": datanode " + badNodeIndex
-          + "("+ nodes[badNodeIndex] + ") is bad.");
+          + "("+ nodes[badNodeIndex] + ") is " + reason);
       failed.add(nodes[badNodeIndex]);
 
       DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
@@ -1691,6 +1724,9 @@ class DataStreamer extends Daemon {
         blockStream = out;
         result =  true; // success
         errorState.reset();
+        // remove all restarting nodes from failed nodes list
+        failed.removeAll(restartingNodes);
+        restartingNodes.clear();
       } catch (IOException ie) {
         if (!errorState.isRestartingNode()) {
           LOG.info("Exception in createBlockOutputStream", ie);
@@ -1724,9 +1760,10 @@ class DataStreamer extends Daemon {
 
         final int i = errorState.getBadNodeIndex();
         // Check whether there is a restart worth waiting for.
-        if (checkRestart && shouldWaitForRestart(i)) {
-          errorState.initRestartingNode(i, "Datanode " + i + " is restarting: "
-              + nodes[i]);
+        if (checkRestart) {
+          errorState.initRestartingNode(i,
+              "Datanode " + i + " is restarting: " + nodes[i],
+              shouldWaitForRestart(i));
         }
         errorState.setError(true);
         lastException.set(ie);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 9ed1766..03cf3a0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -206,7 +206,7 @@ class BlockReceiver implements Closeable {
       // Open local disk out
       //
       if (isDatanode) { //replication or move
-        replicaHandler = datanode.data.createTemporary(storageType, block);
+        replicaHandler = datanode.data.createTemporary(storageType, block, 
false);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
@@ -235,7 +235,7 @@ class BlockReceiver implements Closeable {
         case TRANSFER_FINALIZED:
           // this is a transfer destination
           replicaHandler =
-              datanode.data.createTemporary(storageType, block);
+              datanode.data.createTemporary(storageType, block, isTransfer);
           break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 4d08e17..5074bfa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -314,13 +314,14 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
   /**
    * Creates a temporary replica and returns the meta information of the 
replica
    * .
-   *
    * @param b block
+   * @param isTransfer whether for transfer
+   *
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
   ReplicaHandler createTemporary(StorageType storageType,
-      ExtendedBlock b) throws IOException;
+      ExtendedBlock b, boolean isTransfer) throws IOException;
 
   /**
    * Creates a RBW replica and returns the meta info of the replica

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 564e1a8..10ee9cf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -220,7 +220,19 @@ class FsDatasetAsyncDiskService {
         volumeRef, blockFile, metaFile, block, trashDirectory);
     execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), 
deletionTask);
   }
-  
+
+  /**
+   * Delete the block file and meta file from the disk synchronously, adjust
+   * dfsUsed statistics accordingly.
+   */
+  void deleteSync(FsVolumeReference volumeRef, File blockFile, File metaFile,
+      ExtendedBlock block, String trashDirectory) {
+    LOG.info("Deleting " + block.getLocalBlock() + " file " + blockFile);
+    ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
+        volumeRef, blockFile, metaFile, block, trashDirectory);
+    deletionTask.run();
+  }
+
   /** A task for deleting a block file and its associated meta file, as well
    *  as decrement the dfs usage of the volume.
    *  Optionally accepts a trash directory. If one is specified then the files

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index ef54491..ac0e046 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1656,38 +1656,28 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public ReplicaHandler createTemporary(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+  public ReplicaHandler createTemporary(StorageType storageType,
+      ExtendedBlock b, boolean isTransfer) throws IOException {
     long startTimeMs = Time.monotonicNow();
     long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
     ReplicaInfo lastFoundReplicaInfo = null;
+    boolean isInPipeline = false;
     do {
       try(AutoCloseableLock lock = datasetLock.acquire()) {
         ReplicaInfo currentReplicaInfo =
             volumeMap.get(b.getBlockPoolId(), b.getBlockId());
         if (currentReplicaInfo == lastFoundReplicaInfo) {
-          if (lastFoundReplicaInfo != null) {
-            invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo 
});
-          }
-          FsVolumeReference ref =
-              volumes.getNextVolume(storageType, b.getNumBytes());
-          FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
-          // create a temporary file to hold block in the designated volume
-          File f;
-          try {
-            f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
-          } catch (IOException e) {
-            IOUtils.cleanup(null, ref);
-            throw e;
-          }
-          ReplicaInPipeline newReplicaInfo =
-              new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
-                  f.getParentFile(), b.getLocalBlock().getNumBytes());
-          volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-          return new ReplicaHandler(newReplicaInfo, ref);
+          break;
         } else {
-          if (!(currentReplicaInfo.getGenerationStamp() < b
-              .getGenerationStamp() && currentReplicaInfo instanceof 
ReplicaInPipeline)) {
+          isInPipeline = currentReplicaInfo.getState() == 
ReplicaState.TEMPORARY
+              || currentReplicaInfo.getState() == ReplicaState.RBW;
+          /*
+           * If the current block is old, reject.
+           * else If transfer request, then accept it.
+           * else if state is not RBW/Temporary, then reject
+           */
+          if ((currentReplicaInfo.getGenerationStamp() >= 
b.getGenerationStamp())
+              || (!isTransfer && !isInPipeline)) {
             throw new ReplicaAlreadyExistsException("Block " + b
                 + " already exists in state " + currentReplicaInfo.getState()
                 + " and thus cannot be created.");
@@ -1695,7 +1685,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
           lastFoundReplicaInfo = currentReplicaInfo;
         }
       }
-
+      if (!isInPipeline) {
+        continue;
+      }
       // Hang too long, just bail out. This is not supposed to happen.
       long writerStopMs = Time.monotonicNow() - startTimeMs;
       if (writerStopMs > writerStopTimeoutMs) {
@@ -1709,6 +1701,32 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
       ((ReplicaInPipeline) lastFoundReplicaInfo)
           .stopWriter(writerStopTimeoutMs);
     } while (true);
+
+    if (lastFoundReplicaInfo != null) {
+      // Old blockfile should be deleted synchronously as it might collide
+      // with the new block if allocated in same volume.
+      // Do the deletion outside of lock as its DISK IO.
+      invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
+          false);
+    }
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      FsVolumeReference ref = volumes.getNextVolume(storageType, b
+          .getNumBytes());
+      FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
+      // create a temporary file to hold block in the designated volume
+      File f;
+      try {
+        f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
+      } catch (IOException e) {
+        IOUtils.cleanup(null, ref);
+        throw e;
+      }
+      ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
b
+          .getGenerationStamp(), v, f.getParentFile(), b.getLocalBlock()
+              .getNumBytes());
+      volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+      return new ReplicaHandler(newReplicaInfo, ref);
+    }
   }
 
   /**
@@ -2060,6 +2078,11 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
    */
   @Override // FsDatasetSpi
   public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
+    invalidate(bpid, invalidBlks, true);
+  }
+
+  private void invalidate(String bpid, Block[] invalidBlks, boolean async)
+      throws IOException {
     final List<String> errors = new ArrayList<String>();
     for (int i = 0; i < invalidBlks.length; i++) {
       final File f;
@@ -2125,14 +2148,22 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
       // If the block is cached, start uncaching it.
       cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
 
-      // Delete the block asynchronously to make sure we can do it fast enough.
-      // It's ok to unlink the block file before the uncache operation
-      // finishes.
       try {
-        asyncDiskService.deleteAsync(v.obtainReference(), f,
-            FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
-            new ExtendedBlock(bpid, invalidBlks[i]),
-            dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+        // Delete the block asynchronously to make sure we can do it fast
+        // enough.
+        // It's ok to unlink the block file before the uncache operation
+        // finishes.
+        if (async) {
+          asyncDiskService.deleteAsync(v.obtainReference(), f,
+              FsDatasetUtil.getMetaFile(f, 
invalidBlks[i].getGenerationStamp()),
+              new ExtendedBlock(bpid, invalidBlks[i]),
+              dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+        } else {
+          asyncDiskService.deleteSync(v.obtainReference(), f,
+              FsDatasetUtil.getMetaFile(f, 
invalidBlks[i].getGenerationStamp()),
+              new ExtendedBlock(bpid, invalidBlks[i]),
+              dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+        }
       } catch (ClosedChannelException e) {
         LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
             "block " + invalidBlks[i]);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 564a611..1865102 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
@@ -33,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -437,6 +440,95 @@ public class TestClientProtocolForPipelineRecovery {
     }
   }
 
+  @Test
+  public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
+    MiniDFSCluster cluster = null;
+    DFSClientFaultInjector old = DFSClientFaultInjector.get();
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
+      DFSTestUtil.createFile(fileSys, file, 10240L, (short) 3, 0L);
+      // treat all restarting nodes as remote for test.
+      DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+        public boolean skipRollingRestartWait() {
+          return true;
+        }
+      });
+
+      final DFSOutputStream out = (DFSOutputStream) fileSys.append(file)
+          .getWrappedStream();
+      final AtomicBoolean running = new AtomicBoolean(true);
+      final AtomicBoolean failed = new AtomicBoolean(false);
+      Thread t = new Thread() {
+        public void run() {
+          while (running.get()) {
+            try {
+              out.write("test".getBytes());
+              out.hflush();
+              // Keep writing data every one second
+              Thread.sleep(1000);
+            } catch (IOException | InterruptedException e) {
+              LOG.error("Exception during write", e);
+              failed.set(true);
+              break;
+            }
+          }
+          running.set(false);
+        }
+      };
+      t.start();
+      // Let write start
+      Thread.sleep(1000);
+      DatanodeInfo[] pipeline = out.getPipeline();
+      for (DatanodeInfo node : pipeline) {
+        assertFalse("Write should be going on", failed.get());
+        ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+        int indexToShutdown = 0;
+        for (int i = 0; i < dataNodes.size(); i++) {
+          if (dataNodes.get(i).getIpcPort() == node.getIpcPort()) {
+            indexToShutdown = i;
+            break;
+          }
+        }
+
+        // Note old genstamp to findout pipeline recovery
+        final long oldGs = out.getBlock().getGenerationStamp();
+        MiniDFSCluster.DataNodeProperties dnProps = cluster
+            .stopDataNodeForUpgrade(indexToShutdown);
+        GenericTestUtils.waitForThreadTermination(
+            "Async datanode shutdown thread", 100, 10000);
+        cluster.restartDataNode(dnProps, true);
+        cluster.waitActive();
+        // wait pipeline to be recovered
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return out.getBlock().getGenerationStamp() > oldGs;
+          }
+        }, 100, 10000);
+        Assert.assertEquals("The pipeline recovery count shouldn't increase", 
0,
+            out.getStreamer().getPipelineRecoveryCount());
+      }
+      assertFalse("Write should be going on", failed.get());
+      running.set(false);
+      t.join();
+      out.write("testagain".getBytes());
+      assertTrue("There should be atleast 2 nodes in pipeline still", out
+          .getPipeline().length >= 2);
+      out.close();
+    } finally {
+      DFSClientFaultInjector.set(old);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   /**
    * Test to make sure the checksum is set correctly after pipeline
    * recovery transfers 0 byte partial block. If fails the test case

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 82d49f2..94b5b72 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -991,12 +991,12 @@ public class SimulatedFSDataset implements 
FsDatasetSpi<FsVolumeSpi> {
   public synchronized ReplicaHandler createRbw(
       StorageType storageType, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
-    return createTemporary(storageType, b);
+    return createTemporary(storageType, b, false);
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaHandler createTemporary(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+  public synchronized ReplicaHandler createTemporary(StorageType storageType,
+      ExtendedBlock b, boolean isTransfer) throws IOException {
     if (isValidBlock(b)) {
           throw new ReplicaAlreadyExistsException("Block " + b +
               " is valid, and cannot be written to.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 385d910..9a77350 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -368,7 +368,7 @@ public class TestSimulatedFSDataset {
           ExtendedBlock block = new ExtendedBlock(newbpid,1);
           try {
             // it will throw an exception if the block pool is not found
-            fsdataset.createTemporary(StorageType.DEFAULT, block);
+            fsdataset.createTemporary(StorageType.DEFAULT, block, false);
           } catch (IOException ioe) {
             // JUnit does not capture exception in non-main thread,
             // so cache it and then let main thread throw later.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index a106f7a..b7d43ab 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -137,8 +137,8 @@ public class ExternalDatasetImpl implements 
FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b)
-      throws IOException {
+  public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b,
+      boolean isTransfer) throws IOException {
     return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212a5660/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 45fcbf2..56f1d67 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -435,44 +435,44 @@ public class TestWriteToReplica {
   
   private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] 
blocks) throws IOException {
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED], false);
       Assert.fail("Should not have created a temporary replica that was " +
                "finalized " + blocks[FINALIZED]);
     } catch (ReplicaAlreadyExistsException e) {
     }
  
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY], false);
       Assert.fail("Should not have created a replica that had created as" +
                "temporary " + blocks[TEMPORARY]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW], false);
       Assert.fail("Should not have created a replica that had created as RBW " 
+
           blocks[RBW]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR], false);
       Assert.fail("Should not have created a replica that was waiting to be " +
                "recovered " + blocks[RWR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR], false);
       Assert.fail("Should not have created a replica that was under recovery " 
+
           blocks[RUR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
-    dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+    dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
 
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], 
false);
       Assert.fail("Should not have created a replica that had already been "
           + "created " + blocks[NON_EXISTENT]);
     } catch (Exception e) {
@@ -485,7 +485,8 @@ public class TestWriteToReplica {
     blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
     try {
       ReplicaInPipelineInterface replicaInfo =
-          dataSet.createTemporary(StorageType.DEFAULT, 
blocks[NON_EXISTENT]).getReplica();
+          dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT],
+              false).getReplica();
       Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
       Assert.assertTrue(
           replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to