http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 3ef6390..cbbafc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -17,23 +17,20 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.HardLink; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.util.LightWeightResizableGSet; -import com.google.common.annotations.VisibleForTesting; - /** * This class is used by datanodes to maintain meta data of its replicas. * It provides a general interface for meta information of a replica. @@ -42,81 +39,26 @@ import com.google.common.annotations.VisibleForTesting; abstract public class ReplicaInfo extends Block implements Replica, LightWeightResizableGSet.LinkedElement { - /** For implementing {@link LightWeightResizableGSet.LinkedElement} interface */ + /** For implementing {@link LightWeightResizableGSet.LinkedElement}. */ private LightWeightResizableGSet.LinkedElement next; - /** volume where the replica belongs */ + /** volume where the replica belongs. */ private FsVolumeSpi volume; - - /** directory where block & meta files belong */ - - /** - * Base directory containing numerically-identified sub directories and - * possibly blocks. - */ - private File baseDir; - - /** - * Whether or not this replica's parent directory includes subdirs, in which - * case we can generate them based on the replica's block ID - */ - private boolean hasSubdirs; - - private static final Map<String, File> internedBaseDirs = new HashMap<String, File>(); /** - * Constructor - * @param block a block - * @param vol volume where replica is located - * @param dir directory path where block and meta files are located - */ - ReplicaInfo(Block block, FsVolumeSpi vol, File dir) { - this(block.getBlockId(), block.getNumBytes(), - block.getGenerationStamp(), vol, dir); - } - - /** - * Constructor - * @param blockId block id - * @param len replica length - * @param genStamp replica generation stamp - * @param vol volume where replica is located - * @param dir directory path where block and meta files are located - */ - ReplicaInfo(long blockId, long len, long genStamp, - FsVolumeSpi vol, File dir) { + * Constructor + * @param vol volume where replica is located + * @param blockId block id + * @param len replica length + * @param genStamp replica generation stamp + */ + ReplicaInfo(FsVolumeSpi vol, long blockId, long len, long genStamp) { super(blockId, len, genStamp); this.volume = vol; - setDirInternal(dir); - } - - /** - * Copy constructor. - * @param from where to copy from - */ - ReplicaInfo(ReplicaInfo from) { - this(from, from.getVolume(), from.getDir()); - } - - /** - * Get the full path of this replica's data file - * @return the full path of this replica's data file - */ - public File getBlockFile() { - return new File(getDir(), getBlockName()); } /** - * Get the full path of this replica's meta file - * @return the full path of this replica's meta file - */ - public File getMetaFile() { - return new File(getDir(), - DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp())); - } - - /** - * Get the volume where this replica is located on disk + * Get the volume where this replica is located on disk. * @return the volume where this replica is located on disk */ public FsVolumeSpi getVolume() { @@ -124,7 +66,7 @@ abstract public class ReplicaInfo extends Block } /** - * Set the volume where this replica is located on disk + * Set the volume where this replica is located on disk. */ void setVolume(FsVolumeSpi vol) { this.volume = vol; @@ -137,156 +79,182 @@ abstract public class ReplicaInfo extends Block public String getStorageUuid() { return volume.getStorageID(); } - + /** - * Return the parent directory path where this replica is located - * @return the parent directory path where this replica is located + * Number of bytes reserved for this replica on disk. */ - File getDir() { - return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir, - getBlockId()) : baseDir; + public long getBytesReserved() { + return 0; } /** - * Set the parent directory where this replica is located - * @param dir the parent directory where the replica is located + * Get the {@code URI} for where the data of this replica is stored. + * @return {@code URI} for the location of replica data. */ - public void setDir(File dir) { - setDirInternal(dir); - } + abstract public URI getBlockURI(); - private void setDirInternal(File dir) { - if (dir == null) { - baseDir = null; - return; - } - - ReplicaDirInfo dirInfo = parseBaseDir(dir); - this.hasSubdirs = dirInfo.hasSubidrs; - - synchronized (internedBaseDirs) { - if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) { - // Create a new String path of this file and make a brand new File object - // to guarantee we drop the reference to the underlying char[] storage. - File baseDir = new File(dirInfo.baseDirPath); - internedBaseDirs.put(dirInfo.baseDirPath, baseDir); - } - this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath); - } - } + /** + * Returns an {@link InputStream} to the replica's data. + * @param seekOffset the offset at which the read is started from. + * @return the {@link InputStream} to read the replica data. + * @throws IOException if an error occurs in opening a stream to the data. + */ + abstract public InputStream getDataInputStream(long seekOffset) + throws IOException; - @VisibleForTesting - public static class ReplicaDirInfo { - public String baseDirPath; - public boolean hasSubidrs; + /** + * Returns an {@link OutputStream} to the replica's data. + * @param append indicates if the block should be opened for append. + * @return the {@link OutputStream} to write to the replica. + * @throws IOException if an error occurs in creating an {@link OutputStream}. + */ + abstract public OutputStream getDataOutputStream(boolean append) + throws IOException; - public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) { - this.baseDirPath = baseDirPath; - this.hasSubidrs = hasSubidrs; - } - } - - @VisibleForTesting - public static ReplicaDirInfo parseBaseDir(File dir) { - - File currentDir = dir; - boolean hasSubdirs = false; - while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) { - hasSubdirs = true; - currentDir = currentDir.getParentFile(); - } - - return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs); - } + /** + * @return true if the replica's data exists. + */ + abstract public boolean blockDataExists(); /** - * Number of bytes reserved for this replica on disk. + * Used to deletes the replica's block data. + * + * @return true if the replica's data is successfully deleted. */ - public long getBytesReserved() { - return 0; - } + abstract public boolean deleteBlockData(); /** - * Number of bytes originally reserved for this replica. The actual - * reservation is adjusted as data is written to disk. + * @return the length of the block on storage. + */ + abstract public long getBlockDataLength(); + + /** + * Get the {@code URI} for where the metadata of this replica is stored. * - * @return the number of bytes originally reserved for this replica. + * @return {@code URI} for the location of replica metadata. */ - public long getOriginalBytesReserved() { - return 0; - } + abstract public URI getMetadataURI(); /** - * Copy specified file into a temporary file. Then rename the - * temporary file to the original name. This will cause any - * hardlinks to the original file to be removed. The temporary - * files are created in the same directory. The temporary files will - * be recovered (especially on Windows) on datanode restart. + * Returns an {@link InputStream} to the replica's metadata. + * @param offset the offset at which the read is started from. + * @return the {@link LengthInputStream} to read the replica metadata. + * @throws IOException */ - private void breakHardlinks(File file, Block b) throws IOException { - File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); - try { - FileInputStream in = new FileInputStream(file); - try { - FileOutputStream out = new FileOutputStream(tmpFile); - try { - IOUtils.copyBytes(in, out, 16 * 1024); - } finally { - out.close(); - } - } finally { - in.close(); - } - if (file.length() != tmpFile.length()) { - throw new IOException("Copy of file " + file + " size " + file.length()+ - " into file " + tmpFile + - " resulted in a size of " + tmpFile.length()); - } - FileUtil.replaceFile(tmpFile, file); - } catch (IOException e) { - boolean done = tmpFile.delete(); - if (!done) { - DataNode.LOG.info("detachFile failed to delete temporary file " + - tmpFile); - } - throw e; - } - } + abstract public LengthInputStream getMetadataInputStream(long offset) + throws IOException; + + /** + * Returns an {@link OutputStream} to the replica's metadata. + * @param append indicates if the block metadata should be opened for append. + * @return the {@link OutputStream} to write to the replica's metadata. + * @throws IOException if an error occurs in creating an {@link OutputStream}. + */ + abstract public OutputStream getMetadataOutputStream(boolean append) + throws IOException; + + /** + * @return true if the replica's metadata exists. + */ + abstract public boolean metadataExists(); /** - * This function "breaks hardlinks" to the current replica file. + * Used to deletes the replica's metadata. * - * When doing a DataNode upgrade, we create a bunch of hardlinks to each block - * file. This cleverly ensures that both the old and the new storage - * directories can contain the same block file, without using additional space - * for the data. + * @return true if the replica's metadata is successfully deleted. + */ + abstract public boolean deleteMetadata(); + + /** + * @return the length of the metadata on storage. + */ + abstract public long getMetadataLength(); + + /** + * Rename the metadata {@link URI} to that referenced by {@code destURI}. * - * However, when we want to append to the replica file, we need to "break" the - * hardlink to ensure that the old snapshot continues to contain the old data - * length. If we failed to do that, we could roll back to the previous/ - * directory during a downgrade, and find that the block contents were longer - * than they were at the time of upgrade. + * @param destURI the target {@link URI}. + * @return true if the rename is successful. + * @throws IOException if an exception occurs in the rename. + */ + abstract public boolean renameMeta(URI destURI) throws IOException; + + /** + * Rename the data {@link URI} to that referenced by {@code destURI}. * - * @return true only if data was copied. + * @param destURI the target {@link URI}. + * @return true if the rename is successful. + * @throws IOException if an exception occurs in the rename. + */ + abstract public boolean renameData(URI destURI) throws IOException; + + /** + * Update this replica with the {@link StorageLocation} found. + * @param replicaLocation the {@link StorageLocation} found for this replica. + */ + abstract public void updateWithReplica(StorageLocation replicaLocation); + + /** + * Check whether the block was pinned. + * @param localFS the local filesystem to use. + * @return true if the block is pinned. * @throws IOException */ - public boolean breakHardLinksIfNeeded() throws IOException { - File file = getBlockFile(); - if (file == null || getVolume() == null) { - throw new IOException("detachBlock:Block not found. " + this); - } - File meta = getMetaFile(); - - int linkCount = HardLink.getLinkCount(file); - if (linkCount > 1) { - DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " + - "block " + this); - breakHardlinks(file, this); - } - if (HardLink.getLinkCount(meta) > 1) { - breakHardlinks(meta, this); - } - return true; + abstract public boolean getPinning(LocalFileSystem localFS) + throws IOException; + + /** + * Set a block to be pinned on this datanode so that it cannot be moved + * by Balancer/Mover. + * + * @param localFS the local filesystem to use. + * @throws IOException if there is an exception in the pinning. + */ + abstract public void setPinning(LocalFileSystem localFS) throws IOException; + + /** + * Bump a replica's generation stamp to a new one. + * Its on-disk meta file name is renamed to be the new one too. + * + * @param newGS new generation stamp + * @throws IOException if the change fails + */ + abstract public void bumpReplicaGS(long newGS) throws IOException; + + abstract public ReplicaInfo getOriginalReplica(); + + /** + * Get the recovery id. + * @return the generation stamp that the replica will be bumped to + */ + abstract public long getRecoveryID(); + + /** + * Set the recovery id. + * @param recoveryId the new recoveryId + */ + abstract public void setRecoveryID(long recoveryId); + + abstract public boolean breakHardLinksIfNeeded() throws IOException; + + abstract public ReplicaRecoveryInfo createInfo(); + + abstract public int compareWith(ScanInfo info); + + abstract public void truncateBlock(long newLength) throws IOException; + + abstract public void copyMetadata(URI destination) throws IOException; + + abstract public void copyBlockdata(URI destination) throws IOException; + + /** + * Number of bytes originally reserved for this replica. The actual + * reservation is adjusted as data is written to disk. + * + * @return the number of bytes originally reserved for this replica. + */ + public long getOriginalBytesReserved() { + return 0; } @Override //Object @@ -298,7 +266,7 @@ abstract public class ReplicaInfo extends Block + "\n getBytesOnDisk() = " + getBytesOnDisk() + "\n getVisibleLength()= " + getVisibleLength() + "\n getVolume() = " + getVolume() - + "\n getBlockFile() = " + getBlockFile(); + + "\n getBlockURI() = " + getBlockURI(); } @Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java index 558ee21..09140e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import java.io.File; - import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -31,19 +29,19 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; * A recovery with higher recovery id preempts recoveries with a lower id. * */ -public class ReplicaUnderRecovery extends ReplicaInfo { - private ReplicaInfo original; // the original replica that needs to be recovered +public class ReplicaUnderRecovery extends LocalReplica { + private LocalReplica original; // original replica to be recovered private long recoveryId; // recovery id; it is also the generation stamp // that the replica will be bumped to after recovery public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) { - super(replica, replica.getVolume(), replica.getDir()); + super(replica, replica.getVolume(), ((LocalReplica)replica).getDir()); if ( replica.getState() != ReplicaState.FINALIZED && replica.getState() != ReplicaState.RBW && replica.getState() != ReplicaState.RWR ) { throw new IllegalArgumentException("Cannot recover replica: " + replica); } - this.original = replica; + this.original = (LocalReplica) replica; this.recoveryId = recoveryId; } @@ -53,22 +51,16 @@ public class ReplicaUnderRecovery extends ReplicaInfo { */ public ReplicaUnderRecovery(ReplicaUnderRecovery from) { super(from); - this.original = from.getOriginalReplica(); + this.original = (LocalReplica) from.getOriginalReplica(); this.recoveryId = from.getRecoveryID(); } - /** - * Get the recovery id - * @return the generation stamp that the replica will be bumped to - */ + @Override public long getRecoveryID() { return recoveryId; } - /** - * Set the recovery id - * @param recoveryId the new recoveryId - */ + @Override public void setRecoveryID(long recoveryId) { if (recoveryId > this.recoveryId) { this.recoveryId = recoveryId; @@ -82,6 +74,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo { * Get the original replica that's under recovery * @return the original replica under recovery */ + @Override public ReplicaInfo getOriginalReplica() { return original; } @@ -120,9 +113,9 @@ public class ReplicaUnderRecovery extends ReplicaInfo { } @Override //ReplicaInfo - public void setDir(File dir) { - super.setDir(dir); - original.setDir(dir); + public void updateWithReplica(StorageLocation replicaLocation) { + super.updateWithReplica(replicaLocation); + original.updateWithReplica(replicaLocation); } @Override //ReplicaInfo @@ -148,6 +141,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo { + "\n original=" + original; } + @Override public ReplicaRecoveryInfo createInfo() { return new ReplicaRecoveryInfo(original.getBlockId(), original.getBytesOnDisk(), original.getGenerationStamp(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java index 220649d..38ef286 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java @@ -22,6 +22,7 @@ import java.io.File; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; /** * This class represents a replica that is waiting to be recovered. @@ -32,7 +33,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; * client continues to write or be recovered as a result of * lease recovery. */ -public class ReplicaWaitingToBeRecovered extends ReplicaInfo { +public class ReplicaWaitingToBeRecovered extends LocalReplica { /** * Constructor @@ -94,4 +95,28 @@ public class ReplicaWaitingToBeRecovered extends ReplicaInfo { public String toString() { return super.toString(); } + + @Override + public ReplicaInfo getOriginalReplica() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support getOriginalReplica"); + } + + @Override + public long getRecoveryID() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support getRecoveryID"); + } + + @Override + public void setRecoveryID(long recoveryId) { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support getRecoveryID"); + } + + @Override + public ReplicaRecoveryInfo createInfo() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support createInfo"); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index acc269a..b75ed5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -44,9 +44,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; -import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; -import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; @@ -230,10 +229,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { VolumeFailureSummary getVolumeFailureSummary(); /** @return a list of finalized blocks for the given block pool. */ - List<FinalizedReplica> getFinalizedBlocks(String bpid); + List<ReplicaInfo> getFinalizedBlocks(String bpid); /** @return a list of finalized blocks for the given block pool. */ - List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid); + List<ReplicaInfo> getFinalizedBlocksOnPersistentStorage(String bpid); /** * Check whether the in-memory block record matches the block on the disk, @@ -337,7 +336,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * @param temporary the temporary replica being converted * @return the result RBW */ - ReplicaInPipelineInterface convertTemporaryToRbw( + ReplicaInPipeline convertTemporaryToRbw( ExtendedBlock temporary) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 1e4e37a..b4384b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -45,13 +45,13 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; -import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; -import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; -import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; @@ -309,14 +309,14 @@ class BlockPoolSlice { return rbwFile; } - File addFinalizedBlock(Block b, File f) throws IOException { + File addFinalizedBlock(Block b, ReplicaInfo replicaInfo) throws IOException { File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); if (!blockDir.exists()) { if (!blockDir.mkdirs()) { throw new IOException("Failed to mkdirs " + blockDir); } } - File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir); + File blockFile = FsDatasetImpl.moveBlockFiles(b, replicaInfo, blockDir); File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); if (dfsUsage instanceof CachingGetSpaceUsed) { ((CachingGetSpaceUsed) dfsUsage).incDfsUsed( @@ -329,16 +329,28 @@ class BlockPoolSlice { * Move a persisted replica from lazypersist directory to a subdirectory * under finalized. */ - File activateSavedReplica(Block b, File metaFile, File blockFile) - throws IOException { - final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); + ReplicaInfo activateSavedReplica(ReplicaInfo replicaInfo, + RamDiskReplica replicaState) throws IOException { + File metaFile = replicaState.getSavedMetaFile(); + File blockFile = replicaState.getSavedBlockFile(); + final long blockId = replicaInfo.getBlockId(); + final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId); final File targetBlockFile = new File(blockDir, blockFile.getName()); final File targetMetaFile = new File(blockDir, metaFile.getName()); FileUtils.moveFile(blockFile, targetBlockFile); FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile); FileUtils.moveFile(metaFile, targetMetaFile); FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile); - return targetBlockFile; + + ReplicaInfo newReplicaInfo = + new ReplicaBuilder(ReplicaState.FINALIZED) + .setBlockId(blockId) + .setLength(replicaInfo.getBytesOnDisk()) + .setGenerationStamp(replicaInfo.getGenerationStamp()) + .setFsVolume(replicaState.getLazyPersistVolume()) + .setDirectoryToUse(targetBlockFile.getParentFile()) + .build(); + return newReplicaInfo; } void checkDirs() throws DiskErrorException { @@ -461,9 +473,13 @@ class BlockPoolSlice { long blockId = block.getBlockId(); long genStamp = block.getGenerationStamp(); if (isFinalized) { - newReplica = new FinalizedReplica(blockId, - block.getNumBytes(), genStamp, volume, DatanodeUtil - .idToBlockDir(finalizedDir, blockId)); + newReplica = new ReplicaBuilder(ReplicaState.FINALIZED) + .setBlockId(blockId) + .setLength(block.getNumBytes()) + .setGenerationStamp(genStamp) + .setFsVolume(volume) + .setDirectoryToUse(DatanodeUtil.idToBlockDir(finalizedDir, blockId)) + .build(); } else { File file = new File(rbwDir, block.getBlockName()); boolean loadRwr = true; @@ -477,9 +493,15 @@ class BlockPoolSlice { // It didn't expire. Load the replica as a RBW. // We don't know the expected block length, so just use 0 // and don't reserve any more space for writes. - newReplica = new ReplicaBeingWritten(blockId, - validateIntegrityAndSetLength(file, genStamp), - genStamp, volume, file.getParentFile(), null, 0); + newReplica = new ReplicaBuilder(ReplicaState.RBW) + .setBlockId(blockId) + .setLength(validateIntegrityAndSetLength(file, genStamp)) + .setGenerationStamp(genStamp) + .setFsVolume(volume) + .setDirectoryToUse(file.getParentFile()) + .setWriterThread(null) + .setBytesToReserve(0) + .build(); loadRwr = false; } sc.close(); @@ -496,9 +518,13 @@ class BlockPoolSlice { } // Restart meta doesn't exist or expired. if (loadRwr) { - newReplica = new ReplicaWaitingToBeRecovered(blockId, - validateIntegrityAndSetLength(file, genStamp), - genStamp, volume, file.getParentFile()); + ReplicaBuilder builder = new ReplicaBuilder(ReplicaState.RWR) + .setBlockId(blockId) + .setLength(validateIntegrityAndSetLength(file, genStamp)) + .setGenerationStamp(genStamp) + .setFsVolume(volume) + .setDirectoryToUse(file.getParentFile()); + newReplica = builder.build(); } } @@ -614,7 +640,7 @@ class BlockPoolSlice { // it's the same block so don't ever delete it, even if GS or size // differs. caller should keep the one it just discovered on disk - if (replica1.getBlockFile().equals(replica2.getBlockFile())) { + if (replica1.getBlockURI().equals(replica2.getBlockURI())) { return null; } if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) { @@ -641,13 +667,11 @@ class BlockPoolSlice { private void deleteReplica(final ReplicaInfo replicaToDelete) { // Delete the files on disk. Failure here is okay. - final File blockFile = replicaToDelete.getBlockFile(); - if (!blockFile.delete()) { - LOG.warn("Failed to delete block file " + blockFile); + if (!replicaToDelete.deleteBlockData()) { + LOG.warn("Failed to delete block file for replica " + replicaToDelete); } - final File metaFile = replicaToDelete.getMetaFile(); - if (!metaFile.delete()) { - LOG.warn("Failed to delete meta file " + metaFile); + if (!replicaToDelete.deleteMetadata()) { + LOG.warn("Failed to delete meta file for replica " + replicaToDelete); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index fdc9f83..c9160cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; import java.io.FileDescriptor; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -34,6 +35,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.io.IOUtils; @@ -211,12 +214,12 @@ class FsDatasetAsyncDiskService { * Delete the block file and meta file from the disk asynchronously, adjust * dfsUsed statistics accordingly. */ - void deleteAsync(FsVolumeReference volumeRef, File blockFile, File metaFile, + void deleteAsync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete, ExtendedBlock block, String trashDirectory) { LOG.info("Scheduling " + block.getLocalBlock() - + " file " + blockFile + " for deletion"); + + " replica " + replicaToDelete + " for deletion"); ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask( - volumeRef, blockFile, metaFile, block, trashDirectory); + volumeRef, replicaToDelete, block, trashDirectory); execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask); } @@ -227,19 +230,18 @@ class FsDatasetAsyncDiskService { * files are deleted immediately. */ class ReplicaFileDeleteTask implements Runnable { - final FsVolumeReference volumeRef; - final FsVolumeImpl volume; - final File blockFile; - final File metaFile; - final ExtendedBlock block; - final String trashDirectory; - - ReplicaFileDeleteTask(FsVolumeReference volumeRef, File blockFile, - File metaFile, ExtendedBlock block, String trashDirectory) { + private final FsVolumeReference volumeRef; + private final FsVolumeImpl volume; + private final ReplicaInfo replicaToDelete; + private final ExtendedBlock block; + private final String trashDirectory; + + ReplicaFileDeleteTask(FsVolumeReference volumeRef, + ReplicaInfo replicaToDelete, ExtendedBlock block, + String trashDirectory) { this.volumeRef = volumeRef; this.volume = (FsVolumeImpl) volumeRef.getVolume(); - this.blockFile = blockFile; - this.metaFile = metaFile; + this.replicaToDelete = replicaToDelete; this.block = block; this.trashDirectory = trashDirectory; } @@ -248,15 +250,22 @@ class FsDatasetAsyncDiskService { public String toString() { // Called in AsyncDiskService.execute for displaying error messages. return "deletion of block " + block.getBlockPoolId() + " " - + block.getLocalBlock() + " with block file " + blockFile - + " and meta file " + metaFile + " from volume " + volume; + + block.getLocalBlock() + " with block file " + + replicaToDelete.getBlockURI() + " and meta file " + + replicaToDelete.getMetadataURI() + " from volume " + volume; } private boolean deleteFiles() { - return blockFile.delete() && (metaFile.delete() || !metaFile.exists()); + return replicaToDelete.deleteBlockData() && + (replicaToDelete.deleteMetadata() || !replicaToDelete.metadataExists()); } private boolean moveFiles() { + if (trashDirectory == null) { + LOG.error("Trash dir for replica " + replicaToDelete + " is null"); + return false; + } + File trashDirFile = new File(trashDirectory); if (!trashDirFile.exists() && !trashDirFile.mkdirs()) { LOG.error("Failed to create trash directory " + trashDirectory); @@ -264,20 +273,28 @@ class FsDatasetAsyncDiskService { } if (LOG.isDebugEnabled()) { - LOG.debug("Moving files " + blockFile.getName() + " and " + - metaFile.getName() + " to trash."); + LOG.debug("Moving files " + replicaToDelete.getBlockURI() + " and " + + replicaToDelete.getMetadataURI() + " to trash."); } - File newBlockFile = new File(trashDirectory, blockFile.getName()); - File newMetaFile = new File(trashDirectory, metaFile.getName()); - return (blockFile.renameTo(newBlockFile) && - metaFile.renameTo(newMetaFile)); + final String blockName = replicaToDelete.getBlockName(); + final long genstamp = replicaToDelete.getGenerationStamp(); + File newBlockFile = new File(trashDirectory, blockName); + File newMetaFile = new File(trashDirectory, + DatanodeUtil.getMetaName(blockName, genstamp)); + try { + return (replicaToDelete.renameData(newBlockFile.toURI()) && + replicaToDelete.renameMeta(newMetaFile.toURI())); + } catch (IOException e) { + LOG.error("Error moving files to trash: " + replicaToDelete, e); + } + return false; } @Override public void run() { - final long blockLength = blockFile.length(); - final long metaLength = metaFile.length(); + final long blockLength = replicaToDelete.getBlockDataLength(); + final long metaLength = replicaToDelete.getMetadataLength(); boolean result; result = (trashDirectory == null) ? deleteFiles() : moveFiles(); @@ -286,7 +303,7 @@ class FsDatasetAsyncDiskService { LOG.warn("Unexpected error trying to " + (trashDirectory == null ? "delete" : "move") + " block " + block.getBlockPoolId() + " " + block.getLocalBlock() - + " at file " + blockFile + ". Ignored."); + + " at file " + replicaToDelete.getBlockURI() + ". Ignored."); } else { if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID()); @@ -294,7 +311,7 @@ class FsDatasetAsyncDiskService { volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength); volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength); LOG.info("Deleted " + block.getBlockPoolId() + " " - + block.getLocalBlock() + " file " + blockFile); + + block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI()); } updateDeletedBlockId(block); IOUtils.cleanup(null, volumeRef); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org