This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b447ffc  HDDS-5401. Add more metrics to ReplicationManager to help 
monitor replication progress. (#2382)
b447ffc is described below

commit b447ffcccce368ccad1e053ce264c3923ae423c4
Author: Gui Hecheng <[email protected]>
AuthorDate: Mon Aug 2 11:38:52 2021 +0800

    HDDS-5401. Add more metrics to ReplicationManager to help monitor 
replication progress. (#2382)
---
 .../hdds/scm/container/ReplicationManager.java     | 117 +++++-----
 .../replication/ReplicationManagerMetrics.java     | 161 ++++++++++++++
 .../hdds/scm/container/TestReplicationManager.java | 245 ++++++++++++++++++---
 3 files changed, 438 insertions(+), 85 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 4ab69a0..69f0c1d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -30,7 +30,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.StringJoiner;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +40,7 @@ import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerMetrics;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -66,10 +67,6 @@ import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.metrics2.MetricsCollector;
-import org.apache.hadoop.metrics2.MetricsInfo;
-import org.apache.hadoop.metrics2.MetricsSource;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -92,7 +89,7 @@ import org.slf4j.LoggerFactory;
  * that the containers are properly replicated. Replication Manager deals only
  * with Quasi Closed / Closed container.
  */
-public class ReplicationManager implements MetricsSource, SCMService {
+public class ReplicationManager implements SCMService {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(ReplicationManager.class);
@@ -231,6 +228,11 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   private final Clock clock;
 
   /**
+   * Replication progress related metrics.
+   */
+  private ReplicationManagerMetrics metrics;
+
+  /**
    * Constructs ReplicationManager instance with the given configuration.
    *
    * @param conf OzoneConfiguration
@@ -265,6 +267,7 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
         TimeUnit.MILLISECONDS);
+    this.metrics = null;
 
     // register ReplicationManager to SCMServiceManager.
     serviceManager.register(this);
@@ -279,10 +282,7 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
   @Override
   public synchronized void start() {
     if (!isRunning()) {
-      DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME,
-          "SCM Replication manager (closed container replication) related "
-              + "metrics",
-          this);
+      metrics = ReplicationManagerMetrics.create(this);
       LOG.info("Starting Replication Monitor Thread.");
       running = true;
       replicationMonitor = new Thread(this::run);
@@ -321,7 +321,7 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
       inflightMove.clear();
       inflightMoveFuture.clear();
       running = false;
-      DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME);
+      metrics.unRegister();
       notifyAll();
     } else {
       LOG.info("Replication Monitor Thread is not running.");
@@ -420,12 +420,20 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
          */
         updateInflightAction(container, inflightReplication,
             action -> replicas.stream()
-                .anyMatch(r -> 
r.getDatanodeDetails().equals(action.datanode)));
+                .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)),
+            ()-> metrics.incrNumReplicationCmdsTimeout(),
+            () -> {
+              metrics.incrNumReplicationCmdsCompleted();
+              metrics.incrNumReplicationBytesCompleted(
+                  container.getUsedBytes());
+            });
 
         updateInflightAction(container, inflightDeletion,
             action -> replicas.stream()
                 .noneMatch(r ->
-                    r.getDatanodeDetails().equals(action.datanode)));
+                    r.getDatanodeDetails().equals(action.datanode)),
+            () -> metrics.incrNumDeletionCmdsTimeout(),
+            () -> metrics.incrNumDeletionCmdsCompleted());
 
         /*
          * If container is under deleting and all it's replicas are deleted,
@@ -507,10 +515,14 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
    * @param container Container to update
    * @param inflightActions inflightReplication (or) inflightDeletion
    * @param filter filter to check if the operation is completed
+   * @param timeoutCounter update timeout metrics
+   * @param completedCounter update completed metrics
    */
   private void updateInflightAction(final ContainerInfo container,
       final Map<ContainerID, List<InflightAction>> inflightActions,
-      final Predicate<InflightAction> filter) {
+      final Predicate<InflightAction> filter,
+      final Runnable timeoutCounter,
+      final Runnable completedCounter) {
     final ContainerID id = container.containerID();
     final long deadline = clock.millis() - rmConf.getEventTimeout();
     if (inflightActions.containsKey(id)) {
@@ -528,6 +540,13 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
               NodeOperationalState.IN_SERVICE;
           if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
             iter.remove();
+
+            if (isTimeout) {
+              timeoutCounter.run();
+            } else if (isCompleted) {
+              completedCounter.run();
+            }
+
             updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
                 isNotInService, container, a.datanode, inflightActions);
           }
@@ -1439,6 +1458,9 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
     inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
     sendAndTrackDatanodeCommand(datanode, replicateCommand,
         action -> inflightReplication.get(id).add(action));
+
+    metrics.incrNumReplicationCmdsSent();
+    metrics.incrNumReplicationBytesTotal(container.getUsedBytes());
   }
 
   /**
@@ -1462,6 +1484,8 @@ public class ReplicationManager implements MetricsSource, 
SCMService {
     inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>());
     sendAndTrackDatanodeCommand(datanode, deleteCommand,
         action -> inflightDeletion.get(id).add(action));
+
+    metrics.incrNumDeletionCmdsSent();
   }
 
   /**
@@ -1547,22 +1571,10 @@ public class ReplicationManager implements 
MetricsSource, SCMService {
         .allMatch(r -> ReplicationManager.compareState(state, r.getState()));
   }
 
-  @Override
-  public void getMetrics(MetricsCollector collector, boolean all) {
-    collector.addRecord(ReplicationManager.class.getSimpleName())
-        .addGauge(ReplicationManagerMetrics.INFLIGHT_REPLICATION,
-            inflightReplication.size())
-        .addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION,
-            inflightDeletion.size())
-        .addGauge(ReplicationManagerMetrics.INFLIGHT_MOVE,
-            inflightMove.size())
-        .endRecord();
-  }
-
   /**
    * Wrapper class to hold the InflightAction with its start time.
    */
-  private static final class InflightAction {
+  static final class InflightAction {
 
     private final DatanodeDetails datanode;
     private final long time;
@@ -1572,6 +1584,11 @@ public class ReplicationManager implements 
MetricsSource, SCMService {
       this.datanode = datanode;
       this.time = time;
     }
+
+    @VisibleForTesting
+    public DatanodeDetails getDatanode() {
+      return datanode;
+    }
   }
 
   /**
@@ -1645,35 +1662,6 @@ public class ReplicationManager implements 
MetricsSource, SCMService {
     }
   }
 
-  /**
-   * Metric name definitions for Replication manager.
-   */
-  public enum ReplicationManagerMetrics implements MetricsInfo {
-
-    INFLIGHT_REPLICATION("Tracked inflight container replication requests."),
-    INFLIGHT_DELETION("Tracked inflight container deletion requests."),
-    INFLIGHT_MOVE("Tracked inflight container move requests.");
-
-    private final String desc;
-
-    ReplicationManagerMetrics(String desc) {
-      this.desc = desc;
-    }
-
-    @Override
-    public String description() {
-      return desc;
-    }
-
-    @Override
-    public String toString() {
-      return new StringJoiner(", ", this.getClass().getSimpleName() + "{", "}")
-          .add("name=" + name())
-          .add("description=" + desc)
-          .toString();
-    }
-  }
-
   @Override
   public void notifyStatusChanged() {
     serviceLock.lock();
@@ -1711,4 +1699,21 @@ public class ReplicationManager implements 
MetricsSource, SCMService {
   public String getServiceName() {
     return ReplicationManager.class.getSimpleName();
   }
+
+  public ReplicationManagerMetrics getMetrics() {
+    return this.metrics;
+  }
+
+  public Map<ContainerID, List<InflightAction>> getInflightReplication() {
+    return inflightReplication;
+  }
+
+  public Map<ContainerID, List<InflightAction>> getInflightDeletion() {
+    return inflightDeletion;
+  }
+
+  public Map<ContainerID,
+      Pair<DatanodeDetails, DatanodeDetails>> getInflightMove() {
+    return inflightMove;
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
new file mode 100644
index 0000000..69b1462
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Class contains metrics related to ReplicationManager.
+ */
+@Metrics(about = "Replication Manager Metrics", context = OzoneConsts.OZONE)
+public final class ReplicationManagerMetrics {
+
+  public static final String METRICS_SOURCE_NAME =
+      ReplicationManagerMetrics.class.getSimpleName();
+
+  @Metric("Tracked inflight container replication requests.")
+  private MutableGaugeLong inflightReplication;
+
+  @Metric("Tracked inflight container deletion requests.")
+  private MutableGaugeLong inflightDeletion;
+
+  @Metric("Tracked inflight container move requests.")
+  private MutableGaugeLong inflightMove;
+
+  @Metric("Number of replication commands sent.")
+  private MutableCounterLong numReplicationCmdsSent;
+
+  @Metric("Number of replication commands completed.")
+  private MutableCounterLong numReplicationCmdsCompleted;
+
+  @Metric("Number of replication commands timeout.")
+  private MutableCounterLong numReplicationCmdsTimeout;
+
+  @Metric("Number of deletion commands sent.")
+  private MutableCounterLong numDeletionCmdsSent;
+
+  @Metric("Number of deletion commands completed.")
+  private MutableCounterLong numDeletionCmdsCompleted;
+
+  @Metric("Number of deletion commands timeout.")
+  private MutableCounterLong numDeletionCmdsTimeout;
+
+  @Metric("Number of replication bytes total.")
+  private MutableCounterLong numReplicationBytesTotal;
+
+  @Metric("Number of replication bytes completed.")
+  private MutableCounterLong numReplicationBytesCompleted;
+
+  private ReplicationManager replicationManager;
+
+  public ReplicationManagerMetrics(ReplicationManager manager) {
+    this.replicationManager = manager;
+  }
+
+  public static ReplicationManagerMetrics create(ReplicationManager manager) {
+    return DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME,
+        "SCM Replication manager (closed container replication) related "
+            + "metrics",
+        new ReplicationManagerMetrics(manager));
+  }
+
+  public void unRegister() {
+    DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME);
+  }
+
+  public void incrNumReplicationCmdsSent() {
+    this.numReplicationCmdsSent.incr();
+  }
+
+  public void incrNumReplicationCmdsCompleted() {
+    this.numReplicationCmdsCompleted.incr();
+  }
+
+  public void incrNumReplicationCmdsTimeout() {
+    this.numReplicationCmdsTimeout.incr();
+  }
+
+  public void incrNumDeletionCmdsSent() {
+    this.numDeletionCmdsSent.incr();
+  }
+
+  public void incrNumDeletionCmdsCompleted() {
+    this.numDeletionCmdsCompleted.incr();
+  }
+
+  public void incrNumDeletionCmdsTimeout() {
+    this.numDeletionCmdsTimeout.incr();
+  }
+
+  public void incrNumReplicationBytesTotal(long bytes) {
+    this.numReplicationBytesTotal.incr(bytes);
+  }
+
+  public void incrNumReplicationBytesCompleted(long bytes) {
+    this.numReplicationBytesCompleted.incr(bytes);
+  }
+
+  public long getInflightReplication() {
+    return replicationManager.getInflightReplication().size();
+  }
+
+  public long getInflightDeletion() {
+    return replicationManager.getInflightDeletion().size();
+  }
+
+  public long getInflightMove() {
+    return replicationManager.getInflightMove().size();
+  }
+
+  public long getNumReplicationCmdsSent() {
+    return this.numReplicationCmdsSent.value();
+  }
+
+  public long getNumReplicationCmdsCompleted() {
+    return this.numReplicationCmdsCompleted.value();
+  }
+
+  public long getNumReplicationCmdsTimeout() {
+    return this.numReplicationCmdsTimeout.value();
+  }
+
+  public long getNumDeletionCmdsSent() {
+    return this.numDeletionCmdsSent.value();
+  }
+
+  public long getNumDeletionCmdsCompleted() {
+    return this.numDeletionCmdsCompleted.value();
+  }
+
+  public long getNumDeletionCmdsTimeout() {
+    return this.numDeletionCmdsTimeout.value();
+  }
+
+  public long getNumReplicationBytesTotal() {
+    return this.numReplicationBytesTotal.value();
+  }
+
+  public long getNumReplicationBytesCompleted() {
+    return this.numReplicationBytesCompleted.value();
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 1304258..d7fcde7 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -215,7 +215,6 @@ public class TestReplicationManager {
     final ContainerInfo container = getContainer(LifeCycleState.OPEN);
     containerStateManager.loadContainer(container);
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
 
@@ -250,7 +249,6 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
@@ -261,7 +259,6 @@ public class TestReplicationManager {
     }
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
@@ -296,7 +293,6 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
     // Two of the replicas are in OPEN state
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
@@ -334,7 +330,6 @@ public class TestReplicationManager {
     // All the QUASI_CLOSED replicas have same originNodeId, so the
     // container will not be closed. ReplicationManager should take no action.
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
   }
@@ -353,6 +348,7 @@ public class TestReplicationManager {
       throws SCMException, ContainerNotFoundException, InterruptedException,
       ContainerReplicaNotFoundException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    container.setUsedBytes(100);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
     final ContainerReplica replicaOne = getReplicas(
@@ -375,7 +371,6 @@ public class TestReplicationManager {
     // All the QUASI_CLOSED replicas have same originNodeId, so the
     // container will not be closed. ReplicationManager should take no action.
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
 
@@ -386,26 +381,59 @@ public class TestReplicationManager {
     containerStateManager.updateContainerReplica(id, unhealthyReplica);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     Assert.assertTrue(datanodeCommandHandler.received(
         SCMCommandProto.Type.deleteContainerCommand,
         replicaOne.getDatanodeDetails()));
+    Assert.assertEquals(currentDeleteCommandCount + 1,
+        replicationManager.getMetrics().getNumDeletionCmdsSent());
 
     // Now we will delete the unhealthy replica from in-memory.
     containerStateManager.removeContainerReplica(id, replicaOne);
 
+    final long currentBytesToReplicate = replicationManager.getMetrics()
+        .getNumReplicationBytesTotal();
+
     // The container is under replicated as unhealthy replica is removed
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
 
     // We should get replicate command
     Assert.assertEquals(currentReplicateCommandCount + 1,
         datanodeCommandHandler.getInvocationCount(
             SCMCommandProto.Type.replicateContainerCommand));
+    Assert.assertEquals(currentReplicateCommandCount + 1,
+        replicationManager.getMetrics().getNumReplicationCmdsSent());
+    Assert.assertEquals(currentBytesToReplicate + 100L,
+        replicationManager.getMetrics().getNumReplicationBytesTotal());
+    Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightReplication());
+
+    // Now we add the missing replica back
+    DatanodeDetails targetDn = replicationManager.getInflightReplication()
+        .get(id).get(0).getDatanode();
+    final ContainerReplica replicatedReplicaOne = getReplicas(
+        id, State.CLOSED, 1000L, originNodeId, targetDn);
+    containerStateManager.updateContainerReplica(id, replicatedReplicaOne);
+
+    final long currentReplicationCommandCompleted = replicationManager
+        .getMetrics().getNumReplicationCmdsCompleted();
+    final long currentBytesCompleted = replicationManager.getMetrics()
+        .getNumReplicationBytesCompleted();
+
+    replicationManager.processAll();
+    eventQueue.processAll(1000);
+
+    Assert.assertEquals(0, replicationManager.getInflightReplication().size());
+    Assert.assertEquals(0, replicationManager.getMetrics()
+        .getInflightReplication());
+    Assert.assertEquals(currentReplicationCommandCompleted + 1,
+        replicationManager.getMetrics().getNumReplicationCmdsCompleted());
+    Assert.assertEquals(currentBytesCompleted + 100L,
+        replicationManager.getMetrics().getNumReplicationBytesCompleted());
   }
 
   /**
@@ -437,10 +465,38 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+    Assert.assertEquals(currentDeleteCommandCount + 1,
+        replicationManager.getMetrics().getNumDeletionCmdsSent());
+    Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightDeletion());
+
+    // Now we remove the replica according to inflight
+    DatanodeDetails targetDn = replicationManager.getInflightDeletion()
+        .get(id).get(0).getDatanode();
+    if (targetDn.equals(replicaOne.getDatanodeDetails())) {
+      containerStateManager.removeContainerReplica(id, replicaOne);
+    } else if (targetDn.equals(replicaTwo.getDatanodeDetails())) {
+      containerStateManager.removeContainerReplica(id, replicaTwo);
+    } else if (targetDn.equals(replicaThree.getDatanodeDetails())) {
+      containerStateManager.removeContainerReplica(id, replicaThree);
+    } else if (targetDn.equals(replicaFour.getDatanodeDetails())) {
+      containerStateManager.removeContainerReplica(id, replicaFour);
+    }
+
+    final long currentDeleteCommandCompleted = replicationManager.getMetrics()
+        .getNumDeletionCmdsCompleted();
+
+    replicationManager.processAll();
+    eventQueue.processAll(1000);
+    Assert.assertEquals(0, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(0, replicationManager.getMetrics()
+        .getInflightDeletion());
+    Assert.assertEquals(currentDeleteCommandCompleted + 1,
+        replicationManager.getMetrics().getNumDeletionCmdsCompleted());
   }
 
   /**
@@ -474,13 +530,31 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     Assert.assertTrue(datanodeCommandHandler.received(
         SCMCommandProto.Type.deleteContainerCommand,
         replicaOne.getDatanodeDetails()));
+    Assert.assertEquals(currentDeleteCommandCount + 1,
+        replicationManager.getMetrics().getNumDeletionCmdsSent());
+    Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightDeletion());
+
+    final long currentDeleteCommandCompleted = replicationManager.getMetrics()
+        .getNumDeletionCmdsCompleted();
+    // Now we remove the replica to simulate deletion complete
+    containerStateManager.removeContainerReplica(id, replicaOne);
+
+    replicationManager.processAll();
+    eventQueue.processAll(1000);
+
+    Assert.assertEquals(currentDeleteCommandCompleted + 1,
+        replicationManager.getMetrics().getNumDeletionCmdsCompleted());
+    Assert.assertEquals(0, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(0, replicationManager.getMetrics()
+        .getInflightDeletion());
   }
 
   /**
@@ -491,6 +565,7 @@ public class TestReplicationManager {
   public void testUnderReplicatedQuasiClosedContainer() throws
       SCMException, ContainerNotFoundException, InterruptedException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    container.setUsedBytes(100);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
     final ContainerReplica replicaOne = getReplicas(
@@ -504,13 +579,44 @@ public class TestReplicationManager {
 
     final int currentReplicateCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+    final long currentBytesToReplicate = replicationManager.getMetrics()
+        .getNumReplicationBytesTotal();
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentReplicateCommandCount + 1,
         datanodeCommandHandler.getInvocationCount(
             SCMCommandProto.Type.replicateContainerCommand));
+    Assert.assertEquals(currentReplicateCommandCount + 1,
+        replicationManager.getMetrics().getNumReplicationCmdsSent());
+    Assert.assertEquals(currentBytesToReplicate + 100,
+        replicationManager.getMetrics().getNumReplicationBytesTotal());
+    Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightReplication());
+
+    final long currentReplicateCommandCompleted = replicationManager
+        .getMetrics().getNumReplicationCmdsCompleted();
+    final long currentReplicateBytesCompleted = replicationManager
+        .getMetrics().getNumReplicationBytesCompleted();
+
+    // Now we add the replicated new replica
+    DatanodeDetails targetDn = replicationManager.getInflightReplication()
+        .get(id).get(0).getDatanode();
+    final ContainerReplica replicatedReplicaThree = getReplicas(
+        id, State.CLOSED, 1000L, originNodeId, targetDn);
+    containerStateManager.updateContainerReplica(id, replicatedReplicaThree);
+
+    replicationManager.processAll();
+    eventQueue.processAll(1000);
+
+    Assert.assertEquals(currentReplicateCommandCompleted + 1,
+        replicationManager.getMetrics().getNumReplicationCmdsCompleted());
+    Assert.assertEquals(currentReplicateBytesCompleted + 100,
+        replicationManager.getMetrics().getNumReplicationBytesCompleted());
+    Assert.assertEquals(0, replicationManager.getInflightReplication().size());
+    Assert.assertEquals(0, replicationManager.getMetrics()
+        .getInflightReplication());
   }
 
   /**
@@ -554,7 +660,6 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     GenericTestUtils.waitFor(
         () -> (currentReplicateCommandCount + 1) == datanodeCommandHandler
             
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand),
@@ -580,7 +685,6 @@ public class TestReplicationManager {
      */
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
@@ -588,9 +692,16 @@ public class TestReplicationManager {
     Assert.assertTrue(datanodeCommandHandler.received(
         SCMCommandProto.Type.deleteContainerCommand,
         replicaTwo.getDatanodeDetails()));
+    Assert.assertEquals(currentDeleteCommandCount + 1,
+        replicationManager.getMetrics().getNumDeletionCmdsSent());
+    Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightDeletion());
 
     containerStateManager.removeContainerReplica(id, replicaTwo);
 
+    final long currentDeleteCommandCompleted = replicationManager.getMetrics()
+        .getNumDeletionCmdsCompleted();
     /*
      * We have now removed unhealthy replica, next iteration of
      * ReplicationManager should re-replicate the container as it
@@ -598,11 +709,22 @@ public class TestReplicationManager {
      */
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
+
+    Assert.assertEquals(0, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(0, replicationManager.getMetrics()
+        .getInflightDeletion());
+    Assert.assertEquals(currentDeleteCommandCompleted + 1,
+        replicationManager.getMetrics().getNumDeletionCmdsCompleted());
+
     Assert.assertEquals(currentReplicateCommandCount + 2,
         datanodeCommandHandler.getInvocationCount(
             SCMCommandProto.Type.replicateContainerCommand));
+    Assert.assertEquals(currentReplicateCommandCount + 2,
+        replicationManager.getMetrics().getNumReplicationCmdsSent());
+    Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightReplication());
   }
 
 
@@ -630,7 +752,6 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
 
     // All the replicas have same BCSID, so all of them will be closed.
@@ -660,7 +781,6 @@ public class TestReplicationManager {
     }
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
   }
@@ -688,7 +808,6 @@ public class TestReplicationManager {
     eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Mockito.verify(closeContainerHandler, Mockito.times(1))
         .onMessage(id, eventQueue);
@@ -710,6 +829,7 @@ public class TestReplicationManager {
   public void additionalReplicaScheduledWhenMisReplicated()
       throws SCMException, ContainerNotFoundException, InterruptedException {
     final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+    container.setUsedBytes(100);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
     final ContainerReplica replicaOne = getReplicas(
@@ -736,15 +856,23 @@ public class TestReplicationManager {
 
     int currentReplicateCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+    final long currentBytesToReplicate = replicationManager.getMetrics()
+        .getNumReplicationBytesTotal();
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
-    // At this stage, due to the mocked calls to validteContainerPlacement
-    // the mis-replicated racks will not have improved, so expect to see 
nothing
-    // scheduled.
+    // At this stage, due to the mocked calls to validateContainerPlacement
+    // the policy will not be satisfied, and replication will be triggered.
+
     Assert.assertEquals(currentReplicateCommandCount + 1, 
datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand));
+    Assert.assertEquals(currentReplicateCommandCount + 1,
+        replicationManager.getMetrics().getNumReplicationCmdsSent());
+    Assert.assertEquals(currentBytesToReplicate + 100,
+        replicationManager.getMetrics().getNumReplicationBytesTotal());
+    Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightReplication());
 
     // Now make it so that all containers seem mis-replicated no matter how
     // many replicas. This will test replicas are not scheduled if the new
@@ -760,13 +888,17 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
-    // At this stage, due to the mocked calls to validteContainerPlacement
+    // At this stage, due to the mocked calls to validateContainerPlacement
     // the mis-replicated racks will not have improved, so expect to see 
nothing
     // scheduled.
     Assert.assertEquals(currentReplicateCommandCount, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand));
+    Assert.assertEquals(currentReplicateCommandCount,
+        replicationManager.getMetrics().getNumReplicationCmdsSent());
+    Assert.assertEquals(1, replicationManager.getInflightReplication().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightReplication());
   }
 
   @Test
@@ -806,17 +938,21 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     // The unhealthy replica should be removed, but not the other replica
-    // as each time we test with 3 replicas, Mockitor ensures it returns
+    // as each time we test with 3 replicas, Mockito ensures it returns
     // mis-replicated
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+    Assert.assertEquals(currentDeleteCommandCount + 1,
+        replicationManager.getMetrics().getNumDeletionCmdsSent());
 
     Assert.assertTrue(datanodeCommandHandler.received(
         SCMCommandProto.Type.deleteContainerCommand,
         replicaFive.getDatanodeDetails()));
+    Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightDeletion());
   }
 
   @Test
@@ -850,10 +986,14 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+    Assert.assertEquals(currentDeleteCommandCount + 1,
+        replicationManager.getMetrics().getNumDeletionCmdsSent());
+    Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightDeletion());
   }
 
   @Test
@@ -890,10 +1030,14 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+    Assert.assertEquals(currentDeleteCommandCount + 2,
+        replicationManager.getMetrics().getNumDeletionCmdsSent());
+    Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightDeletion());
   }
 
   /**
@@ -1057,10 +1201,14 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+    Assert.assertEquals(currentDeleteCommandCount + 2,
+        replicationManager.getMetrics().getNumDeletionCmdsSent());
+    Assert.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getInflightDeletion());
     // Get the DECOM and Maint replica and ensure none of them are scheduled
     // for removal
     Set<ContainerReplica> decom =
@@ -1341,6 +1489,31 @@ public class TestReplicationManager {
     // scheduled
     clock.fastForward(timeout + 1000);
     assertReplicaScheduled(1);
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getNumReplicationCmdsTimeout());
+  }
+
+  @Test
+  public void testDeleteCommandTimeout() throws
+      SCMException, InterruptedException {
+    long timeout = new ReplicationManagerConfiguration().getEventTimeout();
+
+    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+    assertDeleteScheduled(1);
+
+    // Already a pending replica, so nothing scheduled
+    assertReplicaScheduled(0);
+
+    // Advance the clock past the timeout, and there should be a replica
+    // scheduled
+    clock.fastForward(timeout + 1000);
+    assertDeleteScheduled(1);
+    Assert.assertEquals(1, replicationManager.getMetrics()
+        .getNumDeletionCmdsTimeout());
   }
 
   private ContainerInfo createContainer(LifeCycleState containerState)
@@ -1393,11 +1566,25 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
 
     replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
     eventQueue.processAll(1000);
     Assert.assertEquals(currentReplicateCommandCount + delta,
         datanodeCommandHandler.getInvocationCount(
             SCMCommandProto.Type.replicateContainerCommand));
+    Assert.assertEquals(currentReplicateCommandCount + delta,
+        replicationManager.getMetrics().getNumReplicationCmdsSent());
+  }
+
+  private void assertDeleteScheduled(int delta) throws InterruptedException {
+    final int currentDeleteCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+    replicationManager.processAll();
+    eventQueue.processAll(1000);
+    Assert.assertEquals(currentDeleteCommandCount + delta,
+        datanodeCommandHandler.getInvocationCount(
+            SCMCommandProto.Type.deleteContainerCommand));
+    Assert.assertEquals(currentDeleteCommandCount + delta,
+        replicationManager.getMetrics().getNumDeletionCmdsSent());
   }
 
   @After

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to