HDFS-8188. Erasure coding: refactor client-related code to sync with HDFS-8082 and HDFS-8169. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/166e565b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/166e565b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/166e565b Branch: refs/heads/HDFS-7285 Commit: 166e565becc08feeee2806617e69ce100a85b156 Parents: c3a866e Author: Zhe Zhang <z...@apache.org> Authored: Mon Apr 20 14:19:12 2015 -0700 Committer: Zhe Zhang <z...@apache.org> Committed: Mon May 11 11:36:17 2015 -0700 ---------------------------------------------------------------------- .../hdfs/client/HdfsClientConfigKeys.java | 12 ++++ .../hdfs/protocol/LocatedStripedBlock.java | 64 +++++++++++++++++ .../java/org/apache/hadoop/hdfs/DFSClient.java | 21 ++---- .../hadoop/hdfs/client/impl/DfsClientConf.java | 21 +++++- .../hdfs/protocol/LocatedStripedBlock.java | 73 -------------------- .../server/blockmanagement/BlockManager.java | 25 ++++--- .../hdfs/server/namenode/FSNamesystem.java | 2 +- .../server/namenode/TestStripedINodeFile.java | 3 +- 8 files changed, 120 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/166e565b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 26283aa..6006d71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -177,6 +177,18 @@ public interface HdfsClientConfigKeys { int THREADPOOL_SIZE_DEFAULT = 0; } + /** dfs.client.read.striped configuration properties */ + interface StripedRead { + String PREFIX = Read.PREFIX + "striped."; + + String THREADPOOL_SIZE_KEY = PREFIX + "threadpool.size"; + /** + * With default 6+3 schema, each normal read could span 6 DNs. So this + * default value accommodates 3 read streams + */ + int THREADPOOL_SIZE_DEFAULT = 18; + } + /** dfs.http.client configuration properties */ interface HttpClient { String PREFIX = "dfs.http.client."; http://git-wip-us.apache.org/repos/asf/hadoop/blob/166e565b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java new file mode 100644 index 0000000..93a5948 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; + +import java.util.Arrays; + +/** + * {@link LocatedBlock} with striped block support. For a striped block, each + * datanode storage is associated with a block in the block group. We need to + * record the index (in the striped block group) for each of them. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class LocatedStripedBlock extends LocatedBlock { + private int[] blockIndices; + + public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs, + String[] storageIDs, StorageType[] storageTypes, int[] indices, + long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) { + super(b, locs, storageIDs, storageTypes, startOffset, corrupt, cachedLocs); + assert indices != null && indices.length == locs.length; + this.blockIndices = new int[indices.length]; + System.arraycopy(indices, 0, blockIndices, 0, indices.length); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + getBlock() + + "; getBlockSize()=" + getBlockSize() + + "; corrupt=" + isCorrupt() + + "; offset=" + getStartOffset() + + "; locs=" + Arrays.asList(getLocations()) + + "; indices=" + Arrays.asList(blockIndices) + + "}"; + } + + public int[] getBlockIndices() { + return this.blockIndices; + } + + @Override + public boolean isStriped() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/166e565b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index b1dab46..111398f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -377,21 +377,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, dfsClientConf); if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) { - this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize()); - } - numThreads = conf.getInt( - DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE, - DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE); - if (numThreads <= 0) { - LOG.warn("The value of " - + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE - + " must be greater than 0. The current setting is " + numThreads - + ". Reset it to the default value " - + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE); - numThreads = - DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE; - } - this.initThreadsNumForStripedReads(numThreads); + this.initThreadsNumForHedgedReads(dfsClientConf. + getHedgedReadThreadpoolSize()); + } + + this.initThreadsNumForStripedReads(dfsClientConf. + getStripedReadThreadpoolSize()); this.saslClient = new SaslDataTransferClient( conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); http://git-wip-us.apache.org/repos/asf/hadoop/blob/166e565b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index a257e32..32a3da0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -38,6 +38,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIM import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -101,6 +102,8 @@ public class DfsClientConf { private final long hedgedReadThresholdMillis; private final int hedgedReadThreadpoolSize; + private final int stripedReadThreadpoolSize; + public DfsClientConf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout hdfsTimeout = Client.getTimeout(conf); @@ -191,7 +194,7 @@ public class DfsClientConf { connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); hdfsBlocksMetadataEnabled = conf.getBoolean( - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); fileBlockStorageLocationsNumThreads = conf.getInt( DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, @@ -215,6 +218,13 @@ public class DfsClientConf { hedgedReadThreadpoolSize = conf.getInt( HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); + + stripedReadThreadpoolSize = conf.getInt( + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY, + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT); + Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " + + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + + " must be greater than 0."); } private DataChecksum.Type getChecksumType(Configuration conf) { @@ -492,6 +502,13 @@ public class DfsClientConf { } /** + * @return the stripedReadThreadpoolSize + */ + public int getStripedReadThreadpoolSize() { + return stripedReadThreadpoolSize; + } + + /** * @return the shortCircuitConf */ public ShortCircuitConf getShortCircuitConf() { @@ -744,4 +761,4 @@ public class DfsClientConf { return builder.toString(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/166e565b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java deleted file mode 100644 index 98614db..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; - -import java.util.Arrays; - -/** - * {@link LocatedBlock} with striped block support. For a striped block, each - * datanode storage is associated with a block in the block group. We need to - * record the index (in the striped block group) for each of them. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class LocatedStripedBlock extends LocatedBlock { - private int[] blockIndices; - - public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs, - String[] storageIDs, StorageType[] storageTypes, int[] indices, - long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) { - super(b, locs, storageIDs, storageTypes, startOffset, corrupt, cachedLocs); - assert indices != null && indices.length == locs.length; - this.blockIndices = new int[indices.length]; - System.arraycopy(indices, 0, blockIndices, 0, indices.length); - } - - public LocatedStripedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages, - int[] indices, long startOffset, boolean corrupt) { - this(b, DatanodeStorageInfo.toDatanodeInfos(storages), - DatanodeStorageInfo.toStorageIDs(storages), - DatanodeStorageInfo.toStorageTypes(storages), indices, - startOffset, corrupt, EMPTY_LOCS); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "{" + getBlock() - + "; getBlockSize()=" + getBlockSize() - + "; corrupt=" + isCorrupt() - + "; offset=" + getStartOffset() - + "; locs=" + Arrays.asList(getLocations()) - + "; indices=" + Arrays.asList(blockIndices) - + "}"; - } - - public int[] getBlockIndices() { - return this.blockIndices; - } - - @Override - public boolean isStriped() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/166e565b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 712c665..5d998c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -873,7 +873,7 @@ public class BlockManager { final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return new LocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos, + return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos, false); } else { assert blk instanceof BlockInfoContiguousUnderConstruction; @@ -882,13 +882,8 @@ public class BlockManager { final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return new LocatedBlock(eb, storages, pos, false); + return newLocatedBlock(eb, storages, pos, false); } - final BlockInfoContiguousUnderConstruction uc = - (BlockInfoContiguousUnderConstruction) blk; - final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); - final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return newLocatedBlock(eb, storages, pos, false); } // get block locations @@ -931,7 +926,7 @@ public class BlockManager { final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); return blockIndices == null ? newLocatedBlock(eb, machines, pos, isCorrupt) : - new LocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt); + newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt); } /** Create a LocatedBlocks. */ @@ -3499,7 +3494,7 @@ public class BlockManager { if (pendingReplicationBlocksCount == 0 && underReplicatedBlocksCount == 0) { LOG.info("Node {} is dead and there are no under-replicated" + - " blocks or blocks pending replication. Safe to decommission.", + " blocks or blocks pending replication. Safe to decommission.", node); return true; } @@ -3905,6 +3900,18 @@ public class BlockManager { null); } + public static LocatedStripedBlock newLocatedStripedBlock( + ExtendedBlock b, DatanodeStorageInfo[] storages, + int[] indices, long startOffset, boolean corrupt) { + // startOffset is unknown + return new LocatedStripedBlock( + b, DatanodeStorageInfo.toDatanodeInfos(storages), + DatanodeStorageInfo.toStorageIDs(storages), + DatanodeStorageInfo.toStorageTypes(storages), + indices, startOffset, corrupt, + null); + } + /** * This class is used internally by {@link this#computeRecoveryWorkForBlocks} * to represent a task to recover a block through replication or erasure http://git-wip-us.apache.org/repos/asf/hadoop/blob/166e565b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f176128..c77b193 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3359,7 +3359,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, final LocatedBlock lBlk; if (blk.isStriped()) { assert blk instanceof BlockInfoStripedUnderConstruction; - lBlk = new LocatedStripedBlock(getExtendedBlock(blk), locs, + lBlk = BlockManager.newLocatedStripedBlock(getExtendedBlock(blk), locs, ((BlockInfoStripedUnderConstruction) blk).getBlockIndices(), offset, false); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/166e565b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java index d251c30..4a6d6cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; @@ -45,7 +46,7 @@ public class TestStripedINodeFile { "userName", null, FsPermission.getDefault()); private static INodeFile createStripedINodeFile() { - return new INodeFile(INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, + return new INodeFile(HdfsConstantsClient.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, null, (short)0, 1024L, HdfsConstants.COLD_STORAGE_POLICY_ID); }