HDFS-10675. Datanode support to read from external stores.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/970028f0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/970028f0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/970028f0 Branch: refs/heads/HDFS-9806 Commit: 970028f04bafd9b2aac52ee8969c42a8fb6f6b25 Parents: 60f95fb Author: Virajith Jalaparti <viraj...@apache.org> Authored: Wed Mar 29 14:29:28 2017 -0700 Committer: Virajith Jalaparti <viraj...@apache.org> Committed: Fri Dec 1 18:16:57 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/hadoop/fs/StorageType.java | 3 +- .../org/apache/hadoop/fs/shell/TestCount.java | 3 +- .../hadoop/hdfs/protocol/HdfsConstants.java | 4 + .../hadoop/hdfs/protocolPB/PBHelperClient.java | 4 + .../src/main/proto/hdfs.proto | 1 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 15 + .../hadoop/hdfs/server/common/BlockAlias.java | 29 + .../hadoop/hdfs/server/common/BlockFormat.java | 82 +++ .../hadoop/hdfs/server/common/FileRegion.java | 121 +++++ .../hdfs/server/common/FileRegionProvider.java | 37 ++ .../hadoop/hdfs/server/common/Storage.java | 71 ++- .../hadoop/hdfs/server/common/StorageInfo.java | 6 + .../server/common/TextFileRegionFormat.java | 442 ++++++++++++++++ .../server/common/TextFileRegionProvider.java | 88 ++++ .../server/datanode/BlockPoolSliceStorage.java | 21 +- .../hdfs/server/datanode/DataStorage.java | 44 +- .../hdfs/server/datanode/DirectoryScanner.java | 19 +- .../datanode/FinalizedProvidedReplica.java | 91 ++++ .../hdfs/server/datanode/ProvidedReplica.java | 248 +++++++++ .../hdfs/server/datanode/ReplicaBuilder.java | 100 +++- .../hdfs/server/datanode/ReplicaInfo.java | 20 +- .../hdfs/server/datanode/StorageLocation.java | 26 +- .../server/datanode/fsdataset/FsDatasetSpi.java | 4 +- .../server/datanode/fsdataset/FsVolumeSpi.java | 32 +- .../fsdataset/impl/DefaultProvidedVolumeDF.java | 58 ++ .../datanode/fsdataset/impl/FsDatasetImpl.java | 40 +- .../datanode/fsdataset/impl/FsDatasetUtil.java | 25 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 19 +- .../fsdataset/impl/FsVolumeImplBuilder.java | 6 + .../fsdataset/impl/ProvidedVolumeDF.java | 34 ++ .../fsdataset/impl/ProvidedVolumeImpl.java | 526 +++++++++++++++++++ .../apache/hadoop/hdfs/server/mover/Mover.java | 2 +- .../server/namenode/FSImageCompression.java | 2 +- .../hadoop/hdfs/server/namenode/NNStorage.java | 10 +- .../src/main/resources/hdfs-default.xml | 78 +++ .../org/apache/hadoop/hdfs/TestDFSRollback.java | 6 +- .../hadoop/hdfs/TestDFSStartupVersions.java | 2 +- .../org/apache/hadoop/hdfs/TestDFSUpgrade.java | 4 +- .../apache/hadoop/hdfs/UpgradeUtilities.java | 16 +- .../hdfs/server/common/TestTextBlockFormat.java | 160 ++++++ .../server/datanode/SimulatedFSDataset.java | 6 +- .../extdataset/ExternalDatasetImpl.java | 5 +- .../fsdataset/impl/TestFsDatasetImpl.java | 17 +- .../fsdataset/impl/TestProvidedImpl.java | 426 +++++++++++++++ .../hdfs/server/namenode/TestClusterId.java | 5 +- 45 files changed, 2873 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java index 0948801..2ecd206 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java @@ -37,7 +37,8 @@ public enum StorageType { RAM_DISK(true), SSD(false), DISK(false), - ARCHIVE(false); + ARCHIVE(false), + PROVIDED(false); private final boolean isTransient; http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java index a782958..b5adfcf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java @@ -285,7 +285,7 @@ public class TestCount { // <----13---> <-------17------> <----13-----> <------17-------> " SSD_QUOTA REM_SSD_QUOTA DISK_QUOTA REM_DISK_QUOTA " + // <----13---> <-------17------> - "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " + + "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA PROVIDED_QUOTA REM_PROVIDED_QUOTA " + "PATHNAME"; verify(out).println(withStorageTypeHeader); verifyNoMoreInteractions(out); @@ -340,6 +340,7 @@ public class TestCount { " SSD_QUOTA REM_SSD_QUOTA " + " DISK_QUOTA REM_DISK_QUOTA " + "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " + + "PROVIDED_QUOTA REM_PROVIDED_QUOTA " + "PATHNAME"; verify(out).println(withStorageTypeHeader); verifyNoMoreInteractions(out); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 8245d1b..e9e6103 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -47,6 +47,10 @@ public final class HdfsConstants { public static final String WARM_STORAGE_POLICY_NAME = "WARM"; public static final byte COLD_STORAGE_POLICY_ID = 2; public static final String COLD_STORAGE_POLICY_NAME = "COLD"; + // branch HDFS-9806 XXX temporary until HDFS-7076 + public static final byte PROVIDED_STORAGE_POLICY_ID = 1; + public static final String PROVIDED_STORAGE_POLICY_NAME = "PROVIDED"; + public static final int DEFAULT_DATA_SOCKET_SIZE = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index fbc6dbf..460112e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -405,6 +405,8 @@ public class PBHelperClient { return StorageTypeProto.ARCHIVE; case RAM_DISK: return StorageTypeProto.RAM_DISK; + case PROVIDED: + return StorageTypeProto.PROVIDED; default: throw new IllegalStateException( "BUG: StorageType not found, type=" + type); @@ -421,6 +423,8 @@ public class PBHelperClient { return StorageType.ARCHIVE; case RAM_DISK: return StorageType.RAM_DISK; + case PROVIDED: + return StorageType.PROVIDED; default: throw new IllegalStateException( "BUG: StorageTypeProto not found, type=" + type); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index a423a4b..06578ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -205,6 +205,7 @@ enum StorageTypeProto { SSD = 2; ARCHIVE = 3; RAM_DISK = 4; + PROVIDED = 5; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 97b8b1a..ca753ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -328,6 +328,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.edits.asynclogging"; public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = true; + public static final String DFS_PROVIDER_CLASS = "dfs.provider.class"; + public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class"; + public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id"; + public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT = "DS-PROVIDED"; + public static final String DFS_PROVIDER_BLK_FORMAT_CLASS = "dfs.provided.blockformat.class"; + + public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER = "dfs.provided.textprovider.delimiter"; + public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT = ","; + + public static final String DFS_PROVIDED_BLOCK_MAP_READ_PATH = "dfs.provided.textprovider.read.path"; + public static final String DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT = "file:///tmp/blocks.csv"; + + public static final String DFS_PROVIDED_BLOCK_MAP_CODEC = "dfs.provided.textprovider.read.codec"; + public static final String DFS_PROVIDED_BLOCK_MAP_WRITE_PATH = "dfs.provided.textprovider.write.path"; + public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final int DFS_LIST_LIMIT_DEFAULT = 1000; public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java new file mode 100644 index 0000000..b2fac97 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common; + +import org.apache.hadoop.hdfs.protocol.Block; + +/** + * Interface used to load provided blocks. + */ +public interface BlockAlias { + + Block getBlock(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java new file mode 100644 index 0000000..66e7fdf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocol.Block; + +/** + * An abstract class used to read and write block maps for provided blocks. + */ +public abstract class BlockFormat<T extends BlockAlias> { + + /** + * An abstract class that is used to read {@link BlockAlias}es + * for provided blocks. + */ + public static abstract class Reader<U extends BlockAlias> + implements Iterable<U>, Closeable { + + /** + * reader options. + */ + public interface Options { } + + public abstract U resolve(Block ident) throws IOException; + + } + + /** + * Returns the reader for the provided block map. + * @param opts reader options + * @return {@link Reader} to the block map. + * @throws IOException + */ + public abstract Reader<T> getReader(Reader.Options opts) throws IOException; + + /** + * An abstract class used as a writer for the provided block map. + */ + public static abstract class Writer<U extends BlockAlias> + implements Closeable { + /** + * writer options. + */ + public interface Options { } + + public abstract void store(U token) throws IOException; + + } + + /** + * Returns the writer for the provided block map. + * @param opts writer options. + * @return {@link Writer} to the block map. + * @throws IOException + */ + public abstract Writer<T> getWriter(Writer.Options opts) throws IOException; + + /** + * Refresh based on the underlying block map. + * @throws IOException + */ + public abstract void refresh() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java new file mode 100644 index 0000000..c568b90 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; + +/** + * This class is used to represent provided blocks that are file regions, + * i.e., can be described using (path, offset, length). + */ +public class FileRegion implements BlockAlias { + + private final Path path; + private final long offset; + private final long length; + private final long blockId; + private final String bpid; + private final long genStamp; + + public FileRegion(long blockId, Path path, long offset, + long length, String bpid, long genStamp) { + this.path = path; + this.offset = offset; + this.length = length; + this.blockId = blockId; + this.bpid = bpid; + this.genStamp = genStamp; + } + + public FileRegion(long blockId, Path path, long offset, + long length, String bpid) { + this(blockId, path, offset, length, bpid, + HdfsConstants.GRANDFATHER_GENERATION_STAMP); + + } + + public FileRegion(long blockId, Path path, long offset, + long length, long genStamp) { + this(blockId, path, offset, length, null, genStamp); + + } + + public FileRegion(long blockId, Path path, long offset, long length) { + this(blockId, path, offset, length, null); + } + + @Override + public Block getBlock() { + return new Block(blockId, length, genStamp); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof FileRegion)) { + return false; + } + FileRegion o = (FileRegion) other; + return blockId == o.blockId + && offset == o.offset + && length == o.length + && genStamp == o.genStamp + && path.equals(o.path); + } + + @Override + public int hashCode() { + return (int)(blockId & Integer.MIN_VALUE); + } + + public Path getPath() { + return path; + } + + public long getOffset() { + return offset; + } + + public long getLength() { + return length; + } + + public long getGenerationStamp() { + return genStamp; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ block=\"").append(getBlock()).append("\""); + sb.append(", path=\"").append(getPath()).append("\""); + sb.append(", off=\"").append(getOffset()).append("\""); + sb.append(", len=\"").append(getBlock().getNumBytes()).append("\""); + sb.append(", genStamp=\"").append(getBlock() + .getGenerationStamp()).append("\""); + sb.append(", bpid=\"").append(bpid).append("\""); + sb.append(" }"); + return sb.toString(); + } + + public String getBlockPoolId() { + return this.bpid; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java new file mode 100644 index 0000000..2e94239 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +/** + * This class is a stub for reading file regions from the block map. + */ +public class FileRegionProvider implements Iterable<FileRegion> { + @Override + public Iterator<FileRegion> iterator() { + return Collections.emptyListIterator(); + } + + public void refresh() throws IOException { + return; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index 414d3a7..9ad61d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; @@ -196,7 +197,10 @@ public abstract class Storage extends StorageInfo { Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() : dirIterator(dirType); for ( ;it.hasNext(); ) { - list.add(new File(it.next().getCurrentDir(), fileName)); + File currentDir = it.next().getCurrentDir(); + if (currentDir != null) { + list.add(new File(currentDir, fileName)); + } } return list; } @@ -328,10 +332,20 @@ public abstract class Storage extends StorageInfo { */ public StorageDirectory(String bpid, StorageDirType dirType, boolean isShared, StorageLocation location) { - this(new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT)), dirType, + this(getBlockPoolCurrentDir(bpid, location), dirType, isShared, location); } + private static File getBlockPoolCurrentDir(String bpid, + StorageLocation location) { + if (location == null || + location.getStorageType() == StorageType.PROVIDED) { + return null; + } else { + return new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT)); + } + } + private StorageDirectory(File dir, StorageDirType dirType, boolean isShared, StorageLocation location) { this.root = dir; @@ -347,7 +361,8 @@ public abstract class Storage extends StorageInfo { } private static File getStorageLocationFile(StorageLocation location) { - if (location == null) { + if (location == null || + location.getStorageType() == StorageType.PROVIDED) { return null; } try { @@ -406,6 +421,10 @@ public abstract class Storage extends StorageInfo { */ public void clearDirectory() throws IOException { File curDir = this.getCurrentDir(); + if (curDir == null) { + //if the directory is null, there is nothing to do. + return; + } if (curDir.exists()) { File[] files = FileUtil.listFiles(curDir); LOG.info("Will remove files: " + Arrays.toString(files)); @@ -423,6 +442,9 @@ public abstract class Storage extends StorageInfo { * @return the directory path */ public File getCurrentDir() { + if (root == null) { + return null; + } return new File(root, STORAGE_DIR_CURRENT); } @@ -443,6 +465,9 @@ public abstract class Storage extends StorageInfo { * @return the version file path */ public File getVersionFile() { + if (root == null) { + return null; + } return new File(new File(root, STORAGE_DIR_CURRENT), STORAGE_FILE_VERSION); } @@ -452,6 +477,9 @@ public abstract class Storage extends StorageInfo { * @return the previous version file path */ public File getPreviousVersionFile() { + if (root == null) { + return null; + } return new File(new File(root, STORAGE_DIR_PREVIOUS), STORAGE_FILE_VERSION); } @@ -462,6 +490,9 @@ public abstract class Storage extends StorageInfo { * @return the directory path */ public File getPreviousDir() { + if (root == null) { + return null; + } return new File(root, STORAGE_DIR_PREVIOUS); } @@ -476,6 +507,9 @@ public abstract class Storage extends StorageInfo { * @return the directory path */ public File getPreviousTmp() { + if (root == null) { + return null; + } return new File(root, STORAGE_TMP_PREVIOUS); } @@ -490,6 +524,9 @@ public abstract class Storage extends StorageInfo { * @return the directory path */ public File getRemovedTmp() { + if (root == null) { + return null; + } return new File(root, STORAGE_TMP_REMOVED); } @@ -503,6 +540,9 @@ public abstract class Storage extends StorageInfo { * @return the directory path */ public File getFinalizedTmp() { + if (root == null) { + return null; + } return new File(root, STORAGE_TMP_FINALIZED); } @@ -517,6 +557,9 @@ public abstract class Storage extends StorageInfo { * @return the directory path */ public File getLastCheckpointTmp() { + if (root == null) { + return null; + } return new File(root, STORAGE_TMP_LAST_CKPT); } @@ -530,6 +573,9 @@ public abstract class Storage extends StorageInfo { * @return the directory path */ public File getPreviousCheckpoint() { + if (root == null) { + return null; + } return new File(root, STORAGE_PREVIOUS_CKPT); } @@ -543,7 +589,7 @@ public abstract class Storage extends StorageInfo { private void checkEmptyCurrent() throws InconsistentFSStateException, IOException { File currentDir = getCurrentDir(); - if(!currentDir.exists()) { + if(currentDir == null || !currentDir.exists()) { // if current/ does not exist, it's safe to format it. return; } @@ -589,6 +635,13 @@ public abstract class Storage extends StorageInfo { public StorageState analyzeStorage(StartupOption startOpt, Storage storage, boolean checkCurrentIsEmpty) throws IOException { + + if (location != null && + location.getStorageType() == StorageType.PROVIDED) { + //currently we assume that PROVIDED storages are always NORMAL + return StorageState.NORMAL; + } + assert root != null : "root is null"; boolean hadMkdirs = false; String rootPath = root.getCanonicalPath(); @@ -710,6 +763,10 @@ public abstract class Storage extends StorageInfo { */ public void doRecover(StorageState curState) throws IOException { File curDir = getCurrentDir(); + if (curDir == null || root == null) { + //at this point, we do not support recovery on PROVIDED storages + return; + } String rootPath = root.getCanonicalPath(); switch(curState) { case COMPLETE_UPGRADE: // mv previous.tmp -> previous @@ -883,7 +940,8 @@ public abstract class Storage extends StorageInfo { @Override public String toString() { - return "Storage Directory " + this.root; + return "Storage Directory root= " + this.root + + "; location= " + this.location; } /** @@ -1153,6 +1211,9 @@ public abstract class Storage extends StorageInfo { } public void writeProperties(File to, StorageDirectory sd) throws IOException { + if (to == null) { + return; + } Properties props = new Properties(); setPropertiesFromFields(props, sd); writeProperties(to, props); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java index 50363c9..28871e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java @@ -152,6 +152,9 @@ public class StorageInfo { */ protected void setFieldsFromProperties( Properties props, StorageDirectory sd) throws IOException { + if (props == null) { + return; + } setLayoutVersion(props, sd); setNamespaceID(props, sd); setcTime(props, sd); @@ -241,6 +244,9 @@ public class StorageInfo { } public static Properties readPropertiesFile(File from) throws IOException { + if (from == null) { + return null; + } RandomAccessFile file = new RandomAccessFile(from, "rws"); FileInputStream in = null; Properties props = new Properties(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java new file mode 100644 index 0000000..eacd08f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java @@ -0,0 +1,442 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common; + +import java.io.File; +import java.io.IOException; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.NoSuchElementException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class is used for block maps stored as text files, + * with a specified delimiter. + */ +public class TextFileRegionFormat + extends BlockFormat<FileRegion> implements Configurable { + + private Configuration conf; + private ReaderOptions readerOpts = TextReader.defaults(); + private WriterOptions writerOpts = TextWriter.defaults(); + + public static final Logger LOG = + LoggerFactory.getLogger(TextFileRegionFormat.class); + @Override + public void setConf(Configuration conf) { + readerOpts.setConf(conf); + writerOpts.setConf(conf); + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public Reader<FileRegion> getReader(Reader.Options opts) + throws IOException { + if (null == opts) { + opts = readerOpts; + } + if (!(opts instanceof ReaderOptions)) { + throw new IllegalArgumentException("Invalid options " + opts.getClass()); + } + ReaderOptions o = (ReaderOptions) opts; + Configuration readerConf = (null == o.getConf()) + ? new Configuration() + : o.getConf(); + return createReader(o.file, o.delim, readerConf); + } + + @VisibleForTesting + TextReader createReader(Path file, String delim, Configuration cfg) + throws IOException { + FileSystem fs = file.getFileSystem(cfg); + if (fs instanceof LocalFileSystem) { + fs = ((LocalFileSystem)fs).getRaw(); + } + CompressionCodecFactory factory = new CompressionCodecFactory(cfg); + CompressionCodec codec = factory.getCodec(file); + return new TextReader(fs, file, codec, delim); + } + + @Override + public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException { + if (null == opts) { + opts = writerOpts; + } + if (!(opts instanceof WriterOptions)) { + throw new IllegalArgumentException("Invalid options " + opts.getClass()); + } + WriterOptions o = (WriterOptions) opts; + Configuration cfg = (null == o.getConf()) + ? new Configuration() + : o.getConf(); + if (o.codec != null) { + CompressionCodecFactory factory = new CompressionCodecFactory(cfg); + CompressionCodec codec = factory.getCodecByName(o.codec); + String name = o.file.getName() + codec.getDefaultExtension(); + o.filename(new Path(o.file.getParent(), name)); + return createWriter(o.file, codec, o.delim, cfg); + } + return createWriter(o.file, null, o.delim, conf); + } + + @VisibleForTesting + TextWriter createWriter(Path file, CompressionCodec codec, String delim, + Configuration cfg) throws IOException { + FileSystem fs = file.getFileSystem(cfg); + if (fs instanceof LocalFileSystem) { + fs = ((LocalFileSystem)fs).getRaw(); + } + OutputStream tmp = fs.create(file); + java.io.Writer out = new BufferedWriter(new OutputStreamWriter( + (null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8")); + return new TextWriter(out, delim); + } + + /** + * Class specifying reader options for the {@link TextFileRegionFormat}. + */ + public static class ReaderOptions + implements TextReader.Options, Configurable { + + private Configuration conf; + private String delim = + DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT; + private Path file = new Path( + new File(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT) + .toURI().toString()); + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + String tmpfile = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH, + DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT); + file = new Path(tmpfile); + delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER, + DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT); + LOG.info("TextFileRegionFormat: read path " + tmpfile.toString()); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public ReaderOptions filename(Path file) { + this.file = file; + return this; + } + + @Override + public ReaderOptions delimiter(String delim) { + this.delim = delim; + return this; + } + } + + /** + * Class specifying writer options for the {@link TextFileRegionFormat}. + */ + public static class WriterOptions + implements TextWriter.Options, Configurable { + + private Configuration conf; + private String codec = null; + private Path file = + new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT); + private String delim = + DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + String tmpfile = conf.get( + DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH, file.toString()); + file = new Path(tmpfile); + codec = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_CODEC); + delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER, + DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public WriterOptions filename(Path file) { + this.file = file; + return this; + } + + public String getCodec() { + return codec; + } + + public Path getFile() { + return file; + } + + @Override + public WriterOptions codec(String codec) { + this.codec = codec; + return this; + } + + @Override + public WriterOptions delimiter(String delim) { + this.delim = delim; + return this; + } + + } + + /** + * This class is used as a reader for block maps which + * are stored as delimited text files. + */ + public static class TextReader extends Reader<FileRegion> { + + /** + * Options for {@link TextReader}. + */ + public interface Options extends Reader.Options { + Options filename(Path file); + Options delimiter(String delim); + } + + static ReaderOptions defaults() { + return new ReaderOptions(); + } + + private final Path file; + private final String delim; + private final FileSystem fs; + private final CompressionCodec codec; + private final Map<FRIterator, BufferedReader> iterators; + + protected TextReader(FileSystem fs, Path file, CompressionCodec codec, + String delim) { + this(fs, file, codec, delim, + new IdentityHashMap<FRIterator, BufferedReader>()); + } + + TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim, + Map<FRIterator, BufferedReader> iterators) { + this.fs = fs; + this.file = file; + this.codec = codec; + this.delim = delim; + this.iterators = Collections.synchronizedMap(iterators); + } + + @Override + public FileRegion resolve(Block ident) throws IOException { + // consider layering index w/ composable format + Iterator<FileRegion> i = iterator(); + try { + while (i.hasNext()) { + FileRegion f = i.next(); + if (f.getBlock().equals(ident)) { + return f; + } + } + } finally { + BufferedReader r = iterators.remove(i); + if (r != null) { + // null on last element + r.close(); + } + } + return null; + } + + class FRIterator implements Iterator<FileRegion> { + + private FileRegion pending; + + @Override + public boolean hasNext() { + return pending != null; + } + + @Override + public FileRegion next() { + if (null == pending) { + throw new NoSuchElementException(); + } + FileRegion ret = pending; + try { + pending = nextInternal(this); + } catch (IOException e) { + throw new RuntimeException(e); + } + return ret; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException { + BufferedReader r = iterators.get(i); + if (null == r) { + throw new IllegalStateException(); + } + String line = r.readLine(); + if (null == line) { + iterators.remove(i); + return null; + } + String[] f = line.split(delim); + if (f.length != 6) { + throw new IOException("Invalid line: " + line); + } + return new FileRegion(Long.parseLong(f[0]), new Path(f[1]), + Long.parseLong(f[2]), Long.parseLong(f[3]), f[5], + Long.parseLong(f[4])); + } + + public InputStream createStream() throws IOException { + InputStream i = fs.open(file); + if (codec != null) { + i = codec.createInputStream(i); + } + return i; + } + + @Override + public Iterator<FileRegion> iterator() { + FRIterator i = new FRIterator(); + try { + BufferedReader r = + new BufferedReader(new InputStreamReader(createStream(), "UTF-8")); + iterators.put(i, r); + i.pending = nextInternal(i); + } catch (IOException e) { + iterators.remove(i); + throw new RuntimeException(e); + } + return i; + } + + @Override + public void close() throws IOException { + ArrayList<IOException> ex = new ArrayList<>(); + synchronized (iterators) { + for (Iterator<BufferedReader> i = iterators.values().iterator(); + i.hasNext();) { + try { + BufferedReader r = i.next(); + r.close(); + } catch (IOException e) { + ex.add(e); + } finally { + i.remove(); + } + } + iterators.clear(); + } + if (!ex.isEmpty()) { + throw MultipleIOException.createIOException(ex); + } + } + + } + + /** + * This class is used as a writer for block maps which + * are stored as delimited text files. + */ + public static class TextWriter extends Writer<FileRegion> { + + /** + * Interface for Writer options. + */ + public interface Options extends Writer.Options { + Options codec(String codec); + Options filename(Path file); + Options delimiter(String delim); + } + + public static WriterOptions defaults() { + return new WriterOptions(); + } + + private final String delim; + private final java.io.Writer out; + + public TextWriter(java.io.Writer out, String delim) { + this.out = out; + this.delim = delim; + } + + @Override + public void store(FileRegion token) throws IOException { + out.append(String.valueOf(token.getBlock().getBlockId())).append(delim); + out.append(token.getPath().toString()).append(delim); + out.append(Long.toString(token.getOffset())).append(delim); + out.append(Long.toString(token.getLength())).append(delim); + out.append(Long.toString(token.getGenerationStamp())).append(delim); + out.append(token.getBlockPoolId()).append("\n"); + } + + @Override + public void close() throws IOException { + out.close(); + } + + } + + @Override + public void refresh() throws IOException { + //nothing to do; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java new file mode 100644 index 0000000..0fa667e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * This class is used to read file regions from block maps + * specified using delimited text. + */ +public class TextFileRegionProvider + extends FileRegionProvider implements Configurable { + + private Configuration conf; + private BlockFormat<FileRegion> fmt; + + @SuppressWarnings("unchecked") + @Override + public void setConf(Configuration conf) { + fmt = ReflectionUtils.newInstance( + conf.getClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS, + TextFileRegionFormat.class, + BlockFormat.class), + conf); + ((Configurable)fmt).setConf(conf); //redundant? + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public Iterator<FileRegion> iterator() { + try { + final BlockFormat.Reader<FileRegion> r = fmt.getReader(null); + return new Iterator<FileRegion>() { + + private final Iterator<FileRegion> inner = r.iterator(); + + @Override + public boolean hasNext() { + return inner.hasNext(); + } + + @Override + public FileRegion next() { + return inner.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } catch (IOException e) { + throw new RuntimeException("Failed to read provided blocks", e); + } + } + + @Override + public void refresh() throws IOException { + fmt.refresh(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index bc41715..012d1f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -36,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; @@ -360,6 +361,9 @@ public class BlockPoolSliceStorage extends Storage { private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt, List<Callable<StorageDirectory>> callables, Configuration conf) throws IOException { + if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) { + return false; // regular startup for PROVIDED storage directories + } if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) { Preconditions.checkState(!getTrashRootDir(sd).exists(), sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " + @@ -439,6 +443,10 @@ public class BlockPoolSliceStorage extends Storage { LayoutVersion.Feature.FEDERATION, layoutVersion)) { return; } + //no upgrades for storage directories that are PROVIDED + if (bpSd.getRoot() == null) { + return; + } final int oldLV = getLayoutVersion(); LOG.info("Upgrading block pool storage directory " + bpSd.getRoot() + ".\n old LV = " + oldLV @@ -589,8 +597,9 @@ public class BlockPoolSliceStorage extends Storage { throws IOException { File prevDir = bpSd.getPreviousDir(); // regular startup if previous dir does not exist - if (!prevDir.exists()) + if (prevDir == null || !prevDir.exists()) { return; + } // read attributes out of the VERSION file of previous directory BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage(); prevInfo.readPreviousVersionProperties(bpSd); @@ -631,6 +640,10 @@ public class BlockPoolSliceStorage extends Storage { * that holds the snapshot. */ void doFinalize(File dnCurDir) throws IOException { + LOG.info("doFinalize: " + dnCurDir); + if (dnCurDir == null) { + return; //we do nothing if the directory is null + } File bpRoot = getBpRoot(blockpoolID, dnCurDir); StorageDirectory bpSd = new StorageDirectory(bpRoot); // block pool level previous directory @@ -841,6 +854,9 @@ public class BlockPoolSliceStorage extends Storage { public void setRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs) throws IOException { for (StorageDirectory sd : dnStorageDirs) { + if (sd.getCurrentDir() == null) { + return; + } File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir()); File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE); if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) { @@ -863,6 +879,9 @@ public class BlockPoolSliceStorage extends Storage { public void clearRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs) throws IOException { for (StorageDirectory sd : dnStorageDirs) { + if (sd.getCurrentDir() == null) { + continue; + } File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir()); File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE); if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 6d6e96a..a1bde31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -48,6 +48,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.Block; @@ -129,22 +130,31 @@ public class DataStorage extends Storage { this.datanodeUuid = newDatanodeUuid; } - private static boolean createStorageID(StorageDirectory sd, int lv) { + private static boolean createStorageID(StorageDirectory sd, int lv, + Configuration conf) { // Clusters previously upgraded from layout versions earlier than // ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a // new storage ID. We check for that and fix it now. final boolean haveValidStorageId = DataNodeLayoutVersion.supports( LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, lv) && DatanodeStorage.isValidStorageId(sd.getStorageUuid()); - return createStorageID(sd, !haveValidStorageId); + return createStorageID(sd, !haveValidStorageId, conf); } /** Create an ID for this storage. * @return true if a new storage ID was generated. * */ public static boolean createStorageID( - StorageDirectory sd, boolean regenerateStorageIds) { + StorageDirectory sd, boolean regenerateStorageIds, Configuration conf) { final String oldStorageID = sd.getStorageUuid(); + if (sd.getStorageLocation() != null && + sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) { + // We only support one provided storage per datanode for now. + // TODO support multiple provided storage ids per datanode. + sd.setStorageUuid(conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID, + DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT)); + return false; + } if (oldStorageID == null || regenerateStorageIds) { sd.setStorageUuid(DatanodeStorage.generateUuid()); LOG.info("Generated new storageID " + sd.getStorageUuid() + @@ -273,7 +283,7 @@ public class DataStorage extends Storage { LOG.info("Storage directory with location " + location + " is not formatted for namespace " + nsInfo.getNamespaceID() + ". Formatting..."); - format(sd, nsInfo, datanode.getDatanodeUuid()); + format(sd, nsInfo, datanode.getDatanodeUuid(), datanode.getConf()); break; default: // recovery part is common sd.doRecover(curState); @@ -547,15 +557,15 @@ public class DataStorage extends Storage { } void format(StorageDirectory sd, NamespaceInfo nsInfo, - String datanodeUuid) throws IOException { + String newDatanodeUuid, Configuration conf) throws IOException { sd.clearDirectory(); // create directory this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION; this.clusterID = nsInfo.getClusterID(); this.namespaceID = nsInfo.getNamespaceID(); this.cTime = 0; - setDatanodeUuid(datanodeUuid); + setDatanodeUuid(newDatanodeUuid); - createStorageID(sd, false); + createStorageID(sd, false, conf); writeProperties(sd); } @@ -600,6 +610,9 @@ public class DataStorage extends Storage { private void setFieldsFromProperties(Properties props, StorageDirectory sd, boolean overrideLayoutVersion, int toLayoutVersion) throws IOException { + if (props == null) { + return; + } if (overrideLayoutVersion) { this.layoutVersion = toLayoutVersion; } else { @@ -694,6 +707,10 @@ public class DataStorage extends Storage { private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt, List<Callable<StorageDirectory>> callables, Configuration conf) throws IOException { + if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) { + createStorageID(sd, layoutVersion, conf); + return false; // regular start up for PROVIDED storage directories + } if (startOpt == StartupOption.ROLLBACK) { doRollback(sd, nsInfo); // rollback if applicable } @@ -724,7 +741,7 @@ public class DataStorage extends Storage { // regular start up. if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) { - createStorageID(sd, layoutVersion); + createStorageID(sd, layoutVersion, conf); return false; // need to write properties } @@ -733,7 +750,7 @@ public class DataStorage extends Storage { if (federationSupported) { // If the existing on-disk layout version supports federation, // simply update the properties. - upgradeProperties(sd); + upgradeProperties(sd, conf); } else { doUpgradePreFederation(sd, nsInfo, callables, conf); } @@ -829,15 +846,16 @@ public class DataStorage extends Storage { // 4. Write version file under <SD>/current clusterID = nsInfo.getClusterID(); - upgradeProperties(sd); + upgradeProperties(sd, conf); // 5. Rename <SD>/previous.tmp to <SD>/previous rename(tmpDir, prevDir); LOG.info("Upgrade of " + sd.getRoot()+ " is complete"); } - void upgradeProperties(StorageDirectory sd) throws IOException { - createStorageID(sd, layoutVersion); + void upgradeProperties(StorageDirectory sd, Configuration conf) + throws IOException { + createStorageID(sd, layoutVersion, conf); LOG.info("Updating layout version from " + layoutVersion + " to " + HdfsServerConstants.DATANODE_LAYOUT_VERSION + " for storage " + sd.getRoot()); @@ -989,7 +1007,7 @@ public class DataStorage extends Storage { // then finalize it. Else finalize the corresponding BP. for (StorageDirectory sd : getStorageDirs()) { File prevDir = sd.getPreviousDir(); - if (prevDir.exists()) { + if (prevDir != null && prevDir.exists()) { // data node level storage finalize doFinalize(sd); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 966bcb0..3b6d06c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.AutoCloseableLock; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -105,7 +106,7 @@ public class DirectoryScanner implements Runnable { * @param b whether to retain diffs */ @VisibleForTesting - void setRetainDiffs(boolean b) { + public void setRetainDiffs(boolean b) { retainDiffs = b; } @@ -215,7 +216,8 @@ public class DirectoryScanner implements Runnable { * @param dataset the dataset to scan * @param conf the Configuration object */ - DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) { + public DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, + Configuration conf) { this.datanode = datanode; this.dataset = dataset; int interval = (int) conf.getTimeDuration( @@ -369,15 +371,14 @@ public class DirectoryScanner implements Runnable { * Reconcile differences between disk and in-memory blocks */ @VisibleForTesting - void reconcile() throws IOException { + public void reconcile() throws IOException { scan(); for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) { String bpid = entry.getKey(); LinkedList<ScanInfo> diff = entry.getValue(); for (ScanInfo info : diff) { - dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(), - info.getMetaFile(), info.getVolume()); + dataset.checkAndUpdate(bpid, info); } } if (!retainDiffs) clear(); @@ -429,11 +430,12 @@ public class DirectoryScanner implements Runnable { } // Block file and/or metadata file exists on the disk // Block exists in memory - if (info.getBlockFile() == null) { + if (info.getVolume().getStorageType() != StorageType.PROVIDED && + info.getBlockFile() == null) { // Block metadata file exits and block file is missing addDifference(diffRecord, statsRecord, info); } else if (info.getGenStamp() != memBlock.getGenerationStamp() - || info.getBlockFileLength() != memBlock.getNumBytes()) { + || info.getBlockLength() != memBlock.getNumBytes()) { // Block metadata file is missing or has wrong generation stamp, // or block file length is different than expected statsRecord.mismatchBlocks++; @@ -611,6 +613,9 @@ public class DirectoryScanner implements Runnable { for (String bpid : bpList) { LinkedList<ScanInfo> report = new LinkedList<>(); + perfTimer.reset().start(); + throttleTimer.reset().start(); + try { result.put(bpid, volume.compileReport(bpid, report, this)); } catch (InterruptedException ex) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java new file mode 100644 index 0000000..722d573 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; + +/** + * This class is used for provided replicas that are finalized. + */ +public class FinalizedProvidedReplica extends ProvidedReplica { + + public FinalizedProvidedReplica(long blockId, URI fileURI, + long fileOffset, long blockLen, long genStamp, + FsVolumeSpi volume, Configuration conf) { + super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf); + } + + @Override + public ReplicaState getState() { + return ReplicaState.FINALIZED; + } + + @Override + public long getBytesOnDisk() { + return getNumBytes(); + } + + @Override + public long getVisibleLength() { + return getNumBytes(); //all bytes are visible + } + + @Override // Object + public boolean equals(Object o) { + return super.equals(o); + } + + @Override // Object + public int hashCode() { + return super.hashCode(); + } + + @Override + public String toString() { + return super.toString(); + } + + @Override + public ReplicaInfo getOriginalReplica() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support getOriginalReplica"); + } + + @Override + public long getRecoveryID() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support getRecoveryID"); + } + + @Override + public void setRecoveryID(long recoveryId) { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support setRecoveryID"); + } + + @Override + public ReplicaRecoveryInfo createInfo() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support createInfo"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java new file mode 100644 index 0000000..b021ea2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This abstract class is used as a base class for provided replicas. + */ +public abstract class ProvidedReplica extends ReplicaInfo { + + public static final Logger LOG = + LoggerFactory.getLogger(ProvidedReplica.class); + + // Null checksum information for provided replicas. + // Shared across all replicas. + static final byte[] NULL_CHECKSUM_ARRAY = + FsDatasetUtil.createNullChecksumByteArray(); + private URI fileURI; + private long fileOffset; + private Configuration conf; + private FileSystem remoteFS; + + /** + * Constructor. + * @param blockId block id + * @param fileURI remote URI this block is to be read from + * @param fileOffset the offset in the remote URI + * @param blockLen the length of the block + * @param genStamp the generation stamp of the block + * @param volume the volume this block belongs to + */ + public ProvidedReplica(long blockId, URI fileURI, long fileOffset, + long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf) { + super(volume, blockId, blockLen, genStamp); + this.fileURI = fileURI; + this.fileOffset = fileOffset; + this.conf = conf; + try { + this.remoteFS = FileSystem.get(fileURI, this.conf); + } catch (IOException e) { + LOG.warn("Failed to obtain filesystem for " + fileURI); + this.remoteFS = null; + } + } + + public ProvidedReplica(ProvidedReplica r) { + super(r); + this.fileURI = r.fileURI; + this.fileOffset = r.fileOffset; + this.conf = r.conf; + try { + this.remoteFS = FileSystem.newInstance(fileURI, this.conf); + } catch (IOException e) { + this.remoteFS = null; + } + } + + @Override + public URI getBlockURI() { + return this.fileURI; + } + + @Override + public InputStream getDataInputStream(long seekOffset) throws IOException { + if (remoteFS != null) { + FSDataInputStream ins = remoteFS.open(new Path(fileURI)); + ins.seek(fileOffset + seekOffset); + return new FSDataInputStream(ins); + } else { + throw new IOException("Remote filesystem for provided replica " + this + + " does not exist"); + } + } + + @Override + public OutputStream getDataOutputStream(boolean append) throws IOException { + throw new UnsupportedOperationException( + "OutputDataStream is not implemented for ProvidedReplica"); + } + + @Override + public URI getMetadataURI() { + return null; + } + + @Override + public OutputStream getMetadataOutputStream(boolean append) + throws IOException { + return null; + } + + @Override + public boolean blockDataExists() { + if(remoteFS != null) { + try { + return remoteFS.exists(new Path(fileURI)); + } catch (IOException e) { + return false; + } + } else { + return false; + } + } + + @Override + public boolean deleteBlockData() { + throw new UnsupportedOperationException( + "ProvidedReplica does not support deleting block data"); + } + + @Override + public long getBlockDataLength() { + return this.getNumBytes(); + } + + @Override + public LengthInputStream getMetadataInputStream(long offset) + throws IOException { + return new LengthInputStream(new ByteArrayInputStream(NULL_CHECKSUM_ARRAY), + NULL_CHECKSUM_ARRAY.length); + } + + @Override + public boolean metadataExists() { + return NULL_CHECKSUM_ARRAY == null ? false : true; + } + + @Override + public boolean deleteMetadata() { + throw new UnsupportedOperationException( + "ProvidedReplica does not support deleting metadata"); + } + + @Override + public long getMetadataLength() { + return NULL_CHECKSUM_ARRAY == null ? 0 : NULL_CHECKSUM_ARRAY.length; + } + + @Override + public boolean renameMeta(URI destURI) throws IOException { + throw new UnsupportedOperationException( + "ProvidedReplica does not support renaming metadata"); + } + + @Override + public boolean renameData(URI destURI) throws IOException { + throw new UnsupportedOperationException( + "ProvidedReplica does not support renaming data"); + } + + @Override + public boolean getPinning(LocalFileSystem localFS) throws IOException { + return false; + } + + @Override + public void setPinning(LocalFileSystem localFS) throws IOException { + throw new UnsupportedOperationException( + "ProvidedReplica does not support pinning"); + } + + @Override + public void bumpReplicaGS(long newGS) throws IOException { + throw new UnsupportedOperationException( + "ProvidedReplica does not yet support writes"); + } + + @Override + public boolean breakHardLinksIfNeeded() throws IOException { + return false; + } + + @Override + public ReplicaRecoveryInfo createInfo() + throws UnsupportedOperationException { + throw new UnsupportedOperationException( + "ProvidedReplica does not yet support writes"); + } + + @Override + public int compareWith(ScanInfo info) { + //local scanning cannot find any provided blocks. + if (info.getFileRegion().equals( + new FileRegion(this.getBlockId(), new Path(fileURI), + fileOffset, this.getNumBytes(), this.getGenerationStamp()))) { + return 0; + } else { + return (int) (info.getBlockLength() - getNumBytes()); + } + } + + @Override + public void truncateBlock(long newLength) throws IOException { + throw new UnsupportedOperationException( + "ProvidedReplica does not yet support truncate"); + } + + @Override + public void updateWithReplica(StorageLocation replicaLocation) { + throw new UnsupportedOperationException( + "ProvidedReplica does not yet support update"); + } + + @Override + public void copyMetadata(URI destination) throws IOException { + throw new UnsupportedOperationException( + "ProvidedReplica does not yet support copy metadata"); + } + + @Override + public void copyBlockdata(URI destination) throws IOException { + throw new UnsupportedOperationException( + "ProvidedReplica does not yet support copy data"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java index 280aaa0..639467f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; +import java.net.URI; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.FileRegion; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; /** @@ -42,11 +46,20 @@ public class ReplicaBuilder { private ReplicaInfo fromReplica; + private URI uri; + private long offset; + private Configuration conf; + private FileRegion fileRegion; + public ReplicaBuilder(ReplicaState state) { volume = null; writer = null; block = null; length = -1; + fileRegion = null; + conf = null; + fromReplica = null; + uri = null; this.state = state; } @@ -105,6 +118,26 @@ public class ReplicaBuilder { return this; } + public ReplicaBuilder setURI(URI uri) { + this.uri = uri; + return this; + } + + public ReplicaBuilder setConf(Configuration conf) { + this.conf = conf; + return this; + } + + public ReplicaBuilder setOffset(long offset) { + this.offset = offset; + return this; + } + + public ReplicaBuilder setFileRegion(FileRegion fileRegion) { + this.fileRegion = fileRegion; + return this; + } + public LocalReplicaInPipeline buildLocalReplicaInPipeline() throws IllegalArgumentException { LocalReplicaInPipeline info = null; @@ -176,7 +209,7 @@ public class ReplicaBuilder { } } - private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException { + private LocalReplica buildFinalizedReplica() throws IllegalArgumentException { if (null != fromReplica && fromReplica.getState() == ReplicaState.FINALIZED) { return new FinalizedReplica((FinalizedReplica)fromReplica); @@ -193,7 +226,7 @@ public class ReplicaBuilder { } } - private ReplicaInfo buildRWR() throws IllegalArgumentException { + private LocalReplica buildRWR() throws IllegalArgumentException { if (null != fromReplica && fromReplica.getState() == ReplicaState.RWR) { return new ReplicaWaitingToBeRecovered( @@ -211,7 +244,7 @@ public class ReplicaBuilder { } } - private ReplicaInfo buildRUR() throws IllegalArgumentException { + private LocalReplica buildRUR() throws IllegalArgumentException { if (null == fromReplica) { throw new IllegalArgumentException( "Missing a valid replica to recover from"); @@ -228,8 +261,53 @@ public class ReplicaBuilder { } } - public ReplicaInfo build() throws IllegalArgumentException { - ReplicaInfo info = null; + private ProvidedReplica buildProvidedFinalizedReplica() + throws IllegalArgumentException { + ProvidedReplica info = null; + if (fromReplica != null) { + throw new IllegalArgumentException("Finalized PROVIDED replica " + + "cannot be constructed from another replica"); + } + if (fileRegion == null && uri == null) { + throw new IllegalArgumentException( + "Trying to construct a provided replica on " + volume + + " without enough information"); + } + if (fileRegion == null) { + info = new FinalizedProvidedReplica(blockId, uri, offset, + length, genStamp, volume, conf); + } else { + info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(), + fileRegion.getPath().toUri(), + fileRegion.getOffset(), + fileRegion.getBlock().getNumBytes(), + fileRegion.getBlock().getGenerationStamp(), + volume, conf); + } + return info; + } + + private ProvidedReplica buildProvidedReplica() + throws IllegalArgumentException { + ProvidedReplica info = null; + switch(this.state) { + case FINALIZED: + info = buildProvidedFinalizedReplica(); + break; + case RWR: + case RUR: + case RBW: + case TEMPORARY: + default: + throw new IllegalArgumentException("Unknown replica state " + + state + " for PROVIDED replica"); + } + return info; + } + + private LocalReplica buildLocalReplica() + throws IllegalArgumentException { + LocalReplica info = null; switch(this.state) { case FINALIZED: info = buildFinalizedReplica(); @@ -249,4 +327,16 @@ public class ReplicaBuilder { } return info; } + + public ReplicaInfo build() throws IllegalArgumentException { + + ReplicaInfo info = null; + if(volume != null && volume.getStorageType() == StorageType.PROVIDED) { + info = buildProvidedReplica(); + } else { + info = buildLocalReplica(); + } + + return info; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 65e9ba7..3718799 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -50,6 +50,17 @@ abstract public class ReplicaInfo extends Block new FileIoProvider(null, null); /** + * Constructor. + * @param block a block + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + */ + ReplicaInfo(Block block, FsVolumeSpi vol) { + this(vol, block.getBlockId(), block.getNumBytes(), + block.getGenerationStamp()); + } + + /** * Constructor * @param vol volume where replica is located * @param blockId block id @@ -62,7 +73,14 @@ abstract public class ReplicaInfo extends Block } /** - * Get the volume where this replica is located on disk. + * Copy constructor. + * @param from where to copy from + */ + ReplicaInfo(ReplicaInfo from) { + this(from, from.getVolume()); + } + + /** * @return the volume where this replica is located on disk */ public FsVolumeSpi getVolume() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java index b4d5794..fb7acfd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java @@ -98,6 +98,16 @@ public class StorageLocation public boolean matchesStorageDirectory(StorageDirectory sd, String bpid) throws IOException { + if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED && + storageType == StorageType.PROVIDED) { + return matchesStorageDirectory(sd); + } + if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED || + storageType == StorageType.PROVIDED) { + //only one of these is PROVIDED; so it cannot be a match! + return false; + } + //both storage directories are local return this.getBpURI(bpid, Storage.STORAGE_DIR_CURRENT).normalize() .equals(sd.getRoot().toURI().normalize()); } @@ -197,6 +207,10 @@ public class StorageLocation if (conf == null) { conf = new HdfsConfiguration(); } + if (storageType == StorageType.PROVIDED) { + //skip creation if the storage type is PROVIDED + return; + } LocalFileSystem localFS = FileSystem.getLocal(conf); FsPermission permission = new FsPermission(conf.get( @@ -213,10 +227,14 @@ public class StorageLocation @Override // Checkable public VolumeCheckResult check(CheckContext context) throws IOException { - DiskChecker.checkDir( - context.localFileSystem, - new Path(baseURI), - context.expectedPermission); + //we assume provided storage locations are always healthy, + //and check only for local storages. + if (storageType != StorageType.PROVIDED) { + DiskChecker.checkDir( + context.localFileSystem, + new Path(baseURI), + context.expectedPermission); + } return VolumeCheckResult.HEALTHY; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 7be42e8..f4bf839 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -252,8 +253,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * and, in case that they are not matched, update the record or mark it * as corrupted. */ - void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FsVolumeSpi vol) throws IOException; + void checkAndUpdate(String bpid, ScanInfo info) throws IOException; /** * @param b - the block --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org