http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index e9f1dc1..54b2ce8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -22,12 +22,10 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; import java.io.FileDescriptor; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.RandomAccessFile; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.ArrayList; @@ -53,10 +51,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -77,17 +73,13 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; -import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; -import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; -import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; -import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; -import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -192,13 +184,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public Block getStoredBlock(String bpid, long blkid) throws IOException { try (AutoCloseableLock lock = datasetLock.acquire()) { - File blockfile = getFile(bpid, blkid, false); - if (blockfile == null) { + ReplicaInfo r = volumeMap.get(bpid, blkid); + if (r == null) { return null; } - final File metafile = FsDatasetUtil.findMetaFile(blockfile); - final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile); - return new Block(blkid, blockfile.length(), gs); + return new Block(blkid, r.getBytesOnDisk(), r.getGenerationStamp()); } } @@ -209,19 +199,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); - if(r == null) + if (r == null) { return null; + } switch(r.getState()) { case FINALIZED: - return new FinalizedReplica((FinalizedReplica)r); case RBW: - return new ReplicaBeingWritten((ReplicaBeingWritten)r); case RWR: - return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); case RUR: - return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); case TEMPORARY: - return new ReplicaInPipeline((ReplicaInPipeline)r); + return new ReplicaBuilder(r.getState()).from(r).build(); } return null; } @@ -229,16 +216,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { @Override // FsDatasetSpi public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { - File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp()); - if (meta == null || !meta.exists()) { + ReplicaInfo info = getBlockReplica(b); + if (info == null || !info.metadataExists()) { return null; } - if (isNativeIOAvailable) { - return new LengthInputStream( - NativeIO.getShareDeleteFileInputStream(meta), - meta.length()); - } - return new LengthInputStream(new FileInputStream(meta), meta.length()); + return info.getMetadataInputStream(0); } final DataNode datanode; @@ -738,62 +720,45 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { */ @Override // FsDatasetSpi public long getLength(ExtendedBlock b) throws IOException { - return getBlockFile(b).length(); + return getBlockReplica(b).getBlockDataLength(); } /** * Get File name for a given block. */ - private File getBlockFile(ExtendedBlock b) throws IOException { - return getBlockFile(b.getBlockPoolId(), b.getBlockId()); + private ReplicaInfo getBlockReplica(ExtendedBlock b) throws IOException { + return getBlockReplica(b.getBlockPoolId(), b.getBlockId()); } /** * Get File name for a given block. */ - File getBlockFile(String bpid, long blockId) throws IOException { - File f = validateBlockFile(bpid, blockId); - if(f == null) { + ReplicaInfo getBlockReplica(String bpid, long blockId) throws IOException { + ReplicaInfo r = validateBlockFile(bpid, blockId); + if (r == null) { throw new IOException("BlockId " + blockId + " is not valid."); } - return f; - } - - /** - * Return the File associated with a block, without first - * checking that it exists. This should be used when the - * next operation is going to open the file for read anyway, - * and thus the exists check is redundant. - * - * @param touch if true then update the last access timestamp of the - * block. Currently used for blocks on transient storage. - */ - private File getBlockFileNoExistsCheck(ExtendedBlock b, - boolean touch) - throws IOException { - final File f; - try (AutoCloseableLock lock = datasetLock.acquire()) { - f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch); - } - if (f == null) { - throw new IOException("Block " + b + " is not valid"); - } - return f; + return r; } @Override // FsDatasetSpi public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { - File blockFile = getBlockFileNoExistsCheck(b, true); - if (isNativeIOAvailable) { - return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset); + + ReplicaInfo info; + synchronized(this) { + info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); + } + + if (info != null && info.getVolume().isTransientStorage()) { + ramDiskReplicaTracker.touch(b.getBlockPoolId(), b.getBlockId()); + datanode.getMetrics().incrRamDiskBlocksReadHits(); + } + + if(info != null && info.blockDataExists()) { + return info.getDataInputStream(seekOffset); } else { - try { - return openAndSeek(blockFile, seekOffset); - } catch (FileNotFoundException fnfe) { - throw new IOException("Block " + b + " is not valid. " + - "Expected block file at " + blockFile + " does not exist."); - } + throw new IOException("No data exists for block " + b); } } @@ -814,7 +779,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } return info; } - + /** * Get the meta info of a block stored in volumeMap. Block is looked up * without matching the generation stamp. @@ -824,7 +789,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { * @throws ReplicaNotFoundException if no entry is in the map or * there is a generation stamp mismatch */ - private ReplicaInfo getReplicaInfo(String bpid, long blkid) + @VisibleForTesting + ReplicaInfo getReplicaInfo(String bpid, long blkid) throws ReplicaNotFoundException { ReplicaInfo info = volumeMap.get(bpid, blkid); if (info == null) { @@ -833,7 +799,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } return info; } - + /** * Returns handles to the block file and its metadata file */ @@ -844,10 +810,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { - InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset); + InputStream blockInStream = info.getDataInputStream(blkOffset); try { - InputStream metaInStream = - openAndSeek(info.getMetaFile(), metaOffset); + InputStream metaInStream = info.getMetadataInputStream(metaOffset); return new ReplicaInputStreams(blockInStream, metaInStream, ref); } catch (IOException e) { IOUtils.cleanup(null, blockInStream); @@ -860,41 +825,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } } - private static FileInputStream openAndSeek(File file, long offset) - throws IOException { - RandomAccessFile raf = null; - try { - raf = new RandomAccessFile(file, "r"); - if (offset > 0) { - raf.seek(offset); - } - return new FileInputStream(raf.getFD()); - } catch(IOException ioe) { - IOUtils.cleanup(null, raf); - throw ioe; - } - } - - static File moveBlockFiles(Block b, File srcfile, File destdir) + static File moveBlockFiles(Block b, ReplicaInfo replicaInfo, File destdir) throws IOException { final File dstfile = new File(destdir, b.getBlockName()); - final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp()); final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp()); try { - NativeIO.renameTo(srcmeta, dstmeta); + replicaInfo.renameMeta(dstmeta.toURI()); } catch (IOException e) { throw new IOException("Failed to move meta file for " + b - + " from " + srcmeta + " to " + dstmeta, e); + + " from " + replicaInfo.getMetadataURI() + " to " + dstmeta, e); } try { - NativeIO.renameTo(srcfile, dstfile); + replicaInfo.renameData(dstfile.toURI()); } catch (IOException e) { throw new IOException("Failed to move block file for " + b - + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e); + + " from " + replicaInfo.getBlockURI() + " to " + + dstfile.getAbsolutePath(), e); } if (LOG.isDebugEnabled()) { - LOG.debug("addFinalizedBlock: Moved " + srcmeta + " to " + dstmeta - + " and " + srcfile + " to " + dstfile); + LOG.debug("addFinalizedBlock: Moved " + replicaInfo.getMetadataURI() + + " to " + dstmeta + " and " + replicaInfo.getBlockURI() + + " to " + dstfile); } return dstfile; } @@ -904,41 +855,44 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { * @return the new meta and block files. * @throws IOException */ - static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta, - File srcFile, File destRoot, boolean calculateChecksum, + static File[] copyBlockFiles(long blockId, long genStamp, + ReplicaInfo srcReplica, File destRoot, boolean calculateChecksum, int smallBufferSize, final Configuration conf) throws IOException { final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); - final File dstFile = new File(destDir, srcFile.getName()); + // blockName is same as the filename for the block + final File dstFile = new File(destDir, srcReplica.getBlockName()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); - return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum, + return copyBlockFiles(srcReplica, dstMeta, dstFile, calculateChecksum, smallBufferSize, conf); } - static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta, + static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta, File dstFile, boolean calculateChecksum, int smallBufferSize, final Configuration conf) throws IOException { + if (calculateChecksum) { - computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize, conf); + computeChecksum(srcReplica, dstMeta, smallBufferSize, conf); } else { try { - Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); + srcReplica.copyMetadata(dstMeta.toURI()); } catch (IOException e) { - throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e); + throw new IOException("Failed to copy " + srcReplica + " metadata to " + + dstMeta, e); } } - try { - Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true); + srcReplica.copyBlockdata(dstFile.toURI()); } catch (IOException e) { - throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e); + throw new IOException("Failed to copy " + srcReplica + " block file to " + + dstFile, e); } if (LOG.isDebugEnabled()) { if (calculateChecksum) { - LOG.debug("Copied " + srcMeta + " to " + dstMeta - + " and calculated checksum"); + LOG.debug("Copied " + srcReplica.getMetadataURI() + " meta to " + + dstMeta + " and calculated checksum"); } else { - LOG.debug("Copied " + srcFile + " to " + dstFile); + LOG.debug("Copied " + srcReplica.getBlockURI() + " to " + dstFile); } } return new File[] {dstMeta, dstFile}; @@ -1002,18 +956,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo, FsVolumeReference volumeRef) throws IOException { - File oldBlockFile = replicaInfo.getBlockFile(); - File oldMetaFile = replicaInfo.getMetaFile(); + FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume(); // Copy files to temp dir first File[] blockFiles = copyBlockFiles(block.getBlockId(), - block.getGenerationStamp(), oldMetaFile, oldBlockFile, + block.getGenerationStamp(), replicaInfo, targetVolume.getTmpDir(block.getBlockPoolId()), replicaInfo.isOnTransientStorage(), smallBufferSize, conf); - ReplicaInfo newReplicaInfo = new ReplicaInPipeline( - replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), - targetVolume, blockFiles[0].getParentFile(), 0); + ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY) + .setBlockId(replicaInfo.getBlockId()) + .setGenerationStamp(replicaInfo.getGenerationStamp()) + .setFsVolume(targetVolume) + .setDirectoryToUse(blockFiles[0].getParentFile()) + .setBytesToReserve(0) + .build(); newReplicaInfo.setNumBytes(blockFiles[1].length()); // Finalize the copied files newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); @@ -1023,8 +980,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks(); } - removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile, - oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId()); + removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId()); return newReplicaInfo; } @@ -1065,16 +1021,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { * Compute and store the checksum for a block file that does not already have * its checksum computed. * - * @param srcMeta source meta file, containing only the checksum header, not a - * calculated checksum + * @param srcReplica source {@link ReplicaInfo}, containing only the checksum + * header, not a calculated checksum * @param dstMeta destination meta file, into which this method will write a * full computed checksum - * @param blockFile block file for which the checksum will be computed + * @param smallBufferSize buffer size to use + * @param conf the {@link Configuration} * @throws IOException */ - private static void computeChecksum(File srcMeta, File dstMeta, - File blockFile, int smallBufferSize, final Configuration conf) + private static void computeChecksum(ReplicaInfo srcReplica, File dstMeta, + int smallBufferSize, final Configuration conf) throws IOException { + File srcMeta = new File(srcReplica.getMetadataURI()); final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta, DFSUtilClient.getIoFileBufferSize(conf)); final byte[] data = new byte[1 << 16]; @@ -1094,9 +1052,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { BlockMetadataHeader.writeHeader(metaOut, checksum); int offset = 0; - try (InputStream dataIn = isNativeIOAvailable ? - NativeIO.getShareDeleteFileInputStream(blockFile) : - new FileInputStream(blockFile)) { + try (InputStream dataIn = srcReplica.getDataInputStream(0)) { for (int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) { if (n > 0) { @@ -1118,58 +1074,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { checksum.calculateChunkedSums(data, 0, offset, crcs, 0); metaOut.write(crcs, 0, 4); } finally { - IOUtils.cleanup(LOG, metaOut); - } - } - - static private void truncateBlock(File blockFile, File metaFile, - long oldlen, long newlen) throws IOException { - LOG.info("truncateBlock: blockFile=" + blockFile - + ", metaFile=" + metaFile - + ", oldlen=" + oldlen - + ", newlen=" + newlen); - - if (newlen == oldlen) { - return; - } - if (newlen > oldlen) { - throw new IOException("Cannot truncate block to from oldlen (=" + oldlen - + ") to newlen (=" + newlen + ")"); - } - - DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); - int checksumsize = dcs.getChecksumSize(); - int bpc = dcs.getBytesPerChecksum(); - long n = (newlen - 1)/bpc + 1; - long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize; - long lastchunkoffset = (n - 1)*bpc; - int lastchunksize = (int)(newlen - lastchunkoffset); - byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; - - RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); - try { - //truncate blockFile - blockRAF.setLength(newlen); - - //read last chunk - blockRAF.seek(lastchunkoffset); - blockRAF.readFully(b, 0, lastchunksize); - } finally { - blockRAF.close(); - } - - //compute checksum - dcs.update(b, 0, lastchunksize); - dcs.writeValue(b, 0, false); - - //update metaFile - RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); - try { - metaRAF.setLength(newmetalen); - metaRAF.seek(newmetalen - checksumsize); - metaRAF.write(b, 0, checksumsize); - } finally { - metaRAF.close(); + IOUtils.cleanup(null, metaOut); } } @@ -1202,10 +1107,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); - ReplicaBeingWritten replica = null; + ReplicaInPipeline replica = null; try { - replica = append(b.getBlockPoolId(), - (FinalizedReplica) replicaInfo, newGS, + replica = append(b.getBlockPoolId(), replicaInfo, newGS, b.getNumBytes()); } catch (IOException e) { IOUtils.cleanup(null, ref); @@ -1227,70 +1131,38 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { * @throws IOException if moving the replica from finalized directory * to rbw directory fails */ - private ReplicaBeingWritten append(String bpid, - FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) + private ReplicaInPipeline append(String bpid, + ReplicaInfo replicaInfo, long newGS, long estimateBlockLen) throws IOException { try (AutoCloseableLock lock = datasetLock.acquire()) { // If the block is cached, start uncaching it. + if (replicaInfo.getState() != ReplicaState.FINALIZED) { + throw new IOException("Only a Finalized replica can be appended to; " + + "Replica with blk id " + replicaInfo.getBlockId() + " has state " + + replicaInfo.getState()); + } + // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); - // If there are any hardlinks to the block, break them. This ensures we - // are not appending to a file that is part of a previous/ directory. + // If there are any hardlinks to the block, break them. This ensures + // we are not appending to a file that is part of a previous/ directory. replicaInfo.breakHardLinksIfNeeded(); - // construct a RBW replica with the new GS - File blkfile = replicaInfo.getBlockFile(); - FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); - long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes(); - if (v.getAvailable() < bytesReserved) { - throw new DiskOutOfSpaceException("Insufficient space for appending to " - + replicaInfo); - } - File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName()); - File oldmeta = replicaInfo.getMetaFile(); - ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( - replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, - v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); - File newmeta = newReplicaInfo.getMetaFile(); - - // rename meta file to rbw directory - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + oldmeta + " to " + newmeta); - } - try { - NativeIO.renameTo(oldmeta, newmeta); - } catch (IOException e) { - throw new IOException("Block " + replicaInfo + " reopen failed. " + - " Unable to move meta file " + oldmeta + - " to rbw dir " + newmeta, e); - } - - // rename block file to rbw directory - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + blkfile + " to " + newBlkFile - + ", file length=" + blkfile.length()); - } - try { - NativeIO.renameTo(blkfile, newBlkFile); - } catch (IOException e) { - try { - NativeIO.renameTo(newmeta, oldmeta); - } catch (IOException ex) { - LOG.warn("Cannot move meta file " + newmeta + - "back to the finalized directory " + oldmeta, ex); - } - throw new IOException("Block " + replicaInfo + " reopen failed. " + - " Unable to move block file " + blkfile + - " to rbw dir " + newBlkFile, e); + FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); + ReplicaInPipeline rip = v.append(bpid, replicaInfo, + newGS, estimateBlockLen); + if (rip.getReplicaInfo().getState() != ReplicaState.RBW) { + throw new IOException("Append on block " + replicaInfo.getBlockId() + + " returned a replica of state " + rip.getReplicaInfo().getState() + + "; expected RBW"); } - // Replace finalized replica by a RBW replica in replicas map - volumeMap.add(bpid, newReplicaInfo); - v.reserveSpaceForReplica(bytesReserved); - return newReplicaInfo; + volumeMap.add(bpid, rip.getReplicaInfo()); + return rip; } } + @SuppressWarnings("serial") private static class MustStopExistingWriter extends Exception { private final ReplicaInPipeline rip; @@ -1298,7 +1170,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { this.rip = rip; } - ReplicaInPipeline getReplica() { + ReplicaInPipeline getReplicaInPipeline() { return rip; } } @@ -1327,7 +1199,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // stop the previous writer before check a replica's length long replicaLen = replicaInfo.getNumBytes(); if (replicaInfo.getState() == ReplicaState.RBW) { - ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; + ReplicaInPipeline rbw = (ReplicaInPipeline) replicaInfo; if (!rbw.attemptToSetWriter(null, Thread.currentThread())) { throw new MustStopExistingWriter(rbw); } @@ -1360,17 +1232,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { try { try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); - FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); - ReplicaBeingWritten replica; + ReplicaInPipeline replica; try { // change the replica's state/gs etc. if (replicaInfo.getState() == ReplicaState.FINALIZED) { - replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, + replica = append(b.getBlockPoolId(), replicaInfo, newGS, b.getNumBytes()); } else { //RBW - bumpReplicaGS(replicaInfo, newGS); - replica = (ReplicaBeingWritten) replicaInfo; + replicaInfo.bumpReplicaGS(newGS); + replica = (ReplicaInPipeline) replicaInfo; } } catch (IOException e) { IOUtils.cleanup(null, ref); @@ -1379,7 +1250,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return new ReplicaHandler(replica, ref); } } catch (MustStopExistingWriter e) { - e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout()); + e.getReplicaInPipeline() + .stopWriter(datanode.getDnConf().getXceiverStopTimeout()); } } } @@ -1394,7 +1266,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS - bumpReplicaGS(replicaInfo, newGS); + replicaInfo.bumpReplicaGS(newGS); // finalize the replica if RBW if (replicaInfo.getState() == ReplicaState.RBW) { finalizeReplica(b.getBlockPoolId(), replicaInfo); @@ -1402,40 +1274,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return replicaInfo; } } catch (MustStopExistingWriter e) { - e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout()); + e.getReplicaInPipeline() + .stopWriter(datanode.getDnConf().getXceiverStopTimeout()); } } } - /** - * Bump a replica's generation stamp to a new one. - * Its on-disk meta file name is renamed to be the new one too. - * - * @param replicaInfo a replica - * @param newGS new generation stamp - * @throws IOException if rename fails - */ - private void bumpReplicaGS(ReplicaInfo replicaInfo, - long newGS) throws IOException { - long oldGS = replicaInfo.getGenerationStamp(); - File oldmeta = replicaInfo.getMetaFile(); - replicaInfo.setGenerationStamp(newGS); - File newmeta = replicaInfo.getMetaFile(); - - // rename meta file to new GS - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + oldmeta + " to " + newmeta); - } - try { - NativeIO.renameTo(oldmeta, newmeta); - } catch (IOException e) { - replicaInfo.setGenerationStamp(oldGS); // restore old GS - throw new IOException("Block " + replicaInfo + " reopen failed. " + - " Unable to move meta file " + oldmeta + - " to " + newmeta, e); - } - } - @Override // FsDatasetSpi public ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) @@ -1482,18 +1326,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { datanode.getMetrics().incrRamDiskBlocksWriteFallback(); } - File f; + ReplicaInPipeline newReplicaInfo; try { - f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); + newReplicaInfo = v.createRbw(b); + if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) { + throw new IOException("CreateRBW returned a replica of state " + + newReplicaInfo.getReplicaInfo().getState() + + " for block " + b.getBlockId()); + } } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } - ReplicaBeingWritten newReplicaInfo = - new ReplicaBeingWritten(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); - volumeMap.add(b.getBlockPoolId(), newReplicaInfo); + volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); return new ReplicaHandler(newReplicaInfo, ref); } } @@ -1507,14 +1353,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { while (true) { try { try (AutoCloseableLock lock = datasetLock.acquire()) { - ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); - + ReplicaInfo replicaInfo = + getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state if (replicaInfo.getState() != ReplicaState.RBW) { throw new ReplicaNotFoundException( ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo); } - ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; + ReplicaInPipeline rbw = (ReplicaInPipeline)replicaInfo; if (!rbw.attemptToSetWriter(null, Thread.currentThread())) { throw new MustStopExistingWriter(rbw); } @@ -1522,12 +1368,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd); } } catch (MustStopExistingWriter e) { - e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout()); + e.getReplicaInPipeline().stopWriter( + datanode.getDnConf().getXceiverStopTimeout()); } } } - private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw, + private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { try (AutoCloseableLock lock = datasetLock.acquire()) { @@ -1551,20 +1398,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { minBytesRcvd + ", " + maxBytesRcvd + "]."); } - FsVolumeReference ref = rbw.getVolume().obtainReference(); + FsVolumeReference ref = rbw.getReplicaInfo() + .getVolume().obtainReference(); try { // Truncate the potentially corrupt portion. // If the source was client and the last node in the pipeline was lost, // any corrupt data written after the acked length can go unnoticed. if (numBytes > bytesAcked) { - final File replicafile = rbw.getBlockFile(); - truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); + rbw.getReplicaInfo().truncateBlock(bytesAcked); rbw.setNumBytes(bytesAcked); rbw.setLastChecksumAndDataLen(bytesAcked, null); } // bump the replica's generation stamp to newGS - bumpReplicaGS(rbw, newGS); + rbw.getReplicaInfo().bumpReplicaGS(newGS); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; @@ -1576,6 +1423,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { @Override // FsDatasetSpi public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { + try (AutoCloseableLock lock = datasetLock.acquire()) { final long blockId = b.getBlockId(); final long expectedGs = b.getGenerationStamp(); @@ -1583,21 +1431,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { LOG.info("Convert " + b + " from Temporary to RBW, visible length=" + visible); - final ReplicaInPipeline temp; - - // get replica - final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId); - if (r == null) { - throw new ReplicaNotFoundException( - ReplicaNotFoundException.NON_EXISTENT_REPLICA + b); - } - // check the replica's state - if (r.getState() != ReplicaState.TEMPORARY) { - throw new ReplicaAlreadyExistsException( - "r.getState() != ReplicaState.TEMPORARY, r=" + r); + final ReplicaInfo temp; + { + // get replica + final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId); + if (r == null) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.NON_EXISTENT_REPLICA + b); + } + // check the replica's state + if (r.getState() != ReplicaState.TEMPORARY) { + throw new ReplicaAlreadyExistsException( + "r.getState() != ReplicaState.TEMPORARY, r=" + r); + } + temp = r; } - temp = (ReplicaInPipeline) r; - // check generation stamp if (temp.getGenerationStamp() != expectedGs) { throw new ReplicaAlreadyExistsException( @@ -1621,17 +1469,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { throw new IOException("r.getVolume() = null, temp=" + temp); } - // move block files to the rbw directory - BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId()); - final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), - bpslice.getRbwDir()); - // create RBW - final ReplicaBeingWritten rbw = new ReplicaBeingWritten( - blockId, numBytes, expectedGs, - v, dest.getParentFile(), Thread.currentThread(), 0); - rbw.setBytesAcked(visible); + final ReplicaInPipeline rbw = v.convertTemporaryToRbw(b, temp); + + if(rbw.getState() != ReplicaState.RBW) { + throw new IOException("Expected replica state: " + ReplicaState.RBW + + " obtained " + rbw.getState() + " for converting block " + + b.getBlockId()); + } // overwrite the RBW in the volume map - volumeMap.add(b.getBlockPoolId(), rbw); + volumeMap.add(b.getBlockPoolId(), rbw.getReplicaInfo()); return rbw; } } @@ -1653,22 +1499,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes()); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); - // create a temporary file to hold block in the designated volume - File f; + ReplicaInPipeline newReplicaInfo; try { - f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); + newReplicaInfo = v.createTemporary(b); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } - ReplicaInPipeline newReplicaInfo = - new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, - f.getParentFile(), b.getLocalBlock().getNumBytes()); - volumeMap.add(b.getBlockPoolId(), newReplicaInfo); + + volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); return new ReplicaHandler(newReplicaInfo, ref); } else { - if (!(currentReplicaInfo.getGenerationStamp() < b - .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) { + if (!(currentReplicaInfo.getGenerationStamp() < b.getGenerationStamp() + && (currentReplicaInfo.getState() == ReplicaState.TEMPORARY + || currentReplicaInfo.getState() == ReplicaState.RBW))) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + currentReplicaInfo.getState() + " and thus cannot be created."); @@ -1687,8 +1531,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } // Stop the previous writer - ((ReplicaInPipeline) lastFoundReplicaInfo) - .stopWriter(writerStopTimeoutMs); + ((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs); } while (true); } @@ -1737,29 +1580,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { finalizeReplica(b.getBlockPoolId(), replicaInfo); } } - - private FinalizedReplica finalizeReplica(String bpid, + + private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { try (AutoCloseableLock lock = datasetLock.acquire()) { - FinalizedReplica newReplicaInfo = null; + ReplicaInfo newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && - ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState() - == ReplicaState.FINALIZED) { - newReplicaInfo = (FinalizedReplica) - ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica(); + replicaInfo.getOriginalReplica().getState() + == ReplicaState.FINALIZED) { + newReplicaInfo = replicaInfo.getOriginalReplica(); } else { - FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); - File f = replicaInfo.getBlockFile(); + FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); if (v == null) { - throw new IOException("No volume for temporary file " + f + - " for block " + replicaInfo); + throw new IOException("No volume for block " + replicaInfo); } - File dest = v.addFinalizedBlock( - bpid, replicaInfo, f, replicaInfo.getBytesReserved()); - newReplicaInfo = - new FinalizedReplica(replicaInfo, v, dest.getParentFile()); - + newReplicaInfo = v.addFinalizedBlock( + bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved()); if (v.isTransientStorage()) { releaseLockedMemory( replicaInfo.getOriginalBytesReserved() @@ -1770,8 +1607,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); } } + assert newReplicaInfo.getState() == ReplicaState.FINALIZED + : "Replica should be finalized"; volumeMap.add(bpid, newReplicaInfo); - return newReplicaInfo; } } @@ -1784,14 +1622,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); - if (replicaInfo != null - && replicaInfo.getState() == ReplicaState.TEMPORARY) { + if (replicaInfo != null && + replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); // delete the on-disk temp file - if (delBlockFromDisk(replicaInfo.getBlockFile(), - replicaInfo.getMetaFile(), b.getLocalBlock())) { + if (delBlockFromDisk(replicaInfo)) { LOG.warn("Block " + b + " unfinalized and removed. "); } if (replicaInfo.getVolume().isTransientStorage()) { @@ -1804,23 +1641,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { /** * Remove a block from disk - * @param blockFile block file - * @param metaFile block meta file - * @param b a block - * @return true if on-disk files are deleted; false otherwise + * @param info the replica that needs to be deleted + * @return true if data for the replica are deleted; false otherwise */ - private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) { - if (blockFile == null) { - LOG.warn("No file exists for block: " + b); - return true; - } + private boolean delBlockFromDisk(ReplicaInfo info) { - if (!blockFile.delete()) { - LOG.warn("Not able to delete the block file: " + blockFile); + if (!info.deleteBlockData()) { + LOG.warn("Not able to delete the block data for replica " + info); return false; } else { // remove the meta file - if (metaFile != null && !metaFile.delete()) { - LOG.warn("Not able to delete the meta block file: " + metaFile); + if (!info.deleteMetadata()) { + LOG.warn("Not able to delete the meta data for replica " + info); return false; } } @@ -1859,20 +1690,19 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { continue; } switch(b.getState()) { - case FINALIZED: - case RBW: - case RWR: - builders.get(b.getVolume().getStorageID()).add(b); - break; - case RUR: - ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b; - builders.get(rur.getVolume().getStorageID()) - .add(rur.getOriginalReplica()); - break; - case TEMPORARY: - break; - default: - assert false : "Illegal ReplicaInfo state."; + case FINALIZED: + case RBW: + case RWR: + builders.get(b.getVolume().getStorageID()).add(b); + break; + case RUR: + ReplicaInfo orig = b.getOriginalReplica(); + builders.get(b.getVolume().getStorageID()).add(orig); + break; + case TEMPORARY: + break; + default: + assert false : "Illegal ReplicaInfo state."; } } } @@ -1889,13 +1719,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { * Get the list of finalized blocks from in-memory blockmap for a block pool. */ @Override - public List<FinalizedReplica> getFinalizedBlocks(String bpid) { + public List<ReplicaInfo> getFinalizedBlocks(String bpid) { try (AutoCloseableLock lock = datasetLock.acquire()) { - ArrayList<FinalizedReplica> finalized = - new ArrayList<FinalizedReplica>(volumeMap.size(bpid)); + ArrayList<ReplicaInfo> finalized = + new ArrayList<ReplicaInfo>(volumeMap.size(bpid)); for (ReplicaInfo b : volumeMap.replicas(bpid)) { if (b.getState() == ReplicaState.FINALIZED) { - finalized.add(new FinalizedReplica((FinalizedReplica) b)); + finalized.add(new ReplicaBuilder(ReplicaState.FINALIZED) + .from(b).build()); } } return finalized; @@ -1906,15 +1737,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { * Get the list of finalized blocks from in-memory blockmap for a block pool. */ @Override - public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage( + public List<ReplicaInfo> getFinalizedBlocksOnPersistentStorage( String bpid) { try (AutoCloseableLock lock = datasetLock.acquire()) { - ArrayList<FinalizedReplica> finalized = - new ArrayList<FinalizedReplica>(volumeMap.size(bpid)); + ArrayList<ReplicaInfo> finalized = + new ArrayList<ReplicaInfo>(volumeMap.size(bpid)); for (ReplicaInfo b : volumeMap.replicas(bpid)) { - if (!b.getVolume().isTransientStorage() && + if(!b.getVolume().isTransientStorage() && b.getState() == ReplicaState.FINALIZED) { - finalized.add(new FinalizedReplica((FinalizedReplica) b)); + finalized.add(new ReplicaBuilder(ReplicaState.FINALIZED) + .from(b).build()); } } return finalized; @@ -1951,8 +1783,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (replicaInfo.getState() != state) { throw new UnexpectedReplicaStateException(b,state); } - if (!replicaInfo.getBlockFile().exists()) { - throw new FileNotFoundException(replicaInfo.getBlockFile().getPath()); + if (!replicaInfo.blockDataExists()) { + throw new FileNotFoundException(replicaInfo.getBlockURI().toString()); } long onDiskLength = getLength(b); if (onDiskLength < minLength) { @@ -1991,46 +1823,44 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { /** * Find the file corresponding to the block and return it if it exists. */ - File validateBlockFile(String bpid, long blockId) { + ReplicaInfo validateBlockFile(String bpid, long blockId) { //Should we check for metadata file too? - final File f; + final ReplicaInfo r; try (AutoCloseableLock lock = datasetLock.acquire()) { - f = getFile(bpid, blockId, false); + r = volumeMap.get(bpid, blockId); } - - if(f != null ) { - if(f.exists()) - return f; - + + if (r != null) { + if (r.blockDataExists()) { + return r; + } // if file is not null, but doesn't exist - possibly disk failed datanode.checkDiskErrorAsync(); } - + if (LOG.isDebugEnabled()) { - LOG.debug("blockId=" + blockId + ", f=" + f); + LOG.debug("blockId=" + blockId + ", replica=" + r); } return null; } /** Check the files of a replica. */ static void checkReplicaFiles(final ReplicaInfo r) throws IOException { - //check replica's file - final File f = r.getBlockFile(); - if (!f.exists()) { - throw new FileNotFoundException("File " + f + " not found, r=" + r); + //check replica's data exists + if (!r.blockDataExists()) { + throw new FileNotFoundException("Block data not found, r=" + r); } - if (r.getBytesOnDisk() != f.length()) { - throw new IOException("File length mismatched. The length of " - + f + " is " + f.length() + " but r=" + r); + if (r.getBytesOnDisk() != r.getBlockDataLength()) { + throw new IOException("Block length mismatch, len=" + + r.getBlockDataLength() + " but r=" + r); } //check replica's meta file - final File metafile = FsDatasetUtil.getMetaFile(f, r.getGenerationStamp()); - if (!metafile.exists()) { - throw new IOException("Metafile " + metafile + " does not exist, r=" + r); + if (!r.metadataExists()) { + throw new IOException(r.getMetadataURI() + " does not exist, r=" + r); } - if (metafile.length() == 0) { - throw new IOException("Metafile " + metafile + " is empty, r=" + r); + if (r.getMetadataLength() == 0) { + throw new IOException("Metafile is empty, r=" + r); } } @@ -2041,7 +1871,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public void invalidate(String bpid, Block invalidBlks[]) throws IOException { final List<String> errors = new ArrayList<String>(); for (int i = 0; i < invalidBlks.length; i++) { - final File f; + final ReplicaInfo removing; final FsVolumeImpl v; try (AutoCloseableLock lock = datasetLock.acquire()) { final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); @@ -2056,27 +1886,31 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { + ": GenerationStamp not matched, info=" + info); continue; } - f = info.getBlockFile(); v = (FsVolumeImpl)info.getVolume(); if (v == null) { errors.add("Failed to delete replica " + invalidBlks[i] - + ". No volume for this replica, file=" + f); + + ". No volume for replica " + info); continue; } - File parent = f.getParentFile(); - if (parent == null) { - errors.add("Failed to delete replica " + invalidBlks[i] - + ". Parent not found for file " + f); - continue; + try { + File blockFile = new File(info.getBlockURI()); + if (blockFile != null && blockFile.getParentFile() == null) { + errors.add("Failed to delete replica " + invalidBlks[i] + + ". Parent not found for block file: " + blockFile); + continue; + } + } catch(IllegalArgumentException e) { + LOG.warn("Parent directory check failed; replica " + info + + " is not backed by a local file"); } - ReplicaInfo removing = volumeMap.remove(bpid, invalidBlks[i]); + removing = volumeMap.remove(bpid, invalidBlks[i]); addDeletingBlock(bpid, removing.getBlockId()); if (LOG.isDebugEnabled()) { - LOG.debug("Block file " + removing.getBlockFile().getName() + LOG.debug("Block file " + removing.getBlockURI() + " is to be deleted"); } - if (removing instanceof ReplicaInPipelineInterface) { - ((ReplicaInPipelineInterface) removing).releaseAllBytesReserved(); + if (removing instanceof ReplicaInPipeline) { + ((ReplicaInPipeline) removing).releaseAllBytesReserved(); } } @@ -2104,10 +1938,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // It's ok to unlink the block file before the uncache operation // finishes. try { - asyncDiskService.deleteAsync(v.obtainReference(), f, - FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), + asyncDiskService.deleteAsync(v.obtainReference(), removing, new ExtendedBlock(bpid, invalidBlks[i]), - dataStorage.getTrashDirectoryForBlockFile(bpid, f)); + dataStorage.getTrashDirectoryForReplica(bpid, removing)); } catch (ClosedChannelException e) { LOG.warn("Volume " + v + " is closed, ignore the deletion task for " + "block " + invalidBlks[i]); @@ -2192,7 +2025,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { cacheManager.numBlocksFailedToCache.incrementAndGet(); } } - blockFileName = info.getBlockFile().getAbsolutePath(); + blockFileName = info.getBlockURI().toString(); length = info.getVisibleLength(); genstamp = info.getGenerationStamp(); volumeExecutor = volume.getCacheExecutor(); @@ -2224,26 +2057,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public boolean contains(final ExtendedBlock block) { try (AutoCloseableLock lock = datasetLock.acquire()) { final long blockId = block.getLocalBlock().getBlockId(); - return getFile(block.getBlockPoolId(), blockId, false) != null; - } - } - - /** - * Turn the block identifier into a filename - * @param bpid Block pool Id - * @param blockId a block's id - * @return on disk data file path; null if the replica does not exist - */ - File getFile(final String bpid, final long blockId, boolean touch) { - ReplicaInfo info = volumeMap.get(bpid, blockId); - if (info != null) { - if (touch && info.getVolume().isTransientStorage()) { - ramDiskReplicaTracker.touch(bpid, blockId); - datanode.getMetrics().incrRamDiskBlocksReadHits(); - } - return info.getBlockFile(); + final String bpid = block.getBlockPoolId(); + final ReplicaInfo r = volumeMap.get(bpid, blockId); + return (r != null && r.blockDataExists()); } - return null; } /** @@ -2373,7 +2190,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } return; } - if (!memBlockInfo.getBlockFile().exists()) { + if (!memBlockInfo.blockDataExists()) { // Block is in memory and not on the disk // Remove the block from volumeMap volumeMap.remove(bpid, blockId); @@ -2396,8 +2213,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { */ if (memBlockInfo == null) { // Block is missing in memory - add the block to volumeMap - ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, - diskFile.length(), diskGS, vol, diskFile.getParentFile()); + ReplicaInfo diskBlockInfo = new ReplicaBuilder(ReplicaState.FINALIZED) + .setBlockId(blockId) + .setLength(diskFile.length()) + .setGenerationStamp(diskGS) + .setFsVolume(vol) + .setDirectoryToUse(diskFile.getParentFile()) + .build(); volumeMap.add(bpid, diskBlockInfo); if (vol.isTransientStorage()) { long lockedBytesReserved = @@ -2413,21 +2235,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { * Block exists in volumeMap and the block file exists on the disk */ // Compare block files - File memFile = memBlockInfo.getBlockFile(); - if (memFile.exists()) { - if (memFile.compareTo(diskFile) != 0) { + if (memBlockInfo.blockDataExists()) { + if (memBlockInfo.getBlockURI().compareTo(diskFile.toURI()) != 0) { if (diskMetaFile.exists()) { - if (memBlockInfo.getMetaFile().exists()) { + if (memBlockInfo.metadataExists()) { // We have two sets of block+meta files. Decide which one to // keep. - ReplicaInfo diskBlockInfo = new FinalizedReplica( - blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile()); - ((FsVolumeImpl) vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas( - memBlockInfo, diskBlockInfo, volumeMap); + ReplicaInfo diskBlockInfo = + new ReplicaBuilder(ReplicaState.FINALIZED) + .setBlockId(blockId) + .setLength(diskFile.length()) + .setGenerationStamp(diskGS) + .setFsVolume(vol) + .setDirectoryToUse(diskFile.getParentFile()) + .build(); + ((FsVolumeImpl) vol).getBlockPoolSlice(bpid) + .resolveDuplicateReplicas( + memBlockInfo, diskBlockInfo, volumeMap); } } else { if (!diskFile.delete()) { - LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan"); + LOG.warn("Failed to delete " + diskFile); } } } @@ -2436,12 +2264,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Update the block with the file found on the disk. Since the block // file and metadata file are found as a pair on the disk, update // the block based on the metadata file found on the disk - LOG.warn("Block file in volumeMap " - + memFile.getAbsolutePath() + LOG.warn("Block file in replica " + + memBlockInfo.getBlockURI() + " does not exist. Updating it to the file found during scan " + diskFile.getAbsolutePath()); - memBlockInfo.setDir(diskFile.getParentFile()); - memFile = diskFile; + memBlockInfo.updateWithReplica( + StorageLocation.parse(diskFile.toString())); LOG.warn("Updating generation stamp for block " + blockId + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS); @@ -2463,24 +2291,31 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Metadata file corresponding to block in memory is missing // If metadata file found during the scan is on the same directory // as the block file, then use the generation stamp from it - long gs = diskMetaFile != null && diskMetaFile.exists() - && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS - : HdfsConstants.GRANDFATHER_GENERATION_STAMP; - - LOG.warn("Updating generation stamp for block " + blockId - + " from " + memBlockInfo.getGenerationStamp() + " to " + gs); - - memBlockInfo.setGenerationStamp(gs); + try { + File memFile = new File(memBlockInfo.getBlockURI()); + long gs = diskMetaFile != null && diskMetaFile.exists() + && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS + : HdfsConstants.GRANDFATHER_GENERATION_STAMP; + + LOG.warn("Updating generation stamp for block " + blockId + + " from " + memBlockInfo.getGenerationStamp() + " to " + gs); + + memBlockInfo.setGenerationStamp(gs); + } catch (IllegalArgumentException e) { + //exception arises because the URI cannot be converted to a file + LOG.warn("Block URI could not be resolved to a file", e); + } } } // Compare block size - if (memBlockInfo.getNumBytes() != memFile.length()) { + if (memBlockInfo.getNumBytes() != memBlockInfo.getBlockDataLength()) { // Update the length based on the block file corruptBlock = new Block(memBlockInfo); LOG.warn("Updating size of block " + blockId + " from " - + memBlockInfo.getNumBytes() + " to " + memFile.length()); - memBlockInfo.setNumBytes(memFile.length()); + + memBlockInfo.getNumBytes() + " to " + + memBlockInfo.getBlockDataLength()); + memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength()); } } @@ -2531,7 +2366,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return initReplicaRecoveryImpl(bpid, map, block, recoveryId); } } catch (MustStopExistingWriter e) { - e.getReplica().stopWriter(xceiverStopTimeout); + e.getReplicaInPipeline().stopWriter(xceiverStopTimeout); } } } @@ -2549,20 +2384,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } //stop writer if there is any - if (replica instanceof ReplicaInPipeline) { + if (replica.getState() == ReplicaState.TEMPORARY || + replica.getState() == ReplicaState.RBW) { final ReplicaInPipeline rip = (ReplicaInPipeline)replica; if (!rip.attemptToSetWriter(null, Thread.currentThread())) { throw new MustStopExistingWriter(rip); } //check replica bytes on disk. - if (rip.getBytesOnDisk() < rip.getVisibleLength()) { + if (replica.getBytesOnDisk() < replica.getVisibleLength()) { throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" - + " getBytesOnDisk() < getVisibleLength(), rip=" + rip); + + " getBytesOnDisk() < getVisibleLength(), rip=" + replica); } //check the replica's files - checkReplicaFiles(rip); + checkReplicaFiles(replica); } //check generation stamp @@ -2580,9 +2416,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } //check RUR - final ReplicaUnderRecovery rur; + final ReplicaInfo rur; if (replica.getState() == ReplicaState.RUR) { - rur = (ReplicaUnderRecovery)replica; + rur = replica; if (rur.getRecoveryID() >= recoveryId) { throw new RecoveryInProgressException( "rur.getRecoveryID() >= recoveryId = " + recoveryId @@ -2594,7 +2430,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { + " from " + oldRecoveryID + " to " + recoveryId); } else { - rur = new ReplicaUnderRecovery(replica, recoveryId); + rur = new ReplicaBuilder(ReplicaState.RUR) + .from(replica).setRecoveryId(recoveryId).build(); map.add(bpid, rur); LOG.info("initReplicaRecovery: changing replica state for " + block + " from " + replica.getState() @@ -2640,8 +2477,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { checkReplicaFiles(replica); //update replica - final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock - .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, + final ReplicaInfo finalized = updateReplicaUnderRecovery(oldBlock + .getBlockPoolId(), replica, recoveryId, newBlockId, newlength); boolean copyTruncate = newBlockId != oldBlock.getBlockId(); @@ -2661,7 +2498,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { + ", len=" + oldBlock.getNumBytes() + ", finalized=" + finalized; } - //check replica files after update checkReplicaFiles(finalized); @@ -2669,9 +2505,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } } - private FinalizedReplica updateReplicaUnderRecovery( + private ReplicaInfo updateReplicaUnderRecovery( String bpid, - ReplicaUnderRecovery rur, + ReplicaInfo rur, long recoveryId, long newBlockId, long newlength) throws IOException { @@ -2682,18 +2518,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId; - File blockFile; - File metaFile; // bump rur's GS to be recovery id if(!copyOnTruncate) { - bumpReplicaGS(rur, recoveryId); - blockFile = rur.getBlockFile(); - metaFile = rur.getMetaFile(); - } else { - File[] copiedReplicaFiles = - copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId); - blockFile = copiedReplicaFiles[1]; - metaFile = copiedReplicaFiles[0]; + rur.bumpReplicaGS(recoveryId); } //update length @@ -2701,48 +2528,34 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { throw new IOException("rur.getNumBytes() < newlength = " + newlength + ", rur=" + rur); } + if (rur.getNumBytes() > newlength) { - rur.breakHardLinksIfNeeded(); - truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); if(!copyOnTruncate) { + rur.breakHardLinksIfNeeded(); + rur.truncateBlock(newlength); // update RUR with the new length rur.setNumBytes(newlength); } else { // Copying block to a new block with new blockId. // Not truncating original block. - FsVolumeSpi volume = rur.getVolume(); - String blockPath = blockFile.getAbsolutePath(); - String volumePath = volume.getBasePath(); - assert blockPath.startsWith(volumePath) : - "New block file: " + blockPath + " must be on " + - "same volume as recovery replica: " + volumePath; - ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( - newBlockId, recoveryId, volume, blockFile.getParentFile(), - newlength); + FsVolumeImpl volume = (FsVolumeImpl) rur.getVolume(); + ReplicaInPipeline newReplicaInfo = volume.updateRURCopyOnTruncate( + rur, bpid, newBlockId, recoveryId, newlength); + if (newReplicaInfo.getState() != ReplicaState.RBW) { + throw new IOException("Append on block " + rur.getBlockId() + + " returned a replica of state " + newReplicaInfo.getState() + + "; expected RBW"); + } + newReplicaInfo.setNumBytes(newlength); - volumeMap.add(bpid, newReplicaInfo); - finalizeReplica(bpid, newReplicaInfo); + volumeMap.add(bpid, newReplicaInfo.getReplicaInfo()); + finalizeReplica(bpid, newReplicaInfo.getReplicaInfo()); } - } - + } // finalize the block return finalizeReplica(bpid, rur); } - private File[] copyReplicaWithNewBlockIdAndGS( - ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS) - throws IOException { - String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId; - FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); - final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir(); - final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId); - final File dstBlockFile = new File(destDir, blockFileName); - final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); - return copyBlockFiles(replicaInfo.getMetaFile(), - replicaInfo.getBlockFile(), - dstMetaFile, dstBlockFile, true, smallBufferSize, conf); - } - @Override // FsDatasetSpi public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { @@ -2886,10 +2699,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } } - File datafile = getBlockFile(block); - File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp()); + ReplicaInfo r = getBlockReplica(block); + File blockFile = new File(r.getBlockURI()); + File metaFile = new File(r.getMetadataURI()); BlockLocalPathInfo info = new BlockLocalPathInfo(block, - datafile.getAbsolutePath(), metafile.getAbsolutePath()); + blockFile.getAbsolutePath(), metaFile.toString()); return info; } @@ -3001,8 +2815,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } private void removeOldReplica(ReplicaInfo replicaInfo, - ReplicaInfo newReplicaInfo, File blockFile, File metaFile, - long blockFileUsed, long metaFileUsed, final String bpid) { + ReplicaInfo newReplicaInfo, final String bpid) { // Before deleting the files from old storage we must notify the // NN that the files are on the new storage. Else a blockReport from // the transient storage might cause the NN to think the blocks are lost. @@ -3019,11 +2832,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { newReplicaInfo.isOnTransientStorage()); // Remove the old replicas - if (blockFile.delete() || !blockFile.exists()) { + if (replicaInfo.deleteBlockData() || !replicaInfo.blockDataExists()) { FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume(); - volume.onBlockFileDeletion(bpid, blockFileUsed); - if (metaFile.delete() || !metaFile.exists()) { - volume.onMetaFileDeletion(bpid, metaFileUsed); + volume.onBlockFileDeletion(bpid, replicaInfo.getBytesOnDisk()); + if (replicaInfo.deleteMetadata() || !replicaInfo.metadataExists()) { + volume.onMetaFileDeletion(bpid, replicaInfo.getMetadataLength()); } } @@ -3083,8 +2896,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { asyncLazyPersistService.submitLazyPersistTask( block.getBlockPoolId(), block.getBlockId(), replicaInfo.getGenerationStamp(), block.getCreationTime(), - replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), - targetReference); + replicaInfo, targetReference); } } } @@ -3122,18 +2934,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } ReplicaInfo replicaInfo, newReplicaInfo; - File blockFile, metaFile; - long blockFileUsed, metaFileUsed; final String bpid = replicaState.getBlockPoolId(); try (AutoCloseableLock lock = datasetLock.acquire()) { replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); - blockFile = replicaInfo.getBlockFile(); - metaFile = replicaInfo.getMetaFile(); - blockFileUsed = blockFile.length(); - metaFileUsed = metaFile.length(); ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(), replicaState.getBlockId(), false); @@ -3141,16 +2947,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // the target volume BlockPoolSlice bpSlice = replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid); - File newBlockFile = bpSlice.activateSavedReplica( - replicaInfo, replicaState.getSavedMetaFile(), - replicaState.getSavedBlockFile()); newReplicaInfo = - new FinalizedReplica(replicaInfo.getBlockId(), - replicaInfo.getBytesOnDisk(), - replicaInfo.getGenerationStamp(), - replicaState.getLazyPersistVolume(), - newBlockFile.getParentFile()); + bpSlice.activateSavedReplica(replicaInfo, replicaState); // Update the volumeMap entry. volumeMap.add(bpid, newReplicaInfo); @@ -3165,8 +2964,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Delete the block+meta files from RAM disk and release locked // memory. - removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile, - blockFileUsed, metaFileUsed, bpid); + removeOldReplica(replicaInfo, newReplicaInfo, bpid); } } } @@ -3205,16 +3003,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (!blockPinningEnabled) { return; } - - File f = getBlockFile(block); - Path p = new Path(f.getAbsolutePath()); - FsPermission oldPermission = localFS.getFileStatus( - new Path(f.getAbsolutePath())).getPermission(); - //sticky bit is used for pinning purpose - FsPermission permission = new FsPermission(oldPermission.getUserAction(), - oldPermission.getGroupAction(), oldPermission.getOtherAction(), true); - localFS.setPermission(p, permission); + ReplicaInfo r = getBlockReplica(block); + r.setPinning(localFS); } @Override @@ -3222,10 +3013,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (!blockPinningEnabled) { return false; } - File f = getBlockFile(block); - - FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath())); - return fss.getPermission().getStickyBit(); + ReplicaInfo r = getBlockReplica(block); + return r.getPinning(localFS); } @Override @@ -3308,10 +3097,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { for (String blockPoolId : volumeMap.getBlockPoolList()) { Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId); for (ReplicaInfo replicaInfo : replicas) { - if (replicaInfo instanceof ReplicaInPipeline + if ((replicaInfo.getState() == ReplicaState.TEMPORARY + || replicaInfo.getState() == ReplicaState.RBW) && replicaInfo.getVolume().equals(volume)) { - ReplicaInPipeline replicaInPipeline - = (ReplicaInPipeline) replicaInfo; + ReplicaInPipeline replicaInPipeline = + (ReplicaInPipeline) replicaInfo; replicaInPipeline.interruptThread(); } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index f695c8c..a4d433d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -18,14 +18,17 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; +import java.io.FileInputStream; import java.io.FilenameFilter; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.io.IOUtils; /** Utility methods. */ @InterfaceAudience.Private @@ -71,6 +74,21 @@ public class FsDatasetUtil { return matches[0]; } + public static FileInputStream openAndSeek(File file, long offset) + throws IOException { + RandomAccessFile raf = null; + try { + raf = new RandomAccessFile(file, "r"); + if (offset > 0) { + raf.seek(offset); + } + return new FileInputStream(raf.getFD()); + } catch(IOException ioe) { + IOUtils.cleanup(null, raf); + throw ioe; + } + } + /** * Find the meta-file for the specified block file * and then return the generation stamp from the name of the meta-file. http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index afcc5dd..57fab66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -47,11 +47,19 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.hdfs.server.datanode.LocalReplica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; +import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -102,7 +110,7 @@ public class FsVolumeImpl implements FsVolumeSpi { // Disk space reserved for blocks (RBW or Re-replicating) open for write. private AtomicLong reservedForReplicas; private long recentReserved = 0; - + private final Configuration conf; // Capacity configured. This is useful when we want to // limit the visible capacity for tests. If negative, then we just // query from the filesystem. @@ -130,6 +138,7 @@ public class FsVolumeImpl implements FsVolumeSpi { this.usage = new DF(parent, conf); this.storageType = storageType; this.configuredCapacity = -1; + this.conf = conf; cacheExecutor = initializeCacheExecutor(parent); } @@ -896,10 +905,15 @@ public class FsVolumeImpl implements FsVolumeSpi { * @return * @throws IOException */ - File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved) - throws IOException { + ReplicaInfo addFinalizedBlock(String bpid, Block b, ReplicaInfo replicaInfo, + long bytesReserved) throws IOException { releaseReservedSpace(bytesReserved); - return getBlockPoolSlice(bpid).addFinalizedBlock(b, f); + File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo); + return new ReplicaBuilder(ReplicaState.FINALIZED) + .setBlock(replicaInfo) + .setFsVolume(this) + .setDirectoryToUse(dest.getParentFile()) + .build(); } Executor getCacheExecutor() { @@ -950,18 +964,18 @@ public class FsVolumeImpl implements FsVolumeSpi { } } - void addBlockPool(String bpid, Configuration conf) throws IOException { - addBlockPool(bpid, conf, null); + void addBlockPool(String bpid, Configuration c) throws IOException { + addBlockPool(bpid, c, null); } - void addBlockPool(String bpid, Configuration conf, Timer timer) + void addBlockPool(String bpid, Configuration c, Timer timer) throws IOException { File bpdir = new File(currentDir, bpid); BlockPoolSlice bp; if (timer == null) { - bp = new BlockPoolSlice(bpid, this, bpdir, conf, new Timer()); + bp = new BlockPoolSlice(bpid, this, bpdir, c, new Timer()); } else { - bp = new BlockPoolSlice(bpid, this, bpdir, conf, timer); + bp = new BlockPoolSlice(bpid, this, bpdir, c, timer); } bpSlices.put(bpid, bp); } @@ -1053,5 +1067,127 @@ public class FsVolumeImpl implements FsVolumeSpi { DatanodeStorage toDatanodeStorage() { return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType); } + + + public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, + long newGS, long estimateBlockLen) throws IOException { + + long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes(); + if (getAvailable() < bytesReserved) { + throw new DiskOutOfSpaceException("Insufficient space for appending to " + + replicaInfo); + } + + assert replicaInfo.getVolume() == this: + "The volume of the replica should be the same as this volume"; + + // construct a RBW replica with the new GS + File newBlkFile = new File(getRbwDir(bpid), replicaInfo.getBlockName()); + LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW) + .setBlockId(replicaInfo.getBlockId()) + .setLength(replicaInfo.getNumBytes()) + .setGenerationStamp(newGS) + .setFsVolume(this) + .setDirectoryToUse(newBlkFile.getParentFile()) + .setWriterThread(Thread.currentThread()) + .setBytesToReserve(bytesReserved) + .buildLocalReplicaInPipeline(); + + // rename meta file to rbw directory + // rename block file to rbw directory + newReplicaInfo.moveReplicaFrom(replicaInfo, newBlkFile); + + reserveSpaceForReplica(bytesReserved); + return newReplicaInfo; + } + + public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException { + + File f = createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); + LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW) + .setBlockId(b.getBlockId()) + .setGenerationStamp(b.getGenerationStamp()) + .setFsVolume(this) + .setDirectoryToUse(f.getParentFile()) + .setBytesToReserve(b.getNumBytes()) + .buildLocalReplicaInPipeline(); + return newReplicaInfo; + } + + public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b, + ReplicaInfo temp) throws IOException { + + final long blockId = b.getBlockId(); + final long expectedGs = b.getGenerationStamp(); + final long visible = b.getNumBytes(); + final long numBytes = temp.getNumBytes(); + + // move block files to the rbw directory + BlockPoolSlice bpslice = getBlockPoolSlice(b.getBlockPoolId()); + final File dest = FsDatasetImpl.moveBlockFiles(b.getLocalBlock(), temp, + bpslice.getRbwDir()); + // create RBW + final LocalReplicaInPipeline rbw = new ReplicaBuilder(ReplicaState.RBW) + .setBlockId(blockId) + .setLength(numBytes) + .setGenerationStamp(expectedGs) + .setFsVolume(this) + .setDirectoryToUse(dest.getParentFile()) + .setWriterThread(Thread.currentThread()) + .setBytesToReserve(0) + .buildLocalReplicaInPipeline(); + rbw.setBytesAcked(visible); + return rbw; + } + + public ReplicaInPipeline createTemporary(ExtendedBlock b) throws IOException { + // create a temporary file to hold block in the designated volume + File f = createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); + LocalReplicaInPipeline newReplicaInfo = + new ReplicaBuilder(ReplicaState.TEMPORARY) + .setBlockId(b.getBlockId()) + .setGenerationStamp(b.getGenerationStamp()) + .setDirectoryToUse(f.getParentFile()) + .setBytesToReserve(b.getLocalBlock().getNumBytes()) + .setFsVolume(this) + .buildLocalReplicaInPipeline(); + return newReplicaInfo; + } + + public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur, + String bpid, long newBlockId, long recoveryId, long newlength) + throws IOException { + + rur.breakHardLinksIfNeeded(); + File[] copiedReplicaFiles = + copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId); + File blockFile = copiedReplicaFiles[1]; + File metaFile = copiedReplicaFiles[0]; + LocalReplica.truncateBlock(blockFile, metaFile, + rur.getNumBytes(), newlength); + + LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW) + .setBlockId(newBlockId) + .setGenerationStamp(recoveryId) + .setFsVolume(this) + .setDirectoryToUse(blockFile.getParentFile()) + .setBytesToReserve(newlength) + .buildLocalReplicaInPipeline(); + return newReplicaInfo; + } + + private File[] copyReplicaWithNewBlockIdAndGS( + ReplicaInfo replicaInfo, String bpid, long newBlkId, long newGS) + throws IOException { + String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId; + FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); + final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir(); + final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId); + final File dstBlockFile = new File(destDir, blockFileName); + final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); + return FsDatasetImpl.copyBlockFiles(replicaInfo, dstMetaFile, + dstBlockFile, true, DFSUtilClient.getSmallBufferSize(conf), conf); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 634ad42..80d3736 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -311,7 +311,7 @@ class FsVolumeList { } else { // If the volume is not put into a volume scanner, it does not need to // hold the reference. - IOUtils.cleanup(FsDatasetImpl.LOG, ref); + IOUtils.cleanup(null, ref); } // If the volume is used to replace a failed volume, it needs to reset the // volume failure info for this volume. http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 9b467ea..9e549f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import java.io.File; @@ -182,8 +183,7 @@ class RamDiskAsyncLazyPersistService { */ void submitLazyPersistTask(String bpId, long blockId, long genStamp, long creationTime, - File metaFile, File blockFile, - FsVolumeReference target) throws IOException { + ReplicaInfo replica, FsVolumeReference target) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: " + bpId + " block id: " + blockId); @@ -198,31 +198,29 @@ class RamDiskAsyncLazyPersistService { } ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask( - bpId, blockId, genStamp, creationTime, blockFile, metaFile, + bpId, blockId, genStamp, creationTime, replica, target, lazyPersistDir); execute(volume.getCurrentDir(), lazyPersistTask); } class ReplicaLazyPersistTask implements Runnable { - final String bpId; - final long blockId; - final long genStamp; - final long creationTime; - final File blockFile; - final File metaFile; - final FsVolumeReference targetVolume; - final File lazyPersistDir; + private final String bpId; + private final long blockId; + private final long genStamp; + private final long creationTime; + private final ReplicaInfo replicaInfo; + private final FsVolumeReference targetVolume; + private final File lazyPersistDir; ReplicaLazyPersistTask(String bpId, long blockId, long genStamp, long creationTime, - File blockFile, File metaFile, + ReplicaInfo replicaInfo, FsVolumeReference targetVolume, File lazyPersistDir) { this.bpId = bpId; this.blockId = blockId; this.genStamp = genStamp; this.creationTime = creationTime; - this.blockFile = blockFile; - this.metaFile = metaFile; + this.replicaInfo = replicaInfo; this.targetVolume = targetVolume; this.lazyPersistDir = lazyPersistDir; } @@ -232,8 +230,10 @@ class RamDiskAsyncLazyPersistService { // Called in AsyncLazyPersistService.execute for displaying error messages. return "LazyWriter async task of persist RamDisk block pool id:" + bpId + " block pool id: " - + blockId + " with block file " + blockFile - + " and meta file " + metaFile + " to target volume " + targetVolume;} + + blockId + " with block file " + replicaInfo.getBlockURI() + + " and meta file " + replicaInfo.getMetadataURI() + + " to target volume " + targetVolume; + } @Override public void run() { @@ -243,7 +243,7 @@ class RamDiskAsyncLazyPersistService { int smallBufferSize = DFSUtilClient.getSmallBufferSize(EMPTY_HDFS_CONF); // No FsDatasetImpl lock for the file copy File targetFiles[] = FsDatasetImpl.copyBlockFiles( - blockId, genStamp, metaFile, blockFile, lazyPersistDir, true, + blockId, genStamp, replicaInfo, lazyPersistDir, true, smallBufferSize, conf); // Lock FsDataSetImpl during onCompleteLazyPersist callback --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org