HDFS-11847. Enhance dfsadmin listOpenFiles command to list files blocking 
datanode decommissioning.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/42a1c985
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/42a1c985
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/42a1c985

Branch: refs/heads/YARN-1011
Commit: 42a1c98597e6dba2e371510a6b2b6b1fb94e4090
Parents: b4d1133
Author: Manoj Govindassamy <manoj...@apache.org>
Authored: Tue Jan 2 14:59:36 2018 -0800
Committer: Manoj Govindassamy <manoj...@apache.org>
Committed: Tue Jan 2 14:59:36 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  16 +-
 .../hadoop/hdfs/DistributedFileSystem.java      |   8 +
 .../apache/hadoop/hdfs/client/HdfsAdmin.java    |   7 +
 .../hadoop/hdfs/protocol/ClientProtocol.java    |  16 ++
 .../hadoop/hdfs/protocol/OpenFilesIterator.java |  36 +++-
 .../ClientNamenodeProtocolTranslatorPB.java     |  18 +-
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  24 +++
 .../src/main/proto/ClientNamenodeProtocol.proto |   7 +
 ...tNamenodeProtocolServerSideTranslatorPB.java |   7 +-
 .../server/blockmanagement/BlockManager.java    |   2 +-
 .../blockmanagement/DatanodeAdminManager.java   |  25 ++-
 .../blockmanagement/DatanodeDescriptor.java     |  24 ++-
 .../federation/router/RouterRpcServer.java      |  10 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  49 ++++-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  10 +-
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java  |  36 +++-
 .../src/site/markdown/HDFSCommands.md           |   2 +-
 .../apache/hadoop/hdfs/AdminStatesBaseTest.java |  18 +-
 .../apache/hadoop/hdfs/TestDecommission.java    | 177 +++++++++++++++++++
 .../org/apache/hadoop/hdfs/TestHdfsAdmin.java   |   4 +-
 .../blockmanagement/BlockManagerTestUtil.java   |  12 +-
 .../hdfs/server/namenode/TestLeaseManager.java  |  48 ++---
 .../hdfs/server/namenode/TestListOpenFiles.java |  27 ++-
 23 files changed, 520 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 59f553b..c774132 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -133,6 +133,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.ReencryptionStatusIterator;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@@ -3084,8 +3085,21 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
    *
    * @throws IOException
    */
+  @Deprecated
   public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
     checkOpen();
-    return new OpenFilesIterator(namenode, tracer);
+    return listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+  }
+
+  /**
+   * Get a remote iterator to the open files list by type, managed by NameNode.
+   *
+   * @param openFilesTypes
+   * @throws IOException
+   */
+  public RemoteIterator<OpenFileEntry> listOpenFiles(
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
+    checkOpen();
+    return new OpenFilesIterator(namenode, tracer, openFilesTypes);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 6b0c57a..85e5964 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsPathHandle;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -3079,10 +3080,17 @@ public class DistributedFileSystem extends FileSystem
    * <p/>
    * This method can only be called by HDFS superusers.
    */
+  @Deprecated
   public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
     return dfs.listOpenFiles();
   }
 
+  public RemoteIterator<OpenFileEntry> listOpenFiles(
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
+    return dfs.listOpenFiles(openFilesTypes);
+  }
+
+
   /**
    * Create a {@link HdfsDataOutputStreamBuilder} to append a file on DFS.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
index 9116167..e620039 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
@@ -46,6 +46,7 @@ import 
org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -652,8 +653,14 @@ public class HdfsAdmin {
    * <p/>
    * This method can only be called by HDFS superusers.
    */
+  @Deprecated
   public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
     return dfs.listOpenFiles();
   }
 
+  public RemoteIterator<OpenFileEntry> listOpenFiles(
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
+    return dfs.listOpenFiles(openFilesTypes);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index e8a33dd..38c242a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@@ -1713,5 +1714,20 @@ public interface ClientProtocol {
    * @throws IOException
    */
   @Idempotent
+  @Deprecated
   BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException;
+
+  /**
+   * List open files in the system in batches. INode id is the cursor and the
+   * open files returned in a batch will have their INode ids greater than
+   * the cursor INode id. Open files can only be requested by super user and
+   * the the list across batches are not atomic.
+   *
+   * @param prevId the cursor INode id.
+   * @param openFilesTypes types to filter the open files
+   * @throws IOException
+   */
+  @Idempotent
+  BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
index c24e585..d113d65 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.io.IOException;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -35,20 +36,51 @@ import org.apache.htrace.core.Tracer;
 @InterfaceStability.Evolving
 public class OpenFilesIterator extends
     BatchedRemoteIterator<Long, OpenFileEntry> {
+
+  /**
+   * Open file types to filter the results.
+   */
+  public enum OpenFilesType {
+
+    ALL_OPEN_FILES((short) 0x01),
+    BLOCKING_DECOMMISSION((short) 0x02);
+
+    private final short mode;
+    OpenFilesType(short mode) {
+      this.mode = mode;
+    }
+
+    public short getMode() {
+      return mode;
+    }
+
+    public static OpenFilesType valueOf(short num) {
+      for (OpenFilesType type : OpenFilesType.values()) {
+        if (type.getMode() == num) {
+          return type;
+        }
+      }
+      return null;
+    }
+  }
+
   private final ClientProtocol namenode;
   private final Tracer tracer;
+  private final EnumSet<OpenFilesType> types;
 
-  public OpenFilesIterator(ClientProtocol namenode, Tracer tracer) {
+  public OpenFilesIterator(ClientProtocol namenode, Tracer tracer,
+      EnumSet<OpenFilesType> types) {
     super(HdfsConstants.GRANDFATHER_INODE_ID);
     this.namenode = namenode;
     this.tracer = tracer;
+    this.types = types;
   }
 
   @Override
   public BatchedEntries<OpenFileEntry> makeRequest(Long prevId)
       throws IOException {
     try (TraceScope ignored = tracer.newScope("listOpenFiles")) {
-      return namenode.listOpenFiles(prevId);
+      return namenode.listOpenFiles(prevId, types);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 9ccc2fa..ea5c951 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
@@ -1893,13 +1894,24 @@ public class ClientNamenodeProtocolTranslatorPB 
implements
     }
   }
 
+  @Deprecated
   @Override
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
       throws IOException {
-    ListOpenFilesRequestProto req =
-        ListOpenFilesRequestProto.newBuilder().setId(prevId).build();
+    return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+  }
+
+  @Override
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
+    ListOpenFilesRequestProto.Builder req =
+        ListOpenFilesRequestProto.newBuilder().setId(prevId);
+    if (openFilesTypes != null) {
+      req.addAllTypes(PBHelperClient.convertOpenFileTypes(openFilesTypes));
+    }
     try {
-      ListOpenFilesResponseProto response = rpcProxy.listOpenFiles(null, req);
+      ListOpenFilesResponseProto response =
+          rpcProxy.listOpenFiles(null, req.build());
       List<OpenFileEntry> openFileEntries =
           Lists.newArrayListWithCapacity(response.getEntriesCount());
       for (OpenFilesBatchResponseProto p : response.getEntriesList()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 813083f..3180f70 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
@@ -131,6 +132,7 @@ import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsE
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsReplicatedBlockStatsResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesTypeProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
@@ -3258,5 +3260,27 @@ public class PBHelperClient {
         .build();
   }
 
+  public static EnumSet<OpenFilesType> convertOpenFileTypes(
+      List<OpenFilesTypeProto> openFilesTypeProtos) {
+    EnumSet<OpenFilesType> types = EnumSet.noneOf(OpenFilesType.class);
+    for (OpenFilesTypeProto af : openFilesTypeProtos) {
+      OpenFilesType type = OpenFilesType.valueOf((short)af.getNumber());
+      if (type != null) {
+        types.add(type);
+      }
+    }
+    return types;
+  }
 
+  public static List<OpenFilesTypeProto> convertOpenFileTypes(
+      EnumSet<OpenFilesType> types) {
+    List<OpenFilesTypeProto> typeProtos = new ArrayList<>();
+    for (OpenFilesType type : types) {
+      OpenFilesTypeProto typeProto = 
OpenFilesTypeProto.valueOf(type.getMode());
+      if (typeProto != null) {
+        typeProtos.add(typeProto);
+      }
+    }
+    return typeProtos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index b33462b..f247da8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -796,8 +796,14 @@ message GetEditsFromTxidResponseProto {
   required EventsListProto eventsList = 1;
 }
 
+enum OpenFilesTypeProto {
+  ALL_OPEN_FILES = 1;
+  BLOCKING_DECOMMISSION = 2;
+}
+
 message ListOpenFilesRequestProto {
   required int64 id = 1;
+  repeated OpenFilesTypeProto types = 2;
 }
 
 message OpenFilesBatchResponseProto {
@@ -810,6 +816,7 @@ message OpenFilesBatchResponseProto {
 message ListOpenFilesResponseProto {
   repeated OpenFilesBatchResponseProto entries = 1;
   required bool hasMore = 2;
+  repeated OpenFilesTypeProto types = 3;
 }
 
 service ClientNamenodeProtocol {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index d63460b..a9d2d1e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
@@ -1852,13 +1853,17 @@ public class 
ClientNamenodeProtocolServerSideTranslatorPB implements
   public ListOpenFilesResponseProto listOpenFiles(RpcController controller,
       ListOpenFilesRequestProto req) throws ServiceException {
     try {
-      BatchedEntries<OpenFileEntry> entries = 
server.listOpenFiles(req.getId());
+      EnumSet<OpenFilesType> openFilesTypes =
+          PBHelperClient.convertOpenFileTypes(req.getTypesList());
+      BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId(),
+          openFilesTypes);
       ListOpenFilesResponseProto.Builder builder =
           ListOpenFilesResponseProto.newBuilder();
       builder.setHasMore(entries.hasMore());
       for (int i = 0; i < entries.size(); i++) {
         builder.addEntries(PBHelperClient.convert(entries.get(i)));
       }
+      builder.addAllTypes(req.getTypesList());
       return builder.build();
     } catch (IOException e) {
       throw new ServiceException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 59e06c6..6b7175d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2294,7 +2294,7 @@ public class BlockManager implements BlockStatsMXBean {
    * If there were any reconstruction requests that timed out, reap them
    * and put them back into the neededReconstruction queue
    */
-  private void processPendingReconstructions() {
+  void processPendingReconstructions() {
     BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
     if (timedOutItems != null) {
       namesystem.writeLock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
index 928036a..e338591 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
@@ -36,10 +36,14 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.util.CyclicIteration;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.util.ChunkedArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -649,8 +653,10 @@ public class DatanodeAdminManager {
         boolean pruneReliableBlocks) {
       boolean firstReplicationLog = true;
       // Low redundancy in UC Blocks only
-      int lowRedundancyInOpenFiles = 0;
-      // All low redundancy blocks. Includes lowRedundancyInOpenFiles.
+      int lowRedundancyBlocksInOpenFiles = 0;
+      LightWeightHashSet<Long> lowRedundancyOpenFiles =
+          new LightWeightLinkedSet<>();
+      // All low redundancy blocks. Includes lowRedundancyOpenFiles.
       int lowRedundancyBlocks = 0;
       // All maintenance and decommission replicas.
       int outOfServiceOnlyReplicas = 0;
@@ -737,15 +743,24 @@ public class DatanodeAdminManager {
         // Update various counts
         lowRedundancyBlocks++;
         if (bc.isUnderConstruction()) {
-          lowRedundancyInOpenFiles++;
+          INode ucFile = namesystem.getFSDirectory().getInode(bc.getId());
+          if(!(ucFile instanceof  INodeFile) ||
+              !ucFile.asFile().isUnderConstruction()) {
+            LOG.warn("File " + ucFile.getLocalName() + " is not under " +
+                "construction. Skipping add to low redundancy open files!");
+          } else {
+            lowRedundancyBlocksInOpenFiles++;
+            lowRedundancyOpenFiles.add(ucFile.getId());
+          }
         }
         if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
           outOfServiceOnlyReplicas++;
         }
       }
 
-      datanode.getLeavingServiceStatus().set(lowRedundancyInOpenFiles,
-          lowRedundancyBlocks, outOfServiceOnlyReplicas);
+      datanode.getLeavingServiceStatus().set(lowRedundancyBlocksInOpenFiles,
+          lowRedundancyOpenFiles, lowRedundancyBlocks,
+          outOfServiceOnlyReplicas);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 618bc13..16ffb43 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -831,17 +832,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /** Leaving service status. */
   public class LeavingServiceStatus {
     private int underReplicatedBlocks;
+    private int underReplicatedBlocksInOpenFiles;
     private int outOfServiceOnlyReplicas;
-    private int underReplicatedInOpenFiles;
+    private LightWeightHashSet<Long> underReplicatedOpenFiles =
+        new LightWeightLinkedSet<>();
     private long startTime;
     
-    synchronized void set(int underRepInOpenFiles, int underRepBlocks,
-        int outOfServiceOnlyRep) {
+    synchronized void set(int lowRedundancyBlocksInOpenFiles,
+        LightWeightHashSet<Long> underRepInOpenFiles,
+        int underRepBlocks, int outOfServiceOnlyRep) {
       if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return;
       }
-      underReplicatedInOpenFiles = underRepInOpenFiles;
+      underReplicatedOpenFiles = underRepInOpenFiles;
       underReplicatedBlocks = underRepBlocks;
+      underReplicatedBlocksInOpenFiles = lowRedundancyBlocksInOpenFiles;
       outOfServiceOnlyReplicas = outOfServiceOnlyRep;
     }
 
@@ -864,7 +869,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
       if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
-      return underReplicatedInOpenFiles;
+      return underReplicatedBlocksInOpenFiles;
+    }
+    /** @return the collection of under-replicated blocks in open files */
+    public synchronized LightWeightHashSet<Long> getOpenFiles() {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
+        return new LightWeightLinkedSet<>();
+      }
+      return underReplicatedOpenFiles;
     }
     /** Set start time */
     public synchronized void setStartTime(long time) {
@@ -880,7 +892,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
       }
       return startTime;
     }
-  }  // End of class DecommissioningStatus
+  }  // End of class LeavingServiceStatus
 
   /**
    * Set the flag to indicate if this datanode is disallowed from communicating

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index c6cd595..537eaf4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -1935,9 +1936,16 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
     return null;
   }
 
+  @Deprecated
   @Override
-  public BatchedEntries<OpenFileEntry> listOpenFiles(long arg0)
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
       throws IOException {
+    return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+  }
+
+  @Override
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
     checkOperation(OperationCategory.READ, false);
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 286c41c..54decc8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -93,6 +93,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
 
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
@@ -276,6 +277,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -1762,12 +1764,14 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    * the open files returned in a batch will have their INode ids greater than
    * this cursor. Open files can only be requested by super user and the the
    * list across batches does not represent a consistent view of all open 
files.
+   * TODO: HDFS-12969 - to report open files by type.
    *
    * @param prevId the cursor INode id.
+   * @param openFilesTypes
    * @throws IOException
    */
-  BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId)
-      throws IOException {
+  BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
     final String operationName = "listOpenFiles";
     checkSuperuserPrivilege();
     checkOperation(OperationCategory.READ);
@@ -1775,7 +1779,16 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     BatchedListEntries<OpenFileEntry> batchedListEntries;
     try {
       checkOperation(OperationCategory.READ);
-      batchedListEntries = leaseManager.getUnderConstructionFiles(prevId);
+      if(openFilesTypes.contains(OpenFilesType.ALL_OPEN_FILES)) {
+        batchedListEntries = leaseManager.getUnderConstructionFiles(prevId);
+      } else {
+        if(openFilesTypes.contains(OpenFilesType.BLOCKING_DECOMMISSION)) {
+          batchedListEntries = getFilesBlockingDecom(prevId);
+        } else {
+          throw new IllegalArgumentException("Unknown OpenFileType: "
+              + openFilesTypes);
+        }
+      }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, null);
       throw e;
@@ -1786,6 +1799,36 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     return batchedListEntries;
   }
 
+  public BatchedListEntries<OpenFileEntry> getFilesBlockingDecom(long prevId) {
+    assert hasReadLock();
+    final List<OpenFileEntry> openFileEntries = Lists.newArrayList();
+    LightWeightHashSet<Long> openFileIds = new LightWeightHashSet<>();
+    for (DatanodeDescriptor dataNode :
+        blockManager.getDatanodeManager().getDatanodes()) {
+      for (long ucFileId : dataNode.getLeavingServiceStatus().getOpenFiles()) {
+        INode ucFile = getFSDirectory().getInode(ucFileId);
+        if (ucFile == null || ucFileId <= prevId ||
+            openFileIds.contains(ucFileId)) {
+          // probably got deleted or
+          // part of previous batch or
+          // already part of the current batch
+          continue;
+        }
+        Preconditions.checkState(ucFile instanceof INodeFile);
+        openFileIds.add(ucFileId);
+        INodeFile inodeFile = ucFile.asFile();
+        openFileEntries.add(new OpenFileEntry(
+            inodeFile.getId(), inodeFile.getFullPathName(),
+            inodeFile.getFileUnderConstructionFeature().getClientName(),
+            inodeFile.getFileUnderConstructionFeature().getClientMachine()));
+        if (openFileIds.size() >= this.maxListOpenFilesResponses) {
+          return new BatchedListEntries<>(openFileEntries, true);
+        }
+      }
+    }
+    return new BatchedListEntries<>(openFileEntries, false);
+  }
+
   private String metaSaveAsString() {
     StringWriter sw = new StringWriter();
     PrintWriter pw = new PrintWriter(sw);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 94bd15f..80f1ba3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -115,6 +115,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -1334,11 +1335,18 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     namesystem.metaSave(filename);
   }
 
+  @Deprecated
   @Override // ClientProtocol
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
       throws IOException {
+    return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+  }
+
+  @Override // ClientProtocol
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
     checkNNStartup();
-    return namesystem.listOpenFiles(prevId);
+    return namesystem.listOpenFiles(prevId, openFilesTypes);
   }
 
   @Override // ClientProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index f4985a6..7367309 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -462,7 +464,7 @@ public class DFSAdmin extends FsShell {
     "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
     "\t[-metasave filename]\n" +
     "\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
-    "\t[-listOpenFiles]\n" +
+    "\t[-listOpenFiles [-blockingDecommission]]\n" +
     "\t[-help [cmd]]\n";
 
   /**
@@ -913,8 +915,21 @@ public class DFSAdmin extends FsShell {
    * Usage: hdfs dfsadmin -listOpenFiles
    *
    * @throws IOException
+   * @param argv
    */
-  public int listOpenFiles() throws IOException {
+  public int listOpenFiles(String[] argv) throws IOException {
+    List<OpenFilesType> types = new ArrayList<>();
+    if (argv != null) {
+      List<String> args = new ArrayList<>(Arrays.asList(argv));
+      if (StringUtils.popOption("-blockingDecommission", args)) {
+        types.add(OpenFilesType.BLOCKING_DECOMMISSION);
+      }
+    }
+    if (types.isEmpty()) {
+      types.add(OpenFilesType.ALL_OPEN_FILES);
+    }
+    EnumSet<OpenFilesType> openFilesTypes = EnumSet.copyOf(types);
+
     DistributedFileSystem dfs = getDFS();
     Configuration dfsConf = dfs.getConf();
     URI dfsUri = dfs.getUri();
@@ -926,9 +941,9 @@ public class DFSAdmin extends FsShell {
           dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class,
           UserGroupInformation.getCurrentUser(), false);
       openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(),
-          FsTracer.get(dfsConf));
+          FsTracer.get(dfsConf), openFilesTypes);
     } else {
-      openFilesRemoteIterator = dfs.listOpenFiles();
+      openFilesRemoteIterator = dfs.listOpenFiles(openFilesTypes);
     }
     printOpenFiles(openFilesRemoteIterator);
     return 0;
@@ -1214,9 +1229,11 @@ public class DFSAdmin extends FsShell {
         + "\tIf 'incremental' is specified, it will be an incremental\n"
         + "\tblock report; otherwise, it will be a full block report.\n";
 
-    String listOpenFiles = "-listOpenFiles\n"
+    String listOpenFiles = "-listOpenFiles [-blockingDecommission]\n"
         + "\tList all open files currently managed by the NameNode along\n"
-        + "\twith client name and client machine accessing them.\n";
+        + "\twith client name and client machine accessing them.\n"
+        + "\tIf 'blockingDecommission' option is specified, it will list the\n"
+        + "\topen files only that are blocking the ongoing Decommission.";
 
     String help = "-help [cmd]: \tDisplays help for the given command or all 
commands if none\n" +
       "\t\tis specified.\n";
@@ -1964,7 +1981,8 @@ public class DFSAdmin extends FsShell {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
     } else if ("-listOpenFiles".equals(cmd)) {
-      System.err.println("Usage: hdfs dfsadmin [-listOpenFiles]");
+      System.err.println("Usage: hdfs dfsadmin"
+          + " [-listOpenFiles [-blockingDecommission]]");
     } else {
       System.err.println("Usage: hdfs dfsadmin");
       System.err.println("Note: Administrative commands can only be run as the 
HDFS superuser.");
@@ -2119,7 +2137,7 @@ public class DFSAdmin extends FsShell {
         return exitCode;
       }
     } else if ("-listOpenFiles".equals(cmd)) {
-      if (argv.length != 1) {
+      if ((argv.length != 1) && (argv.length != 2)) {
         printUsage(cmd);
         return exitCode;
       }
@@ -2205,7 +2223,7 @@ public class DFSAdmin extends FsShell {
       } else if ("-triggerBlockReport".equals(cmd)) {
         exitCode = triggerBlockReport(argv);
       } else if ("-listOpenFiles".equals(cmd)) {
-        exitCode = listOpenFiles();
+        exitCode = listOpenFiles(argv);
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 316b955..a13116f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -409,7 +409,7 @@ Usage:
 | `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about 
the given datanode. See [Rolling Upgrade 
document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. |
 | `-metasave` filename | Save Namenode's primary data structures to *filename* 
in the directory specified by hadoop.log.dir property. *filename* is 
overwritten if it exists. *filename* will contain one line for each of the 
following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to 
be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to 
be deleted |
 | `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | 
Trigger a block report for the given datanode. If 'incremental' is specified, 
it will be otherwise, it will be a full block report. |
-| `-listOpenFiles` | List all open files currently managed by the NameNode 
along with client name and client machine accessing them. |
+| `-listOpenFiles` `[-blockingDecommission]` | List all open files currently 
managed by the NameNode along with client name and client machine accessing 
them. |
 | `-help` [cmd] | Displays help for the given command or all commands if none 
is specified. |
 
 Runs a HDFS dfsadmin client.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
index c0cef19..5d96b7b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
@@ -388,9 +388,19 @@ public class AdminStatesBaseTest {
   protected void startCluster(int numNameNodes, int numDatanodes,
       boolean setupHostsFile, long[] nodesCapacity,
       boolean checkDataNodeHostConfig) throws IOException {
+    startCluster(numNameNodes, numDatanodes, setupHostsFile, nodesCapacity,
+        checkDataNodeHostConfig, true);
+  }
+
+  protected void startCluster(int numNameNodes, int numDatanodes,
+      boolean setupHostsFile, long[] nodesCapacity,
+      boolean checkDataNodeHostConfig, boolean federation) throws IOException {
     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
         .numDataNodes(numDatanodes);
+    if (federation) {
+      builder.nnTopology(
+          MiniDFSNNTopology.simpleFederatedTopology(numNameNodes));
+    }
     if (setupHostsFile) {
       builder.setupHostsFile(setupHostsFile);
     }
@@ -413,6 +423,12 @@ public class AdminStatesBaseTest {
     startCluster(numNameNodes, numDatanodes, false, null, false);
   }
 
+  protected void startSimpleCluster(int numNameNodes, int numDatanodes)
+      throws IOException {
+    startCluster(numNameNodes, numDatanodes, false, null, false, false);
+  }
+
+
   protected void startSimpleHACluster(int numDatanodes) throws IOException {
     cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index ac14a2a..d82025c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -22,16 +22,23 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Scanner;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.text.StrBuilder;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -60,7 +67,9 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -651,6 +660,174 @@ public class TestDecommission extends AdminStatesBaseTest 
{
     fdos.close();
   }
 
+  private static String scanIntoString(final ByteArrayOutputStream baos) {
+    final StrBuilder sb = new StrBuilder();
+    final Scanner scanner = new Scanner(baos.toString());
+    while (scanner.hasNextLine()) {
+      sb.appendln(scanner.nextLine());
+    }
+    scanner.close();
+    return sb.toString();
+  }
+
+  private boolean verifyOpenFilesListing(String message,
+      HashSet<Path> closedFileSet,
+      HashMap<Path, FSDataOutputStream> openFilesMap,
+      ByteArrayOutputStream out, int expOpenFilesListSize) {
+    final String outStr = scanIntoString(out);
+    LOG.info(message + " - stdout: \n" + outStr);
+    for (Path closedFilePath : closedFileSet) {
+      if(outStr.contains(closedFilePath.toString())) {
+        return false;
+      }
+    }
+    HashSet<Path> openFilesNotListed = new HashSet<>();
+    for (Path openFilePath : openFilesMap.keySet()) {
+      if(!outStr.contains(openFilePath.toString())) {
+        openFilesNotListed.add(openFilePath);
+      }
+    }
+    int actualOpenFilesListedSize =
+        openFilesMap.size() - openFilesNotListed.size();
+    if (actualOpenFilesListedSize >= expOpenFilesListSize) {
+      return true;
+    } else {
+      LOG.info("Open files that are not listed yet: " + openFilesNotListed);
+      return false;
+    }
+  }
+
+  private void verifyOpenFilesBlockingDecommission(HashSet<Path> closedFileSet,
+      HashMap<Path, FSDataOutputStream> openFilesMap, final int maxOpenFiles)
+      throws Exception {
+    final PrintStream oldStreamOut = System.out;
+    try {
+      final ByteArrayOutputStream toolOut = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(toolOut));
+      final DFSAdmin dfsAdmin = new DFSAdmin(getConf());
+
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            toolOut.reset();
+            assertEquals(0, ToolRunner.run(dfsAdmin,
+                new String[]{"-listOpenFiles", "-blockingDecommission"}));
+            toolOut.flush();
+            return verifyOpenFilesListing(
+                "dfsadmin -listOpenFiles -blockingDecommission",
+                closedFileSet, openFilesMap, toolOut, maxOpenFiles);
+          } catch (Exception e) {
+            LOG.warn("Unexpected exception: " + e);
+          }
+          return false;
+        }
+      }, 1000, 60000);
+    } finally {
+      System.setOut(oldStreamOut);
+    }
+  }
+
+  @Test(timeout=180000)
+  public void testDecommissionWithOpenfileReporting()
+      throws Exception {
+    LOG.info("Starting test testDecommissionWithOpenfileReporting");
+
+    // Disable redundancy monitor check so that open files blocking
+    // decommission can be listed and verified.
+    getConf().setInt(
+        DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+        1000);
+    getConf().setLong(
+        DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 1);
+
+    //At most 1 node can be decommissioned
+    startSimpleCluster(1, 4);
+
+    FileSystem fileSys = getCluster().getFileSystem(0);
+    FSNamesystem ns = getCluster().getNamesystem(0);
+
+    final String[] closedFiles = new String[3];
+    final String[] openFiles = new String[3];
+    HashSet<Path> closedFileSet = new HashSet<>();
+    HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>();
+    for (int i = 0; i < 3; i++) {
+      closedFiles[i] = "/testDecommissionWithOpenfileReporting.closed." + i;
+      openFiles[i] = "/testDecommissionWithOpenfileReporting.open." + i;
+      writeFile(fileSys, new Path(closedFiles[i]), (short)3, 10);
+      closedFileSet.add(new Path(closedFiles[i]));
+      writeFile(fileSys, new Path(openFiles[i]), (short)3, 10);
+      FSDataOutputStream fdos =  fileSys.append(new Path(openFiles[i]));
+      openFilesMap.put(new Path(openFiles[i]), fdos);
+    }
+
+    HashMap<DatanodeInfo, Integer> dnInfoMap = new HashMap<>();
+    for (int i = 0; i < 3; i++) {
+      LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
+          getCluster().getNameNode(0), openFiles[i], 0, blockSize * 10);
+      for (DatanodeInfo dn : lbs.getLastLocatedBlock().getLocations()) {
+        if (dnInfoMap.containsKey(dn)) {
+          dnInfoMap.put(dn, dnInfoMap.get(dn) + 1);
+        } else {
+          dnInfoMap.put(dn, 1);
+        }
+      }
+    }
+
+    DatanodeInfo dnToDecommission = null;
+    int maxDnOccurance = 0;
+    for (Map.Entry<DatanodeInfo, Integer> entry : dnInfoMap.entrySet()) {
+      if (entry.getValue() > maxDnOccurance) {
+        maxDnOccurance = entry.getValue();
+        dnToDecommission = entry.getKey();
+      }
+    }
+    LOG.info("XXX Dn to decommission: " + dnToDecommission + ", max: "
+        + maxDnOccurance);
+
+    //decommission one of the 3 nodes which have last block
+    DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
+    ArrayList<String> nodes = new ArrayList<>();
+    dnToDecommission = dm.getDatanode(dnToDecommission.getDatanodeUuid());
+    nodes.add(dnToDecommission.getXferAddr());
+    initExcludeHosts(nodes);
+    refreshNodes(0);
+    waitNodeState(dnToDecommission, AdminStates.DECOMMISSION_INPROGRESS);
+
+    // list and verify all the open files that are blocking decommission
+    verifyOpenFilesBlockingDecommission(
+        closedFileSet, openFilesMap, maxDnOccurance);
+
+    final AtomicBoolean stopRedundancyMonitor = new AtomicBoolean(false);
+    Thread monitorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopRedundancyMonitor.get()) {
+          try {
+            BlockManagerTestUtil.checkRedundancy(
+                getCluster().getNamesystem().getBlockManager());
+            BlockManagerTestUtil.updateState(
+                getCluster().getNamesystem().getBlockManager());
+            Thread.sleep(1000);
+          } catch (Exception e) {
+            LOG.warn("Encountered exception during redundancy monitor: " + e);
+          }
+        }
+      }
+    });
+    monitorThread.start();
+
+    waitNodeState(dnToDecommission, AdminStates.DECOMMISSIONED);
+    stopRedundancyMonitor.set(true);
+    monitorThread.join();
+
+    // Open file is no more blocking decommission as all its blocks
+    // are re-replicated.
+    openFilesMap.clear();
+    verifyOpenFilesBlockingDecommission(
+        closedFileSet, openFilesMap, 0);
+  }
+
   @Test(timeout = 360000)
   public void testDecommissionWithOpenFileAndBlockRecovery()
       throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
index 685ea8b..3cb10bf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
@@ -41,6 +42,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.junit.After;
 import org.junit.Assert;
@@ -254,7 +256,7 @@ public class TestHdfsAdmin {
     HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     HashSet<Path> openFiles = new HashSet<>(openFileMap.keySet());
     RemoteIterator<OpenFileEntry> openFilesRemoteItr =
-        hdfsAdmin.listOpenFiles();
+        hdfsAdmin.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
     while (openFilesRemoteItr.hasNext()) {
       String filePath = openFilesRemoteItr.next().getFilePath();
       assertFalse(filePath + " should not be listed under open files!",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 7ee766f..dfb40a6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -168,7 +168,17 @@ public class BlockManagerTestUtil {
   public static int computeInvalidationWork(BlockManager bm) {
     return bm.computeInvalidateWork(Integer.MAX_VALUE);
   }
-  
+
+  /**
+   * Check the redundancy of blocks and trigger replication if needed.
+   * @param blockManager
+   */
+  public static void checkRedundancy(final BlockManager blockManager) {
+    blockManager.computeDatanodeWork();
+    blockManager.processPendingReconstructions();
+    blockManager.rescanPostponedMisreplicatedBlocks();
+  }
+
   /**
    * Compute all the replication and invalidation work for the
    * given BlockManager.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
index 55bc7c3..0a8da4b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
@@ -206,7 +206,7 @@ public class TestLeaseManager {
         HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
         perm, 0L);
     when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
-    verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
+    verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, 0, 0, 0);
 
     for (Long iNodeId : iNodeIds) {
       INodeFile iNodeFile = stubInodeFile(iNodeId);
@@ -215,13 +215,13 @@ public class TestLeaseManager {
       when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
       lm.addLease("holder_" + iNodeId, iNodeId);
     }
-    verifyINodeLeaseCounts(lm, rootInodeDirectory, iNodeIds.size(),
-        iNodeIds.size(), iNodeIds.size());
+    verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory,
+        iNodeIds.size(), iNodeIds.size(), iNodeIds.size());
 
     for (Long iNodeId : iNodeIds) {
       lm.removeLease(iNodeId);
     }
-    verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
+    verifyINodeLeaseCounts(fsNamesystem, lm, rootInodeDirectory, 0, 0, 0);
   }
 
   /**
@@ -246,41 +246,44 @@ public class TestLeaseManager {
 
     // Case 1: No open files
     int scale = 0;
-    testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
+    testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
+        rootInodeDirectory, scale);
 
     for (int workerCount = 1;
          workerCount <= LeaseManager.INODE_FILTER_WORKER_COUNT_MAX / 2;
          workerCount++) {
       // Case 2: Open files count is half of worker task size
       scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN / 2;
-      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+      testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
           rootInodeDirectory, scale);
 
       // Case 3: Open files count is 1 less of worker task size
       scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN - 1;
-      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+      testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
           rootInodeDirectory, scale);
 
       // Case 4: Open files count is equal to worker task size
       scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN;
-      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+      testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
           rootInodeDirectory, scale);
 
       // Case 5: Open files count is 1 more than worker task size
       scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN + 1;
-      testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
+      testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
           rootInodeDirectory, scale);
     }
 
     // Case 6: Open files count is way more than worker count
     scale = 1279;
-    testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
+    testInodeWithLeasesAtScaleImpl(fsNamesystem, lm, fsDirectory,
+        rootInodeDirectory, scale);
   }
 
-  private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager,
-      final FSDirectory fsDirectory, INodeDirectory ancestorDirectory,
-      int scale) throws IOException {
-    verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
+  private void testInodeWithLeasesAtScaleImpl(FSNamesystem fsNamesystem,
+      final LeaseManager leaseManager, final FSDirectory fsDirectory,
+      INodeDirectory ancestorDirectory, int scale) throws IOException {
+    verifyINodeLeaseCounts(
+        fsNamesystem, leaseManager, ancestorDirectory, 0, 0, 0);
 
     Set<Long> iNodeIds = new HashSet<>();
     for (int i = 0; i < scale; i++) {
@@ -293,11 +296,12 @@ public class TestLeaseManager {
       when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
       leaseManager.addLease("holder_" + iNodeId, iNodeId);
     }
-    verifyINodeLeaseCounts(leaseManager, ancestorDirectory, iNodeIds.size(),
-        iNodeIds.size(), iNodeIds.size());
+    verifyINodeLeaseCounts(fsNamesystem, leaseManager,
+        ancestorDirectory, iNodeIds.size(), iNodeIds.size(), iNodeIds.size());
 
     leaseManager.removeAllLeases();
-    verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
+    verifyINodeLeaseCounts(fsNamesystem, leaseManager,
+        ancestorDirectory, 0, 0, 0);
   }
 
   /**
@@ -389,10 +393,10 @@ public class TestLeaseManager {
 
   }
 
-  private void verifyINodeLeaseCounts(final LeaseManager leaseManager,
-      INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount,
-      int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount)
-      throws IOException {
+  private void verifyINodeLeaseCounts(FSNamesystem fsNamesystem,
+      LeaseManager leaseManager, INodeDirectory ancestorDirectory,
+      int iNodeIdWithLeaseCount, int iNodeWithLeaseCount,
+      int iNodeUnderAncestorLeaseCount) throws IOException {
     assertEquals(iNodeIdWithLeaseCount,
         leaseManager.getINodeIdWithLeases().size());
     assertEquals(iNodeWithLeaseCount,
@@ -401,6 +405,8 @@ public class TestLeaseManager {
         leaseManager.getINodeWithLeases(ancestorDirectory).size());
     assertEquals(iNodeIdWithLeaseCount,
         leaseManager.getUnderConstructionFiles(0).size());
+    assertEquals(0, (fsNamesystem.getFilesBlockingDecom(0) == null ?
+        0 : fsNamesystem.getFilesBlockingDecom(0).size()));
   }
 
   private Map<String, INode> createINodeTree(INodeDirectory parentDir,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42a1c985/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
index b290194..cfee7ba 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -95,9 +97,13 @@ public class TestListOpenFiles {
     verifyOpenFiles(openFiles);
 
     BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries =
-        nnRpc.listOpenFiles(0);
+        nnRpc.listOpenFiles(0, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
     assertTrue("Open files list should be empty!",
         openFileEntryBatchedEntries.size() == 0);
+    BatchedEntries<OpenFileEntry> openFilesBlockingDecomEntries =
+        nnRpc.listOpenFiles(0, 
EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION));
+    assertTrue("Open files list blocking decommission should be empty!",
+        openFilesBlockingDecomEntries.size() == 0);
 
     openFiles.putAll(
         DFSTestUtil.createOpenFiles(fs, "open-1", 1));
@@ -121,16 +127,16 @@ public class TestListOpenFiles {
     }
   }
 
-  private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles)
-      throws IOException {
+  private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles,
+      EnumSet<OpenFilesType> openFilesTypes) throws IOException {
     HashSet<Path> remainingFiles = new HashSet<>(openFiles.keySet());
     OpenFileEntry lastEntry = null;
     BatchedEntries<OpenFileEntry> batchedEntries;
     do {
       if (lastEntry == null) {
-        batchedEntries = nnRpc.listOpenFiles(0);
+        batchedEntries = nnRpc.listOpenFiles(0, openFilesTypes);
       } else {
-        batchedEntries = nnRpc.listOpenFiles(lastEntry.getId());
+        batchedEntries = nnRpc.listOpenFiles(lastEntry.getId(), 
openFilesTypes);
       }
       assertTrue("Incorrect open files list size!",
           batchedEntries.size() <= BATCH_SIZE);
@@ -146,6 +152,13 @@ public class TestListOpenFiles {
         remainingFiles.size() == 0);
   }
 
+  private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles)
+      throws IOException {
+    verifyOpenFiles(openFiles, EnumSet.of(OpenFilesType.ALL_OPEN_FILES));
+    verifyOpenFiles(new HashMap<>(),
+        EnumSet.of(OpenFilesType.BLOCKING_DECOMMISSION));
+  }
+
   private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix,
       int numFilesToCreate) throws IOException {
     HashSet<Path> files = new HashSet<>();
@@ -197,6 +210,8 @@ public class TestListOpenFiles {
             try {
               assertEquals(0, ToolRunner.run(dfsAdmin,
                   new String[] {"-listOpenFiles"}));
+              assertEquals(0, ToolRunner.run(dfsAdmin,
+                  new String[] {"-listOpenFiles", "-blockingDecommission"}));
               // Sleep for some time to avoid
               // flooding logs with listing.
               Thread.sleep(listingIntervalMsec);
@@ -222,6 +237,8 @@ public class TestListOpenFiles {
 
       assertEquals(0, ToolRunner.run(dfsAdmin,
           new String[] {"-listOpenFiles"}));
+      assertEquals(0, ToolRunner.run(dfsAdmin,
+          new String[] {"-listOpenFiles", "-blockingDecommission"}));
       assertFalse("Client Error!", listOpenFilesError.get());
 
       clientThread.join();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to