HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring. (Arpit Agarwal)
Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d973656e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d973656e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d973656e Branch: refs/heads/branch-2.6 Commit: d973656e5d20351a65ab78871c4c77e14d150bd6 Parents: 2079d4b Author: arp <a...@apache.org> Authored: Thu Aug 28 23:05:32 2014 -0700 Committer: Jitendra Pandey <Jitendra@Jitendra-Pandeys-MacBook-Pro-4.local> Committed: Fri Oct 17 16:00:50 2014 -0700 ---------------------------------------------------------------------- .../datanode/fsdataset/impl/BlockPoolSlice.java | 11 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 129 +++++++++---------- .../datanode/fsdataset/impl/FsVolumeImpl.java | 16 --- .../fsdataset/impl/LazyWriteReplicaTracker.java | 76 +++++++++-- .../fsdataset/impl/TestLazyPersistFiles.java | 10 +- 5 files changed, 138 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d973656e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 179f3ea..63cac55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -277,10 +277,13 @@ class BlockPoolSlice { return blockFile; } - File lazyPersistReplica(Block b, File f) throws IOException { - File blockFile = FsDatasetImpl.copyBlockFiles(b, f, lazypersistDir); - File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); - dfsUsage.incDfsUsed(b.getNumBytes() + metaFile.length()); + File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException { + if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) { + FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir); + } + File metaFile = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir); + File blockFile = Block.metaToBlockFile(metaFile); + dfsUsage.incDfsUsed(replicaInfo.getNumBytes() + metaFile.length()); return blockFile; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d973656e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 1abc6f9..5c822e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; @@ -704,28 +705,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return dstfile; } - static File copyBlockFiles(Block b, File srcfile, File destdir) + /** + * Copy the block and meta files for the given block from the given + * @return the new meta file. + * @throws IOException + */ + static File copyBlockFiles(ReplicaInfo replicaInfo, File destRoot) throws IOException { - final File dstfile = new File(destdir, b.getBlockName()); - final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp()); - final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp()); + final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId()); + final File dstFile = new File(destDir, replicaInfo.getBlockName()); + final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, replicaInfo.getGenerationStamp()); + final File srcMeta = replicaInfo.getMetaFile(); + final File srcFile = replicaInfo.getBlockFile(); try { - FileUtils.copyFile(srcmeta, dstmeta); + FileUtils.copyFile(srcMeta, dstMeta); } catch (IOException e) { - throw new IOException("Failed to copy meta file for " + b - + " from " + srcmeta + " to " + dstmeta, e); + throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e); } try { - FileUtils.copyFile(srcfile, dstfile); + FileUtils.copyFile(srcFile, dstFile); } catch (IOException e) { - throw new IOException("Failed to copy block file for " + b - + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e); + throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e); } if (LOG.isDebugEnabled()) { - LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta - + " and " + srcfile + " to " + dstfile); + LOG.debug("addBlock: Moved " + srcMeta + " to " + dstMeta); + LOG.debug("addBlock: Moved " + srcFile + " to " + dstFile); } - return dstfile; + return dstMeta; } static private void truncateBlock(File blockFile, File metaFile, @@ -1322,10 +1328,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (v.isTransientStorage()) { lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); - - // Schedule a checkpoint. - ((LazyWriter) lazyWriter.getRunnable()) - .addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId()); } } volumeMap.add(bpid, newReplicaInfo); @@ -2371,32 +2373,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { nbytes, flags); } - private static class BlockIdPair { - final String bpid; - final long blockId; - - BlockIdPair(final String bpid, final long blockId) { - this.bpid = bpid; - this.blockId = blockId; - } - } - - private class LazyWriter implements Runnable { + class LazyWriter implements Runnable { private volatile boolean shouldRun = true; final int checkpointerInterval; - final private Queue<BlockIdPair> blocksPendingCheckpoint; - public LazyWriter(final int checkpointerInterval) { this.checkpointerInterval = checkpointerInterval; - blocksPendingCheckpoint = new LinkedList<BlockIdPair>(); - } - - // Schedule a replica for writing to persistent storage. - public synchronized void addReplicaToLazyWriteQueue( - String bpid, long blockId) { - LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer queue"); - blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId)); } private void moveReplicaToNewVolume(String bpid, long blockId) @@ -2404,76 +2386,85 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid); - FsVolumeImpl targetVolume = null; - Block block = null; - File blockFile = null; + FsVolumeImpl targetVolume; + ReplicaInfo replicaInfo; synchronized (this) { - block = getStoredBlock(bpid, blockId); - blockFile = getFile(bpid, blockId); + replicaInfo = volumeMap.get(bpid, blockId); - if (block == null) { - // The block was deleted before it could be checkpointed. + if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) { + // The block was either deleted before it could be checkpointed or + // it is already on persistent storage. This can occur if a second + // replica on persistent storage was found after the lazy write was + // scheduled. return; } // Pick a target volume for the block. targetVolume = volumes.getNextVolume( - StorageType.DEFAULT, block.getNumBytes()); + StorageType.DEFAULT, replicaInfo.getNumBytes()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); } - LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); File savedBlockFile = targetVolume.getBlockPoolSlice(bpid) - .lazyPersistReplica(block, blockFile); + .lazyPersistReplica(replicaInfo); lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile); - LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + - " to file " + savedBlockFile); + + if (LOG.isDebugEnabled()) { + LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + + " to file " + savedBlockFile); + } } /** * Checkpoint a pending replica to persistent storage now. + * If we fail then move the replica to the end of the queue. * @return true if there is more work to be done, false otherwise. */ private boolean saveNextReplica() { - BlockIdPair blockIdPair = null; - int moreWorkThreshold = 0; + LazyWriteReplicaTracker.ReplicaState replicaState = null; + boolean succeeded = false; try { synchronized (this) { - // Dequeue the next replica waiting to be checkpointed. - blockIdPair = blocksPendingCheckpoint.poll(); - if (blockIdPair == null) { - LOG.info("LazyWriter has no blocks to persist. " + - "Thread going to sleep."); + replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist(); + if (replicaState == null) { return false; } } // Move the replica outside the lock. - moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId); - + moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId); + succeeded = true; } catch(IOException ioe) { - // If we failed, put the block on the queue and let a retry - // interval elapse before we try again so we don't try to keep - // checkpointing the same block in a tight loop. - synchronized (this) { - blocksPendingCheckpoint.add(blockIdPair); - ++moreWorkThreshold; + LOG.warn("Exception saving replica " + replicaState, ioe); + } finally { + if (!succeeded && replicaState != null) { + lazyWriteReplicaTracker.reenqueueReplica(replicaState); } } - synchronized (this) { - return blocksPendingCheckpoint.size() > moreWorkThreshold; - } + return succeeded; } @Override public void run() { + int numSuccessiveFailures = 0; + while (fsRunning && shouldRun) { try { - if (!saveNextReplica()) { + numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1); + + // Sleep if we have no more work to do or if it looks like we are not + // making any forward progress. This is to ensure that if all persist + // operations are failing we don't keep retrying them in a tight loop. + if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) { Thread.sleep(checkpointerInterval * 1000); + numSuccessiveFailures = 0; } } catch (InterruptedException e) { LOG.info("LazyWriter was interrupted, exiting"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d973656e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 994344e..93b5895 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -307,22 +307,6 @@ public class FsVolumeImpl implements FsVolumeSpi { getBlockPoolSlice(bpid).getVolumeMap(volumeMap); } - /** - * Add replicas under the given directory to the volume map - * @param volumeMap the replicas map - * @param dir an input directory - * @param isFinalized true if the directory has finalized replicas; - * false if the directory has rbw replicas - * @throws IOException - */ - void addToReplicasMap(String bpid, ReplicaMap volumeMap, - File dir, boolean isFinalized) throws IOException { - BlockPoolSlice bp = getBlockPoolSlice(bpid); - // TODO move this up - // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); - bp.addToReplicasMap(volumeMap, dir, isFinalized); - } - @Override public String toString() { return currentDir.getAbsolutePath(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d973656e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java index ae28f09..222b63a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java @@ -19,12 +19,11 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import com.google.common.collect.Multimap; import com.google.common.collect.TreeMultimap; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import java.io.File; -import java.util.HashMap; -import java.util.Map; +import java.util.*; class LazyWriteReplicaTracker { @@ -43,7 +42,7 @@ class LazyWriteReplicaTracker { /** * transient storage volume that holds the original replica. */ - final FsVolumeImpl transientVolume; + final FsVolumeSpi transientVolume; /** * Persistent volume that holds or will hold the saved replica. @@ -51,7 +50,7 @@ class LazyWriteReplicaTracker { FsVolumeImpl lazyPersistVolume; File savedBlockFile; - ReplicaState(final String bpid, final long blockId, FsVolumeImpl transientVolume) { + ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) { this.bpid = bpid; this.blockId = blockId; this.transientVolume = transientVolume; @@ -61,6 +60,11 @@ class LazyWriteReplicaTracker { } @Override + public String toString() { + return "[Bpid=" + bpid + ";blockId=" + blockId + "]"; + } + + @Override public int hashCode() { return bpid.hashCode() ^ (int) blockId; } @@ -99,35 +103,43 @@ class LazyWriteReplicaTracker { final Map<String, Map<Long, ReplicaState>> replicaMaps; /** + * Queue of replicas that need to be written to disk. + */ + final Queue<ReplicaState> replicasNotPersisted; + + /** * A map of blockId to persist complete time for transient blocks. This allows * us to evict LRU blocks from transient storage. Protected by 'this' * Object lock. */ - final Map<ReplicaState, Long> persistTimeMap; + final Map<ReplicaState, Long> replicasPersisted; LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) { this.fsDataset = fsDataset; replicaMaps = new HashMap<String, Map<Long, ReplicaState>>(); - persistTimeMap = new HashMap<ReplicaState, Long>(); + replicasNotPersisted = new LinkedList<ReplicaState>(); + replicasPersisted = new HashMap<ReplicaState, Long>(); } TreeMultimap<Long, ReplicaState> getLruMap() { // TODO: This can be made more efficient. TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create(); - for (Map.Entry<ReplicaState, Long> entry : persistTimeMap.entrySet()) { + for (Map.Entry<ReplicaState, Long> entry : replicasPersisted.entrySet()) { reversedMap.put(entry.getValue(), entry.getKey()); } return reversedMap; } synchronized void addReplica(String bpid, long blockId, - final FsVolumeImpl transientVolume) { + final FsVolumeSpi transientVolume) { Map<Long, ReplicaState> map = replicaMaps.get(bpid); if (map == null) { map = new HashMap<Long, ReplicaState>(); replicaMaps.put(bpid, map); } - map.put(blockId, new ReplicaState(bpid, blockId, transientVolume)); + ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume); + map.put(blockId, replicaState); + replicasNotPersisted.add(replicaState); } synchronized void recordStartLazyPersist( @@ -149,12 +161,49 @@ class LazyWriteReplicaTracker { } replicaState.state = State.LAZY_PERSIST_COMPLETE; replicaState.savedBlockFile = savedBlockFile; - persistTimeMap.put(replicaState, System.currentTimeMillis() / 1000); + + if (replicasNotPersisted.peek() == replicaState) { + // Common case. + replicasNotPersisted.remove(); + } else { + // Should never occur in practice as lazy writer always persists + // the replica at the head of the queue before moving to the next + // one. + replicasNotPersisted.remove(replicaState); + } + replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000); + } + + synchronized ReplicaState dequeueNextReplicaToPersist() { + while (replicasNotPersisted.size() != 0) { + ReplicaState replicaState = replicasNotPersisted.remove(); + Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid); + + if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) { + return replicaState; + } + + // The replica no longer exists, look for the next one. + } + return null; + } + + synchronized void reenqueueReplica(final ReplicaState replicaState) { + replicasNotPersisted.add(replicaState); + } + + synchronized int numReplicasNotPersisted() { + return replicasNotPersisted.size(); } synchronized void discardReplica( final String bpid, final long blockId, boolean force) { Map<Long, ReplicaState> map = replicaMaps.get(bpid); + + if (map == null) { + return; + } + ReplicaState replicaState = map.get(blockId); if (replicaState == null) { @@ -172,6 +221,9 @@ class LazyWriteReplicaTracker { } map.remove(blockId); - persistTimeMap.remove(replicaState); + replicasPersisted.remove(replicaState); + + // Leave the replica in replicasNotPersisted if its present. + // dequeueNextReplicaToPersist will GC it eventually. } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d973656e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index ddd71b1..af0e8ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -40,7 +40,9 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.test.GenericTestUtils; @@ -61,6 +63,7 @@ public class TestLazyPersistFiles { static { ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL); ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); + ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); } private static short REPL_FACTOR = 1; @@ -68,7 +71,7 @@ public class TestLazyPersistFiles { private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; - private static final int LAZY_WRITER_INTERVAL_SEC = 3; + private static final int LAZY_WRITER_INTERVAL_SEC = 1; private static final int BUFFER_LENGTH = 4096; private MiniDFSCluster cluster; @@ -283,8 +286,9 @@ public class TestLazyPersistFiles { File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir(); for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { - File persistedBlockFile = new File(lazyPersistDir, "blk_" + lb.getBlock().getBlockId()); - if (persistedBlockFile.exists()) { + File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, lb.getBlock().getBlockId()); + File blockFile = new File(targetDir, lb.getBlock().getBlockName()); + if (blockFile.exists()) { // Found a persisted copy for this block! boolean added = persistedBlockIds.add(lb.getBlock().getBlockId()); assertThat(added, is(true));