HDFS-8137. Send the EC schema to DataNode via EC encoding/recovering command. Contributed by Uma Maheswara Rao G
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c8de162 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c8de162 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c8de162 Branch: refs/heads/HDFS-7285 Commit: 0c8de162622c14982722c431cec4fd8e62f34d09 Parents: 67a22d2 Author: Uma Maheswara Rao G <umamah...@apache.org> Authored: Tue May 5 11:22:52 2015 +0530 Committer: Jing Zhao <ji...@apache.org> Committed: Sat May 16 15:16:05 2015 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 2 + .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 6 ++- .../server/blockmanagement/BlockManager.java | 22 +++++++++- .../blockmanagement/DatanodeDescriptor.java | 16 ++++---- .../hdfs/server/namenode/FSNamesystem.java | 43 +++++++++++--------- .../hadoop/hdfs/server/namenode/Namesystem.java | 14 ++++++- .../server/protocol/BlockECRecoveryCommand.java | 14 ++++++- .../src/main/proto/erasurecoding.proto | 1 + .../hadoop/hdfs/protocolPB/TestPBHelper.java | 21 ++++++++-- 9 files changed, 102 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8de162/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 77272e7..faec023 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -164,3 +164,5 @@ HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout. (jing9) + + HDFS-8137. Send the EC schema to DataNode via EC encoding/recovering command(umamahesh) http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8de162/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 3cd3e03..e230232 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -3191,8 +3191,10 @@ public class PBHelper { liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue(); } + ECSchema ecSchema = convertECSchema(blockEcRecoveryInfoProto.getEcSchema()); + return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, - targetStorageUuids, convertStorageTypes, liveBlkIndices); + targetStorageUuids, convertStorageTypes, liveBlkIndices, ecSchema); } public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( @@ -3217,6 +3219,8 @@ public class PBHelper { short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices)); + builder.setEcSchema(convertECSchema(blockEcRecoveryInfo.getECSchema())); + return builder.build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8de162/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 1e50348..b55c654 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -65,7 +65,6 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -83,7 +82,10 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; +import org.apache.hadoop.io.erasurecode.ECSchema; + import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; + import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Daemon; @@ -93,6 +95,7 @@ import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1552,10 +1555,25 @@ public class BlockManager { if (block.isStriped()) { assert rw instanceof ErasureCodingWork; assert rw.targets.length > 0; + String src = block.getBlockCollection().getName(); + ECSchema ecSchema = null; + try { + ecSchema = namesystem.getECSchemaForPath(src); + } catch (IOException e) { + blockLog + .warn("Failed to get the EC schema for the file {} ", src); + } + if (ecSchema == null) { + blockLog.warn("No EC schema found for the file {}. " + + "So cannot proceed for recovery", src); + // TODO: we may have to revisit later for what we can do better to + // handle this case. + continue; + } rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( new ExtendedBlock(namesystem.getBlockPoolId(), block), rw.srcNodes, rw.targets, - ((ErasureCodingWork) rw).liveBlockIndicies); + ((ErasureCodingWork) rw).liveBlockIndicies, ecSchema); } else { rw.srcNodes[0].addBlockToBeReplicated(block, targets); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8de162/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 35cc31b..83d3303 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.Arrays; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -51,6 +50,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.Time; @@ -608,15 +608,15 @@ public class DatanodeDescriptor extends DatanodeInfo { /** * Store block erasure coding work. */ - void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources, - DatanodeStorageInfo[] targets, short[] liveBlockIndices) { - assert(block != null && sources != null && sources.length > 0); + void addBlockToBeErasureCoded(ExtendedBlock block, + DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets, + short[] liveBlockIndices, ECSchema ecSchema) { + assert (block != null && sources != null && sources.length > 0); BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets, - liveBlockIndices); + liveBlockIndices, ecSchema); erasurecodeBlocks.offer(task); - BlockManager.LOG.debug("Adding block recovery task " + task + - "to " + getName() + ", current queue size is " + - erasurecodeBlocks.size()); + BlockManager.LOG.debug("Adding block recovery task " + task + "to " + + getName() + ", current queue size is " + erasurecodeBlocks.size()); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8de162/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 2bf89db..c955f33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -7764,25 +7764,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, */ ECInfo getErasureCodingInfo(String src) throws AccessControlException, UnresolvedLinkException, IOException { - checkOperation(OperationCategory.READ); - final byte[][] pathComponents = FSDirectory - .getPathComponentsForReservedPath(src); - final FSPermissionChecker pc = getPermissionChecker(); - readLock(); - try { - checkOperation(OperationCategory.READ); - src = dir.resolvePath(pc, src, pathComponents); - final INodesInPath iip = dir.getINodesInPath(src, true); - if (isPermissionEnabled) { - dir.checkPathAccess(pc, iip, FsAction.READ); - } - // Get schema set for the zone - ECSchema schema = dir.getECSchema(iip); - if (schema != null) { - return new ECInfo(src, schema); - } - } finally { - readUnlock(); + ECSchema schema = getECSchemaForPath(src); + if (schema != null) { + return new ECInfo(src, schema); } return null; } @@ -8024,5 +8008,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } + @Override + public ECSchema getECSchemaForPath(String src) throws IOException { + checkOperation(OperationCategory.READ); + final byte[][] pathComponents = FSDirectory + .getPathComponentsForReservedPath(src); + final FSPermissionChecker pc = getPermissionChecker(); + readLock(); + try { + checkOperation(OperationCategory.READ); + src = dir.resolvePath(pc, src, pathComponents); + final INodesInPath iip = dir.getINodesInPath(src, true); + if (isPermissionEnabled) { + dir.checkPathAccess(pc, iip, FsAction.READ); + } + // Get schema set for the zone + return dir.getECSchema(iip); + } finally { + readUnlock(); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8de162/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index 4695c3f..e6c7fc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.util.RwLock; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; @@ -47,4 +49,14 @@ public interface Namesystem extends RwLock, SafeMode { public void checkOperation(OperationCategory read) throws StandbyException; public boolean isInSnapshot(BlockCollection bc); + + /** + * Gets the ECSchema for the specified path + * + * @param src + * - path + * @return ECSchema + * @throws IOException + */ + public ECSchema getECSchemaForPath(String src) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8de162/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java index 9a387dd..61e49e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.io.erasurecode.ECSchema; import java.util.Arrays; import java.util.Collection; @@ -76,9 +77,11 @@ public class BlockECRecoveryCommand extends DatanodeCommand { private String[] targetStorageIDs; private StorageType[] targetStorageTypes; private final short[] liveBlockIndices; + private final ECSchema ecSchema; public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, - DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices) { + DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices, + ECSchema ecSchema) { this.block = block; this.sources = sources; this.targets = DatanodeStorageInfo.toDatanodeInfos(targetDnStorageInfo); @@ -87,17 +90,20 @@ public class BlockECRecoveryCommand extends DatanodeCommand { this.targetStorageTypes = DatanodeStorageInfo .toStorageTypes(targetDnStorageInfo); this.liveBlockIndices = liveBlockIndices; + this.ecSchema = ecSchema; } public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, DatanodeInfo[] targets, String[] targetStorageIDs, - StorageType[] targetStorageTypes, short[] liveBlockIndices) { + StorageType[] targetStorageTypes, short[] liveBlockIndices, + ECSchema ecSchema) { this.block = block; this.sources = sources; this.targets = targets; this.targetStorageIDs = targetStorageIDs; this.targetStorageTypes = targetStorageTypes; this.liveBlockIndices = liveBlockIndices; + this.ecSchema = ecSchema; } public ExtendedBlock getExtendedBlock() { @@ -123,6 +129,10 @@ public class BlockECRecoveryCommand extends DatanodeCommand { public short[] getLiveBlockIndices() { return liveBlockIndices; } + + public ECSchema getECSchema() { + return ecSchema; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8de162/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto index 59bd949..702f6fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto @@ -99,4 +99,5 @@ message BlockECRecoveryInfoProto { required StorageUuidsProto targetStorageUuids = 4; required StorageTypesProto targetStorageTypes = 5; repeated uint32 liveBlockIndices = 6; + required ECSchemaProto ecSchema = 7; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8de162/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 4ec4ea5..f580cbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -71,8 +70,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.StorageInfo; -import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; @@ -88,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -663,7 +663,7 @@ public class TestPBHelper { short[] liveBlkIndices0 = new short[2]; BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo( new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0, - liveBlkIndices0); + liveBlkIndices0, ECSchemaManager.getSystemDefaultSchema()); DatanodeInfo[] dnInfos1 = new DatanodeInfo[] { DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() }; DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil @@ -677,7 +677,7 @@ public class TestPBHelper { short[] liveBlkIndices1 = new short[2]; BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo( new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1, - liveBlkIndices1); + liveBlkIndices1, ECSchemaManager.getSystemDefaultSchema()); List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>(); blkRecoveryInfosList.add(blkECRecoveryInfo0); blkRecoveryInfosList.add(blkECRecoveryInfo1); @@ -718,6 +718,19 @@ public class TestPBHelper { for (int i = 0; i < liveBlockIndices1.length; i++) { assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]); } + + ECSchema ecSchema1 = blkECRecoveryInfo1.getECSchema(); + ECSchema ecSchema2 = blkECRecoveryInfo2.getECSchema(); + // Compare ECSchemas same as default ECSchema as we used system default + // ECSchema used in this test + compareECSchemas(ECSchemaManager.getSystemDefaultSchema(), ecSchema1); + compareECSchemas(ECSchemaManager.getSystemDefaultSchema(), ecSchema2); + } + + private void compareECSchemas(ECSchema ecSchema1, ECSchema ecSchema2) { + assertEquals(ecSchema1.getSchemaName(), ecSchema2.getSchemaName()); + assertEquals(ecSchema1.getNumDataUnits(), ecSchema2.getNumDataUnits()); + assertEquals(ecSchema1.getNumParityUnits(), ecSchema2.getNumParityUnits()); } private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,