arp7 commented on a change in pull request #662: HDDS-1207. Refactor Container Report Processing logic and plugin new Replication Manager. URL: https://github.com/apache/hadoop/pull/662#discussion_r271495280
########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java ########## @@ -15,129 +15,94 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdds.scm.container; -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; +package org.apache.hadoop.hdds.scm.container; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; -import org.apache.hadoop.hdds.scm.container.replication - .ReplicationActivityStatus; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.server - .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; - -import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + /** * Handles container reports from datanode. */ -public class ContainerReportHandler implements - EventHandler<ContainerReportFromDatanode> { +public class ContainerReportHandler extends AbstractContainerReportHandler + implements EventHandler<ContainerReportFromDatanode> { private static final Logger LOG = LoggerFactory.getLogger(ContainerReportHandler.class); private final NodeManager nodeManager; - private final PipelineManager pipelineManager; private final ContainerManager containerManager; - private final ReplicationActivityStatus replicationStatus; + /** + * Constructs ContainerReportHandler instance with the + * given NodeManager and ContainerManager instance. + * + * @param nodeManager NodeManager instance + * @param containerManager ContainerManager instance + */ public ContainerReportHandler(final NodeManager nodeManager, - final PipelineManager pipelineManager, - final ContainerManager containerManager, - final ReplicationActivityStatus replicationActivityStatus) { - Preconditions.checkNotNull(nodeManager); - Preconditions.checkNotNull(pipelineManager); - Preconditions.checkNotNull(containerManager); - Preconditions.checkNotNull(replicationActivityStatus); + final ContainerManager containerManager) { + super(containerManager, LOG); this.nodeManager = nodeManager; - this.pipelineManager = pipelineManager; this.containerManager = containerManager; - this.replicationStatus = replicationActivityStatus; } + /** + * Process the container reports from datanodes. + * + * @param reportFromDatanode Container Report + * @param publisher EventPublisher reference + */ @Override public void onMessage(final ContainerReportFromDatanode reportFromDatanode, - final EventPublisher publisher) { + final EventPublisher publisher) { final DatanodeDetails datanodeDetails = reportFromDatanode.getDatanodeDetails(); - final ContainerReportsProto containerReport = reportFromDatanode.getReport(); try { + final List<ContainerReplicaProto> replicas = + containerReport.getReportsList(); + final Set<ContainerID> containersInSCM = + nodeManager.getContainers(datanodeDetails); - final List<ContainerReplicaProto> replicas = containerReport - .getReportsList(); - - // ContainerIDs which SCM expects this datanode to have. - final Set<ContainerID> expectedContainerIDs = nodeManager - .getContainers(datanodeDetails); - - // ContainerIDs that this datanode actually has. - final Set<ContainerID> actualContainerIDs = replicas.parallelStream() + final Set<ContainerID> containersInDn = replicas.parallelStream() .map(ContainerReplicaProto::getContainerID) .map(ContainerID::valueof).collect(Collectors.toSet()); - // Container replicas which SCM is not aware of. - final Set<ContainerID> newReplicas = - new HashSet<>(actualContainerIDs); - newReplicas.removeAll(expectedContainerIDs); - - // Container replicas which are missing from datanode. - final Set<ContainerID> missingReplicas = - new HashSet<>(expectedContainerIDs); - missingReplicas.removeAll(actualContainerIDs); - - processContainerReplicas(datanodeDetails, replicas, publisher); - - // Remove missing replica from ContainerManager - for (ContainerID id : missingReplicas) { - try { - containerManager.getContainerReplicas(id) - .stream() - .filter(replica -> - replica.getDatanodeDetails().equals(datanodeDetails)) - .findFirst() - .ifPresent(replica -> { - try { - containerManager.removeContainerReplica(id, replica); - } catch (ContainerNotFoundException | - ContainerReplicaNotFoundException e) { - // This should not happen, but even if it happens, not an - // issue - } - }); - } catch (ContainerNotFoundException e) { - LOG.warn("Cannot remove container replica, container {} not found {}", - id, e); - } - } + final Set<ContainerID> missingReplicas = new HashSet<>(containersInSCM); + missingReplicas.removeAll(containersInDn); Review comment: What are the last few lines are doing? Didn't quite get it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org