HDFS-12777. [READ] Reduce memory and CPU footprint for PROVIDED volumes.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4310e059 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4310e059 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4310e059 Branch: refs/heads/HDFS-9806 Commit: 4310e059d02a28dc14b9d0a19612873569a01e7e Parents: ec6f48f Author: Virajith Jalaparti <viraj...@apache.org> Authored: Fri Nov 10 10:19:33 2017 -0800 Committer: Virajith Jalaparti <viraj...@apache.org> Committed: Fri Dec 1 18:16:59 2017 -0800 ---------------------------------------------------------------------- .../hdfs/server/datanode/DirectoryScanner.java | 4 + .../datanode/FinalizedProvidedReplica.java | 8 ++ .../hdfs/server/datanode/ProvidedReplica.java | 77 +++++++++++++++++++- .../hdfs/server/datanode/ReplicaBuilder.java | 37 +++++++++- .../fsdataset/impl/ProvidedVolumeImpl.java | 30 +++++++- .../fsdataset/impl/TestProvidedImpl.java | 76 ++++++++++++------- 6 files changed, 196 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4310e059/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 3b6d06c..8fb8551 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -530,6 +530,10 @@ public class DirectoryScanner implements Runnable { new HashMap<Integer, Future<ScanInfoPerBlockPool>>(); for (int i = 0; i < volumes.size(); i++) { + if (volumes.get(i).getStorageType() == StorageType.PROVIDED) { + // Disable scanning PROVIDED volumes to keep overhead low + continue; + } ReportCompiler reportCompiler = new ReportCompiler(datanode, volumes.get(i)); Future<ScanInfoPerBlockPool> result = http://git-wip-us.apache.org/repos/asf/hadoop/blob/4310e059/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java index e23d6be..bcc9a38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java @@ -21,6 +21,7 @@ import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -37,6 +38,13 @@ public class FinalizedProvidedReplica extends ProvidedReplica { remoteFS); } + public FinalizedProvidedReplica(long blockId, Path pathPrefix, + String pathSuffix, long fileOffset, long blockLen, long genStamp, + FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) { + super(blockId, pathPrefix, pathSuffix, fileOffset, blockLen, + genStamp, volume, conf, remoteFS); + } + @Override public ReplicaState getState() { return ReplicaState.FINALIZED; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4310e059/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java index 2b3bd13..8681421 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URI; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.input.BoundedInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -51,18 +52,23 @@ public abstract class ProvidedReplica extends ReplicaInfo { static final byte[] NULL_CHECKSUM_ARRAY = FsDatasetUtil.createNullChecksumByteArray(); private URI fileURI; + private Path pathPrefix; + private String pathSuffix; private long fileOffset; private Configuration conf; private FileSystem remoteFS; /** * Constructor. + * * @param blockId block id * @param fileURI remote URI this block is to be read from * @param fileOffset the offset in the remote URI * @param blockLen the length of the block * @param genStamp the generation stamp of the block * @param volume the volume this block belongs to + * @param conf the configuration + * @param remoteFS reference to the remote filesystem to use for this replica. */ public ProvidedReplica(long blockId, URI fileURI, long fileOffset, long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf, @@ -85,23 +91,86 @@ public abstract class ProvidedReplica extends ReplicaInfo { } } + /** + * Constructor. + * + * @param blockId block id + * @param pathPrefix A prefix of the {@link Path} associated with this replica + * on the remote {@link FileSystem}. + * @param pathSuffix A suffix of the {@link Path} associated with this replica + * on the remote {@link FileSystem}. Resolving the {@code pathSuffix} + * against the {@code pathPrefix} should provide the exact + * {@link Path} of the data associated with this replica on the + * remote {@link FileSystem}. + * @param fileOffset the offset in the remote URI + * @param blockLen the length of the block + * @param genStamp the generation stamp of the block + * @param volume the volume this block belongs to + * @param conf the configuration + * @param remoteFS reference to the remote filesystem to use for this replica. + */ + public ProvidedReplica(long blockId, Path pathPrefix, String pathSuffix, + long fileOffset, long blockLen, long genStamp, FsVolumeSpi volume, + Configuration conf, FileSystem remoteFS) { + super(volume, blockId, blockLen, genStamp); + this.fileURI = null; + this.pathPrefix = pathPrefix; + this.pathSuffix = pathSuffix; + this.fileOffset = fileOffset; + this.conf = conf; + if (remoteFS != null) { + this.remoteFS = remoteFS; + } else { + LOG.warn( + "Creating an reference to the remote FS for provided block " + this); + try { + this.remoteFS = FileSystem.get(pathPrefix.toUri(), this.conf); + } catch (IOException e) { + LOG.warn("Failed to obtain filesystem for " + pathPrefix); + this.remoteFS = null; + } + } + } + public ProvidedReplica(ProvidedReplica r) { super(r); this.fileURI = r.fileURI; this.fileOffset = r.fileOffset; this.conf = r.conf; this.remoteFS = r.remoteFS; + this.pathPrefix = r.pathPrefix; + this.pathSuffix = r.pathSuffix; } @Override public URI getBlockURI() { - return this.fileURI; + return getRemoteURI(); + } + + @VisibleForTesting + public String getPathSuffix() { + return pathSuffix; + } + + @VisibleForTesting + public Path getPathPrefix() { + return pathPrefix; + } + + private URI getRemoteURI() { + if (fileURI != null) { + return fileURI; + } else if (pathPrefix == null) { + return new Path(pathSuffix).toUri(); + } else { + return new Path(pathPrefix, pathSuffix).toUri(); + } } @Override public InputStream getDataInputStream(long seekOffset) throws IOException { if (remoteFS != null) { - FSDataInputStream ins = remoteFS.open(new Path(fileURI)); + FSDataInputStream ins = remoteFS.open(new Path(getRemoteURI())); ins.seek(fileOffset + seekOffset); return new BoundedInputStream( new FSDataInputStream(ins), getBlockDataLength()); @@ -132,7 +201,7 @@ public abstract class ProvidedReplica extends ReplicaInfo { public boolean blockDataExists() { if(remoteFS != null) { try { - return remoteFS.exists(new Path(fileURI)); + return remoteFS.exists(new Path(getRemoteURI())); } catch (IOException e) { return false; } @@ -220,7 +289,7 @@ public abstract class ProvidedReplica extends ReplicaInfo { public int compareWith(ScanInfo info) { //local scanning cannot find any provided blocks. if (info.getFileRegion().equals( - new FileRegion(this.getBlockId(), new Path(fileURI), + new FileRegion(this.getBlockId(), new Path(getRemoteURI()), fileOffset, this.getNumBytes(), this.getGenerationStamp()))) { return 0; } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4310e059/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java index c5cb6a5..de68e2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java @@ -21,6 +21,7 @@ import java.io.File; import java.net.URI; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -52,6 +53,8 @@ public class ReplicaBuilder { private Configuration conf; private FileRegion fileRegion; private FileSystem remoteFS; + private String pathSuffix; + private Path pathPrefix; public ReplicaBuilder(ReplicaState state) { volume = null; @@ -145,6 +148,28 @@ public class ReplicaBuilder { return this; } + /** + * Set the suffix of the {@link Path} associated with the replica. + * Intended to be use only for {@link ProvidedReplica}s. + * @param suffix the path suffix. + * @return the builder with the path suffix set. + */ + public ReplicaBuilder setPathSuffix(String suffix) { + this.pathSuffix = suffix; + return this; + } + + /** + * Set the prefix of the {@link Path} associated with the replica. + * Intended to be use only for {@link ProvidedReplica}s. + * @param prefix the path prefix. + * @return the builder with the path prefix set. + */ + public ReplicaBuilder setPathPrefix(Path prefix) { + this.pathPrefix = prefix; + return this; + } + public LocalReplicaInPipeline buildLocalReplicaInPipeline() throws IllegalArgumentException { LocalReplicaInPipeline info = null; @@ -275,14 +300,20 @@ public class ReplicaBuilder { throw new IllegalArgumentException("Finalized PROVIDED replica " + "cannot be constructed from another replica"); } - if (fileRegion == null && uri == null) { + if (fileRegion == null && uri == null && + (pathPrefix == null || pathSuffix == null)) { throw new IllegalArgumentException( "Trying to construct a provided replica on " + volume + " without enough information"); } if (fileRegion == null) { - info = new FinalizedProvidedReplica(blockId, uri, offset, - length, genStamp, volume, conf, remoteFS); + if (uri != null) { + info = new FinalizedProvidedReplica(blockId, uri, offset, + length, genStamp, volume, conf, remoteFS); + } else { + info = new FinalizedProvidedReplica(blockId, pathPrefix, pathSuffix, + offset, length, genStamp, volume, conf, remoteFS); + } } else { info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(), fileRegion.getPath().toUri(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/4310e059/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java index 092672d..d103b64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; @@ -65,6 +66,29 @@ import org.apache.hadoop.util.Time; */ public class ProvidedVolumeImpl extends FsVolumeImpl { + /** + * Get a suffix of the full path, excluding the given prefix. + * + * @param prefix a prefix of the path. + * @param fullPath the full path whose suffix is needed. + * @return the suffix of the path, which when resolved against {@code prefix} + * gets back the {@code fullPath}. + */ + @VisibleForTesting + protected static String getSuffix(final Path prefix, final Path fullPath) { + String prefixStr = prefix.toString(); + String pathStr = fullPath.toString(); + if (!pathStr.startsWith(prefixStr)) { + LOG.debug("Path {} is not a prefix of the path {}", prefix, fullPath); + return pathStr; + } + String suffix = pathStr.replaceFirst("^" + prefixStr, ""); + if (suffix.startsWith("/")) { + suffix = suffix.substring(1); + } + return suffix; + } + static class ProvidedBlockPoolSlice { private ProvidedVolumeImpl providedVolume; @@ -106,15 +130,19 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { return; } Iterator<FileRegion> iter = reader.iterator(); + Path blockPrefixPath = new Path(providedVolume.getBaseURI()); while (iter.hasNext()) { FileRegion region = iter.next(); if (region.getBlockPoolId() != null && region.getBlockPoolId().equals(bpid) && containsBlock(providedVolume.baseURI, region.getPath().toUri())) { + String blockSuffix = + getSuffix(blockPrefixPath, new Path(region.getPath().toUri())); ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED) .setBlockId(region.getBlock().getBlockId()) - .setURI(region.getPath().toUri()) + .setPathPrefix(blockPrefixPath) + .setPathSuffix(blockSuffix) .setOffset(region.getOffset()) .setLength(region.getBlock().getNumBytes()) .setGenerationStamp(region.getBlock().getGenerationStamp()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/4310e059/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java index 40d77f7a..ecab06b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java @@ -62,7 +62,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; -import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; +import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; @@ -509,33 +509,6 @@ public class TestProvidedImpl { } } - @Test - public void testRefresh() throws IOException { - conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); - for (int i = 0; i < providedVolumes.size(); i++) { - ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i); - TestFileRegionBlockAliasMap testBlockFormat = - (TestFileRegionBlockAliasMap) vol - .getBlockFormat(BLOCK_POOL_IDS[CHOSEN_BP_ID]); - //equivalent to two new blocks appearing - testBlockFormat.setBlockCount(NUM_PROVIDED_BLKS + 2); - //equivalent to deleting the first block - testBlockFormat.setMinBlkId(MIN_BLK_ID + 1); - - DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf); - scanner.reconcile(); - ReplicaInfo info = dataset.getBlockReplica( - BLOCK_POOL_IDS[CHOSEN_BP_ID], NUM_PROVIDED_BLKS + 1); - //new replica should be added to the dataset - assertTrue(info != null); - try { - info = dataset.getBlockReplica(BLOCK_POOL_IDS[CHOSEN_BP_ID], 0); - } catch(Exception ex) { - LOG.info("Exception expected: " + ex); - } - } - } - private int getBlocksInProvidedVolumes(String basePath, int numBlocks, int minBlockId) throws IOException { TestFileRegionIterator fileRegionIterator = @@ -621,4 +594,51 @@ public class TestProvidedImpl { ProvidedVolumeImpl.containsBlock(new URI("/bucket1/dir1/"), new URI("s3a:/bucket1/dir1/temp.txt"))); } + + @Test + public void testProvidedReplicaSuffixExtraction() { + assertEquals("B.txt", ProvidedVolumeImpl.getSuffix( + new Path("file:///A/"), new Path("file:///A/B.txt"))); + assertEquals("B/C.txt", ProvidedVolumeImpl.getSuffix( + new Path("file:///A/"), new Path("file:///A/B/C.txt"))); + assertEquals("B/C/D.txt", ProvidedVolumeImpl.getSuffix( + new Path("file:///A/"), new Path("file:///A/B/C/D.txt"))); + assertEquals("D.txt", ProvidedVolumeImpl.getSuffix( + new Path("file:///A/B/C/"), new Path("file:///A/B/C/D.txt"))); + assertEquals("file:/A/B/C/D.txt", ProvidedVolumeImpl.getSuffix( + new Path("file:///X/B/C/"), new Path("file:///A/B/C/D.txt"))); + assertEquals("D.txt", ProvidedVolumeImpl.getSuffix( + new Path("/A/B/C"), new Path("/A/B/C/D.txt"))); + assertEquals("D.txt", ProvidedVolumeImpl.getSuffix( + new Path("/A/B/C/"), new Path("/A/B/C/D.txt"))); + + assertEquals("data/current.csv", ProvidedVolumeImpl.getSuffix( + new Path("wasb:///users/alice/"), + new Path("wasb:///users/alice/data/current.csv"))); + assertEquals("current.csv", ProvidedVolumeImpl.getSuffix( + new Path("wasb:///users/alice/data"), + new Path("wasb:///users/alice/data/current.csv"))); + + assertEquals("wasb:/users/alice/data/current.csv", + ProvidedVolumeImpl.getSuffix( + new Path("wasb:///users/bob/"), + new Path("wasb:///users/alice/data/current.csv"))); + } + + @Test + public void testProvidedReplicaPrefix() throws Exception { + for (int i = 0; i < providedVolumes.size(); i++) { + FsVolumeImpl vol = providedVolumes.get(i); + ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); + vol.getVolumeMap(volumeMap, null); + + Path expectedPrefix = new Path( + StorageLocation.normalizeFileURI(new File(providedBasePath).toURI())); + for (ReplicaInfo info : volumeMap + .replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID])) { + ProvidedReplica pInfo = (ProvidedReplica) info; + assertEquals(expectedPrefix, pInfo.getPathPrefix()); + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org