arp7 commented on a change in pull request #662: HDDS-1207. Refactor Container 
Report Processing logic and plugin new Replication Manager.
URL: https://github.com/apache/hadoop/pull/662#discussion_r271495280
 
 

 ##########
 File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
 ##########
 @@ -15,129 +15,94 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdds.scm.container;
 
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+package org.apache.hadoop.hdds.scm.container;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
-import org.apache.hadoop.hdds.scm.container.replication
-    .ReplicationActivityStatus;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.server
-    .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-
-import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
  * Handles container reports from datanode.
  */
-public class ContainerReportHandler implements
-    EventHandler<ContainerReportFromDatanode> {
+public class ContainerReportHandler extends AbstractContainerReportHandler
+    implements EventHandler<ContainerReportFromDatanode> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerReportHandler.class);
 
   private final NodeManager nodeManager;
-  private final PipelineManager pipelineManager;
   private final ContainerManager containerManager;
-  private final ReplicationActivityStatus replicationStatus;
 
+  /**
+   * Constructs ContainerReportHandler instance with the
+   * given NodeManager and ContainerManager instance.
+   *
+   * @param nodeManager NodeManager instance
+   * @param containerManager ContainerManager instance
+   */
   public ContainerReportHandler(final NodeManager nodeManager,
-      final PipelineManager pipelineManager,
-      final ContainerManager containerManager,
-      final ReplicationActivityStatus replicationActivityStatus) {
-    Preconditions.checkNotNull(nodeManager);
-    Preconditions.checkNotNull(pipelineManager);
-    Preconditions.checkNotNull(containerManager);
-    Preconditions.checkNotNull(replicationActivityStatus);
+                                final ContainerManager containerManager) {
+    super(containerManager, LOG);
     this.nodeManager = nodeManager;
-    this.pipelineManager = pipelineManager;
     this.containerManager = containerManager;
-    this.replicationStatus = replicationActivityStatus;
   }
 
+  /**
+   * Process the container reports from datanodes.
+   *
+   * @param reportFromDatanode Container Report
+   * @param publisher EventPublisher reference
+   */
   @Override
   public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
-      final EventPublisher publisher) {
+                        final EventPublisher publisher) {
 
     final DatanodeDetails datanodeDetails =
         reportFromDatanode.getDatanodeDetails();
-
     final ContainerReportsProto containerReport =
         reportFromDatanode.getReport();
 
     try {
+      final List<ContainerReplicaProto> replicas =
+          containerReport.getReportsList();
+      final Set<ContainerID> containersInSCM =
+          nodeManager.getContainers(datanodeDetails);
 
-      final List<ContainerReplicaProto> replicas = containerReport
-          .getReportsList();
-
-      // ContainerIDs which SCM expects this datanode to have.
-      final Set<ContainerID> expectedContainerIDs = nodeManager
-          .getContainers(datanodeDetails);
-
-      // ContainerIDs that this datanode actually has.
-      final Set<ContainerID> actualContainerIDs = replicas.parallelStream()
+      final Set<ContainerID> containersInDn = replicas.parallelStream()
           .map(ContainerReplicaProto::getContainerID)
           .map(ContainerID::valueof).collect(Collectors.toSet());
 
-      // Container replicas which SCM is not aware of.
-      final  Set<ContainerID> newReplicas =
-          new HashSet<>(actualContainerIDs);
-      newReplicas.removeAll(expectedContainerIDs);
-
-      // Container replicas which are missing from datanode.
-      final Set<ContainerID> missingReplicas =
-          new HashSet<>(expectedContainerIDs);
-      missingReplicas.removeAll(actualContainerIDs);
-
-      processContainerReplicas(datanodeDetails, replicas, publisher);
-
-      // Remove missing replica from ContainerManager
-      for (ContainerID id : missingReplicas) {
-        try {
-          containerManager.getContainerReplicas(id)
-              .stream()
-              .filter(replica ->
-                  replica.getDatanodeDetails().equals(datanodeDetails))
-              .findFirst()
-              .ifPresent(replica -> {
-                try {
-                  containerManager.removeContainerReplica(id, replica);
-                } catch (ContainerNotFoundException |
-                    ContainerReplicaNotFoundException e) {
-                  // This should not happen, but even if it happens, not an
-                  // issue
-                }
-              });
-        } catch (ContainerNotFoundException e) {
-          LOG.warn("Cannot remove container replica, container {} not found 
{}",
-              id, e);
-        }
-      }
+      final Set<ContainerID> missingReplicas = new HashSet<>(containersInSCM);
+      missingReplicas.removeAll(containersInDn);
 
 Review comment:
   What are the last few lines are doing? Didn't quite get it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to