HDFS-11856. Ability to re-add Upgrading Nodes to pipeline for future pipeline updates. Contributed by Vinayakumar B.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29b7df96 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29b7df96 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29b7df96 Branch: refs/heads/YARN-1011 Commit: 29b7df960fc3d0a7d1416225c3106c7d4222f0ca Parents: 4fb41b3 Author: Kihwal Lee <kih...@apache.org> Authored: Thu May 25 13:04:09 2017 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Thu May 25 13:05:23 2017 -0500 ---------------------------------------------------------------------- .../hadoop/hdfs/DFSClientFaultInjector.java | 4 + .../org/apache/hadoop/hdfs/DataStreamer.java | 70 +++++++++++---- .../hdfs/server/datanode/BlockReceiver.java | 6 +- .../server/datanode/fsdataset/FsDatasetSpi.java | 2 +- .../impl/FsDatasetAsyncDiskService.java | 14 ++- .../datanode/fsdataset/impl/FsDatasetImpl.java | 85 ++++++++++++------ .../TestClientProtocolForPipelineRecovery.java | 92 ++++++++++++++++++++ .../server/datanode/SimulatedFSDataset.java | 6 +- .../server/datanode/TestSimulatedFSDataset.java | 2 +- .../extdataset/ExternalDatasetImpl.java | 3 +- .../fsdataset/impl/TestWriteToReplica.java | 20 +++-- 11 files changed, 241 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index 4eb4c52..748edcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -57,4 +57,8 @@ public class DFSClientFaultInjector { public void fetchFromDatanodeException() {} public void readFromDatanodeDelay() {} + + public boolean skipRollingRestartWait() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 49c17b9..f5ce0ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -327,6 +327,7 @@ class DataStreamer extends Daemon { static class ErrorState { ErrorType error = ErrorType.NONE; private int badNodeIndex = -1; + private boolean waitForRestart = true; private int restartingNodeIndex = -1; private long restartingNodeDeadline = 0; private final long datanodeRestartTimeout; @@ -342,6 +343,7 @@ class DataStreamer extends Daemon { badNodeIndex = -1; restartingNodeIndex = -1; restartingNodeDeadline = 0; + waitForRestart = true; } synchronized void reset() { @@ -349,6 +351,7 @@ class DataStreamer extends Daemon { badNodeIndex = -1; restartingNodeIndex = -1; restartingNodeDeadline = 0; + waitForRestart = true; } synchronized boolean hasInternalError() { @@ -389,14 +392,19 @@ class DataStreamer extends Daemon { return restartingNodeIndex; } - synchronized void initRestartingNode(int i, String message) { + synchronized void initRestartingNode(int i, String message, + boolean shouldWait) { restartingNodeIndex = i; - restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout; - // If the data streamer has already set the primary node - // bad, clear it. It is likely that the write failed due to - // the DN shutdown. Even if it was a real failure, the pipeline - // recovery will take care of it. - badNodeIndex = -1; + if (shouldWait) { + restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout; + // If the data streamer has already set the primary node + // bad, clear it. It is likely that the write failed due to + // the DN shutdown. Even if it was a real failure, the pipeline + // recovery will take care of it. + badNodeIndex = -1; + } else { + this.waitForRestart = false; + } LOG.info(message); } @@ -405,7 +413,7 @@ class DataStreamer extends Daemon { } synchronized boolean isNodeMarked() { - return badNodeIndex >= 0 || isRestartingNode(); + return badNodeIndex >= 0 || (isRestartingNode() && doWaitForRestart()); } /** @@ -430,7 +438,7 @@ class DataStreamer extends Daemon { } else if (badNodeIndex < restartingNodeIndex) { // the node index has shifted. restartingNodeIndex--; - } else { + } else if (waitForRestart) { throw new IllegalStateException("badNodeIndex = " + badNodeIndex + " = restartingNodeIndex = " + restartingNodeIndex); } @@ -472,6 +480,10 @@ class DataStreamer extends Daemon { } } } + + boolean doWaitForRestart() { + return waitForRestart; + } } private volatile boolean streamerClosed = false; @@ -491,6 +503,8 @@ class DataStreamer extends Daemon { /** Nodes have been used in the pipeline before and have failed. */ private final List<DatanodeInfo> failed = new ArrayList<>(); + /** Restarting Nodes */ + private List<DatanodeInfo> restartingNodes = new ArrayList<>(); /** The times have retried to recover pipeline, for the same packet. */ private volatile int pipelineRecoveryCount = 0; /** Has the current block been hflushed? */ @@ -1043,6 +1057,13 @@ class DataStreamer extends Daemon { return true; } + /* + * Treat all nodes as remote for test when skip enabled. + */ + if (DFSClientFaultInjector.get().skipRollingRestartWait()) { + return false; + } + // Is it a local node? InetAddress addr = null; try { @@ -1110,11 +1131,11 @@ class DataStreamer extends Daemon { } // Restart will not be treated differently unless it is // the local node or the only one in the pipeline. - if (PipelineAck.isRestartOOBStatus(reply) && - shouldWaitForRestart(i)) { + if (PipelineAck.isRestartOOBStatus(reply)) { final String message = "Datanode " + i + " is restarting: " + targets[i]; - errorState.initRestartingNode(i, message); + errorState.initRestartingNode(i, message, + shouldWaitForRestart(i)); throw new IOException(message); } // node error @@ -1492,6 +1513,14 @@ class DataStreamer extends Daemon { */ boolean handleRestartingDatanode() { if (errorState.isRestartingNode()) { + if (!errorState.doWaitForRestart()) { + // If node is restarting and not worth to wait for restart then can go + // ahead with error recovery considering it as bad node for now. Later + // it should be able to re-consider the same node for future pipeline + // updates. + errorState.setBadNodeIndex(errorState.getRestartingNodeIndex()); + return true; + } // 4 seconds or the configured deadline period, whichever is shorter. // This is the retry interval and recovery will be retried in this // interval until timeout or success. @@ -1523,9 +1552,14 @@ class DataStreamer extends Daemon { return false; } + String reason = "bad."; + if (errorState.getRestartingNodeIndex() == badNodeIndex) { + reason = "restarting."; + restartingNodes.add(nodes[badNodeIndex]); + } LOG.warn("Error Recovery for " + block + " in pipeline " + Arrays.toString(nodes) + ": datanode " + badNodeIndex - + "("+ nodes[badNodeIndex] + ") is bad."); + + "("+ nodes[badNodeIndex] + ") is " + reason); failed.add(nodes[badNodeIndex]); DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; @@ -1735,6 +1769,9 @@ class DataStreamer extends Daemon { blockStream = out; result = true; // success errorState.resetInternalError(); + // remove all restarting nodes from failed nodes list + failed.removeAll(restartingNodes); + restartingNodes.clear(); } catch (IOException ie) { if (!errorState.isRestartingNode()) { LOG.info("Exception in createBlockOutputStream " + this, ie); @@ -1768,9 +1805,10 @@ class DataStreamer extends Daemon { final int i = errorState.getBadNodeIndex(); // Check whether there is a restart worth waiting for. - if (checkRestart && shouldWaitForRestart(i)) { - errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " - + nodes[i]); + if (checkRestart) { + errorState.initRestartingNode(i, + "Datanode " + i + " is restarting: " + nodes[i], + shouldWaitForRestart(i)); } errorState.setInternalError(); lastException.set(ie); http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 2ab4067..c5462a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -207,7 +207,7 @@ class BlockReceiver implements Closeable { // if (isDatanode) { //replication or move replicaHandler = - datanode.data.createTemporary(storageType, storageId, block); + datanode.data.createTemporary(storageType, storageId, block, false); } else { switch (stage) { case PIPELINE_SETUP_CREATE: @@ -236,8 +236,8 @@ class BlockReceiver implements Closeable { case TRANSFER_RBW: case TRANSFER_FINALIZED: // this is a transfer destination - replicaHandler = - datanode.data.createTemporary(storageType, storageId, block); + replicaHandler = datanode.data.createTemporary(storageType, storageId, + block, isTransfer); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index d7e29cf..fd3af5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -319,7 +319,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * @throws IOException if an error occurs */ ReplicaHandler createTemporary(StorageType storageType, String storageId, - ExtendedBlock b) throws IOException; + ExtendedBlock b, boolean isTransfer) throws IOException; /** * Creates a RBW replica and returns the meta info of the replica http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 416609d..9174cb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -227,7 +227,19 @@ class FsDatasetAsyncDiskService { volumeRef, replicaToDelete, block, trashDirectory); execute(((FsVolumeImpl) volumeRef.getVolume()), deletionTask); } - + + /** + * Delete the block file and meta file from the disk synchronously, adjust + * dfsUsed statistics accordingly. + */ + void deleteSync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete, + ExtendedBlock block, String trashDirectory) { + LOG.info("Deleting " + block.getLocalBlock() + " replica " + replicaToDelete); + ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(volumeRef, + replicaToDelete, block, trashDirectory); + deletionTask.run(); + } + /** A task for deleting a block file and its associated meta file, as well * as decrement the dfs usage of the volume. * Optionally accepts a trash directory. If one is specified then the files http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index e7d4d25..eb4455b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1504,37 +1504,29 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } @Override // FsDatasetSpi - public ReplicaHandler createTemporary( - StorageType storageType, String storageId, ExtendedBlock b) + public ReplicaHandler createTemporary(StorageType storageType, + String storageId, ExtendedBlock b, boolean isTransfer) throws IOException { long startTimeMs = Time.monotonicNow(); long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); ReplicaInfo lastFoundReplicaInfo = null; + boolean isInPipeline = false; do { try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo currentReplicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (currentReplicaInfo == lastFoundReplicaInfo) { - if (lastFoundReplicaInfo != null) { - invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }); - } - FsVolumeReference ref = - volumes.getNextVolume(storageType, storageId, b.getNumBytes()); - FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); - ReplicaInPipeline newReplicaInfo; - try { - newReplicaInfo = v.createTemporary(b); - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; - } - - volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); - return new ReplicaHandler(newReplicaInfo, ref); + break; } else { - if (!(currentReplicaInfo.getGenerationStamp() < b.getGenerationStamp() - && (currentReplicaInfo.getState() == ReplicaState.TEMPORARY - || currentReplicaInfo.getState() == ReplicaState.RBW))) { + isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY + || currentReplicaInfo.getState() == ReplicaState.RBW; + /* + * If the current block is old, reject. + * else If transfer request, then accept it. + * else if state is not RBW/Temporary, then reject + */ + if ((currentReplicaInfo.getGenerationStamp() >= b.getGenerationStamp()) + || (!isTransfer && !isInPipeline)) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + currentReplicaInfo.getState() + " and thus cannot be created."); @@ -1542,7 +1534,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { lastFoundReplicaInfo = currentReplicaInfo; } } - + if (!isInPipeline) { + continue; + } // Hang too long, just bail out. This is not supposed to happen. long writerStopMs = Time.monotonicNow() - startTimeMs; if (writerStopMs > writerStopTimeoutMs) { @@ -1555,6 +1549,29 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Stop the previous writer ((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs); } while (true); + + if (lastFoundReplicaInfo != null) { + // Old blockfile should be deleted synchronously as it might collide + // with the new block if allocated in same volume. + // Do the deletion outside of lock as its DISK IO. + invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }, + false); + } + try (AutoCloseableLock lock = datasetLock.acquire()) { + FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b + .getNumBytes()); + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); + ReplicaInPipeline newReplicaInfo; + try { + newReplicaInfo = v.createTemporary(b); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + + volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); + return new ReplicaHandler(newReplicaInfo, ref); + } } /** @@ -1877,6 +1894,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { */ @Override // FsDatasetSpi public void invalidate(String bpid, Block invalidBlks[]) throws IOException { + invalidate(bpid, invalidBlks, true); + } + + private void invalidate(String bpid, Block[] invalidBlks, boolean async) + throws IOException { final List<String> errors = new ArrayList<String>(); for (int i = 0; i < invalidBlks.length; i++) { final ReplicaInfo removing; @@ -1947,13 +1969,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); - // Delete the block asynchronously to make sure we can do it fast enough. - // It's ok to unlink the block file before the uncache operation - // finishes. try { - asyncDiskService.deleteAsync(v.obtainReference(), removing, - new ExtendedBlock(bpid, invalidBlks[i]), - dataStorage.getTrashDirectoryForReplica(bpid, removing)); + if (async) { + // Delete the block asynchronously to make sure we can do it fast + // enough. + // It's ok to unlink the block file before the uncache operation + // finishes. + asyncDiskService.deleteAsync(v.obtainReference(), removing, + new ExtendedBlock(bpid, invalidBlks[i]), + dataStorage.getTrashDirectoryForReplica(bpid, removing)); + } else { + asyncDiskService.deleteSync(v.obtainReference(), removing, + new ExtendedBlock(bpid, invalidBlks[i]), + dataStorage.getTrashDirectoryForReplica(bpid, removing)); + } } catch (ClosedChannelException e) { LOG.warn("Volume " + v + " is closed, ignore the deletion task for " + "block " + invalidBlks[i]); http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 1a640b4..0212c4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hdfs; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; @@ -33,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; @@ -437,6 +440,95 @@ public class TestClientProtocolForPipelineRecovery { } } + @Test + public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true); + MiniDFSCluster cluster = null; + DFSClientFaultInjector old = DFSClientFaultInjector.get(); + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade"); + DFSTestUtil.createFile(fileSys, file, 10240L, (short) 3, 0L); + // treat all restarting nodes as remote for test. + DFSClientFaultInjector.set(new DFSClientFaultInjector() { + public boolean skipRollingRestartWait() { + return true; + } + }); + + final DFSOutputStream out = (DFSOutputStream) fileSys.append(file) + .getWrappedStream(); + final AtomicBoolean running = new AtomicBoolean(true); + final AtomicBoolean failed = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + while (running.get()) { + try { + out.write("test".getBytes()); + out.hflush(); + // Keep writing data every one second + Thread.sleep(1000); + } catch (IOException | InterruptedException e) { + LOG.error("Exception during write", e); + failed.set(true); + break; + } + } + running.set(false); + } + }; + t.start(); + // Let write start + Thread.sleep(1000); + DatanodeInfo[] pipeline = out.getPipeline(); + for (DatanodeInfo node : pipeline) { + assertFalse("Write should be going on", failed.get()); + ArrayList<DataNode> dataNodes = cluster.getDataNodes(); + int indexToShutdown = 0; + for (int i = 0; i < dataNodes.size(); i++) { + if (dataNodes.get(i).getIpcPort() == node.getIpcPort()) { + indexToShutdown = i; + break; + } + } + + // Note old genstamp to findout pipeline recovery + final long oldGs = out.getBlock().getGenerationStamp(); + MiniDFSCluster.DataNodeProperties dnProps = cluster + .stopDataNodeForUpgrade(indexToShutdown); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); + cluster.restartDataNode(dnProps, true); + cluster.waitActive(); + // wait pipeline to be recovered + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return out.getBlock().getGenerationStamp() > oldGs; + } + }, 100, 10000); + Assert.assertEquals("The pipeline recovery count shouldn't increase", 0, + out.getStreamer().getPipelineRecoveryCount()); + } + assertFalse("Write should be going on", failed.get()); + running.set(false); + t.join(); + out.write("testagain".getBytes()); + assertTrue("There should be atleast 2 nodes in pipeline still", out + .getPipeline().length >= 2); + out.close(); + } finally { + DFSClientFaultInjector.set(old); + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** * Test to make sure the checksum is set correctly after pipeline * recovery transfers 0 byte partial block. If fails the test case http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 18b4922..afa7a82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1025,12 +1025,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { public synchronized ReplicaHandler createRbw( StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException { - return createTemporary(storageType, storageId, b); + return createTemporary(storageType, storageId, b, false); } @Override // FsDatasetSpi - public synchronized ReplicaHandler createTemporary( - StorageType storageType, String storageId, ExtendedBlock b) + public synchronized ReplicaHandler createTemporary(StorageType storageType, + String storageId, ExtendedBlock b, boolean isTransfer) throws IOException { if (isValidBlock(b)) { throw new ReplicaAlreadyExistsException("Block " + b + http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 2e69595..469e249b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -368,7 +368,7 @@ public class TestSimulatedFSDataset { ExtendedBlock block = new ExtendedBlock(newbpid,1); try { // it will throw an exception if the block pool is not found - fsdataset.createTemporary(StorageType.DEFAULT, null, block); + fsdataset.createTemporary(StorageType.DEFAULT, null, block, false); } catch (IOException ioe) { // JUnit does not capture exception in non-main thread, // so cache it and then let main thread throw later. http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 2e439d6..d14bd72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -139,8 +139,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { @Override public ReplicaHandler createTemporary(StorageType t, String i, - ExtendedBlock b) - throws IOException { + ExtendedBlock b, boolean isTransfer) throws IOException { return new ReplicaHandler(new ExternalReplicaInPipeline(), null); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/29b7df96/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index 11525ed..657e618 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -435,44 +435,48 @@ public class TestWriteToReplica { private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException { try { - dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED], + false); Assert.fail("Should not have created a temporary replica that was " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY], + false); Assert.fail("Should not have created a replica that had created as" + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW], false); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR], false); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR], false); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { } - dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT], + false); try { - dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT], + false); Assert.fail("Should not have created a replica that had already been " + "created " + blocks[NON_EXISTENT]); } catch (Exception e) { @@ -486,7 +490,7 @@ public class TestWriteToReplica { try { ReplicaInPipeline replicaInfo = dataSet.createTemporary(StorageType.DEFAULT, null, - blocks[NON_EXISTENT]).getReplica(); + blocks[NON_EXISTENT], false).getReplica(); Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp); Assert.assertTrue( replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org