HDFS-11847. Enhance dfsadmin listOpenFiles command to list files blocking datanode decommissioning.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/42a1c985 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/42a1c985 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/42a1c985 Branch: refs/heads/YARN-1011 Commit: 42a1c98597e6dba2e371510a6b2b6b1fb94e4090 Parents: b4d1133 Author: Manoj Govindassamy <manoj...@apache.org> Authored: Tue Jan 2 14:59:36 2018 -0800 Committer: Manoj Govindassamy <manoj...@apache.org> Committed: Tue Jan 2 14:59:36 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSClient.java | 16 +- .../hadoop/hdfs/DistributedFileSystem.java | 8 + .../apache/hadoop/hdfs/client/HdfsAdmin.java | 7 + .../hadoop/hdfs/protocol/ClientProtocol.java | 16 ++ .../hadoop/hdfs/protocol/OpenFilesIterator.java | 36 +++- .../ClientNamenodeProtocolTranslatorPB.java | 18 +- .../hadoop/hdfs/protocolPB/PBHelperClient.java | 24 +++ .../src/main/proto/ClientNamenodeProtocol.proto | 7 + ...tNamenodeProtocolServerSideTranslatorPB.java | 7 +- .../server/blockmanagement/BlockManager.java | 2 +- .../blockmanagement/DatanodeAdminManager.java | 25 ++- .../blockmanagement/DatanodeDescriptor.java | 24 ++- .../federation/router/RouterRpcServer.java | 10 +- .../hdfs/server/namenode/FSNamesystem.java | 49 ++++- .../hdfs/server/namenode/NameNodeRpcServer.java | 10 +- .../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 36 +++- .../src/site/markdown/HDFSCommands.md | 2 +- .../apache/hadoop/hdfs/AdminStatesBaseTest.java | 18 +- .../apache/hadoop/hdfs/TestDecommission.java | 177 +++++++++++++++++++ .../org/apache/hadoop/hdfs/TestHdfsAdmin.java | 4 +- .../blockmanagement/BlockManagerTestUtil.java | 12 +- .../hdfs/server/namenode/TestLeaseManager.java | 48 ++--- .../hdfs/server/namenode/TestListOpenFiles.java | 27 ++- 23 files changed, 520 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 59f553b..c774132 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -133,6 +133,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.ReencryptionStatusIterator; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; @@ -3084,8 +3085,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * * @throws IOException */ + @Deprecated public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { checkOpen(); - return new OpenFilesIterator(namenode, tracer); + return listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + } + + /** + * Get a remote iterator to the open files list by type, managed by NameNode. + * + * @param openFilesTypes + * @throws IOException + */ + public RemoteIterator<OpenFileEntry> listOpenFiles( + EnumSet<OpenFilesType> openFilesTypes) throws IOException { + checkOpen(); + return new OpenFilesIterator(namenode, tracer, openFilesTypes); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 6b0c57a..85e5964 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsPathHandle; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -3079,10 +3080,17 @@ public class DistributedFileSystem extends FileSystem * <p/> * This method can only be called by HDFS superusers. */ + @Deprecated public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { return dfs.listOpenFiles(); } + public RemoteIterator<OpenFileEntry> listOpenFiles( + EnumSet<OpenFilesType> openFilesTypes) throws IOException { + return dfs.listOpenFiles(openFilesTypes); + } + + /** * Create a {@link HdfsDataOutputStreamBuilder} to append a file on DFS. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java index 9116167..e620039 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.security.AccessControlException; @@ -652,8 +653,14 @@ public class HdfsAdmin { * <p/> * This method can only be called by HDFS superusers. */ + @Deprecated public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { return dfs.listOpenFiles(); } + public RemoteIterator<OpenFileEntry> listOpenFiles( + EnumSet<OpenFilesType> openFilesTypes) throws IOException { + return dfs.listOpenFiles(openFilesTypes); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index e8a33dd..38c242a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; @@ -1713,5 +1714,20 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent + @Deprecated BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException; + + /** + * List open files in the system in batches. INode id is the cursor and the + * open files returned in a batch will have their INode ids greater than + * the cursor INode id. Open files can only be requested by super user and + * the the list across batches are not atomic. + * + * @param prevId the cursor INode id. + * @param openFilesTypes types to filter the open files + * @throws IOException + */ + @Idempotent + BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, + EnumSet<OpenFilesType> openFilesTypes) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java index c24e585..d113d65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.protocol; import java.io.IOException; +import java.util.EnumSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -35,20 +36,51 @@ import org.apache.htrace.core.Tracer; @InterfaceStability.Evolving public class OpenFilesIterator extends BatchedRemoteIterator<Long, OpenFileEntry> { + + /** + * Open file types to filter the results. + */ + public enum OpenFilesType { + + ALL_OPEN_FILES((short) 0x01), + BLOCKING_DECOMMISSION((short) 0x02); + + private final short mode; + OpenFilesType(short mode) { + this.mode = mode; + } + + public short getMode() { + return mode; + } + + public static OpenFilesType valueOf(short num) { + for (OpenFilesType type : OpenFilesType.values()) { + if (type.getMode() == num) { + return type; + } + } + return null; + } + } + private final ClientProtocol namenode; private final Tracer tracer; + private final EnumSet<OpenFilesType> types; - public OpenFilesIterator(ClientProtocol namenode, Tracer tracer) { + public OpenFilesIterator(ClientProtocol namenode, Tracer tracer, + EnumSet<OpenFilesType> types) { super(HdfsConstants.GRANDFATHER_INODE_ID); this.namenode = namenode; this.tracer = tracer; + this.types = types; } @Override public BatchedEntries<OpenFileEntry> makeRequest(Long prevId) throws IOException { try (TraceScope ignored = tracer.newScope("listOpenFiles")) { - return namenode.listOpenFiles(prevId); + return namenode.listOpenFiles(prevId, types); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 9ccc2fa..ea5c951 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; @@ -1893,13 +1894,24 @@ public class ClientNamenodeProtocolTranslatorPB implements } } + @Deprecated @Override public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException { - ListOpenFilesRequestProto req = - ListOpenFilesRequestProto.newBuilder().setId(prevId).build(); + return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + } + + @Override + public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, + EnumSet<OpenFilesType> openFilesTypes) throws IOException { + ListOpenFilesRequestProto.Builder req = + ListOpenFilesRequestProto.newBuilder().setId(prevId); + if (openFilesTypes != null) { + req.addAllTypes(PBHelperClient.convertOpenFileTypes(openFilesTypes)); + } try { - ListOpenFilesResponseProto response = rpcProxy.listOpenFiles(null, req); + ListOpenFilesResponseProto response = + rpcProxy.listOpenFiles(null, req.build()); List<OpenFileEntry> openFileEntries = Lists.newArrayListWithCapacity(response.getEntriesCount()); for (OpenFilesBatchResponseProto p : response.getEntriesList()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 813083f..3180f70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; @@ -131,6 +132,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsE import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsReplicatedBlockStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesTypeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; @@ -3258,5 +3260,27 @@ public class PBHelperClient { .build(); } + public static EnumSet<OpenFilesType> convertOpenFileTypes( + List<OpenFilesTypeProto> openFilesTypeProtos) { + EnumSet<OpenFilesType> types = EnumSet.noneOf(OpenFilesType.class); + for (OpenFilesTypeProto af : openFilesTypeProtos) { + OpenFilesType type = OpenFilesType.valueOf((short)af.getNumber()); + if (type != null) { + types.add(type); + } + } + return types; + } + public static List<OpenFilesTypeProto> convertOpenFileTypes( + EnumSet<OpenFilesType> types) { + List<OpenFilesTypeProto> typeProtos = new ArrayList<>(); + for (OpenFilesType type : types) { + OpenFilesTypeProto typeProto = OpenFilesTypeProto.valueOf(type.getMode()); + if (typeProto != null) { + typeProtos.add(typeProto); + } + } + return typeProtos; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index b33462b..f247da8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -796,8 +796,14 @@ message GetEditsFromTxidResponseProto { required EventsListProto eventsList = 1; } +enum OpenFilesTypeProto { + ALL_OPEN_FILES = 1; + BLOCKING_DECOMMISSION = 2; +} + message ListOpenFilesRequestProto { required int64 id = 1; + repeated OpenFilesTypeProto types = 2; } message OpenFilesBatchResponseProto { @@ -810,6 +816,7 @@ message OpenFilesBatchResponseProto { message ListOpenFilesResponseProto { repeated OpenFilesBatchResponseProto entries = 1; required bool hasMore = 2; + repeated OpenFilesTypeProto types = 3; } service ClientNamenodeProtocol { http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index d63460b..a9d2d1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; @@ -1852,13 +1853,17 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements public ListOpenFilesResponseProto listOpenFiles(RpcController controller, ListOpenFilesRequestProto req) throws ServiceException { try { - BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId()); + EnumSet<OpenFilesType> openFilesTypes = + PBHelperClient.convertOpenFileTypes(req.getTypesList()); + BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId(), + openFilesTypes); ListOpenFilesResponseProto.Builder builder = ListOpenFilesResponseProto.newBuilder(); builder.setHasMore(entries.hasMore()); for (int i = 0; i < entries.size(); i++) { builder.addEntries(PBHelperClient.convert(entries.get(i))); } + builder.addAllTypes(req.getTypesList()); return builder.build(); } catch (IOException e) { throw new ServiceException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 59e06c6..6b7175d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2294,7 +2294,7 @@ public class BlockManager implements BlockStatsMXBean { * If there were any reconstruction requests that timed out, reap them * and put them back into the neededReconstruction queue */ - private void processPendingReconstructions() { + void processPendingReconstructions() { BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks(); if (timedOutItems != null) { namesystem.writeLock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java index 928036a..e338591 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java @@ -36,10 +36,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.util.CyclicIteration; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.util.ChunkedArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -649,8 +653,10 @@ public class DatanodeAdminManager { boolean pruneReliableBlocks) { boolean firstReplicationLog = true; // Low redundancy in UC Blocks only - int lowRedundancyInOpenFiles = 0; - // All low redundancy blocks. Includes lowRedundancyInOpenFiles. + int lowRedundancyBlocksInOpenFiles = 0; + LightWeightHashSet<Long> lowRedundancyOpenFiles = + new LightWeightLinkedSet<>(); + // All low redundancy blocks. Includes lowRedundancyOpenFiles. int lowRedundancyBlocks = 0; // All maintenance and decommission replicas. int outOfServiceOnlyReplicas = 0; @@ -737,15 +743,24 @@ public class DatanodeAdminManager { // Update various counts lowRedundancyBlocks++; if (bc.isUnderConstruction()) { - lowRedundancyInOpenFiles++; + INode ucFile = namesystem.getFSDirectory().getInode(bc.getId()); + if(!(ucFile instanceof INodeFile) || + !ucFile.asFile().isUnderConstruction()) { + LOG.warn("File " + ucFile.getLocalName() + " is not under " + + "construction. Skipping add to low redundancy open files!"); + } else { + lowRedundancyBlocksInOpenFiles++; + lowRedundancyOpenFiles.add(ucFile.getId()); + } } if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) { outOfServiceOnlyReplicas++; } } - datanode.getLeavingServiceStatus().set(lowRedundancyInOpenFiles, - lowRedundancyBlocks, outOfServiceOnlyReplicas); + datanode.getLeavingServiceStatus().set(lowRedundancyBlocksInOpenFiles, + lowRedundancyOpenFiles, lowRedundancyBlocks, + outOfServiceOnlyReplicas); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 618bc13..16ffb43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -831,17 +832,21 @@ public class DatanodeDescriptor extends DatanodeInfo { /** Leaving service status. */ public class LeavingServiceStatus { private int underReplicatedBlocks; + private int underReplicatedBlocksInOpenFiles; private int outOfServiceOnlyReplicas; - private int underReplicatedInOpenFiles; + private LightWeightHashSet<Long> underReplicatedOpenFiles = + new LightWeightLinkedSet<>(); private long startTime; - synchronized void set(int underRepInOpenFiles, int underRepBlocks, - int outOfServiceOnlyRep) { + synchronized void set(int lowRedundancyBlocksInOpenFiles, + LightWeightHashSet<Long> underRepInOpenFiles, + int underRepBlocks, int outOfServiceOnlyRep) { if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return; } - underReplicatedInOpenFiles = underRepInOpenFiles; + underReplicatedOpenFiles = underRepInOpenFiles; underReplicatedBlocks = underRepBlocks; + underReplicatedBlocksInOpenFiles = lowRedundancyBlocksInOpenFiles; outOfServiceOnlyReplicas = outOfServiceOnlyRep; } @@ -864,7 +869,14 @@ public class DatanodeDescriptor extends DatanodeInfo { if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } - return underReplicatedInOpenFiles; + return underReplicatedBlocksInOpenFiles; + } + /** @return the collection of under-replicated blocks in open files */ + public synchronized LightWeightHashSet<Long> getOpenFiles() { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { + return new LightWeightLinkedSet<>(); + } + return underReplicatedOpenFiles; } /** Set start time */ public synchronized void setStartTime(long time) { @@ -880,7 +892,7 @@ public class DatanodeDescriptor extends DatanodeInfo { } return startTime; } - } // End of class DecommissioningStatus + } // End of class LeavingServiceStatus /** * Set the flag to indicate if this datanode is disallowed from communicating http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c6cd595..537eaf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -1935,9 +1936,16 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { return null; } + @Deprecated @Override - public BatchedEntries<OpenFileEntry> listOpenFiles(long arg0) + public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException { + return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + } + + @Override + public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, + EnumSet<OpenFilesType> openFilesTypes) throws IOException { checkOperation(OperationCategory.READ, false); return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 286c41c..54decc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; @@ -276,6 +277,7 @@ import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.web.JsonUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; @@ -1762,12 +1764,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * the open files returned in a batch will have their INode ids greater than * this cursor. Open files can only be requested by super user and the the * list across batches does not represent a consistent view of all open files. + * TODO: HDFS-12969 - to report open files by type. * * @param prevId the cursor INode id. + * @param openFilesTypes * @throws IOException */ - BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId) - throws IOException { + BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId, + EnumSet<OpenFilesType> openFilesTypes) throws IOException { final String operationName = "listOpenFiles"; checkSuperuserPrivilege(); checkOperation(OperationCategory.READ); @@ -1775,7 +1779,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, BatchedListEntries<OpenFileEntry> batchedListEntries; try { checkOperation(OperationCategory.READ); - batchedListEntries = leaseManager.getUnderConstructionFiles(prevId); + if(openFilesTypes.contains(OpenFilesType.ALL_OPEN_FILES)) { + batchedListEntries = leaseManager.getUnderConstructionFiles(prevId); + } else { + if(openFilesTypes.contains(OpenFilesType.BLOCKING_DECOMMISSION)) { + batchedListEntries = getFilesBlockingDecom(prevId); + } else { + throw new IllegalArgumentException("Unknown OpenFileType: " + + openFilesTypes); + } + } } catch (AccessControlException e) { logAuditEvent(false, operationName, null); throw e; @@ -1786,6 +1799,36 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return batchedListEntries; } + public BatchedListEntries<OpenFileEntry> getFilesBlockingDecom(long prevId) { + assert hasReadLock(); + final List<OpenFileEntry> openFileEntries = Lists.newArrayList(); + LightWeightHashSet<Long> openFileIds = new LightWeightHashSet<>(); + for (DatanodeDescriptor dataNode : + blockManager.getDatanodeManager().getDatanodes()) { + for (long ucFileId : dataNode.getLeavingServiceStatus().getOpenFiles()) { + INode ucFile = getFSDirectory().getInode(ucFileId); + if (ucFile == null || ucFileId <= prevId || + openFileIds.contains(ucFileId)) { + // probably got deleted or + // part of previous batch or + // already part of the current batch + continue; + } + Preconditions.checkState(ucFile instanceof INodeFile); + openFileIds.add(ucFileId); + INodeFile inodeFile = ucFile.asFile(); + openFileEntries.add(new OpenFileEntry( + inodeFile.getId(), inodeFile.getFullPathName(), + inodeFile.getFileUnderConstructionFeature().getClientName(), + inodeFile.getFileUnderConstructionFeature().getClientMachine())); + if (openFileIds.size() >= this.maxListOpenFilesResponses) { + return new BatchedListEntries<>(openFileEntries, true); + } + } + } + return new BatchedListEntries<>(openFileEntries, false); + } + private String metaSaveAsString() { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 94bd15f..80f1ba3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -115,6 +115,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -1334,11 +1335,18 @@ public class NameNodeRpcServer implements NamenodeProtocols { namesystem.metaSave(filename); } + @Deprecated @Override // ClientProtocol public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException { + return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + } + + @Override // ClientProtocol + public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, + EnumSet<OpenFilesType> openFilesTypes) throws IOException { checkNNStartup(); - return namesystem.listOpenFiles(prevId); + return namesystem.listOpenFiles(prevId, openFilesTypes); } @Override // ClientProtocol http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index f4985a6..7367309 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; @@ -462,7 +464,7 @@ public class DFSAdmin extends FsShell { "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" + "\t[-metasave filename]\n" + "\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" + - "\t[-listOpenFiles]\n" + + "\t[-listOpenFiles [-blockingDecommission]]\n" + "\t[-help [cmd]]\n"; /** @@ -913,8 +915,21 @@ public class DFSAdmin extends FsShell { * Usage: hdfs dfsadmin -listOpenFiles * * @throws IOException + * @param argv */ - public int listOpenFiles() throws IOException { + public int listOpenFiles(String[] argv) throws IOException { + List<OpenFilesType> types = new ArrayList<>(); + if (argv != null) { + List<String> args = new ArrayList<>(Arrays.asList(argv)); + if (StringUtils.popOption("-blockingDecommission", args)) { + types.add(OpenFilesType.BLOCKING_DECOMMISSION); + } + } + if (types.isEmpty()) { + types.add(OpenFilesType.ALL_OPEN_FILES); + } + EnumSet<OpenFilesType> openFilesTypes = EnumSet.copyOf(types); + DistributedFileSystem dfs = getDFS(); Configuration dfsConf = dfs.getConf(); URI dfsUri = dfs.getUri(); @@ -926,9 +941,9 @@ public class DFSAdmin extends FsShell { dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class, UserGroupInformation.getCurrentUser(), false); openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(), - FsTracer.get(dfsConf)); + FsTracer.get(dfsConf), openFilesTypes); } else { - openFilesRemoteIterator = dfs.listOpenFiles(); + openFilesRemoteIterator = dfs.listOpenFiles(openFilesTypes); } printOpenFiles(openFilesRemoteIterator); return 0; @@ -1214,9 +1229,11 @@ public class DFSAdmin extends FsShell { + "\tIf 'incremental' is specified, it will be an incremental\n" + "\tblock report; otherwise, it will be a full block report.\n"; - String listOpenFiles = "-listOpenFiles\n" + String listOpenFiles = "-listOpenFiles [-blockingDecommission]\n" + "\tList all open files currently managed by the NameNode along\n" - + "\twith client name and client machine accessing them.\n"; + + "\twith client name and client machine accessing them.\n" + + "\tIf 'blockingDecommission' option is specified, it will list the\n" + + "\topen files only that are blocking the ongoing Decommission."; String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -1964,7 +1981,8 @@ public class DFSAdmin extends FsShell { System.err.println("Usage: hdfs dfsadmin" + " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]"); } else if ("-listOpenFiles".equals(cmd)) { - System.err.println("Usage: hdfs dfsadmin [-listOpenFiles]"); + System.err.println("Usage: hdfs dfsadmin" + + " [-listOpenFiles [-blockingDecommission]]"); } else { System.err.println("Usage: hdfs dfsadmin"); System.err.println("Note: Administrative commands can only be run as the HDFS superuser."); @@ -2119,7 +2137,7 @@ public class DFSAdmin extends FsShell { return exitCode; } } else if ("-listOpenFiles".equals(cmd)) { - if (argv.length != 1) { + if ((argv.length != 1) && (argv.length != 2)) { printUsage(cmd); return exitCode; } @@ -2205,7 +2223,7 @@ public class DFSAdmin extends FsShell { } else if ("-triggerBlockReport".equals(cmd)) { exitCode = triggerBlockReport(argv); } else if ("-listOpenFiles".equals(cmd)) { - exitCode = listOpenFiles(); + exitCode = listOpenFiles(argv); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index 316b955..a13116f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -409,7 +409,7 @@ Usage: | `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. | | `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to be deleted | | `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. | -| `-listOpenFiles` | List all open files currently managed by the NameNode along with client name and client machine accessing them. | +| `-listOpenFiles` `[-blockingDecommission]` | List all open files currently managed by the NameNode along with client name and client machine accessing them. | | `-help` [cmd] | Displays help for the given command or all commands if none is specified. | Runs a HDFS dfsadmin client. http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java index c0cef19..5d96b7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java @@ -388,9 +388,19 @@ public class AdminStatesBaseTest { protected void startCluster(int numNameNodes, int numDatanodes, boolean setupHostsFile, long[] nodesCapacity, boolean checkDataNodeHostConfig) throws IOException { + startCluster(numNameNodes, numDatanodes, setupHostsFile, nodesCapacity, + checkDataNodeHostConfig, true); + } + + protected void startCluster(int numNameNodes, int numDatanodes, + boolean setupHostsFile, long[] nodesCapacity, + boolean checkDataNodeHostConfig, boolean federation) throws IOException { MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) .numDataNodes(numDatanodes); + if (federation) { + builder.nnTopology( + MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)); + } if (setupHostsFile) { builder.setupHostsFile(setupHostsFile); } @@ -413,6 +423,12 @@ public class AdminStatesBaseTest { startCluster(numNameNodes, numDatanodes, false, null, false); } + protected void startSimpleCluster(int numNameNodes, int numDatanodes) + throws IOException { + startCluster(numNameNodes, numDatanodes, false, null, false, false); + } + + protected void startSimpleHACluster(int numDatanodes) throws IOException { cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes( http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index ac14a2a..d82025c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -22,16 +22,23 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Scanner; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import org.apache.commons.lang.text.StrBuilder; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; @@ -60,7 +67,9 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; +import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Ignore; @@ -651,6 +660,174 @@ public class TestDecommission extends AdminStatesBaseTest { fdos.close(); } + private static String scanIntoString(final ByteArrayOutputStream baos) { + final StrBuilder sb = new StrBuilder(); + final Scanner scanner = new Scanner(baos.toString()); + while (scanner.hasNextLine()) { + sb.appendln(scanner.nextLine()); + } + scanner.close(); + return sb.toString(); + } + + private boolean verifyOpenFilesListing(String message, + HashSet<Path> closedFileSet, + HashMap<Path, FSDataOutputStream> openFilesMap, + ByteArrayOutputStream out, int expOpenFilesListSize) { + final String outStr = scanIntoString(out); + LOG.info(message + " - stdout: \n" + outStr); + for (Path closedFilePath : closedFileSet) { + if(outStr.contains(closedFilePath.toString())) { + return false; + } + } + HashSet<Path> openFilesNotListed = new HashSet<>(); + for (Path openFilePath : openFilesMap.keySet()) { + if(!outStr.contains(openFilePath.toString())) { + openFilesNotListed.add(openFilePath); + } + } + int actualOpenFilesListedSize = + openFilesMap.size() - openFilesNotListed.size(); + if (actualOpenFilesListedSize >= expOpenFilesListSize) { + return true; + } else { + LOG.info("Open files that are not listed yet: " + openFilesNotListed); + return false; + } + } + + private void verifyOpenFilesBlockingDecommission(HashSet<Path> closedFileSet, + HashMap<Path, FSDataOutputStream> openFilesMap, final int maxOpenFiles) + throws Exception { + final PrintStream oldStreamOut = System.out; + try { + final ByteArrayOutputStream toolOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(toolOut)); + final DFSAdmin dfsAdmin = new DFSAdmin(getConf()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + toolOut.reset(); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-listOpenFiles", "-blockingDecommission"})); + toolOut.flush(); + return verifyOpenFilesListing( + "dfsadmin -listOpenFiles -blockingDecommission", + closedFileSet, openFilesMap, toolOut, maxOpenFiles); + } catch (Exception e) { + LOG.warn("Unexpected exception: " + e); + } + return false; + } + }, 1000, 60000); + } finally { + System.setOut(oldStreamOut); + } + } + + @Test(timeout=180000) + public void testDecommissionWithOpenfileReporting() + throws Exception { + LOG.info("Starting test testDecommissionWithOpenfileReporting"); + + // Disable redundancy monitor check so that open files blocking + // decommission can be listed and verified. + getConf().setInt( + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + 1000); + getConf().setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 1); + + //At most 1 node can be decommissioned + startSimpleCluster(1, 4); + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + + final String[] closedFiles = new String[3]; + final String[] openFiles = new String[3]; + HashSet<Path> closedFileSet = new HashSet<>(); + HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>(); + for (int i = 0; i < 3; i++) { + closedFiles[i] = "/testDecommissionWithOpenfileReporting.closed." + i; + openFiles[i] = "/testDecommissionWithOpenfileReporting.open." + i; + writeFile(fileSys, new Path(closedFiles[i]), (short)3, 10); + closedFileSet.add(new Path(closedFiles[i])); + writeFile(fileSys, new Path(openFiles[i]), (short)3, 10); + FSDataOutputStream fdos = fileSys.append(new Path(openFiles[i])); + openFilesMap.put(new Path(openFiles[i]), fdos); + } + + HashMap<DatanodeInfo, Integer> dnInfoMap = new HashMap<>(); + for (int i = 0; i < 3; i++) { + LocatedBlocks lbs = NameNodeAdapter.getBlockLocations( + getCluster().getNameNode(0), openFiles[i], 0, blockSize * 10); + for (DatanodeInfo dn : lbs.getLastLocatedBlock().getLocations()) { + if (dnInfoMap.containsKey(dn)) { + dnInfoMap.put(dn, dnInfoMap.get(dn) + 1); + } else { + dnInfoMap.put(dn, 1); + } + } + } + + DatanodeInfo dnToDecommission = null; + int maxDnOccurance = 0; + for (Map.Entry<DatanodeInfo, Integer> entry : dnInfoMap.entrySet()) { + if (entry.getValue() > maxDnOccurance) { + maxDnOccurance = entry.getValue(); + dnToDecommission = entry.getKey(); + } + } + LOG.info("XXX Dn to decommission: " + dnToDecommission + ", max: " + + maxDnOccurance); + + //decommission one of the 3 nodes which have last block + DatanodeManager dm = ns.getBlockManager().getDatanodeManager(); + ArrayList<String> nodes = new ArrayList<>(); + dnToDecommission = dm.getDatanode(dnToDecommission.getDatanodeUuid()); + nodes.add(dnToDecommission.getXferAddr()); + initExcludeHosts(nodes); + refreshNodes(0); + waitNodeState(dnToDecommission, AdminStates.DECOMMISSION_INPROGRESS); + + // list and verify all the open files that are blocking decommission + verifyOpenFilesBlockingDecommission( + closedFileSet, openFilesMap, maxDnOccurance); + + final AtomicBoolean stopRedundancyMonitor = new AtomicBoolean(false); + Thread monitorThread = new Thread(new Runnable() { + @Override + public void run() { + while (!stopRedundancyMonitor.get()) { + try { + BlockManagerTestUtil.checkRedundancy( + getCluster().getNamesystem().getBlockManager()); + BlockManagerTestUtil.updateState( + getCluster().getNamesystem().getBlockManager()); + Thread.sleep(1000); + } catch (Exception e) { + LOG.warn("Encountered exception during redundancy monitor: " + e); + } + } + } + }); + monitorThread.start(); + + waitNodeState(dnToDecommission, AdminStates.DECOMMISSIONED); + stopRedundancyMonitor.set(true); + monitorThread.join(); + + // Open file is no more blocking decommission as all its blocks + // are re-replicated. + openFilesMap.clear(); + verifyOpenFilesBlockingDecommission( + closedFileSet, openFilesMap, 0); + } + @Test(timeout = 360000) public void testDecommissionWithOpenFileAndBlockRecovery() throws IOException, InterruptedException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java index 685ea8b..3cb10bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -41,6 +42,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.junit.After; import org.junit.Assert; @@ -254,7 +256,7 @@ public class TestHdfsAdmin { HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); HashSet<Path> openFiles = new HashSet<>(openFileMap.keySet()); RemoteIterator<OpenFileEntry> openFilesRemoteItr = - hdfsAdmin.listOpenFiles(); + hdfsAdmin.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); while (openFilesRemoteItr.hasNext()) { String filePath = openFilesRemoteItr.next().getFilePath(); assertFalse(filePath + " should not be listed under open files!", http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 7ee766f..dfb40a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -168,7 +168,17 @@ public class BlockManagerTestUtil { public static int computeInvalidationWork(BlockManager bm) { return bm.computeInvalidateWork(Integer.MAX_VALUE); } - + + /** + * Check the redundancy of blocks and trigger replication if needed. + * @param blockManager + */ + public static void checkRedundancy(final BlockManager blockManager) { + blockManager.computeDatanodeWork(); + blockManager.processPendingReconstructions(); + blockManager.rescanPostponedMisreplicatedBlocks(); + } + /** * Compute all the replication and invalidation work for the * given BlockManager. http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java index 55bc7c3..0a8da4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java @@ -206,7 +206,7 @@ public class TestLeaseManager { HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""), perm, 0L); when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory); - verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0); + verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, 0, 0, 0); for (Long iNodeId : iNodeIds) { INodeFile iNodeFile = stubInodeFile(iNodeId); @@ -215,13 +215,13 @@ public class TestLeaseManager { when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); lm.addLease("holder_" + iNodeId, iNodeId); } - verifyINodeLeaseCounts(lm, rootInodeDirectory, iNodeIds.size(), - iNodeIds.size(), iNodeIds.size()); + verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, + iNodeIds.size(), iNodeIds.size(), iNodeIds.size()); for (Long iNodeId : iNodeIds) { lm.removeLease(iNodeId); } - verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0); + verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, 0, 0, 0); } /** @@ -246,41 +246,44 @@ public class TestLeaseManager { // Case 1: No open files int scale = 0; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale); + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, + rootInodeDirectory, scale); for (int workerCount = 1; workerCount <= LeaseManager.INODE_FILTER_WORKER_COUNT_MAX / 2; workerCount++) { // Case 2: Open files count is half of worker task size scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN / 2; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, rootInodeDirectory, scale); // Case 3: Open files count is 1 less of worker task size scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN - 1; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, rootInodeDirectory, scale); // Case 4: Open files count is equal to worker task size scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, rootInodeDirectory, scale); // Case 5: Open files count is 1 more than worker task size scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN + 1; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, rootInodeDirectory, scale); } // Case 6: Open files count is way more than worker count scale = 1279; - testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale); + testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory, + rootInodeDirectory, scale); } - private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager, - final FSDirectory fsDirectory, INodeDirectory ancestorDirectory, - int scale) throws IOException { - verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0); + private void testInodeWithLeasesAtScaleImpl(FSNamesystem fsNamesystem, + final LeaseManager leaseManager, final FSDirectory fsDirectory, + INodeDirectory ancestorDirectory, int scale) throws IOException { + verifyINodeLeaseCounts( + fsNamesystem, leaseManager, ancestorDirectory, 0, 0, 0); Set<Long> iNodeIds = new HashSet<>(); for (int i = 0; i < scale; i++) { @@ -293,11 +296,12 @@ public class TestLeaseManager { when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); leaseManager.addLease("holder_" + iNodeId, iNodeId); } - verifyINodeLeaseCounts(leaseManager, ancestorDirectory, iNodeIds.size(), - iNodeIds.size(), iNodeIds.size()); + verifyINodeLeaseCounts(fsNamesystem, leaseManager, + ancestorDirectory, iNodeIds.size(), iNodeIds.size(), iNodeIds.size()); leaseManager.removeAllLeases(); - verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0); + verifyINodeLeaseCounts(fsNamesystem, leaseManager, + ancestorDirectory, 0, 0, 0); } /** @@ -389,10 +393,10 @@ public class TestLeaseManager { } - private void verifyINodeLeaseCounts(final LeaseManager leaseManager, - INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount, - int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) - throws IOException { + private void verifyINodeLeaseCounts(FSNamesystem fsNamesystem, + LeaseManager leaseManager, INodeDirectory ancestorDirectory, + int iNodeIdWithLeaseCount, int iNodeWithLeaseCount, + int iNodeUnderAncestorLeaseCount) throws IOException { assertEquals(iNodeIdWithLeaseCount, leaseManager.getINodeIdWithLeases().size()); assertEquals(iNodeWithLeaseCount, @@ -401,6 +405,8 @@ public class TestLeaseManager { leaseManager.getINodeWithLeases(ancestorDirectory).size()); assertEquals(iNodeIdWithLeaseCount, leaseManager.getUnderConstructionFiles(0).size()); + assertEquals(0, (fsNamesystem.getFilesBlockingDecom(0) == null ? + 0 : fsNamesystem.getFilesBlockingDecom(0).size())); } private Map<String, INode> createINodeTree(INodeDirectory parentDir, http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java index b290194..cfee7ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -95,9 +97,13 @@ public class TestListOpenFiles { verifyOpenFiles(openFiles); BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries = - nnRpc.listOpenFiles(0); + nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); assertTrue("Open files list should be empty!", openFileEntryBatchedEntries.size() == 0); + BatchedEntries<OpenFileEntry> openFilesBlockingDecomEntries = + nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION)); + assertTrue("Open files list blocking decommission should be empty!", + openFilesBlockingDecomEntries.size() == 0); openFiles.putAll( DFSTestUtil.createOpenFiles(fs, "open-1", 1)); @@ -121,16 +127,16 @@ public class TestListOpenFiles { } } - private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles) - throws IOException { + private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles, + EnumSet<OpenFilesType> openFilesTypes) throws IOException { HashSet<Path> remainingFiles = new HashSet<>(openFiles.keySet()); OpenFileEntry lastEntry = null; BatchedEntries<OpenFileEntry> batchedEntries; do { if (lastEntry == null) { - batchedEntries = nnRpc.listOpenFiles(0); + batchedEntries = nnRpc.listOpenFiles(0, openFilesTypes); } else { - batchedEntries = nnRpc.listOpenFiles(lastEntry.getId()); + batchedEntries = nnRpc.listOpenFiles(lastEntry.getId(), openFilesTypes); } assertTrue("Incorrect open files list size!", batchedEntries.size() <= BATCH_SIZE); @@ -146,6 +152,13 @@ public class TestListOpenFiles { remainingFiles.size() == 0); } + private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles) + throws IOException { + verifyOpenFiles(openFiles, EnumSet.of(OpenFilesType.ALL_OPEN_FILES)); + verifyOpenFiles(new HashMap<>(), + EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION)); + } + private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix, int numFilesToCreate) throws IOException { HashSet<Path> files = new HashSet<>(); @@ -197,6 +210,8 @@ public class TestListOpenFiles { try { assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-listOpenFiles"})); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles", "-blockingDecommission"})); // Sleep for some time to avoid // flooding logs with listing. Thread.sleep(listingIntervalMsec); @@ -222,6 +237,8 @@ public class TestListOpenFiles { assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-listOpenFiles"})); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles", "-blockingDecommission"})); assertFalse("Client Error!", listOpenFilesError.get()); clientThread.join(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org