Repository: hadoop
Updated Branches:
refs/heads/trunk 21daa6c68 -> 7f393a6f6
Revert "HDFS-8860. Remove unused Replica copyOnWrite code (Lei (Eddy) Xu via
Colin P. McCabe)"
This reverts commit a153b9601ad8628fdd608d8696310ca8c1f58ff0.
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f393a6f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f393a6f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f393a6f
Branch: refs/heads/trunk
Commit: 7f393a6f61f5a34a1de11481ad321c6a941d5d27
Parents: 21daa6c
Author: Lei Xu <[email protected]>
Authored: Thu Dec 10 10:57:33 2015 -1000
Committer: Lei Xu <[email protected]>
Committed: Thu Dec 10 10:57:33 2015 -1000
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 -
.../hdfs/server/datanode/FinalizedReplica.java | 15 +++-
.../hdfs/server/datanode/ReplicaInfo.java | 88 ++++++++++++++++++++
.../server/datanode/ReplicaUnderRecovery.java | 10 +++
.../datanode/ReplicaWaitingToBeRecovered.java | 15 +++-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 3 +
.../org/apache/hadoop/hdfs/TestFileAppend.java | 72 ++++++++++++++++
.../hdfs/server/datanode/DataNodeTestUtils.java | 15 ++++
.../fsdataset/impl/FsDatasetTestUtil.java | 6 ++
.../fsdataset/impl/TestDatanodeRestart.java | 72 ++++++++++++++++
10 files changed, 294 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6755f00..7fe5850 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1448,8 +1448,6 @@ Release 2.8.0 - UNRELEASED
HDFS-9019. Adding informative message to sticky bit permission denied
exception. (xyao)
- HDFS-8860. Remove unused Replica copyOnWrite code (Lei (Eddy) Xu via Colin
P. McCabe)
-
HDFS-8716. Introduce a new config specifically for safe mode block count
(Chang Li via kihwal)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
index 8daeb51..cc32874 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
@@ -27,6 +27,7 @@ import
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
* This class describes a replica that has been finalized.
*/
public class FinalizedReplica extends ReplicaInfo {
+ private boolean unlinked; // copy-on-write done for block
/**
* Constructor
@@ -57,6 +58,7 @@ public class FinalizedReplica extends ReplicaInfo {
*/
public FinalizedReplica(FinalizedReplica from) {
super(from);
+ this.unlinked = from.isUnlinked();
}
@Override // ReplicaInfo
@@ -64,6 +66,16 @@ public class FinalizedReplica extends ReplicaInfo {
return ReplicaState.FINALIZED;
}
+ @Override // ReplicaInfo
+ public boolean isUnlinked() {
+ return unlinked;
+ }
+
+ @Override // ReplicaInfo
+ public void setUnlinked() {
+ unlinked = true;
+ }
+
@Override
public long getVisibleLength() {
return getNumBytes(); // all bytes are visible
@@ -86,6 +98,7 @@ public class FinalizedReplica extends ReplicaInfo {
@Override
public String toString() {
- return super.toString();
+ return super.toString()
+ + "\n unlinked =" + unlinked;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index d19e656..e41cce0 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -18,12 +18,18 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.LightWeightResizableGSet;
import com.google.common.annotations.VisibleForTesting;
@@ -194,6 +200,22 @@ abstract public class ReplicaInfo extends Block
}
/**
+ * check if this replica has already been unlinked.
+ * @return true if the replica has already been unlinked
+ * or no need to be detached; false otherwise
+ */
+ public boolean isUnlinked() {
+ return true; // no need to be unlinked
+ }
+
+ /**
+ * set that this replica is unlinked
+ */
+ public void setUnlinked() {
+ // no need to be unlinked
+ }
+
+ /**
* Number of bytes reserved for this replica on disk.
*/
public long getBytesReserved() {
@@ -210,6 +232,72 @@ abstract public class ReplicaInfo extends Block
return 0;
}
+ /**
+ * Copy specified file into a temporary file. Then rename the
+ * temporary file to the original name. This will cause any
+ * hardlinks to the original file to be removed. The temporary
+ * files are created in the same directory. The temporary files will
+ * be recovered (especially on Windows) on datanode restart.
+ */
+ private void unlinkFile(File file, Block b) throws IOException {
+ File tmpFile = DatanodeUtil.createTmpFile(b,
DatanodeUtil.getUnlinkTmpFile(file));
+ try {
+ FileInputStream in = new FileInputStream(file);
+ try {
+ FileOutputStream out = new FileOutputStream(tmpFile);
+ try {
+ IOUtils.copyBytes(in, out, 16*1024);
+ } finally {
+ out.close();
+ }
+ } finally {
+ in.close();
+ }
+ if (file.length() != tmpFile.length()) {
+ throw new IOException("Copy of file " + file + " size " +
file.length()+
+ " into file " + tmpFile +
+ " resulted in a size of " + tmpFile.length());
+ }
+ FileUtil.replaceFile(tmpFile, file);
+ } catch (IOException e) {
+ boolean done = tmpFile.delete();
+ if (!done) {
+ DataNode.LOG.info("detachFile failed to delete temporary file " +
+ tmpFile);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Remove a hard link by copying the block to a temporary place and
+ * then moving it back
+ * @param numLinks number of hard links
+ * @return true if copy is successful;
+ * false if it is already detached or no need to be detached
+ * @throws IOException if there is any copy error
+ */
+ public boolean unlinkBlock(int numLinks) throws IOException {
+ if (isUnlinked()) {
+ return false;
+ }
+ File file = getBlockFile();
+ if (file == null || getVolume() == null) {
+ throw new IOException("detachBlock:Block not found. " + this);
+ }
+ File meta = getMetaFile();
+
+ if (HardLink.getLinkCount(file) > numLinks) {
+ DataNode.LOG.info("CopyOnWrite for block " + this);
+ unlinkFile(file, this);
+ }
+ if (HardLink.getLinkCount(meta) > numLinks) {
+ unlinkFile(meta, this);
+ }
+ setUnlinked();
+ return true;
+ }
+
@Override //Object
public String toString() {
return getClass().getSimpleName()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
index 558ee21..2cd8a01 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
@@ -85,6 +85,16 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
public ReplicaInfo getOriginalReplica() {
return original;
}
+
+ @Override //ReplicaInfo
+ public boolean isUnlinked() {
+ return original.isUnlinked();
+ }
+
+ @Override //ReplicaInfo
+ public void setUnlinked() {
+ original.setUnlinked();
+ }
@Override //ReplicaInfo
public ReplicaState getState() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
index 220649d..26ab3db 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
@@ -33,6 +33,7 @@ import
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
* lease recovery.
*/
public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
+ private boolean unlinked; // copy-on-write done for block
/**
* Constructor
@@ -63,6 +64,7 @@ public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
*/
public ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) {
super(from);
+ this.unlinked = from.isUnlinked();
}
@Override //ReplicaInfo
@@ -71,6 +73,16 @@ public class ReplicaWaitingToBeRecovered extends ReplicaInfo
{
}
@Override //ReplicaInfo
+ public boolean isUnlinked() {
+ return unlinked;
+ }
+
+ @Override //ReplicaInfo
+ public void setUnlinked() {
+ unlinked = true;
+ }
+
+ @Override //ReplicaInfo
public long getVisibleLength() {
return -1; //no bytes are visible
}
@@ -92,6 +104,7 @@ public class ReplicaWaitingToBeRecovered extends ReplicaInfo
{
@Override
public String toString() {
- return super.toString();
+ return super.toString()
+ + "\n unlinked=" + unlinked;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 466c7e9..1d8c705 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1109,6 +1109,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
throws IOException {
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
+ // unlink the finalized replica
+ replicaInfo.unlinkBlock(1);
// construct a RBW replica with the new GS
File blkfile = replicaInfo.getBlockFile();
@@ -2478,6 +2480,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
+ ", rur=" + rur);
}
if (rur.getNumBytes() > newlength) {
+ rur.unlinkBlock(1);
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
if(!copyOnTruncate) {
// update RUR with the new length
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index 7b7f415..85d92c9 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -110,6 +110,78 @@ public class TestFileAppend{
}
/**
+ * Test that copy on write for blocks works correctly
+ * @throws IOException an exception might be thrown
+ */
+ @Test
+ public void testCopyOnWrite() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ if (simulatedStorage) {
+ SimulatedFSDataset.setFactory(conf);
+ }
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ FileSystem fs = cluster.getFileSystem();
+ InetSocketAddress addr = new InetSocketAddress("localhost",
+ cluster.getNameNodePort());
+ DFSClient client = new DFSClient(addr, conf);
+ try {
+
+ // create a new file, write to it and close it.
+ //
+ Path file1 = new Path("/filestatus.dat");
+ FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
+ writeFile(stm);
+ stm.close();
+
+ // Get a handle to the datanode
+ DataNode[] dn = cluster.listDataNodes();
+ assertTrue("There should be only one datanode but found " + dn.length,
+ dn.length == 1);
+
+ LocatedBlocks locations = client.getNamenode().getBlockLocations(
+ file1.toString(), 0, Long.MAX_VALUE);
+ List<LocatedBlock> blocks = locations.getLocatedBlocks();
+
+ //
+ // Create hard links for a few of the blocks
+ //
+ for (int i = 0; i < blocks.size(); i = i + 2) {
+ ExtendedBlock b = blocks.get(i).getBlock();
+ final File f = DataNodeTestUtils.getFile(dn[0],
+ b.getBlockPoolId(), b.getLocalBlock().getBlockId());
+ File link = new File(f.toString() + ".link");
+ System.out.println("Creating hardlink for File " + f + " to " + link);
+ HardLink.createHardLink(f, link);
+ }
+
+ //
+ // Detach all blocks. This should remove hardlinks (if any)
+ //
+ for (int i = 0; i < blocks.size(); i++) {
+ ExtendedBlock b = blocks.get(i).getBlock();
+ System.out.println("testCopyOnWrite detaching block " + b);
+ assertTrue("Detaching block " + b + " should have returned true",
+ DataNodeTestUtils.unlinkBlock(dn[0], b, 1));
+ }
+
+ // Since the blocks were already detached earlier, these calls should
+ // return false
+ //
+ for (int i = 0; i < blocks.size(); i++) {
+ ExtendedBlock b = blocks.get(i).getBlock();
+ System.out.println("testCopyOnWrite detaching block " + b);
+ assertTrue("Detaching block " + b + " should have returned false",
+ !DataNodeTestUtils.unlinkBlock(dn[0], b, 1));
+ }
+
+ } finally {
+ client.close();
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+ /**
* Test a simple flush on a simple HDFS file.
* @throws IOException an exception might be thrown
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
index 1d47192..6bcbb1d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
@@ -158,6 +159,20 @@ public class DataNodeTestUtils {
return dn.getFSDataset();
}
+ public static File getFile(DataNode dn, String bpid, long bid) {
+ return FsDatasetTestUtil.getFile(dn.getFSDataset(), bpid, bid);
+ }
+
+ public static File getBlockFile(DataNode dn, String bpid, Block b
+ ) throws IOException {
+ return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
+ }
+
+ public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
+ ) throws IOException {
+ return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks);
+ }
+
/**
* Fetch a copy of ReplicaInfo from a datanode by block id
* @param dn datanode to retrieve a replicainfo object from
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
index f4480a1..9c297e8 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
@@ -54,6 +54,12 @@ public class FsDatasetTestUtil {
return FsDatasetUtil.getMetaFile(getBlockFile(fsd, bpid, b), b
.getGenerationStamp());
}
+
+ public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
+ ExtendedBlock block, int numLinks) throws IOException {
+ final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
+ return info.unlinkBlock(numLinks);
+ }
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
final String bpid, final long blockId) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f393a6f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
index 8bbac9f..4516696 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java
@@ -143,7 +143,79 @@ public class TestDatanodeRestart {
}
}
+ // test recovering unlinked tmp replicas
+ @Test public void testRecoverReplicas() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024L);
+ conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ try {
+ FileSystem fs = cluster.getFileSystem();
+ for (int i=0; i<4; i++) {
+ Path fileName = new Path("/test"+i);
+ DFSTestUtil.createFile(fs, fileName, 1, (short)1, 0L);
+ DFSTestUtil.waitReplication(fs, fileName, (short)1);
+ }
+ String bpid = cluster.getNamesystem().getBlockPoolId();
+ DataNode dn = cluster.getDataNodes().get(0);
+ Iterator<ReplicaInfo> replicasItor =
+ dataset(dn).volumeMap.replicas(bpid).iterator();
+ ReplicaInfo replica = replicasItor.next();
+ createUnlinkTmpFile(replica, true, true); // rename block file
+ createUnlinkTmpFile(replica, false, true); // rename meta file
+ replica = replicasItor.next();
+ createUnlinkTmpFile(replica, true, false); // copy block file
+ createUnlinkTmpFile(replica, false, false); // copy meta file
+ replica = replicasItor.next();
+ createUnlinkTmpFile(replica, true, true); // rename block file
+ createUnlinkTmpFile(replica, false, false); // copy meta file
+
+ cluster.restartDataNodes();
+ cluster.waitActive();
+ dn = cluster.getDataNodes().get(0);
+
+ // check volumeMap: 4 finalized replica
+ Collection<ReplicaInfo> replicas = dataset(dn).volumeMap.replicas(bpid);
+ Assert.assertEquals(4, replicas.size());
+ replicasItor = replicas.iterator();
+ while (replicasItor.hasNext()) {
+ Assert.assertEquals(ReplicaState.FINALIZED,
+ replicasItor.next().getState());
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
private static FsDatasetImpl dataset(DataNode dn) {
return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
}
+
+ private static void createUnlinkTmpFile(ReplicaInfo replicaInfo,
+ boolean changeBlockFile,
+ boolean isRename) throws IOException {
+ File src;
+ if (changeBlockFile) {
+ src = replicaInfo.getBlockFile();
+ } else {
+ src = replicaInfo.getMetaFile();
+ }
+ File dst = DatanodeUtil.getUnlinkTmpFile(src);
+ if (isRename) {
+ src.renameTo(dst);
+ } else {
+ FileInputStream in = new FileInputStream(src);
+ try {
+ FileOutputStream out = new FileOutputStream(dst);
+ try {
+ IOUtils.copyBytes(in, out, 1);
+ } finally {
+ out.close();
+ }
+ } finally {
+ in.close();
+ }
+ }
+ }
}