Repository: hadoop Updated Branches: refs/heads/trunk c7101fe21 -> 2021f4bdc
HDFS-11187. Optimize disk access for last partial chunk checksum of Finalized replica. Contributed by Wei-Chiu Chuang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2021f4bd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2021f4bd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2021f4bd Branch: refs/heads/trunk Commit: 2021f4bdce3b27c46edaad198f0007a26a8a1391 Parents: c7101fe Author: Wei-Chiu Chuang <weic...@apache.org> Authored: Fri Feb 2 17:15:26 2018 -0800 Committer: Wei-Chiu Chuang <weic...@apache.org> Committed: Fri Feb 2 17:18:42 2018 -0800 ---------------------------------------------------------------------- .../hdfs/server/datanode/BlockSender.java | 56 +++++++++++---- .../hdfs/server/datanode/FinalizedReplica.java | 74 ++++++++++++-------- .../hdfs/server/datanode/ReplicaBuilder.java | 11 ++- .../datanode/fsdataset/impl/FsDatasetImpl.java | 1 + .../datanode/fsdataset/impl/FsVolumeImpl.java | 21 ++++-- .../org/apache/hadoop/hdfs/MiniDFSCluster.java | 23 ++++++ .../namenode/TestListCorruptFileBlocks.java | 4 +- 7 files changed, 140 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2021f4bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 3ff5c75..268007f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -175,8 +175,13 @@ class BlockSender implements java.io.Closeable { * See {{@link BlockSender#isLongRead()} */ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024; - + // The number of bytes per checksum here determines the alignment + // of reads: we always start reading at a checksum chunk boundary, + // even if the checksum type is NULL. So, choosing too big of a value + // would risk sending too much unnecessary data. 512 (1 disk sector) + // is likely to result in minimal extra IO. + private static final long CHUNK_SIZE = 512; /** * Constructor * @@ -250,18 +255,16 @@ class BlockSender implements java.io.Closeable { try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); - if (replica instanceof FinalizedReplica) { - // Load last checksum in case the replica is being written - // concurrently - final FinalizedReplica frep = (FinalizedReplica) replica; - chunkChecksum = frep.getLastChecksumAndDataLen(); - } } if (replica.getState() == ReplicaState.RBW) { final ReplicaInPipeline rbw = (ReplicaInPipeline) replica; waitForMinLength(rbw, startOffset + length); chunkChecksum = rbw.getLastChecksumAndDataLen(); } + if (replica instanceof FinalizedReplica) { + chunkChecksum = getPartialChunkChecksumForFinalized( + (FinalizedReplica)replica); + } if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException("Replica gen stamp < block genstamp, block=" @@ -348,12 +351,8 @@ class BlockSender implements java.io.Closeable { } } if (csum == null) { - // The number of bytes per checksum here determines the alignment - // of reads: we always start reading at a checksum chunk boundary, - // even if the checksum type is NULL. So, choosing too big of a value - // would risk sending too much unnecessary data. 512 (1 disk sector) - // is likely to result in minimal extra IO. - csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512); + csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, + (int)CHUNK_SIZE); } /* @@ -427,6 +426,37 @@ class BlockSender implements java.io.Closeable { } } + private ChunkChecksum getPartialChunkChecksumForFinalized( + FinalizedReplica finalized) throws IOException { + // There are a number of places in the code base where a finalized replica + // object is created. If last partial checksum is loaded whenever a + // finalized replica is created, it would increase latency in DataNode + // initialization. Therefore, the last partial chunk checksum is loaded + // lazily. + + // Load last checksum in case the replica is being written concurrently + final long replicaVisibleLength = replica.getVisibleLength(); + if (replicaVisibleLength % CHUNK_SIZE != 0 && + finalized.getLastPartialChunkChecksum() == null) { + // the finalized replica does not have precomputed last partial + // chunk checksum. Recompute now. + try { + finalized.loadLastPartialChunkChecksum(); + return new ChunkChecksum(finalized.getVisibleLength(), + finalized.getLastPartialChunkChecksum()); + } catch (FileNotFoundException e) { + // meta file is lost. Continue anyway to preserve existing behavior. + DataNode.LOG.warn( + "meta file " + finalized.getMetaFile() + " is missing!"); + return null; + } + } else { + // If the checksum is null, BlockSender will use on-disk checksum. + return new ChunkChecksum(finalized.getVisibleLength(), + finalized.getLastPartialChunkChecksum()); + } + } + /** * close opened files. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/2021f4bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java index e3e0450..b6212be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import org.apache.hadoop.hdfs.protocol.Block; @@ -30,9 +29,9 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; * This class describes a replica that has been finalized. */ public class FinalizedReplica extends LocalReplica { - + private byte[] lastPartialChunkChecksum; /** - * Constructor + * Constructor. * @param blockId block id * @param len replica length * @param genStamp replica generation stamp @@ -41,9 +40,24 @@ public class FinalizedReplica extends LocalReplica { */ public FinalizedReplica(long blockId, long len, long genStamp, FsVolumeSpi vol, File dir) { + this(blockId, len, genStamp, vol, dir, null); + } + + /** + * Constructor. + * @param blockId block id + * @param len replica length + * @param genStamp replica generation stamp + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param checksum the last partial chunk checksum + */ + public FinalizedReplica(long blockId, long len, long genStamp, + FsVolumeSpi vol, File dir, byte[] checksum) { super(blockId, len, genStamp, vol, dir); + this.setLastPartialChunkChecksum(checksum); } - + /** * Constructor * @param block a block @@ -51,7 +65,20 @@ public class FinalizedReplica extends LocalReplica { * @param dir directory path where block and meta files are located */ public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) { + this(block, vol, dir, null); + } + + /** + * Constructor + * @param block a block + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param checksum the last partial chunk checksum + */ + public FinalizedReplica(Block block, FsVolumeSpi vol, File dir, + byte[] checksum) { super(block, vol, dir); + this.setLastPartialChunkChecksum(checksum); } /** @@ -60,6 +87,7 @@ public class FinalizedReplica extends LocalReplica { */ public FinalizedReplica(FinalizedReplica from) { super(from); + this.setLastPartialChunkChecksum(from.getLastPartialChunkChecksum()); } @Override // ReplicaInfo @@ -116,30 +144,18 @@ public class FinalizedReplica extends LocalReplica { " does not support createInfo"); } - /** - * gets the last chunk checksum and the length of the block corresponding - * to that checksum. - * Note, need to be called with the FsDataset lock acquired. May improve to - * lock only the FsVolume in the future. - * @throws IOException - */ - public ChunkChecksum getLastChecksumAndDataLen() throws IOException { - ChunkChecksum chunkChecksum = null; - try { - byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( - getBlockFile(), getMetaFile()); - if (lastChecksum != null) { - chunkChecksum = - new ChunkChecksum(getVisibleLength(), lastChecksum); - } - } catch (FileNotFoundException e) { - // meta file is lost. Try to continue anyway. - DataNode.LOG.warn("meta file " + getMetaFile() + - " is missing!"); - } catch (IOException ioe) { - DataNode.LOG.warn("Unable to read checksum from meta file " + - getMetaFile(), ioe); - } - return chunkChecksum; + public byte[] getLastPartialChunkChecksum() { + return lastPartialChunkChecksum; + } + + public void setLastPartialChunkChecksum(byte[] checksum) { + lastPartialChunkChecksum = checksum; + } + + public void loadLastPartialChunkChecksum() + throws IOException { + byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( + getBlockFile(), getMetaFile()); + setLastPartialChunkChecksum(lastChecksum); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2021f4bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java index 2c55e73..d198197 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java @@ -46,6 +46,7 @@ public class ReplicaBuilder { private Thread writer; private long recoveryId; private Block block; + private byte[] lastPartialChunkChecksum; private ReplicaInfo fromReplica; @@ -178,6 +179,11 @@ public class ReplicaBuilder { return this; } + public ReplicaBuilder setLastPartialChunkChecksum(byte[] checksum) { + this.lastPartialChunkChecksum = checksum; + return this; + } + public LocalReplicaInPipeline buildLocalReplicaInPipeline() throws IllegalArgumentException { LocalReplicaInPipeline info = null; @@ -258,10 +264,11 @@ public class ReplicaBuilder { + "state: " + fromReplica.getState()); } else { if (null != block) { - return new FinalizedReplica(block, volume, directoryUsed); + return new FinalizedReplica(block, volume, directoryUsed, + lastPartialChunkChecksum); } else { return new FinalizedReplica(blockId, length, genStamp, volume, - directoryUsed); + directoryUsed, lastPartialChunkChecksum); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2021f4bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8e7884d..c141293 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1724,6 +1724,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { replicaInfo.getOriginalReplica().getState() == ReplicaState.FINALIZED) { newReplicaInfo = replicaInfo.getOriginalReplica(); + ((FinalizedReplica)newReplicaInfo).loadLastPartialChunkChecksum(); } else { FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); if (v == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/2021f4bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 319bc0e..b8c95a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.Block; @@ -950,10 +952,22 @@ public class FsVolumeImpl implements FsVolumeSpi { long bytesReserved) throws IOException { releaseReservedSpace(bytesReserved); File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo); + byte[] checksum = null; + // copy the last partial checksum if the replica is originally + // in finalized or rbw state. + if (replicaInfo.getState() == ReplicaState.FINALIZED) { + FinalizedReplica finalized = (FinalizedReplica)replicaInfo; + checksum = finalized.getLastPartialChunkChecksum(); + } else if (replicaInfo.getState() == ReplicaState.RBW) { + ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; + checksum = rbw.getLastChecksumAndDataLen().getChecksum(); + } + return new ReplicaBuilder(ReplicaState.FINALIZED) .setBlock(replicaInfo) .setFsVolume(this) .setDirectoryToUse(dest.getParentFile()) + .setLastPartialChunkChecksum(checksum) .build(); } @@ -1183,12 +1197,11 @@ public class FsVolumeImpl implements FsVolumeSpi { .setBytesToReserve(bytesReserved) .buildLocalReplicaInPipeline(); + // Only a finalized replica can be appended. + FinalizedReplica finalized = (FinalizedReplica)replicaInfo; // load last checksum and datalen - LocalReplica localReplica = (LocalReplica)replicaInfo; - byte[] lastChunkChecksum = loadLastPartialChunkChecksum( - localReplica.getBlockFile(), localReplica.getMetaFile()); newReplicaInfo.setLastChecksumAndDataLen( - replicaInfo.getNumBytes(), lastChunkChecksum); + finalized.getVisibleLength(), finalized.getLastPartialChunkChecksum()); // rename meta file to rbw directory // rename block file to rbw directory http://git-wip-us.apache.org/repos/asf/hadoop/blob/2021f4bd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index c694854..107decf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2987,6 +2987,29 @@ public class MiniDFSCluster implements AutoCloseable { } /** + * Return all block files in given directory (recursive search). + */ + public static List<File> getAllBlockFiles(File storageDir) { + List<File> results = new ArrayList<File>(); + File[] files = storageDir.listFiles(); + if (files == null) { + return null; + } + for (File f : files) { + if (f.getName().startsWith(Block.BLOCK_FILE_PREFIX) && + !f.getName().endsWith(Block.METADATA_EXTENSION)) { + results.add(f); + } else if (f.isDirectory()) { + List<File> subdirResults = getAllBlockFiles(f); + if (subdirResults != null) { + results.addAll(subdirResults); + } + } + } + return results; + } + + /** * Get the latest metadata file correpsonding to a block * @param storageDir storage directory * @param blk the block http://git-wip-us.apache.org/repos/asf/hadoop/blob/2021f4bd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java index 0b273df..1f31bdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java @@ -92,7 +92,7 @@ public class TestListCorruptFileBlocks { File storageDir = cluster.getInstanceStorageDir(0, 1); File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); assertTrue("data directory does not exist", data_dir.exists()); - List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir); + List<File> metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir); assertTrue("Data directory does not contain any blocks or there was an " + "IO error", metaFiles != null && !metaFiles.isEmpty()); File metaFile = metaFiles.get(0); @@ -172,7 +172,7 @@ public class TestListCorruptFileBlocks { File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, cluster.getNamesystem().getBlockPoolId()); assertTrue("data directory does not exist", data_dir.exists()); - List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir); + List<File> metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir); assertTrue("Data directory does not contain any blocks or there was an " + "IO error", metaFiles != null && !metaFiles.isEmpty()); File metaFile = metaFiles.get(0); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org