http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 53c6cdb,28ea866..8874c4d --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@@ -23,8 -23,8 +23,9 @@@ import java.util.concurrent.TimeUnit import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.http.HttpConfig; @@@ -171,8 -171,8 +172,10 @@@ public class DFSConfigKeys extends Comm public static final int DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT = 3; public static final String DFS_NAMENODE_REPLICATION_MIN_KEY = "dfs.namenode.replication.min"; public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1; + public static final String DFS_NAMENODE_STRIPE_MIN_KEY = "dfs.namenode.stripe.min"; + public static final int DFS_NAMENODE_STRIPE_MIN_DEFAULT = 1; + public static final String DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY = + "dfs.namenode.safemode.replication.min"; public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY = "dfs.namenode.replication.pending.timeout-sec"; public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1; public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index ac927ef,f4cf4c2..5bf52c5 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@@ -84,9 -85,7 +85,8 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; - import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.Text; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 3217484,d93277c..1e4b899 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@@ -429,10 -422,10 +429,11 @@@ public class ClientNamenodeProtocolServ req.getClientName(), flags); AppendResponseProto.Builder builder = AppendResponseProto.newBuilder(); if (result.getLastBlock() != null) { - builder.setBlock(PBHelper.convertLocatedBlock(result.getLastBlock())); - builder.setBlock(PBHelperClient.convert(result.getLastBlock())); ++ builder.setBlock(PBHelperClient.convertLocatedBlock( ++ result.getLastBlock())); } if (result.getFileStatus() != null) { - builder.setStat(PBHelper.convert(result.getFileStatus())); + builder.setStat(PBHelperClient.convert(result.getFileStatus())); } return builder.build(); } catch (IOException e) { @@@ -505,7 -498,7 +506,7 @@@ (favor == null || favor.size() == 0) ? null : favor .toArray(new String[favor.size()])); return AddBlockResponseProto.newBuilder() - .setBlock(PBHelper.convertLocatedBlock(result)).build(); - .setBlock(PBHelperClient.convert(result)).build(); ++ .setBlock(PBHelperClient.convertLocatedBlock(result)).build(); } catch (IOException e) { throw new ServiceException(e); } @@@ -525,11 -518,11 +526,11 @@@ new DatanodeInfoProto[existingList.size()])), existingStorageIDsList.toArray( new String[existingStorageIDsList.size()]), - PBHelper.convert(excludesList.toArray( - new DatanodeInfoProto[excludesList.size()])), + PBHelperClient.convert(excludesList.toArray( + new DatanodeInfoProto[excludesList.size()])), req.getNumAdditionalNodes(), req.getClientName()); return GetAdditionalDatanodeResponseProto.newBuilder().setBlock( - PBHelper.convertLocatedBlock(result)) - PBHelperClient.convert(result)) ++ PBHelperClient.convertLocatedBlock(result)) .build(); } catch (IOException e) { throw new ServiceException(e); @@@ -555,7 -548,7 +556,7 @@@ ReportBadBlocksRequestProto req) throws ServiceException { try { List<LocatedBlockProto> bl = req.getBlocksList(); - server.reportBadBlocks(PBHelper.convertLocatedBlocks( - server.reportBadBlocks(PBHelperClient.convertLocatedBlock( ++ server.reportBadBlocks(PBHelperClient.convertLocatedBlocks( bl.toArray(new LocatedBlockProto[bl.size()]))); } catch (IOException e) { throw new ServiceException(e); @@@ -960,8 -953,8 +961,8 @@@ RpcController controller, UpdateBlockForPipelineRequestProto req) throws ServiceException { try { - LocatedBlockProto result = PBHelper.convertLocatedBlock( - LocatedBlockProto result = PBHelperClient.convert(server - .updateBlockForPipeline(PBHelperClient.convert(req.getBlock()), ++ LocatedBlockProto result = PBHelperClient.convertLocatedBlock( + server.updateBlockForPipeline(PBHelperClient.convert(req.getBlock()), req.getClientName())); return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result) .build(); @@@ -1401,20 -1394,6 +1402,20 @@@ } @Override + public SetErasureCodingPolicyResponseProto setErasureCodingPolicy( + RpcController controller, SetErasureCodingPolicyRequestProto req) + throws ServiceException { + try { - ErasureCodingPolicy ecPolicy = req.hasEcPolicy() ? PBHelper.convertErasureCodingPolicy(req - .getEcPolicy()) : null; ++ ErasureCodingPolicy ecPolicy = req.hasEcPolicy() ? ++ PBHelperClient.convertErasureCodingPolicy(req.getEcPolicy()) : null; + server.setErasureCodingPolicy(req.getSrc(), ecPolicy); + return SetErasureCodingPolicyResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override public SetXAttrResponseProto setXAttr(RpcController controller, SetXAttrRequestProto req) throws ServiceException { try { @@@ -1535,35 -1514,4 +1536,35 @@@ throw new ServiceException(e); } } + + @Override + public GetErasureCodingPoliciesResponseProto getErasureCodingPolicies(RpcController controller, + GetErasureCodingPoliciesRequestProto request) throws ServiceException { + try { + ErasureCodingPolicy[] ecPolicies = server.getErasureCodingPolicies(); + GetErasureCodingPoliciesResponseProto.Builder resBuilder = GetErasureCodingPoliciesResponseProto + .newBuilder(); + for (ErasureCodingPolicy ecPolicy : ecPolicies) { - resBuilder.addEcPolicies(PBHelper.convertErasureCodingPolicy(ecPolicy)); ++ resBuilder.addEcPolicies(PBHelperClient.convertErasureCodingPolicy(ecPolicy)); + } + return resBuilder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetErasureCodingPolicyResponseProto getErasureCodingPolicy(RpcController controller, + GetErasureCodingPolicyRequestProto request) throws ServiceException { + try { + ErasureCodingPolicy ecPolicy = server.getErasureCodingPolicy(request.getSrc()); + GetErasureCodingPolicyResponseProto.Builder builder = GetErasureCodingPolicyResponseProto.newBuilder(); + if (ecPolicy != null) { - builder.setEcPolicy(PBHelper.convertErasureCodingPolicy(ecPolicy)); ++ builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(ecPolicy)); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 8419244,f4ce46d..7b02691 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@@ -338,9 -327,9 +338,9 @@@ public class ClientNamenodeProtocolTran .build(); try { AppendResponseProto res = rpcProxy.append(null, req); - LocatedBlock lastBlock = res.hasBlock() ? PBHelper + LocatedBlock lastBlock = res.hasBlock() ? PBHelperClient - .convert(res.getBlock()) : null; + .convertLocatedBlockProto(res.getBlock()) : null; - HdfsFileStatus stat = (res.hasStat()) ? PBHelper.convert(res.getStat()) + HdfsFileStatus stat = (res.hasStat()) ? PBHelperClient.convert(res.getStat()) : null; return new LastBlockWithStatus(lastBlock, stat); } catch (ServiceException e) { @@@ -427,8 -416,7 +427,8 @@@ req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } try { - return PBHelper.convertLocatedBlockProto( - return PBHelperClient.convert(rpcProxy.addBlock(null, req.build()).getBlock()); ++ return PBHelperClient.convertLocatedBlockProto( + rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@@ -453,8 -441,8 +453,8 @@@ .setClientName(clientName) .build(); try { - return PBHelper.convertLocatedBlockProto( - return PBHelperClient.convert(rpcProxy.getAdditionalDatanode(null, req) - .getBlock()); ++ return PBHelperClient.convertLocatedBlockProto( + rpcProxy.getAdditionalDatanode(null, req).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@@ -481,7 -469,7 +481,7 @@@ @Override public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { ReportBadBlocksRequestProto req = ReportBadBlocksRequestProto.newBuilder() - .addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlocks(blocks))) - .addAllBlocks(Arrays.asList(PBHelperClient.convertLocatedBlock(blocks))) ++ .addAllBlocks(Arrays.asList(PBHelperClient.convertLocatedBlocks(blocks))) .build(); try { rpcProxy.reportBadBlocks(null, req); @@@ -913,7 -901,7 +913,7 @@@ .setClientName(clientName) .build(); try { - return PBHelper.convertLocatedBlockProto( - return PBHelperClient.convert( ++ return PBHelperClient.convertLocatedBlockProto( rpcProxy.updateBlockForPipeline(null, req).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); @@@ -1419,23 -1407,6 +1419,23 @@@ } @Override + public void setErasureCodingPolicy(String src, ErasureCodingPolicy ecPolicy) + throws IOException { + final SetErasureCodingPolicyRequestProto.Builder builder = + SetErasureCodingPolicyRequestProto.newBuilder(); + builder.setSrc(src); + if (ecPolicy != null) { - builder.setEcPolicy(PBHelper.convertErasureCodingPolicy(ecPolicy)); ++ builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(ecPolicy)); + } + SetErasureCodingPolicyRequestProto req = builder.build(); + try { + rpcProxy.setErasureCodingPolicy(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag) throws IOException { SetXAttrRequestProto req = SetXAttrRequestProto.newBuilder() @@@ -1557,37 -1528,4 +1557,37 @@@ throw ProtobufHelper.getRemoteException(e); } } + + @Override + public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException { + try { + GetErasureCodingPoliciesResponseProto response = rpcProxy + .getErasureCodingPolicies(null, VOID_GET_EC_POLICIES_REQUEST); + ErasureCodingPolicy[] ecPolicies = + new ErasureCodingPolicy[response.getEcPoliciesCount()]; + int i = 0; + for (ErasureCodingPolicyProto ecPolicyProto : response.getEcPoliciesList()) { - ecPolicies[i++] = PBHelper.convertErasureCodingPolicy(ecPolicyProto); ++ ecPolicies[i++] = PBHelperClient.convertErasureCodingPolicy(ecPolicyProto); + } + return ecPolicies; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException { + GetErasureCodingPolicyRequestProto req = GetErasureCodingPolicyRequestProto.newBuilder() + .setSrc(src).build(); + try { + GetErasureCodingPolicyResponseProto response = rpcProxy.getErasureCodingPolicy( + null, req); + if (response.hasEcPolicy()) { - return PBHelper.convertErasureCodingPolicy(response.getEcPolicy()); ++ return PBHelperClient.convertErasureCodingPolicy(response.getEcPolicy()); + } + return null; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index f20e58a,18f89f8..194e563 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@@ -281,7 -281,7 +281,7 @@@ public class DatanodeProtocolClientSide ReportBadBlocksRequestProto.Builder builder = ReportBadBlocksRequestProto .newBuilder(); for (int i = 0; i < blocks.length; i++) { - builder.addBlocks(i, PBHelper.convertLocatedBlock(blocks[i])); - builder.addBlocks(i, PBHelperClient.convert(blocks[i])); ++ builder.addBlocks(i, PBHelperClient.convertLocatedBlock(blocks[i])); } ReportBadBlocksRequestProto req = builder.build(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 1ff80b3,94d1f0c..a1ea9a6 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@@ -259,7 -259,7 +259,7 @@@ public class DatanodeProtocolServerSide List<LocatedBlockProto> lbps = request.getBlocksList(); LocatedBlock [] blocks = new LocatedBlock [lbps.size()]; for(int i=0; i<lbps.size(); i++) { - blocks[i] = PBHelper.convertLocatedBlockProto(lbps.get(i)); - blocks[i] = PBHelperClient.convert(lbps.get(i)); ++ blocks[i] = PBHelperClient.convertLocatedBlockProto(lbps.get(i)); } try { impl.reportBadBlocks(blocks); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index ce39e15,3de4513..ece9984 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@@ -17,108 -17,23 +17,25 @@@ */ package org.apache.hadoop.hdfs.protocolPB; - import static com.google.common.base.Preconditions.checkNotNull; - import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos - .EncryptionZoneProto; - import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherSuiteProto; - import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto; - - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; - import java.util.EnumSet; - import java.util.HashMap; import java.util.List; - import java.util.Map; - import java.util.Map.Entry; - import java.util.Set; - - import org.apache.hadoop.fs.CacheFlag; - import org.apache.hadoop.fs.ContentSummary; - import org.apache.hadoop.fs.CreateFlag; - import org.apache.hadoop.fs.FsServerDefaults; - import org.apache.hadoop.fs.Path; + + import com.google.protobuf.ByteString; + import org.apache.hadoop.fs.StorageType; - import org.apache.hadoop.fs.XAttr; - import org.apache.hadoop.fs.XAttrSetFlag; - import org.apache.hadoop.fs.permission.AclEntry; - import org.apache.hadoop.fs.permission.AclEntryScope; - import org.apache.hadoop.fs.permission.AclEntryType; - import org.apache.hadoop.fs.permission.AclStatus; - import org.apache.hadoop.fs.permission.FsAction; - import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.hdfs.DFSUtilClient; - import org.apache.hadoop.hdfs.inotify.EventBatch; - import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; - import org.apache.hadoop.hdfs.inotify.Event; - import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.Block; - import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; - import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; - import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; - import org.apache.hadoop.hdfs.protocol.CachePoolEntry; - import org.apache.hadoop.hdfs.protocol.CachePoolInfo; - import org.apache.hadoop.hdfs.protocol.CachePoolStats; - import org.apache.hadoop.crypto.CipherOption; - import org.apache.hadoop.crypto.CipherSuite; - import org.apache.hadoop.hdfs.protocol.ClientProtocol; - import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; - import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; - import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; - import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; - import org.apache.hadoop.hdfs.protocol.DirectoryListing; - import org.apache.hadoop.hdfs.protocol.EncryptionZone; - import org.apache.hadoop.hdfs.protocol.ExtendedBlock; - import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; - import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; - import org.apache.hadoop.hdfs.protocol.HdfsConstants; - import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; - import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; - import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; - import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; - import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; - import org.apache.hadoop.hdfs.protocol.LocatedBlock; - import org.apache.hadoop.hdfs.protocol.LocatedBlocks; - import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; - import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; - import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; - import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; - import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; - import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; - import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; - import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto; - import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryScopeProto; - import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTypeProto; - import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto; - import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto; - import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto; - import org.apache.hadoop.hdfs.protocol.proto.*; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheFlagProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto; - import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; - import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; + import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; @@@ -130,14 -45,8 +47,17 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto; ++import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; @@@ -381,73 -168,23 +182,34 @@@ public class PBHelper si, convert(reg.getRole())); } - // DatanodeId - public static DatanodeID convert(DatanodeIDProto dn) { - return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getDatanodeUuid(), - dn.getXferPort(), dn.getInfoPort(), dn.hasInfoSecurePort() ? dn - .getInfoSecurePort() : 0, dn.getIpcPort()); - } - - // Arrays of DatanodeId - public static DatanodeIDProto[] convert(DatanodeID[] did) { - if (did == null) - return null; - final int len = did.length; - DatanodeIDProto[] result = new DatanodeIDProto[len]; - for (int i = 0; i < len; ++i) { - result[i] = PBHelperClient.convert(did[i]); - } - return result; - } - - public static DatanodeID[] convert(DatanodeIDProto[] did) { - if (did == null) return null; - final int len = did.length; - DatanodeID[] result = new DatanodeID[len]; - for (int i = 0; i < len; ++i) { - result[i] = convert(did[i]); - } - return result; - } - - // Block - public static BlockProto convert(Block b) { - return BlockProto.newBuilder().setBlockId(b.getBlockId()) - .setGenStamp(b.getGenerationStamp()).setNumBytes(b.getNumBytes()) - .build(); - } - - public static Block convert(BlockProto b) { - return new Block(b.getBlockId(), b.getNumBytes(), b.getGenStamp()); - } - public static BlockWithLocationsProto convert(BlockWithLocations blk) { - return BlockWithLocationsProto.newBuilder() - .setBlock(PBHelperClient.convert(blk.getBlock())) + BlockWithLocationsProto.Builder builder = BlockWithLocationsProto - .newBuilder().setBlock(convert(blk.getBlock())) ++ .newBuilder().setBlock(PBHelperClient.convert(blk.getBlock())) .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids())) .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())) - .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes())) - .build(); + .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes())); + if (blk instanceof StripedBlockWithLocations) { + StripedBlockWithLocations sblk = (StripedBlockWithLocations) blk; - builder.setIndices(getByteString(sblk.getIndices())); ++ builder.setIndices(PBHelperClient.getByteString(sblk.getIndices())); + builder.setDataBlockNum(sblk.getDataBlockNum()); + } + return builder.build(); } public static BlockWithLocations convert(BlockWithLocationsProto b) { final List<String> datanodeUuids = b.getDatanodeUuidsList(); final List<String> storageUuids = b.getStorageUuidsList(); final List<StorageTypeProto> storageTypes = b.getStorageTypesList(); - BlockWithLocations blk = new BlockWithLocations(convert(b.getBlock()), - return new BlockWithLocations(PBHelperClient.convert(b.getBlock()), ++ BlockWithLocations blk = new BlockWithLocations(PBHelperClient. ++ convert(b.getBlock()), datanodeUuids.toArray(new String[datanodeUuids.size()]), storageUuids.toArray(new String[storageUuids.size()]), - convertStorageTypes(storageTypes, storageUuids.size())); + PBHelperClient.convertStorageTypes(storageTypes, storageUuids.size())); + if (b.hasIndices()) { + blk = new StripedBlockWithLocations(blk, b.getIndices().toByteArray(), + (short) b.getDataBlockNum()); + } + return blk; } public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { @@@ -596,7 -333,7 +358,7 @@@ if (b == null) { return null; } - LocatedBlockProto lb = PBHelper.convertLocatedBlock(b); - LocatedBlockProto lb = PBHelperClient.convert((LocatedBlock) b); ++ LocatedBlockProto lb = PBHelperClient.convertLocatedBlock(b); RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder(); builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp()); if(b.getNewBlock() != null) @@@ -606,206 -343,12 +368,12 @@@ public static RecoveringBlock convert(RecoveringBlockProto b) { ExtendedBlock block = PBHelperClient.convert(b.getBlock().getB()); - DatanodeInfo[] locs = convert(b.getBlock().getLocsList()); + DatanodeInfo[] locs = PBHelperClient.convert(b.getBlock().getLocsList()); return (b.hasTruncateBlock()) ? - new RecoveringBlock(block, locs, PBHelper.convert(b.getTruncateBlock())) : + new RecoveringBlock(block, locs, PBHelperClient.convert(b.getTruncateBlock())) : new RecoveringBlock(block, locs, b.getNewGenStamp()); } - - static public DatanodeInfo convert(DatanodeInfoProto di) { - if (di == null) return null; - return new DatanodeInfo( - PBHelper.convert(di.getId()), - di.hasLocation() ? di.getLocation() : null , - di.getCapacity(), di.getDfsUsed(), di.getRemaining(), - di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(), - di.getLastUpdate(), di.getLastUpdateMonotonic(), - di.getXceiverCount(), PBHelper.convert(di.getAdminState())); - } - - static public DatanodeInfo[] convert(DatanodeInfoProto di[]) { - if (di == null) return null; - DatanodeInfo[] result = new DatanodeInfo[di.length]; - for (int i = 0; i < di.length; i++) { - result[i] = convert(di[i]); - } - return result; - } - - public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) { - DatanodeInfo[] info = new DatanodeInfo[list.size()]; - for (int i = 0; i < info.length; i++) { - info[i] = convert(list.get(i)); - } - return info; - } - - public static DatanodeStorageReportProto convertDatanodeStorageReport( - DatanodeStorageReport report) { - return DatanodeStorageReportProto.newBuilder() - .setDatanodeInfo(PBHelperClient.convert(report.getDatanodeInfo())) - .addAllStorageReports(convertStorageReports(report.getStorageReports())) - .build(); - } - - public static List<DatanodeStorageReportProto> convertDatanodeStorageReports( - DatanodeStorageReport[] reports) { - final List<DatanodeStorageReportProto> protos - = new ArrayList<DatanodeStorageReportProto>(reports.length); - for(int i = 0; i < reports.length; i++) { - protos.add(convertDatanodeStorageReport(reports[i])); - } - return protos; - } - - public static DatanodeStorageReport convertDatanodeStorageReport( - DatanodeStorageReportProto proto) { - return new DatanodeStorageReport( - convert(proto.getDatanodeInfo()), - convertStorageReports(proto.getStorageReportsList())); - } - - public static DatanodeStorageReport[] convertDatanodeStorageReports( - List<DatanodeStorageReportProto> protos) { - final DatanodeStorageReport[] reports - = new DatanodeStorageReport[protos.size()]; - for(int i = 0; i < reports.length; i++) { - reports[i] = convertDatanodeStorageReport(protos.get(i)); - } - return reports; - } - - public static AdminStates convert(AdminState adminState) { - switch(adminState) { - case DECOMMISSION_INPROGRESS: - return AdminStates.DECOMMISSION_INPROGRESS; - case DECOMMISSIONED: - return AdminStates.DECOMMISSIONED; - case NORMAL: - default: - return AdminStates.NORMAL; - } - } - - public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) { - if (b == null) return null; - Builder builder = LocatedBlockProto.newBuilder(); - DatanodeInfo[] locs = b.getLocations(); - List<DatanodeInfo> cachedLocs = - Lists.newLinkedList(Arrays.asList(b.getCachedLocations())); - for (int i = 0; i < locs.length; i++) { - DatanodeInfo loc = locs[i]; - builder.addLocs(i, PBHelperClient.convert(loc)); - boolean locIsCached = cachedLocs.contains(loc); - builder.addIsCached(locIsCached); - if (locIsCached) { - cachedLocs.remove(loc); - } - } - Preconditions.checkArgument(cachedLocs.size() == 0, - "Found additional cached replica locations that are not in the set of" - + " storage-backed locations!"); - - StorageType[] storageTypes = b.getStorageTypes(); - if (storageTypes != null) { - for (StorageType storageType : storageTypes) { - builder.addStorageTypes(PBHelperClient.convertStorageType(storageType)); - } - } - final String[] storageIDs = b.getStorageIDs(); - if (storageIDs != null) { - builder.addAllStorageIDs(Arrays.asList(storageIDs)); - } - if (b instanceof LocatedStripedBlock) { - LocatedStripedBlock sb = (LocatedStripedBlock) b; - int[] indices = sb.getBlockIndices(); - Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens(); - for (int i = 0; i < indices.length; i++) { - builder.addBlockIndex(indices[i]); - builder.addBlockTokens(PBHelperClient.convert(blockTokens[i])); - } - } - - return builder.setB(PBHelperClient.convert(b.getBlock())) - .setBlockToken(PBHelperClient.convert(b.getBlockToken())) - .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); - } - - public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) { - if (proto == null) return null; - List<DatanodeInfoProto> locs = proto.getLocsList(); - DatanodeInfo[] targets = new DatanodeInfo[locs.size()]; - for (int i = 0; i < locs.size(); i++) { - targets[i] = PBHelper.convert(locs.get(i)); - } - - final StorageType[] storageTypes = convertStorageTypes( - proto.getStorageTypesList(), locs.size()); - - final int storageIDsCount = proto.getStorageIDsCount(); - final String[] storageIDs; - if (storageIDsCount == 0) { - storageIDs = null; - } else { - Preconditions.checkState(storageIDsCount == locs.size()); - storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]); - } - - int[] indices = null; - final int indexCount = proto.getBlockIndexCount(); - if (indexCount > 0) { - indices = new int[indexCount]; - for (int i = 0; i < indexCount; i++) { - indices[i] = proto.getBlockIndex(i); - } - } - - // Set values from the isCached list, re-using references from loc - List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size()); - List<Boolean> isCachedList = proto.getIsCachedList(); - for (int i=0; i<isCachedList.size(); i++) { - if (isCachedList.get(i)) { - cachedLocs.add(targets[i]); - } - } - - final LocatedBlock lb; - if (indices == null) { - lb = new LocatedBlock(PBHelperClient.convert(proto.getB()), targets, - storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(), - cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); - } else { - lb = new LocatedStripedBlock(PBHelperClient.convert(proto.getB()), targets, - storageIDs, storageTypes, indices, proto.getOffset(), - proto.getCorrupt(), - cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); - List<TokenProto> tokenProtos = proto.getBlockTokensList(); - Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length]; - for (int i = 0; i < indices.length; i++) { - blockTokens[i] = PBHelper.convert(tokenProtos.get(i)); - } - ((LocatedStripedBlock) lb).setBlockTokens(blockTokens); - } - lb.setBlockToken(PBHelper.convert(proto.getBlockToken())); - - return lb; - } - - public static Token<BlockTokenIdentifier> convert( - TokenProto blockToken) { - return new Token<BlockTokenIdentifier>(blockToken.getIdentifier() - .toByteArray(), blockToken.getPassword().toByteArray(), new Text( - blockToken.getKind()), new Text(blockToken.getService())); - } - -- - public static Token<DelegationTokenIdentifier> convertDelegationToken( - TokenProto blockToken) { - return new Token<DelegationTokenIdentifier>(blockToken.getIdentifier() - .toByteArray(), blockToken.getPassword().toByteArray(), new Text( - blockToken.getKind()), new Text(blockToken.getService())); - } + public static ReplicaState convert(ReplicaStateProto state) { switch (state) { case RBW: @@@ -1198,549 -704,8 +736,7 @@@ .setCapabilities(info.getCapabilities()) .build(); } - - // Located Block Arrays and Lists - public static LocatedBlockProto[] convertLocatedBlocks(LocatedBlock[] lb) { - if (lb == null) return null; - return convertLocatedBlocks2(Arrays.asList(lb)) - .toArray(new LocatedBlockProto[lb.length]); - } - - public static LocatedBlock[] convertLocatedBlocks(LocatedBlockProto[] lb) { - if (lb == null) return null; - return convertLocatedBlocks(Arrays.asList(lb)) - .toArray(new LocatedBlock[lb.length]); - } - - public static List<LocatedBlock> convertLocatedBlocks( - List<LocatedBlockProto> lb) { - if (lb == null) return null; - final int len = lb.size(); - List<LocatedBlock> result = new ArrayList<>(len); - for (LocatedBlockProto aLb : lb) { - result.add(PBHelper.convertLocatedBlockProto(aLb)); - } - return result; - } - - public static List<LocatedBlockProto> convertLocatedBlocks2( - List<LocatedBlock> lb) { - if (lb == null) return null; - final int len = lb.size(); - List<LocatedBlockProto> result = new ArrayList<>(len); - for (LocatedBlock aLb : lb) { - result.add(PBHelper.convertLocatedBlock(aLb)); - } - return result; - } - - - // LocatedBlocks - public static LocatedBlocks convert(LocatedBlocksProto lb) { - return new LocatedBlocks( - lb.getFileLength(), lb.getUnderConstruction(), - PBHelper.convertLocatedBlocks(lb.getBlocksList()), - lb.hasLastBlock() ? - PBHelper.convertLocatedBlockProto(lb.getLastBlock()) : null, - lb.getIsLastBlockComplete(), - lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) : null, - lb.hasEcPolicy() ? convertErasureCodingPolicy(lb.getEcPolicy()) : null); - } - - public static LocatedBlocksProto convert(LocatedBlocks lb) { - if (lb == null) { - return null; - } - LocatedBlocksProto.Builder builder = - LocatedBlocksProto.newBuilder(); - if (lb.getLastLocatedBlock() != null) { - builder.setLastBlock( - PBHelper.convertLocatedBlock(lb.getLastLocatedBlock())); - } - if (lb.getFileEncryptionInfo() != null) { - builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo())); - } - if (lb.getErasureCodingPolicy() != null) { - builder.setEcPolicy(convertErasureCodingPolicy(lb.getErasureCodingPolicy())); - } - return builder.setFileLength(lb.getFileLength()) - .setUnderConstruction(lb.isUnderConstruction()) - .addAllBlocks(PBHelper.convertLocatedBlocks2(lb.getLocatedBlocks())) - .setIsLastBlockComplete(lb.isLastBlockComplete()).build(); - } - - // DataEncryptionKey - public static DataEncryptionKey convert(DataEncryptionKeyProto bet) { - String encryptionAlgorithm = bet.getEncryptionAlgorithm(); - return new DataEncryptionKey(bet.getKeyId(), - bet.getBlockPoolId(), - bet.getNonce().toByteArray(), - bet.getEncryptionKey().toByteArray(), - bet.getExpiryDate(), - encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm); - } - - public static DataEncryptionKeyProto convert(DataEncryptionKey bet) { - DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder() - .setKeyId(bet.keyId) - .setBlockPoolId(bet.blockPoolId) - .setNonce(ByteString.copyFrom(bet.nonce)) - .setEncryptionKey(ByteString.copyFrom(bet.encryptionKey)) - .setExpiryDate(bet.expiryDate); - if (bet.encryptionAlgorithm != null) { - b.setEncryptionAlgorithm(bet.encryptionAlgorithm); - } - return b.build(); - } - - public static FsServerDefaults convert(FsServerDefaultsProto fs) { - if (fs == null) return null; - return new FsServerDefaults( - fs.getBlockSize(), fs.getBytesPerChecksum(), - fs.getWritePacketSize(), (short) fs.getReplication(), - fs.getFileBufferSize(), - fs.getEncryptDataTransfer(), - fs.getTrashInterval(), - PBHelperClient.convert(fs.getChecksumType())); - } - - public static FsServerDefaultsProto convert(FsServerDefaults fs) { - if (fs == null) return null; - return FsServerDefaultsProto.newBuilder(). - setBlockSize(fs.getBlockSize()). - setBytesPerChecksum(fs.getBytesPerChecksum()). - setWritePacketSize(fs.getWritePacketSize()) - .setReplication(fs.getReplication()) - .setFileBufferSize(fs.getFileBufferSize()) - .setEncryptDataTransfer(fs.getEncryptDataTransfer()) - .setTrashInterval(fs.getTrashInterval()) - .setChecksumType(PBHelperClient.convert(fs.getChecksumType())) - .build(); - } - - public static FsPermissionProto convert(FsPermission p) { - return FsPermissionProto.newBuilder().setPerm(p.toExtendedShort()).build(); - } - - public static FsPermission convert(FsPermissionProto p) { - return new FsPermissionExtension((short)p.getPerm()); - } - - - // The creatFlag field in PB is a bitmask whose values are the same a the - // emum values of CreateFlag - public static int convertCreateFlag(EnumSetWritable<CreateFlag> flag) { - int value = 0; - if (flag.contains(CreateFlag.APPEND)) { - value |= CreateFlagProto.APPEND.getNumber(); - } - if (flag.contains(CreateFlag.CREATE)) { - value |= CreateFlagProto.CREATE.getNumber(); - } - if (flag.contains(CreateFlag.OVERWRITE)) { - value |= CreateFlagProto.OVERWRITE.getNumber(); - } - if (flag.contains(CreateFlag.LAZY_PERSIST)) { - value |= CreateFlagProto.LAZY_PERSIST.getNumber(); - } - if (flag.contains(CreateFlag.NEW_BLOCK)) { - value |= CreateFlagProto.NEW_BLOCK.getNumber(); - } - return value; - } - - public static EnumSetWritable<CreateFlag> convertCreateFlag(int flag) { - EnumSet<CreateFlag> result = - EnumSet.noneOf(CreateFlag.class); - if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) { - result.add(CreateFlag.APPEND); - } - if ((flag & CreateFlagProto.CREATE_VALUE) == CreateFlagProto.CREATE_VALUE) { - result.add(CreateFlag.CREATE); - } - if ((flag & CreateFlagProto.OVERWRITE_VALUE) - == CreateFlagProto.OVERWRITE_VALUE) { - result.add(CreateFlag.OVERWRITE); - } - if ((flag & CreateFlagProto.LAZY_PERSIST_VALUE) - == CreateFlagProto.LAZY_PERSIST_VALUE) { - result.add(CreateFlag.LAZY_PERSIST); - } - if ((flag & CreateFlagProto.NEW_BLOCK_VALUE) - == CreateFlagProto.NEW_BLOCK_VALUE) { - result.add(CreateFlag.NEW_BLOCK); - } - return new EnumSetWritable<CreateFlag>(result, CreateFlag.class); - } - - public static int convertCacheFlags(EnumSet<CacheFlag> flags) { - int value = 0; - if (flags.contains(CacheFlag.FORCE)) { - value |= CacheFlagProto.FORCE.getNumber(); - } - return value; - } - - public static EnumSet<CacheFlag> convertCacheFlags(int flags) { - EnumSet<CacheFlag> result = EnumSet.noneOf(CacheFlag.class); - if ((flags & CacheFlagProto.FORCE_VALUE) == CacheFlagProto.FORCE_VALUE) { - result.add(CacheFlag.FORCE); - } - return result; - } - - public static HdfsFileStatus convert(HdfsFileStatusProto fs) { - if (fs == null) - return null; - return new HdfsLocatedFileStatus( - fs.getLength(), fs.getFileType().equals(FileType.IS_DIR), - fs.getBlockReplication(), fs.getBlocksize(), - fs.getModificationTime(), fs.getAccessTime(), - PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(), - fs.getFileType().equals(FileType.IS_SYMLINK) ? - fs.getSymlink().toByteArray() : null, - fs.getPath().toByteArray(), - fs.hasFileId()? fs.getFileId(): HdfsConstants.GRANDFATHER_INODE_ID, - fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null, - fs.hasChildrenNum() ? fs.getChildrenNum() : -1, - fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null, - fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy() - : HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, - fs.hasEcPolicy() ? PBHelper.convertErasureCodingPolicy(fs.getEcPolicy()) : null); - } - - public static SnapshottableDirectoryStatus convert( - SnapshottableDirectoryStatusProto sdirStatusProto) { - if (sdirStatusProto == null) { - return null; - } - final HdfsFileStatusProto status = sdirStatusProto.getDirStatus(); - return new SnapshottableDirectoryStatus( - status.getModificationTime(), - status.getAccessTime(), - PBHelper.convert(status.getPermission()), - status.getOwner(), - status.getGroup(), - status.getPath().toByteArray(), - status.getFileId(), - status.getChildrenNum(), - sdirStatusProto.getSnapshotNumber(), - sdirStatusProto.getSnapshotQuota(), - sdirStatusProto.getParentFullpath().toByteArray()); - } - - public static HdfsFileStatusProto convert(HdfsFileStatus fs) { - if (fs == null) - return null; - FileType fType = FileType.IS_FILE; - if (fs.isDir()) { - fType = FileType.IS_DIR; - } else if (fs.isSymlink()) { - fType = FileType.IS_SYMLINK; - } - - HdfsFileStatusProto.Builder builder = - HdfsFileStatusProto.newBuilder(). - setLength(fs.getLen()). - setFileType(fType). - setBlockReplication(fs.getReplication()). - setBlocksize(fs.getBlockSize()). - setModificationTime(fs.getModificationTime()). - setAccessTime(fs.getAccessTime()). - setPermission(PBHelper.convert(fs.getPermission())). - setOwner(fs.getOwner()). - setGroup(fs.getGroup()). - setFileId(fs.getFileId()). - setChildrenNum(fs.getChildrenNum()). - setPath(ByteString.copyFrom(fs.getLocalNameInBytes())). - setStoragePolicy(fs.getStoragePolicy()); - if (fs.isSymlink()) { - builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes())); - } - if (fs.getFileEncryptionInfo() != null) { - builder.setFileEncryptionInfo(convert(fs.getFileEncryptionInfo())); - } - if (fs instanceof HdfsLocatedFileStatus) { - final HdfsLocatedFileStatus lfs = (HdfsLocatedFileStatus) fs; - LocatedBlocks locations = lfs.getBlockLocations(); - if (locations != null) { - builder.setLocations(PBHelper.convert(locations)); - } - } - if(fs.getErasureCodingPolicy() != null) { - builder.setEcPolicy(PBHelper.convertErasureCodingPolicy(fs.getErasureCodingPolicy())); - } - return builder.build(); - } - - public static SnapshottableDirectoryStatusProto convert( - SnapshottableDirectoryStatus status) { - if (status == null) { - return null; - } - int snapshotNumber = status.getSnapshotNumber(); - int snapshotQuota = status.getSnapshotQuota(); - byte[] parentFullPath = status.getParentFullPath(); - ByteString parentFullPathBytes = ByteString.copyFrom( - parentFullPath == null ? DFSUtilClient.EMPTY_BYTES : parentFullPath); - HdfsFileStatusProto fs = convert(status.getDirStatus()); - SnapshottableDirectoryStatusProto.Builder builder = - SnapshottableDirectoryStatusProto - .newBuilder().setSnapshotNumber(snapshotNumber) - .setSnapshotQuota(snapshotQuota).setParentFullpath(parentFullPathBytes) - .setDirStatus(fs); - return builder.build(); - } - - public static HdfsFileStatusProto[] convert(HdfsFileStatus[] fs) { - if (fs == null) return null; - final int len = fs.length; - HdfsFileStatusProto[] result = new HdfsFileStatusProto[len]; - for (int i = 0; i < len; ++i) { - result[i] = PBHelper.convert(fs[i]); - } - return result; - } - - public static HdfsFileStatus[] convert(HdfsFileStatusProto[] fs) { - if (fs == null) return null; - final int len = fs.length; - HdfsFileStatus[] result = new HdfsFileStatus[len]; - for (int i = 0; i < len; ++i) { - result[i] = PBHelper.convert(fs[i]); - } - return result; - } - - public static DirectoryListing convert(DirectoryListingProto dl) { - if (dl == null) - return null; - List<HdfsFileStatusProto> partList = dl.getPartialListingList(); - return new DirectoryListing( - partList.isEmpty() ? new HdfsLocatedFileStatus[0] - : PBHelper.convert( - partList.toArray(new HdfsFileStatusProto[partList.size()])), - dl.getRemainingEntries()); - } - - public static DirectoryListingProto convert(DirectoryListing d) { - if (d == null) - return null; - return DirectoryListingProto.newBuilder(). - addAllPartialListing(Arrays.asList( - PBHelper.convert(d.getPartialListing()))). - setRemainingEntries(d.getRemainingEntries()). - build(); - } - - public static long[] convert(GetFsStatsResponseProto res) { - long[] result = new long[7]; - result[ClientProtocol.GET_STATS_CAPACITY_IDX] = res.getCapacity(); - result[ClientProtocol.GET_STATS_USED_IDX] = res.getUsed(); - result[ClientProtocol.GET_STATS_REMAINING_IDX] = res.getRemaining(); - result[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = res.getUnderReplicated(); - result[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = res.getCorruptBlocks(); - result[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = res.getMissingBlocks(); - result[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX] = - res.getMissingReplOneBlocks(); - return result; - } - - public static GetFsStatsResponseProto convert(long[] fsStats) { - GetFsStatsResponseProto.Builder result = GetFsStatsResponseProto - .newBuilder(); - if (fsStats.length >= ClientProtocol.GET_STATS_CAPACITY_IDX + 1) - result.setCapacity(fsStats[ClientProtocol.GET_STATS_CAPACITY_IDX]); - if (fsStats.length >= ClientProtocol.GET_STATS_USED_IDX + 1) - result.setUsed(fsStats[ClientProtocol.GET_STATS_USED_IDX]); - if (fsStats.length >= ClientProtocol.GET_STATS_REMAINING_IDX + 1) - result.setRemaining(fsStats[ClientProtocol.GET_STATS_REMAINING_IDX]); - if (fsStats.length >= ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX + 1) - result.setUnderReplicated( - fsStats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]); - if (fsStats.length >= ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX + 1) - result.setCorruptBlocks( - fsStats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX]); - if (fsStats.length >= ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX + 1) - result.setMissingBlocks( - fsStats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]); - if (fsStats.length >= ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX + 1) - result.setMissingReplOneBlocks( - fsStats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX]); - return result.build(); - } - - public static DatanodeReportTypeProto - convert(DatanodeReportType t) { - switch (t) { - case ALL: return DatanodeReportTypeProto.ALL; - case LIVE: return DatanodeReportTypeProto.LIVE; - case DEAD: return DatanodeReportTypeProto.DEAD; - case DECOMMISSIONING: return DatanodeReportTypeProto.DECOMMISSIONING; - default: - throw new IllegalArgumentException("Unexpected data type report:" + t); - } - } - - public static DatanodeReportType - convert(DatanodeReportTypeProto t) { - switch (t) { - case ALL: return DatanodeReportType.ALL; - case LIVE: return DatanodeReportType.LIVE; - case DEAD: return DatanodeReportType.DEAD; - case DECOMMISSIONING: return DatanodeReportType.DECOMMISSIONING; - default: - throw new IllegalArgumentException("Unexpected data type report:" + t); - } - } - - public static SafeModeActionProto convert( - SafeModeAction a) { - switch (a) { - case SAFEMODE_LEAVE: - return SafeModeActionProto.SAFEMODE_LEAVE; - case SAFEMODE_ENTER: - return SafeModeActionProto.SAFEMODE_ENTER; - case SAFEMODE_GET: - return SafeModeActionProto.SAFEMODE_GET; - default: - throw new IllegalArgumentException("Unexpected SafeModeAction :" + a); - } - } - - public static SafeModeAction convert( - ClientNamenodeProtocolProtos.SafeModeActionProto a) { - switch (a) { - case SAFEMODE_LEAVE: - return SafeModeAction.SAFEMODE_LEAVE; - case SAFEMODE_ENTER: - return SafeModeAction.SAFEMODE_ENTER; - case SAFEMODE_GET: - return SafeModeAction.SAFEMODE_GET; - default: - throw new IllegalArgumentException("Unexpected SafeModeAction :" + a); - } - } - - public static RollingUpgradeActionProto convert(RollingUpgradeAction a) { - switch (a) { - case QUERY: - return RollingUpgradeActionProto.QUERY; - case PREPARE: - return RollingUpgradeActionProto.START; - case FINALIZE: - return RollingUpgradeActionProto.FINALIZE; - default: - throw new IllegalArgumentException("Unexpected value: " + a); - } - } - - public static RollingUpgradeAction convert(RollingUpgradeActionProto a) { - switch (a) { - case QUERY: - return RollingUpgradeAction.QUERY; - case START: - return RollingUpgradeAction.PREPARE; - case FINALIZE: - return RollingUpgradeAction.FINALIZE; - default: - throw new IllegalArgumentException("Unexpected value: " + a); - } - } - - public static RollingUpgradeStatusProto convertRollingUpgradeStatus( - RollingUpgradeStatus status) { - return RollingUpgradeStatusProto.newBuilder() - .setBlockPoolId(status.getBlockPoolId()) - .setFinalized(status.isFinalized()) - .build(); - } - - public static RollingUpgradeStatus convert(RollingUpgradeStatusProto proto) { - return new RollingUpgradeStatus(proto.getBlockPoolId(), - proto.getFinalized()); - } - - public static RollingUpgradeInfoProto convert(RollingUpgradeInfo info) { - return RollingUpgradeInfoProto.newBuilder() - .setStatus(convertRollingUpgradeStatus(info)) - .setCreatedRollbackImages(info.createdRollbackImages()) - .setStartTime(info.getStartTime()) - .setFinalizeTime(info.getFinalizeTime()) - .build(); - } - - public static RollingUpgradeInfo convert(RollingUpgradeInfoProto proto) { - RollingUpgradeStatusProto status = proto.getStatus(); - return new RollingUpgradeInfo(status.getBlockPoolId(), - proto.getCreatedRollbackImages(), - proto.getStartTime(), proto.getFinalizeTime()); - } - - public static CorruptFileBlocks convert(CorruptFileBlocksProto c) { - if (c == null) - return null; - List<String> fileList = c.getFilesList(); - return new CorruptFileBlocks(fileList.toArray(new String[fileList.size()]), - c.getCookie()); - } - - public static CorruptFileBlocksProto convert(CorruptFileBlocks c) { - if (c == null) - return null; - return CorruptFileBlocksProto.newBuilder(). - addAllFiles(Arrays.asList(c.getFiles())). - setCookie(c.getCookie()). - build(); - } - - public static ContentSummary convert(ContentSummaryProto cs) { - if (cs == null) return null; - ContentSummary.Builder builder = new ContentSummary.Builder(); - builder.length(cs.getLength()). - fileCount(cs.getFileCount()). - directoryCount(cs.getDirectoryCount()). - quota(cs.getQuota()). - spaceConsumed(cs.getSpaceConsumed()). - spaceQuota(cs.getSpaceQuota()); - if (cs.hasTypeQuotaInfos()) { - for (HdfsProtos.StorageTypeQuotaInfoProto info : - cs.getTypeQuotaInfos().getTypeQuotaInfoList()) { - StorageType type = PBHelperClient.convertStorageType(info.getType()); - builder.typeConsumed(type, info.getConsumed()); - builder.typeQuota(type, info.getQuota()); - } - } - return builder.build(); - } - - public static ContentSummaryProto convert(ContentSummary cs) { - if (cs == null) return null; - ContentSummaryProto.Builder builder = ContentSummaryProto.newBuilder(); - builder.setLength(cs.getLength()). - setFileCount(cs.getFileCount()). - setDirectoryCount(cs.getDirectoryCount()). - setQuota(cs.getQuota()). - setSpaceConsumed(cs.getSpaceConsumed()). - setSpaceQuota(cs.getSpaceQuota()); - - if (cs.isTypeQuotaSet() || cs.isTypeConsumedAvailable()) { - HdfsProtos.StorageTypeQuotaInfosProto.Builder isb = - HdfsProtos.StorageTypeQuotaInfosProto.newBuilder(); - for (StorageType t: StorageType.getTypesSupportingQuota()) { - HdfsProtos.StorageTypeQuotaInfoProto info = - HdfsProtos.StorageTypeQuotaInfoProto.newBuilder(). - setType(PBHelperClient.convertStorageType(t)). - setConsumed(cs.getTypeConsumed(t)). - setQuota(cs.getTypeQuota(t)). - build(); - isb.addTypeQuotaInfo(info); - } - builder.setTypeQuotaInfos(isb); - } - return builder.build(); - } - public static NNHAStatusHeartbeat convert(NNHAStatusHeartbeatProto s) { if (s == null) return null; switch (s.getState()) { @@@ -2933,179 -790,4 +821,140 @@@ setLeaseId(context.getLeaseId()). build(); } + - public static ECSchema convertECSchema(ECSchemaProto schema) { - List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList(); - Map<String, String> options = new HashMap<>(optionsList.size()); - for (ECSchemaOptionEntryProto option : optionsList) { - options.put(option.getKey(), option.getValue()); ++ private static List<Integer> convertIntArray(short[] liveBlockIndices) { ++ List<Integer> liveBlockIndicesList = new ArrayList<>(); ++ for (short s : liveBlockIndices) { ++ liveBlockIndicesList.add((int) s); + } - return new ECSchema(schema.getCodecName(), schema.getDataUnits(), - schema.getParityUnits(), options); - } - - public static ECSchemaProto convertECSchema(ECSchema schema) { - ECSchemaProto.Builder builder = ECSchemaProto.newBuilder() - .setCodecName(schema.getCodecName()) - .setDataUnits(schema.getNumDataUnits()) - .setParityUnits(schema.getNumParityUnits()); - Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet(); - for (Entry<String, String> entry : entrySet) { - builder.addOptions(ECSchemaOptionEntryProto.newBuilder() - .setKey(entry.getKey()).setValue(entry.getValue()).build()); ++ return liveBlockIndicesList; ++ } ++ ++ private static StorageTypesProto convertStorageTypesProto( ++ StorageType[] targetStorageTypes) { ++ StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); ++ for (StorageType storageType : targetStorageTypes) { ++ builder.addStorageTypes(PBHelperClient.convertStorageType(storageType)); + } + return builder.build(); + } + - public static ErasureCodingPolicy convertErasureCodingPolicy( - ErasureCodingPolicyProto policy) { - return new ErasureCodingPolicy(policy.getName(), - convertECSchema(policy.getSchema()), - policy.getCellSize()); ++ private static HdfsProtos.StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) { ++ HdfsProtos.StorageUuidsProto.Builder builder = HdfsProtos.StorageUuidsProto.newBuilder(); ++ for (String storageUuid : targetStorageIDs) { ++ builder.addStorageUuids(storageUuid); ++ } ++ return builder.build(); + } + - public static ErasureCodingPolicyProto convertErasureCodingPolicy( - ErasureCodingPolicy policy) { - ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto - .newBuilder() - .setName(policy.getName()) - .setSchema(convertECSchema(policy.getSchema())) - .setCellSize(policy.getCellSize()); ++ private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) { ++ DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder(); ++ for (DatanodeInfo datanodeInfo : dnInfos) { ++ builder.addDatanodes(PBHelperClient.convert(datanodeInfo)); ++ } + return builder.build(); + } - ++ ++ private static String[] convert(HdfsProtos.StorageUuidsProto targetStorageUuidsProto) { ++ List<String> storageUuidsList = targetStorageUuidsProto ++ .getStorageUuidsList(); ++ String[] storageUuids = new String[storageUuidsList.size()]; ++ for (int i = 0; i < storageUuidsList.size(); i++) { ++ storageUuids[i] = storageUuidsList.get(i); ++ } ++ return storageUuids; ++ } ++ + public static BlockECRecoveryInfo convertBlockECRecoveryInfo( + BlockECRecoveryInfoProto blockEcRecoveryInfoProto) { + ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock(); + ExtendedBlock block = PBHelperClient.convert(blockProto); + + DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto + .getSourceDnInfos(); - DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto); ++ DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto); + + DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto + .getTargetDnInfos(); - DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto); ++ DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto); + - StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto ++ HdfsProtos.StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto + .getTargetStorageUuids(); + String[] targetStorageUuids = convert(targetStorageUuidsProto); + + StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto + .getTargetStorageTypes(); - StorageType[] convertStorageTypes = convertStorageTypes( ++ StorageType[] convertStorageTypes = PBHelperClient.convertStorageTypes( + targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto + .getStorageTypesList().size()); + + List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto + .getLiveBlockIndicesList(); + short[] liveBlkIndices = new short[liveBlockIndicesList.size()]; + for (int i = 0; i < liveBlockIndicesList.size(); i++) { + liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue(); + } + + ErasureCodingPolicy ecPolicy = - convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy()); ++ PBHelperClient.convertErasureCodingPolicy( ++ blockEcRecoveryInfoProto.getEcPolicy()); + + return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, + targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy); + } + + public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( + BlockECRecoveryInfo blockEcRecoveryInfo) { + BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto + .newBuilder(); + builder.setBlock(PBHelperClient.convert( + blockEcRecoveryInfo.getExtendedBlock())); + + DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos(); + builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + + DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos(); + builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + + String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs(); + builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs)); + + StorageType[] targetStorageTypes = blockEcRecoveryInfo + .getTargetStorageTypes(); + builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + + short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); + builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices)); + - builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo - .getErasureCodingPolicy())); - - return builder.build(); - } - - private static List<Integer> convertIntArray(short[] liveBlockIndices) { - List<Integer> liveBlockIndicesList = new ArrayList<Integer>(); - for (short s : liveBlockIndices) { - liveBlockIndicesList.add((int) s); - } - return liveBlockIndicesList; - } ++ builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy( ++ blockEcRecoveryInfo.getErasureCodingPolicy())); + - private static StorageTypesProto convertStorageTypesProto( - StorageType[] targetStorageTypes) { - StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); - for (StorageType storageType : targetStorageTypes) { - builder.addStorageTypes(PBHelperClient.convertStorageType(storageType)); - } + return builder.build(); + } + - private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) { - StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder(); - for (String storageUuid : targetStorageIDs) { - builder.addStorageUuids(storageUuid); - } - return builder.build(); - } - - private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) { - DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder(); - for (DatanodeInfo datanodeInfo : dnInfos) { - builder.addDatanodes(PBHelperClient.convert(datanodeInfo)); - } - return builder.build(); - } - - private static String[] convert(StorageUuidsProto targetStorageUuidsProto) { - List<String> storageUuidsList = targetStorageUuidsProto - .getStorageUuidsList(); - String[] storageUuids = new String[storageUuidsList.size()]; - for (int i = 0; i < storageUuidsList.size(); i++) { - storageUuids[i] = storageUuidsList.get(i); - } - return storageUuids; - } - + public static BlockECRecoveryCommandProto convert( + BlockECRecoveryCommand blkECRecoveryCmd) { + BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto + .newBuilder(); + Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd + .getECTasks(); + for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) { + builder + .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo)); + } + return builder.build(); + } - ++ + public static BlockECRecoveryCommand convert( + BlockECRecoveryCommandProto blkECRecoveryCmdProto) { - Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>(); ++ Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<>(); + List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto + .getBlockECRecoveryinfoList(); + for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) { + blkECRecoveryInfos + .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto)); + } + return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, + blkECRecoveryInfos); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 1211169,b0a11fe..e7f9262 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@@ -1160,8 -1107,8 +1176,8 @@@ public class BlockManager implements Bl * Adds block to list of blocks which will be invalidated on all its * datanodes. */ - private void addToInvalidates(Block b) { + private void addToInvalidates(BlockInfo storedBlock) { - if (!namesystem.isPopulatingReplQueues()) { + if (!isPopulatingReplQueues()) { return; } StringBuilder datanodes = new StringBuilder(); @@@ -1287,8 -1215,8 +1303,8 @@@ if (hasEnoughLiveReplicas || hasMoreCorruptReplicas || corruptedDuringWrite) { // the block is over-replicated so invalidate the replicas immediately - invalidateBlock(b, node); + invalidateBlock(b, node, numberOfReplicas); - } else if (namesystem.isPopulatingReplQueues()) { + } else if (isPopulatingReplQueues()) { // add the block to neededReplication updateNeededReplications(b.getStored(), -1, 0); } @@@ -2654,9 -2488,9 +2670,9 @@@ DatanodeStorageInfo storageInfo) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); -- if (!namesystem.isInStartupSafeMode() - || namesystem.isPopulatingReplQueues()) { ++ if (!namesystem.isInStartupSafeMode() + || isPopulatingReplQueues()) { - addStoredBlock(storedBlock, storageInfo, null, false); + addStoredBlock(storedBlock, reported, storageInfo, null, false); return; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index a80bfd6,6d199d7..fb86ff3 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@@ -542,12 -546,12 +542,12 @@@ public class DecommissionManager if (blockManager.isNeededReplication(block, liveReplicas)) { if (!blockManager.neededReplications.contains(block) && blockManager.pendingReplications.getNumReplicas(block) == 0 && - namesystem.isPopulatingReplQueues()) { + blockManager.isPopulatingReplQueues()) { // Process these blocks only when active NN is out of safe mode. blockManager.neededReplications.add(block, - curReplicas, + liveReplicas, num.decommissionedAndDecommissioning(), - block.getReplication()); + blockManager.getExpectedReplicaNum(block)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 82a0f62,2aad83d..9228bec --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@@ -1156,8 -1175,7 +1179,9 @@@ public class DataNode extends Reconfigu saslClient = new SaslDataTransferClient(dnConf.conf, dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); + // Initialize ErasureCoding worker + ecWorker = new ErasureCodingWorker(conf, this); + startMetricsLogger(conf); } /** @@@ -3264,9 -3256,72 +3291,76 @@@ checkSuperuserPrivilege(); spanReceiverHost.removeSpanReceiver(id); } + + public ErasureCodingWorker getErasureCodingWorker(){ + return ecWorker; - ++ } + + /** + * Get timeout value of each OOB type from configuration + */ + private void initOOBTimeout() { + final int oobStart = Status.OOB_RESTART_VALUE; // the first OOB type + final int oobEnd = Status.OOB_RESERVED3_VALUE; // the last OOB type + final int numOobTypes = oobEnd - oobStart + 1; + oobTimeouts = new long[numOobTypes]; + + final String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY, + DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(","); + for (int i = 0; i < numOobTypes; i++) { + oobTimeouts[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0; + } + } + + /** + * Get the timeout to be used for transmitting the OOB type + * @return the timeout in milliseconds + */ + public long getOOBTimeout(Status status) + throws IOException { + if (status.getNumber() < Status.OOB_RESTART_VALUE || + status.getNumber() > Status.OOB_RESERVED3_VALUE) { + // Not an OOB. + throw new IOException("Not an OOB status: " + status); + } + + return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE]; + } + + /** + * Start a timer to periodically write DataNode metrics to the log file. This + * behavior can be disabled by configuration. + * + * @param metricConf + */ + protected void startMetricsLogger(Configuration metricConf) { + long metricsLoggerPeriodSec = metricConf.getInt( + DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, + DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT); + + if (metricsLoggerPeriodSec <= 0) { + return; + } + + MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG); + + // Schedule the periodic logging. + metricsLoggerTimer = new ScheduledThreadPoolExecutor(1); + metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(METRICS_LOG, + "DataNode", (short) 0), metricsLoggerPeriodSec, metricsLoggerPeriodSec, + TimeUnit.SECONDS); + } + + protected void stopMetricsLogger() { + if (metricsLoggerTimer != null) { + metricsLoggerTimer.shutdown(); + metricsLoggerTimer = null; + } + } + + @VisibleForTesting + ScheduledThreadPoolExecutor getMetricsLoggerTimer() { + return metricsLoggerTimer; } }
