KevinWikant commented on a change in pull request #3675:
URL: https://github.com/apache/hadoop/pull/3675#discussion_r770854361



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
##########
@@ -1654,4 +1658,204 @@ public Boolean get() {
 
     cleanupFile(fileSys, file);
   }
+
+  /**
+   * Test DatanodeAdminManager logic to re-queue unhealthy decommissioning 
nodes
+   * which are blocking the decommissioning of healthy nodes.
+   * Force the tracked nodes set to be filled with nodes lost while 
decommissioning,
+   * then decommission healthy nodes & validate they are decommissioned 
eventually.
+   */
+  @Test(timeout = 120000)
+  public void testRequeueUnhealthyDecommissioningNodes() throws Exception {
+    // Create a MiniDFSCluster with 3 live datanode in AdminState=NORMAL and
+    // 2 dead datanodes in AdminState=DECOMMISSION_INPROGRESS and a file
+    // with replication factor of 5.
+    final int numLiveNodes = 3;
+    final int numDeadNodes = 2;
+    final int numNodes = numLiveNodes + numDeadNodes;
+    final List<DatanodeDescriptor> liveNodes = new ArrayList<>();
+    final Map<DatanodeDescriptor, MiniDFSCluster.DataNodeProperties> 
deadNodeProps =
+        new HashMap<>();
+    final ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
+    final Path filePath = new Path("/tmp/test");
+    createClusterWithDeadNodesDecommissionInProgress(numLiveNodes, liveNodes, 
numDeadNodes,
+        deadNodeProps, decommissionedNodes, filePath);
+    final FSNamesystem namesystem = getCluster().getNamesystem();
+    final BlockManager blockManager = namesystem.getBlockManager();
+    final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
+    final DatanodeAdminManager decomManager = 
datanodeManager.getDatanodeAdminManager();
+
+    // Validate the 2 "dead" nodes are not removed from the tracked nodes set
+    // after several seconds of operation
+    final Duration checkDuration = Duration.ofSeconds(5);
+    Instant checkUntil = Instant.now().plus(checkDuration);
+    while (Instant.now().isBefore(checkUntil)) {
+      BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+      assertEquals(0, decomManager.getNumPendingNodes());
+      assertEquals(numDeadNodes, decomManager.getNumTrackedNodes());
+      assertTrue(deadNodeProps.keySet().stream()
+          .allMatch(node -> 
node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
+      Thread.sleep(500);
+    }
+
+    // Delete the file such that its no longer a factor blocking 
decommissioning of live nodes
+    // which have block replicas for that file
+    getCluster().getFileSystem().delete(filePath, true);
+
+    // Start decommissioning 2 "live" datanodes
+    int numLiveDecommNodes = 2;
+    final List<DatanodeDescriptor> liveDecommNodes = liveNodes.subList(0, 
numLiveDecommNodes);
+    for (final DatanodeDescriptor liveNode : liveDecommNodes) {
+      takeNodeOutofService(0, liveNode.getDatanodeUuid(), 0, 
decommissionedNodes,
+          AdminStates.DECOMMISSION_INPROGRESS);
+      decommissionedNodes.add(liveNode);
+    }
+
+    // Write a new file such that there are under-replicated blocks preventing 
decommissioning
+    // of dead nodes
+    writeFile(getCluster().getFileSystem(), filePath, numNodes, 10);
+
+    // Validate that the live datanodes are put into the pending 
decommissioning queue
+    GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 
numDeadNodes
+            && decomManager.getNumPendingNodes() == numLiveDecommNodes
+            && liveDecommNodes.stream().allMatch(
+                node -> 
node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)),
+        500, 30000);
+    for (final DatanodeDescriptor node : decomManager.getPendingNodes()) {
+      assertTrue(liveDecommNodes.contains(node));
+    }

Review comment:
       I have modified the relevant parts of the new unit test to use 
"assertThat", note that IntelliJ is informing me that in the maven JUnit 
version "assertThat" is deprecated
   
   As per the guidance here 
[https://junit.org/junit4/javadoc/4.13/deprecated-list.html], I have used 
"org.hamcrest.MatcherAssert.assertThat" instead of "org.junit.Assert.assertThat"
   
   I have also added error messages to the assertions

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java
##########
@@ -151,4 +163,34 @@ public int getPendingNodeCount() {
   public Queue<DatanodeDescriptor> getPendingNodes() {
     return pendingNodes;
   }
+
+  /**
+   * If node "is dead while in Decommission In Progress", it cannot be 
decommissioned
+   * until it becomes healthy again. If there are more pendingNodes than can 
be tracked
+   * & some unhealthy tracked nodes, then re-queue the unhealthy tracked nodes
+   * to avoid blocking decommissioning of healthy nodes.
+   *
+   * @param unhealthyDns The unhealthy datanodes which may be re-queued
+   * @param numDecommissioningNodes The total number of nodes being 
decommissioned
+   * @return List of unhealthy nodes to be re-queued
+   */
+  List<DatanodeDescriptor> identifyUnhealthyNodesToRequeue(
+      final List<DatanodeDescriptor> unhealthyDns, int 
numDecommissioningNodes) {

Review comment:
       modified the return type to Stream

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java
##########
@@ -270,12 +274,34 @@ private void check() {
         // an invalid state.
         LOG.warn("DatanodeAdminMonitor caught exception when processing node "
             + "{}.", dn, e);
-        pendingNodes.add(dn);
-        toRemove.add(dn);
+        toRequeue.add(dn);

Review comment:
       I have modified the code to remove the toRequeue variable

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java
##########
@@ -35,12 +38,21 @@
 public abstract class DatanodeAdminMonitorBase
     implements DatanodeAdminMonitorInterface, Configurable {
 
+  /**
+   * Sort by lastUpdate time descending order, such that unhealthy
+   * nodes are de-prioritized given they cannot be decommissioned.
+   */
+  public static final Comparator<DatanodeDescriptor> 
PENDING_NODES_QUEUE_COMPARATOR =
+      (dn1, dn2) -> Long.compare(dn2.getLastUpdate(), dn1.getLastUpdate());
+
   protected BlockManager blockManager;
   protected Namesystem namesystem;
   protected DatanodeAdminManager dnAdmin;
   protected Configuration conf;
 
-  protected final Queue<DatanodeDescriptor> pendingNodes = new ArrayDeque<>();
+  private final PriorityQueue<DatanodeDescriptor> pendingNodes = new 
PriorityQueue<>(
+      
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT,

Review comment:
       I have changed the constructor to use the default initial capacity of 
11: 
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/PriorityQueue.html#%3Cinit%3E(int)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to