http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 2d88621..f5fe46a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; @@ -33,7 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseException; @@ -368,11 +369,12 @@ public class ContainerMapping implements Mapping { * @param reports Container report */ @Override - public void processContainerReports(ContainerReportsRequestProto reports) + public void processContainerReports(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) throws IOException { List<StorageContainerDatanodeProtocolProtos.ContainerInfo> containerInfos = reports.getReportsList(); - containerSupervisor.handleContainerReport(reports); + containerSupervisor.handleContainerReport(datanodeDetails, reports); for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : containerInfos) { byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID()); @@ -402,7 +404,7 @@ public class ContainerMapping implements Mapping { // Container not found in our container db. LOG.error("Error while processing container report from datanode :" + " {}, for container: {}, reason: container doesn't exist in" + - "container database.", reports.getDatanodeDetails(), + "container database.", datanodeDetails, datanodeState.getContainerID()); } } finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java index f560174..ee8e344 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java @@ -16,10 +16,11 @@ */ package org.apache.hadoop.hdds.scm.container; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import java.io.Closeable; import java.io.IOException; @@ -98,7 +99,8 @@ public interface Mapping extends Closeable { * * @param reports Container report */ - void processContainerReports(ContainerReportsRequestProto reports) + void processContainerReports(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java index c14303f..5bd0574 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodePoolManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; @@ -295,24 +295,21 @@ public class ContainerSupervisor implements Closeable { * @param containerReport -- Container report for a specific container from * a datanode. */ - public void handleContainerReport( - ContainerReportsRequestProto containerReport) { - DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf( - containerReport.getDatanodeDetails()); + public void handleContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto containerReport) { inProgressPoolListLock.readLock().lock(); try { String poolName = poolManager.getNodePool(datanodeDetails); for (InProgressPool ppool : inProgressPoolList) { if (ppool.getPoolName().equalsIgnoreCase(poolName)) { - ppool.handleContainerReport(containerReport); + ppool.handleContainerReport(datanodeDetails, containerReport); return; } } // TODO: Decide if we can do anything else with this report. LOG.debug("Discarding the container report for pool {}. " + "That pool is not currently in the pool reconciliation process." + - " Container Name: {}", poolName, - containerReport.getDatanodeDetails()); + " Container Name: {}", poolName, datanodeDetails); } catch (SCMException e) { LOG.warn("Skipping processing container report from datanode {}, " + "cause: failed to get the corresponding node pool", http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java index c444e90..4b54731 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,21 +178,20 @@ public final class InProgressPool { * * @param containerReport - ContainerReport */ - public void handleContainerReport( - ContainerReportsRequestProto containerReport) { + public void handleContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto containerReport) { if (status == ProgressStatus.InProgress) { - executorService.submit(processContainerReport(containerReport)); + executorService.submit(processContainerReport(datanodeDetails, + containerReport)); } else { LOG.debug("Cannot handle container report when the pool is in {} status.", status); } } - private Runnable processContainerReport( - ContainerReportsRequestProto reports) { + private Runnable processContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) { return () -> { - DatanodeDetails datanodeDetails = - DatanodeDetails.getFromProtoBuf(reports.getDatanodeDetails()); if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(), (k) -> true)) { nodeProcessed.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java index 05a9fc3..04658bd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm.node; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import static org.apache.hadoop.util.Time.monotonicNow; @@ -31,7 +31,7 @@ import static org.apache.hadoop.util.Time.monotonicNow; public class HeartbeatQueueItem { private DatanodeDetails datanodeDetails; private long recvTimestamp; - private SCMNodeReport nodeReport; + private NodeReportProto nodeReport; /** * @@ -40,7 +40,7 @@ public class HeartbeatQueueItem { * @param nodeReport - node report associated with the heartbeat if any. */ HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp, - SCMNodeReport nodeReport) { + NodeReportProto nodeReport) { this.datanodeDetails = datanodeDetails; this.recvTimestamp = recvTimestamp; this.nodeReport = nodeReport; @@ -56,7 +56,7 @@ public class HeartbeatQueueItem { /** * @return node report. */ - public SCMNodeReport getNodeReport() { + public NodeReportProto getNodeReport() { return nodeReport; } @@ -72,7 +72,7 @@ public class HeartbeatQueueItem { */ public static class Builder { private DatanodeDetails datanodeDetails; - private SCMNodeReport nodeReport; + private NodeReportProto nodeReport; private long recvTimestamp = monotonicNow(); public Builder setDatanodeDetails(DatanodeDetails dnDetails) { @@ -80,8 +80,8 @@ public class HeartbeatQueueItem { return this; } - public Builder setNodeReport(SCMNodeReport scmNodeReport) { - this.nodeReport = scmNodeReport; + public Builder setNodeReport(NodeReportProto report) { + this.nodeReport = report; return this; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 353a069..b339fb7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -28,15 +28,14 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto .ErrorCode; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ipc.Server; @@ -592,7 +591,7 @@ public class SCMNodeManager DatanodeDetails datanodeDetails = hbItem.getDatanodeDetails(); UUID datanodeUuid = datanodeDetails.getUuid(); - SCMNodeReport nodeReport = hbItem.getNodeReport(); + NodeReportProto nodeReport = hbItem.getNodeReport(); long recvTimestamp = hbItem.getRecvTimestamp(); long processTimestamp = Time.monotonicNow(); if (LOG.isTraceEnabled()) { @@ -637,7 +636,7 @@ public class SCMNodeManager new ReregisterCommand()); } - private void updateNodeStat(UUID dnId, SCMNodeReport nodeReport) { + private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) { SCMNodeStat stat = nodeStats.get(dnId); if (stat == null) { LOG.debug("SCM updateNodeStat based on heartbeat from previous" + @@ -649,8 +648,9 @@ public class SCMNodeManager long totalCapacity = 0; long totalRemaining = 0; long totalScmUsed = 0; - List<SCMStorageReport> storageReports = nodeReport.getStorageReportList(); - for (SCMStorageReport report : storageReports) { + List<StorageReportProto> storageReports = nodeReport + .getStorageReportList(); + for (StorageReportProto report : storageReports) { totalCapacity += report.getCapacity(); totalRemaining += report.getRemaining(); totalScmUsed+= report.getScmUsed(); @@ -710,7 +710,7 @@ public class SCMNodeManager * Register the node if the node finds that it is not registered with any * SCM. * - * @param datanodeDetailsProto - Send datanodeDetails with Node info. + * @param datanodeDetails - Send datanodeDetails with Node info. * This function generates and assigns new datanode ID * for the datanode. This allows SCM to be run independent * of Namenode if required. @@ -719,13 +719,11 @@ public class SCMNodeManager * @return SCMHeartbeatResponseProto */ @Override - public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto, - SCMNodeReport nodeReport) { + public RegisteredCommand register( + DatanodeDetails datanodeDetails, NodeReportProto nodeReport) { String hostname = null; String ip = null; - DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf( - datanodeDetailsProto); InetAddress dnAddress = Server.getRemoteIp(); if (dnAddress != null) { // Mostly called inside an RPC, update ip and peer hostname @@ -734,7 +732,7 @@ public class SCMNodeManager datanodeDetails.setHostName(hostname); datanodeDetails.setIpAddress(ip); } - SCMCommand responseCommand = verifyDatanodeUUID(datanodeDetails); + RegisteredCommand responseCommand = verifyDatanodeUUID(datanodeDetails); if (responseCommand != null) { return responseCommand; } @@ -785,7 +783,8 @@ public class SCMNodeManager * @param datanodeDetails - Datanode Details. * @return SCMCommand */ - private SCMCommand verifyDatanodeUUID(DatanodeDetails datanodeDetails) { + private RegisteredCommand verifyDatanodeUUID( + DatanodeDetails datanodeDetails) { if (datanodeDetails.getUuid() != null && nodes.containsKey(datanodeDetails.getUuid())) { LOG.trace("Datanode is already registered. Datanode: {}", @@ -802,34 +801,23 @@ public class SCMNodeManager /** * Send heartbeat to indicate the datanode is alive and doing well. * - * @param datanodeDetailsProto - DatanodeDetailsProto. + * @param datanodeDetails - DatanodeDetailsProto. * @param nodeReport - node report. * @return SCMheartbeat response. * @throws IOException */ @Override public List<SCMCommand> sendHeartbeat( - DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport) { + DatanodeDetails datanodeDetails, NodeReportProto nodeReport) { - Preconditions.checkNotNull(datanodeDetailsProto, "Heartbeat is missing " + + Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " + "DatanodeDetails."); - DatanodeDetails datanodeDetails = DatanodeDetails - .getFromProtoBuf(datanodeDetailsProto); - // Checking for NULL to make sure that we don't get - // an exception from ConcurrentList. - // This could be a problem in tests, if this function is invoked via - // protobuf, transport layer will guarantee that this is not null. - if (datanodeDetails != null) { - heartbeatQueue.add( - new HeartbeatQueueItem.Builder() - .setDatanodeDetails(datanodeDetails) - .setNodeReport(nodeReport) - .build()); - return commandQueue.getCommand(datanodeDetails.getUuid()); - } else { - LOG.error("Datanode ID in heartbeat is null"); - } - return null; + heartbeatQueue.add( + new HeartbeatQueueItem.Builder() + .setDatanodeDetails(datanodeDetails) + .setNodeReport(nodeReport) + .build()); + return commandQueue.getCommand(datanodeDetails.getUuid()); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java index fa423bb..6ea83df 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java @@ -23,7 +23,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMStorageReport; + StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -33,7 +33,11 @@ import org.slf4j.LoggerFactory; import javax.management.ObjectName; import java.io.IOException; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -159,7 +163,7 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { } public StorageReportResult processNodeReport(UUID datanodeID, - StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport) + StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport) throws IOException { Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(nodeReport); @@ -170,9 +174,9 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { Set<StorageLocationReport> storagReportSet = new HashSet<>(); Set<StorageLocationReport> fullVolumeSet = new HashSet<>(); Set<StorageLocationReport> failedVolumeSet = new HashSet<>(); - List<SCMStorageReport> + List<StorageReportProto> storageReports = nodeReport.getStorageReportList(); - for (SCMStorageReport report : storageReports) { + for (StorageReportProto report : storageReports) { StorageLocationReport storageReport = StorageLocationReport.getFromProtobuf(report); storagReportSet.add(storageReport); http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 6e5b7de..1b1645d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -25,29 +25,47 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .ContainerBlocksDeletionACKResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; + +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.closeContainerCommand; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.deleteBlocksCommand; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.reregisterCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.versionCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.registeredCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.deleteBlocksCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand; import org.apache.hadoop.hdds.scm.HddsServerUtil; @@ -150,96 +168,81 @@ public class SCMDatanodeProtocolServer implements @Override public SCMHeartbeatResponseProto sendHeartbeat( - HddsProtos.DatanodeDetailsProto datanodeDetails, - StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport) + SCMHeartbeatRequestProto heartbeat) throws IOException { + // TODO: Add a heartbeat dispatcher. + DatanodeDetails datanodeDetails = DatanodeDetails + .getFromProtoBuf(heartbeat.getDatanodeDetails()); + NodeReportProto nodeReport = heartbeat.getNodeReport(); List<SCMCommand> commands = scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport); - List<SCMCommandResponseProto> cmdResponses = new LinkedList<>(); + List<SCMCommandProto> cmdResponses = new LinkedList<>(); for (SCMCommand cmd : commands) { - cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid())); + cmdResponses.add(getCommandResponse(cmd)); } return SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID(datanodeDetails.getUuidString()) .addAllCommands(cmdResponses).build(); } @Override - public SCMRegisteredCmdResponseProto register( - HddsProtos.DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport, - ContainerReportsRequestProto containerReportsRequestProto) + public SCMRegisteredResponseProto register( + HddsProtos.DatanodeDetailsProto datanodeDetailsProto, + NodeReportProto nodeReport, + ContainerReportsProto containerReportsProto) throws IOException { + DatanodeDetails datanodeDetails = DatanodeDetails + .getFromProtoBuf(datanodeDetailsProto); // TODO : Return the list of Nodes that forms the SCM HA. - RegisteredCommand registeredCommand = (RegisteredCommand) scm - .getScmNodeManager().register(datanodeDetails, nodeReport); - SCMCmdType type = registeredCommand.getType(); - if (type == SCMCmdType.registeredCommand && registeredCommand.getError() - == SCMRegisteredCmdResponseProto.ErrorCode.success) { - scm.getScmContainerManager().processContainerReports( - containerReportsRequestProto); + RegisteredCommand registeredCommand = scm.getScmNodeManager() + .register(datanodeDetails, nodeReport); + if (registeredCommand.getError() + == SCMRegisteredResponseProto.ErrorCode.success) { + scm.getScmContainerManager().processContainerReports(datanodeDetails, + containerReportsProto); } return getRegisteredResponse(registeredCommand); } @VisibleForTesting - public static SCMRegisteredCmdResponseProto getRegisteredResponse( - SCMCommand cmd) { - Preconditions.checkState(cmd.getClass() == RegisteredCommand.class); - RegisteredCommand rCmd = (RegisteredCommand) cmd; - SCMCmdType type = cmd.getType(); - if (type != SCMCmdType.registeredCommand) { - throw new IllegalArgumentException( - "Registered command is not well " + "formed. Internal Error."); - } - return SCMRegisteredCmdResponseProto.newBuilder() + public static SCMRegisteredResponseProto getRegisteredResponse( + RegisteredCommand cmd) { + return SCMRegisteredResponseProto.newBuilder() // TODO : Fix this later when we have multiple SCM support. // .setAddressList(addressList) - .setErrorCode(rCmd.getError()) - .setClusterID(rCmd.getClusterID()) - .setDatanodeUUID(rCmd.getDatanodeUUID()) + .setErrorCode(cmd.getError()) + .setClusterID(cmd.getClusterID()) + .setDatanodeUUID(cmd.getDatanodeUUID()) .build(); } - @Override - public ContainerReportsResponseProto sendContainerReport( - ContainerReportsRequestProto reports) + public void processContainerReports(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) throws IOException { - updateContainerReportMetrics(reports); - + updateContainerReportMetrics(datanodeDetails, reports); // should we process container reports async? - scm.getScmContainerManager().processContainerReports(reports); - return ContainerReportsResponseProto.newBuilder().build(); + scm.getScmContainerManager() + .processContainerReports(datanodeDetails, reports); } - private void updateContainerReportMetrics( - ContainerReportsRequestProto reports) { - ContainerStat newStat = null; - // TODO: We should update the logic once incremental container report - // type is supported. - if (reports - .getType() == StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.reportType.fullReport) { - newStat = new ContainerStat(); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports - .getReportsList()) { - newStat.add(new ContainerStat(info.getSize(), info.getUsed(), - info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), - info.getReadCount(), info.getWriteCount())); - } - - // update container metrics - StorageContainerManager.getMetrics().setLastContainerStat(newStat); + private void updateContainerReportMetrics(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) { + ContainerStat newStat = new ContainerStat(); + for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports + .getReportsList()) { + newStat.add(new ContainerStat(info.getSize(), info.getUsed(), + info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), + info.getReadCount(), info.getWriteCount())); } + // update container metrics + StorageContainerManager.getMetrics().setLastContainerStat(newStat); // Update container stat entry, this will trigger a removal operation if it // exists in cache. - synchronized (scm.getContainerReportCache()) { - String datanodeUuid = reports.getDatanodeDetails().getUuid(); - if (datanodeUuid != null && newStat != null) { - scm.getContainerReportCache().put(datanodeUuid, newStat); - // update global view container metrics - StorageContainerManager.getMetrics().incrContainerStat(newStat); - } - } + String datanodeUuid = datanodeDetails.getUuidString(); + scm.getContainerReportCache().put(datanodeUuid, newStat); + // update global view container metrics + StorageContainerManager.getMetrics().incrContainerStat(newStat); } @@ -298,28 +301,15 @@ public class SCMDatanodeProtocolServer implements * @throws IOException */ @VisibleForTesting - public StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto - getCommandResponse( - SCMCommand cmd, final String datanodeID) throws IOException { - SCMCmdType type = cmd.getType(); - SCMCommandResponseProto.Builder builder = - SCMCommandResponseProto.newBuilder().setDatanodeUUID(datanodeID); - switch (type) { - case registeredCommand: - return builder - .setCmdType(registeredCommand) - .setRegisteredProto(SCMRegisteredCmdResponseProto - .getDefaultInstance()) - .build(); - case versionCommand: - return builder - .setCmdType(versionCommand) - .setVersionProto(SCMVersionResponseProto.getDefaultInstance()) - .build(); + public SCMCommandProto getCommandResponse(SCMCommand cmd) + throws IOException { + SCMCommandProto.Builder builder = + SCMCommandProto.newBuilder(); + switch (cmd.getType()) { case reregisterCommand: return builder - .setCmdType(reregisterCommand) - .setReregisterProto(SCMReregisterCmdResponseProto + .setCommandType(reregisterCommand) + .setReregisterCommandProto(ReregisterCommandProto .getDefaultInstance()) .build(); case deleteBlocksCommand: @@ -335,13 +325,14 @@ public class SCMDatanodeProtocolServer implements .collect(Collectors.toList()); scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs); return builder - .setCmdType(deleteBlocksCommand) - .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto()) + .setCommandType(deleteBlocksCommand) + .setDeleteBlocksCommandProto(((DeleteBlocksCommand) cmd).getProto()) .build(); case closeContainerCommand: return builder - .setCmdType(closeContainerCommand) - .setCloseContainerProto(((CloseContainerCommand) cmd).getProto()) + .setCommandType(closeContainerCommand) + .setCloseContainerCommandProto( + ((CloseContainerCommand) cmd).getProto()) .build(); default: throw new IllegalArgumentException("Not implemented"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 5cf0a92..b8036d7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol - .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageTypeProto; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; @@ -53,16 +53,17 @@ public final class TestUtils { public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager, String uuid) { DatanodeDetails datanodeDetails = getDatanodeDetails(uuid); - nodeManager.register(datanodeDetails.getProtoBufMessage(), null); + nodeManager.register(datanodeDetails, null); return datanodeDetails; } /** * Create Node Report object. - * @return SCMNodeReport + * @return NodeReportProto */ - public static SCMNodeReport createNodeReport(List<SCMStorageReport> reports) { - SCMNodeReport.Builder nodeReport = SCMNodeReport.newBuilder(); + public static NodeReportProto createNodeReport( + List<StorageReportProto> reports) { + NodeReportProto.Builder nodeReport = NodeReportProto.newBuilder(); nodeReport.addAllStorageReport(reports); return nodeReport.build(); } @@ -71,14 +72,14 @@ public final class TestUtils { * Create SCM Storage Report object. * @return list of SCMStorageReport */ - public static List<SCMStorageReport> createStorageReport(long capacity, + public static List<StorageReportProto> createStorageReport(long capacity, long used, long remaining, String path, StorageTypeProto type, String id, int count) { - List<SCMStorageReport> reportList = new ArrayList<>(); + List<StorageReportProto> reportList = new ArrayList<>(); for (int i = 0; i < count; i++) { Preconditions.checkNotNull(path); Preconditions.checkNotNull(id); - SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + StorageReportProto.Builder srb = StorageReportProto.newBuilder(); srb.setStorageUuid(id).setStorageLocation(path).setCapacity(capacity) .setScmUsed(used).setRemaining(remaining); StorageTypeProto storageTypeProto = http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index a46d7ba..8c59462 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -24,13 +24,14 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.assertj.core.util.Preconditions; import org.mockito.Mockito; @@ -370,13 +371,13 @@ public class MockNodeManager implements NodeManager { * Register the node if the node finds that it is not registered with any * SCM. * - * @param datanodeDetails DatanodeDetailsProto - * @param nodeReport SCMNodeReport + * @param datanodeDetails DatanodeDetails + * @param nodeReport NodeReportProto * @return SCMHeartbeatResponseProto */ @Override - public SCMCommand register(HddsProtos.DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport) { + public RegisteredCommand register(DatanodeDetails datanodeDetails, + NodeReportProto nodeReport) { return null; } @@ -388,9 +389,8 @@ public class MockNodeManager implements NodeManager { * @return SCMheartbeat response list */ @Override - public List<SCMCommand> sendHeartbeat( - HddsProtos.DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport) { + public List<SCMCommand> sendHeartbeat(DatanodeDetails datanodeDetails, + NodeReportProto nodeReport) { if ((datanodeDetails != null) && (nodeReport != null) && (nodeReport .getStorageReportCount() > 0)) { SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); @@ -398,8 +398,9 @@ public class MockNodeManager implements NodeManager { long totalCapacity = 0L; long totalRemaining = 0L; long totalScmUsed = 0L; - List<SCMStorageReport> storageReports = nodeReport.getStorageReportList(); - for (SCMStorageReport report : storageReports) { + List<StorageReportProto> storageReports = nodeReport + .getStorageReportList(); + for (StorageReportProto report : storageReports) { totalCapacity += report.getCapacity(); totalRemaining += report.getRemaining(); totalScmUsed += report.getScmUsed(); @@ -407,8 +408,7 @@ public class MockNodeManager implements NodeManager { aggregateStat.subtract(stat); stat.set(totalCapacity, totalScmUsed, totalRemaining); aggregateStat.add(stat); - nodeMetricMap.put(DatanodeDetails - .getFromProtoBuf(datanodeDetails).getUuid(), stat); + nodeMetricMap.put(datanodeDetails.getUuid(), stat); } return null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java index f318316..ba2ab64 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -191,8 +191,6 @@ public class TestContainerMapping { public void testFullContainerReport() throws IOException { ContainerInfo info = createContainer(); DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); - ContainerReportsRequestProto.reportType reportType = - ContainerReportsRequestProto.reportType.fullReport; List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports = new ArrayList<>(); StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = @@ -209,12 +207,11 @@ public class TestContainerMapping { reports.add(ciBuilder.build()); - ContainerReportsRequestProto.Builder crBuilder = - ContainerReportsRequestProto.newBuilder(); - crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setType(reportType).addAllReports(reports); + ContainerReportsProto.Builder crBuilder = ContainerReportsProto + .newBuilder(); + crBuilder.addAllReports(reports); - mapping.processContainerReports(crBuilder.build()); + mapping.processContainerReports(datanodeDetails, crBuilder.build()); ContainerInfo updatedContainer = mapping.getContainer(info.getContainerID()); @@ -227,8 +224,6 @@ public class TestContainerMapping { public void testContainerCloseWithContainerReport() throws IOException { ContainerInfo info = createContainer(); DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); - ContainerReportsRequestProto.reportType reportType = - ContainerReportsRequestProto.reportType.fullReport; List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports = new ArrayList<>(); @@ -246,12 +241,11 @@ public class TestContainerMapping { reports.add(ciBuilder.build()); - ContainerReportsRequestProto.Builder crBuilder = - ContainerReportsRequestProto.newBuilder(); - crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setType(reportType).addAllReports(reports); + ContainerReportsProto.Builder crBuilder = + ContainerReportsProto.newBuilder(); + crBuilder.addAllReports(reports); - mapping.processContainerReports(crBuilder.build()); + mapping.processContainerReports(datanodeDetails, crBuilder.build()); ContainerInfo updatedContainer = mapping.getContainer(info.getContainerID()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java index 15ecbad..0a3efda 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -199,9 +199,8 @@ public class TestContainerCloser { private void sendContainerReport(ContainerInfo info, long used) throws IOException { - ContainerReportsRequestProto.Builder - reports = ContainerReportsRequestProto.newBuilder(); - reports.setType(ContainerReportsRequestProto.reportType.fullReport); + ContainerReportsProto.Builder + reports = ContainerReportsProto.newBuilder(); StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); @@ -214,9 +213,8 @@ public class TestContainerCloser { .setWriteCount(100000000L) .setReadBytes(2000000000L) .setWriteBytes(2000000000L); - reports.setDatanodeDetails( - TestUtils.getDatanodeDetails().getProtoBufMessage()); reports.addReports(ciBuilder); - mapping.processContainerReports(reports.build()); + mapping.processContainerReports(TestUtils.getDatanodeDetails(), + reports.build()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 09b6cd1..5ad28f6 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.test.GenericTestUtils; @@ -133,9 +133,9 @@ public class TestContainerPlacement { for (DatanodeDetails datanodeDetails : datanodes) { String id = UUID.randomUUID().toString(); String path = testDir.getAbsolutePath() + "/" + id; - List<SCMStorageReport> reports = TestUtils + List<StorageReportProto> reports = TestUtils .createStorageReport(capacity, used, remaining, path, null, id, 1); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, TestUtils.createNodeReport(reports)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java index de87e50..2b04d6b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node; import com.google.common.base.Supplier; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; @@ -26,7 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.test.GenericTestUtils; @@ -63,8 +65,6 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState .HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertEquals; @@ -144,7 +144,7 @@ public class TestNodeManager { for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) { DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( nodeManager); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, null); } @@ -191,8 +191,8 @@ public class TestNodeManager { // Need 100 nodes to come out of chill mode, only one node is sending HB. nodeManager.setMinimumChillModeNodes(100); - nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager) - .getProtoBufMessage(), null); + nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager), + null); GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), 100, 4 * 1000); assertFalse("Not enough heartbeat, Node manager should have" + @@ -219,7 +219,7 @@ public class TestNodeManager { // Send 10 heartbeat from same node, and assert we never leave chill mode. for (int x = 0; x < 10; x++) { - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, null); } @@ -250,7 +250,7 @@ public class TestNodeManager { nodeManager.close(); // These should never be processed. - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, null); // Let us just wait for 2 seconds to prove that HBs are not processed. @@ -274,13 +274,13 @@ public class TestNodeManager { DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); String dnId = datanodeDetails.getUuidString(); String storagePath = testDir.getAbsolutePath() + "/" + dnId; - List<SCMStorageReport> reports = + List<StorageReportProto> reports = TestUtils.createStorageReport(100, 10, 90, storagePath, null, dnId, 1); try (SCMNodeManager nodemanager = createNodeManager(conf)) { - nodemanager.register(datanodeDetails.getProtoBufMessage(), + nodemanager.register(datanodeDetails, TestUtils.createNodeReport(reports)); List<SCMCommand> command = nodemanager.sendHeartbeat( - datanodeDetails.getProtoBufMessage(), null); + datanodeDetails, null); Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails)); Assert.assertTrue("On regular HB calls, SCM responses a " + "datanode with an empty command list", command.isEmpty()); @@ -298,10 +298,10 @@ public class TestNodeManager { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { List<SCMCommand> command = - nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodemanager.sendHeartbeat(datanodeDetails, null); return command.size() == 1 && command.get(0).getType() - .equals(SCMCmdType.reregisterCommand); + .equals(SCMCommandProto.Type.reregisterCommand); } }, 100, 3 * 1000); } catch (TimeoutException e) { @@ -330,7 +330,7 @@ public class TestNodeManager { for (int x = 0; x < count; x++) { DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( nodeManager); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, null); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), @@ -422,19 +422,19 @@ public class TestNodeManager { DatanodeDetails staleNode = TestUtils.getDatanodeDetails(nodeManager); // Heartbeat once - nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(), + nodeManager.sendHeartbeat(staleNode, null); // Heartbeat all other nodes. for (DatanodeDetails dn : nodeList) { - nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null); + nodeManager.sendHeartbeat(dn, null); } // Wait for 2 seconds .. and heartbeat good nodes again. Thread.sleep(2 * 1000); for (DatanodeDetails dn : nodeList) { - nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null); + nodeManager.sendHeartbeat(dn, null); } // Wait for 2 seconds, wait a total of 4 seconds to make sure that the @@ -451,7 +451,7 @@ public class TestNodeManager { // heartbeat good nodes again. for (DatanodeDetails dn : nodeList) { - nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null); + nodeManager.sendHeartbeat(dn, null); } // 6 seconds is the dead window for this test , so we wait a total of @@ -565,11 +565,11 @@ public class TestNodeManager { DatanodeDetails deadNode = TestUtils.getDatanodeDetails(nodeManager); nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); nodeManager.sendHeartbeat( - staleNode.getProtoBufMessage(), null); + staleNode, null); nodeManager.sendHeartbeat( - deadNode.getProtoBufMessage(), null); + deadNode, null); // Sleep so that heartbeat processing thread gets to run. Thread.sleep(500); @@ -596,15 +596,15 @@ public class TestNodeManager { */ nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); nodeManager.sendHeartbeat( - staleNode.getProtoBufMessage(), null); + staleNode, null); nodeManager.sendHeartbeat( - deadNode.getProtoBufMessage(), null); + deadNode, null); Thread.sleep(1500); nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); Thread.sleep(2 * 1000); assertEquals(1, nodeManager.getNodeCount(HEALTHY)); @@ -625,12 +625,12 @@ public class TestNodeManager { */ nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); nodeManager.sendHeartbeat( - staleNode.getProtoBufMessage(), null); + staleNode, null); Thread.sleep(1500); nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); Thread.sleep(2 * 1000); // 3.5 seconds have elapsed for stale node, so it moves into Stale. @@ -664,11 +664,11 @@ public class TestNodeManager { * back all the nodes in healthy state. */ nodeManager.sendHeartbeat( - healthyNode.getProtoBufMessage(), null); + healthyNode, null); nodeManager.sendHeartbeat( - staleNode.getProtoBufMessage(), null); + staleNode, null); nodeManager.sendHeartbeat( - deadNode.getProtoBufMessage(), null); + deadNode, null); Thread.sleep(500); //Assert all nodes are healthy. assertEquals(3, nodeManager.getAllNodes().size()); @@ -689,7 +689,7 @@ public class TestNodeManager { int sleepDuration) throws InterruptedException { while (!Thread.currentThread().isInterrupted()) { for (DatanodeDetails dn : list) { - manager.sendHeartbeat(dn.getProtoBufMessage(), null); + manager.sendHeartbeat(dn, null); } Thread.sleep(sleepDuration); } @@ -775,7 +775,7 @@ public class TestNodeManager { // No Thread just one time HBs the node manager, so that these will be // marked as dead nodes eventually. for (DatanodeDetails dn : deadNodeList) { - nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null); + nodeManager.sendHeartbeat(dn, null); } @@ -940,7 +940,7 @@ public class TestNodeManager { DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( nodeManager); nodeManager.sendHeartbeat( - datanodeDetails.getProtoBufMessage(), null); + datanodeDetails, null); String status = nodeManager.getChillModeStatus(); Assert.assertThat(status, containsString("Still in chill " + "mode, waiting on nodes to report in.")); @@ -967,8 +967,7 @@ public class TestNodeManager { // Assert that node manager force enter cannot be overridden by nodes HBs. for (int x = 0; x < 20; x++) { DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager); - nodeManager.sendHeartbeat(datanode.getProtoBufMessage(), - null); + nodeManager.sendHeartbeat(datanode, null); } Thread.sleep(500); @@ -1009,10 +1008,10 @@ public class TestNodeManager { String dnId = datanodeDetails.getUuidString(); long free = capacity - used; String storagePath = testDir.getAbsolutePath() + "/" + dnId; - List<SCMStorageReport> reports = TestUtils + List<StorageReportProto> reports = TestUtils .createStorageReport(capacity, used, free, storagePath, null, dnId, 1); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, TestUtils.createNodeReport(reports)); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), @@ -1058,11 +1057,11 @@ public class TestNodeManager { long scmUsed = x * usedPerHeartbeat; long remaining = capacity - scmUsed; String storagePath = testDir.getAbsolutePath() + "/" + dnId; - List<SCMStorageReport> reports = TestUtils + List<StorageReportProto> reports = TestUtils .createStorageReport(capacity, scmUsed, remaining, storagePath, null, dnId, 1); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, TestUtils.createNodeReport(reports)); Thread.sleep(100); } @@ -1140,10 +1139,10 @@ public class TestNodeManager { // Send a new report to bring the dead node back to healthy String storagePath = testDir.getAbsolutePath() + "/" + dnId; - List<SCMStorageReport> reports = TestUtils + List<StorageReportProto> reports = TestUtils .createStorageReport(capacity, expectedScmUsed, expectedRemaining, storagePath, null, dnId, 1); - nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nodeManager.sendHeartbeat(datanodeDetails, TestUtils.createNodeReport(reports)); // Wait up to 5 seconds so that the dead node becomes healthy http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java index b824412..072dee7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java @@ -21,9 +21,9 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMNodeReport; + StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMStorageReport; + StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.ozone.OzoneConsts; @@ -134,7 +134,7 @@ public class TestSCMNodeStorageStatMap { @Test public void testProcessNodeReportCheckOneNode() throws IOException { UUID key = getFirstKey(); - List<SCMStorageReport> reportList = new ArrayList<>(); + List<StorageReportProto> reportList = new ArrayList<>(); Set<StorageLocationReport> reportSet = testData.get(key); SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); map.insertNewDatanode(key, reportSet); @@ -146,16 +146,16 @@ public class TestSCMNodeStorageStatMap { long reportCapacity = report.getCapacity(); long reportScmUsed = report.getScmUsed(); long reportRemaining = report.getRemaining(); - List<SCMStorageReport> reports = TestUtils + List<StorageReportProto> reports = TestUtils .createStorageReport(reportCapacity, reportScmUsed, reportRemaining, path, null, storageId, 1); StorageReportResult result = map.processNodeReport(key, TestUtils.createNodeReport(reports)); Assert.assertEquals(result.getStatus(), SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL); - StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb = - SCMNodeReport.newBuilder(); - SCMStorageReport srb = reportSet.iterator().next().getProtoBufMessage(); + StorageContainerDatanodeProtocolProtos.NodeReportProto.Builder nrb = + NodeReportProto.newBuilder(); + StorageReportProto srb = reportSet.iterator().next().getProtoBufMessage(); reportList.add(srb); result = map.processNodeReport(key, TestUtils.createNodeReport(reportList)); Assert.assertEquals(result.getStatus(), @@ -168,7 +168,7 @@ public class TestSCMNodeStorageStatMap { Assert.assertEquals(result.getStatus(), SCMNodeStorageStatMap.ReportStatus.STORAGE_OUT_OF_SPACE); // Mark a disk failed - SCMStorageReport srb2 = SCMStorageReport.newBuilder() + StorageReportProto srb2 = StorageReportProto.newBuilder() .setStorageUuid(UUID.randomUUID().toString()) .setStorageLocation(srb.getStorageLocation()).setScmUsed(reportCapacity) .setCapacity(reportCapacity).setRemaining(0).setFailed(true).build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 1d92cdc..34779da 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -20,22 +20,21 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.ipc.RPC; @@ -200,7 +199,7 @@ public class TestEndPoint { DatanodeDetails nodeToRegister = getDatanodeDetails(); try (EndpointStateMachine rpcEndPoint = createEndpoint( SCMTestUtils.getConf(), serverAddress, 1000)) { - SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint() + SCMRegisteredResponseProto responseProto = rpcEndPoint.getEndPoint() .register(nodeToRegister.getProtoBufMessage(), TestUtils .createNodeReport( getStorageReports(nodeToRegister.getUuidString())), @@ -215,7 +214,7 @@ public class TestEndPoint { } } - private List<SCMStorageReport> getStorageReports(String id) { + private List<StorageReportProto> getStorageReports(String id) { String storagePath = testDir.getAbsolutePath() + "/" + id; return TestUtils.createStorageReport(100, 10, 90, storagePath, null, id, 1); } @@ -293,9 +292,14 @@ public class TestEndPoint { createEndpoint(SCMTestUtils.getConf(), serverAddress, 1000)) { String storageId = UUID.randomUUID().toString(); + SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(dataNode.getProtoBufMessage()) + .setNodeReport(TestUtils.createNodeReport( + getStorageReports(storageId))) + .build(); + SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() - .sendHeartbeat(dataNode.getProtoBufMessage(), - TestUtils.createNodeReport(getStorageReports(storageId))); + .sendHeartbeat(request); Assert.assertNotNull(responseProto); Assert.assertEquals(0, responseProto.getCommandsCount()); } @@ -361,86 +365,11 @@ public class TestEndPoint { lessThanOrEqualTo(rpcTimeout + tolerance)); } - /** - * Returns a new container report. - * @return - */ - ContainerReport getRandomContainerReport() { - return new ContainerReport(RandomUtils.nextLong(), - DigestUtils.sha256Hex("Random")); - } - - /** - * Creates dummy container reports. - * @param count - The number of closed containers to create. - * @return ContainerReportsProto - */ - StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto - createDummyContainerReports(int count) { - StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder - reportsBuilder = StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.newBuilder(); - for (int x = 0; x < count; x++) { - reportsBuilder.addReports(getRandomContainerReport() - .getProtoBufMessage()); - } - reportsBuilder.setDatanodeDetails(getDatanodeDetails() - .getProtoBufMessage()); - reportsBuilder.setType(StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.reportType.fullReport); - return reportsBuilder.build(); - } - - /** - * Tests that rpcEndpoint sendContainerReport works as expected. - * @throws Exception - */ - @Test - public void testContainerReportSend() throws Exception { - final int count = 1000; - scmServerImpl.reset(); - try (EndpointStateMachine rpcEndPoint = - createEndpoint(SCMTestUtils.getConf(), - serverAddress, 1000)) { - ContainerReportsResponseProto responseProto = rpcEndPoint - .getEndPoint().sendContainerReport(createDummyContainerReports( - count)); - Assert.assertNotNull(responseProto); - } - Assert.assertEquals(1, scmServerImpl.getContainerReportsCount()); - Assert.assertEquals(count, scmServerImpl.getContainerCount()); - } - - - /** - * Tests that rpcEndpoint sendContainerReport works as expected. - * @throws Exception - */ - @Test - public void testContainerReport() throws Exception { - final int count = 1000; - scmServerImpl.reset(); - try (EndpointStateMachine rpcEndPoint = - createEndpoint(SCMTestUtils.getConf(), - serverAddress, 1000)) { - ContainerReportsResponseProto responseProto = rpcEndPoint - .getEndPoint().sendContainerReport(createContainerReport(count, - null)); - Assert.assertNotNull(responseProto); - } - Assert.assertEquals(1, scmServerImpl.getContainerReportsCount()); - Assert.assertEquals(count, scmServerImpl.getContainerCount()); - final long expectedKeyCount = count * 1000; - Assert.assertEquals(expectedKeyCount, scmServerImpl.getKeyCount()); - final long expectedBytesUsed = count * OzoneConsts.GB * 2; - Assert.assertEquals(expectedBytesUsed, scmServerImpl.getBytesUsed()); - } - - private ContainerReportsRequestProto createContainerReport( + private ContainerReportsProto createContainerReport( int count, DatanodeDetails datanodeDetails) { - StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder + StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder reportsBuilder = StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.newBuilder(); + .ContainerReportsProto.newBuilder(); for (int x = 0; x < count; x++) { long containerID = RandomUtils.nextLong(); ContainerReport report = new ContainerReport(containerID, @@ -455,14 +384,6 @@ public class TestEndPoint { reportsBuilder.addReports(report.getProtoBufMessage()); } - if(datanodeDetails == null) { - reportsBuilder.setDatanodeDetails(getDatanodeDetails() - .getProtoBufMessage()); - } else { - reportsBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()); - } - reportsBuilder.setType(StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto.reportType.fullReport); return reportsBuilder.build(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27ac8235/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java deleted file mode 100644 index e197886..0000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java +++ /dev/null @@ -1,275 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor; -import org.apache.hadoop.hdds.scm.container.replication.InProgressPool; -import org.apache.hadoop.hdds.scm.node.CommandQueue; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.NodePoolManager; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.container.common.SCMTestUtils; -import org.apache.hadoop.ozone.container.testutils - .ReplicationDatanodeStateManager; -import org.apache.hadoop.ozone.container.testutils.ReplicationNodeManagerMock; -import org.apache.hadoop.ozone.container.testutils - .ReplicationNodePoolManagerMock; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.GenericTestUtils.LogCapturer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.event.Level; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .HEALTHY; -import static org.apache.ratis.shaded.com.google.common.util.concurrent - .Uninterruptibles.sleepUninterruptibly; - -/** - * Tests for the container manager. - */ -public class TestContainerSupervisor { - final static String POOL_NAME_TEMPLATE = "Pool%d"; - static final int MAX_DATANODES = 72; - static final int POOL_SIZE = 24; - static final int POOL_COUNT = 3; - private LogCapturer logCapturer = LogCapturer.captureLogs( - LogFactory.getLog(ContainerSupervisor.class)); - private List<DatanodeDetails> datanodes = new LinkedList<>(); - private NodeManager nodeManager; - private NodePoolManager poolManager; - private CommandQueue commandQueue; - private ContainerSupervisor containerSupervisor; - private ReplicationDatanodeStateManager datanodeStateManager; - - @After - public void tearDown() throws Exception { - logCapturer.stopCapturing(); - GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.INFO); - } - - @Before - public void setUp() throws Exception { - GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG); - Map<DatanodeDetails, NodeState> nodeStateMap = new HashMap<>(); - // We are setting up 3 pools with 24 nodes each in this cluster. - // First we create 72 Datanodes. - for (int x = 0; x < MAX_DATANODES; x++) { - DatanodeDetails datanode = TestUtils.getDatanodeDetails(); - datanodes.add(datanode); - nodeStateMap.put(datanode, HEALTHY); - } - - commandQueue = new CommandQueue(); - - // All nodes in this cluster are healthy for time being. - nodeManager = new ReplicationNodeManagerMock(nodeStateMap, commandQueue); - poolManager = new ReplicationNodePoolManagerMock(); - - - Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " + - "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES); - - // Start from 1 instead of zero so we can multiply and get the node index. - for (int y = 1; y <= POOL_COUNT; y++) { - String poolName = String.format(POOL_NAME_TEMPLATE, y); - for (int z = 0; z < POOL_SIZE; z++) { - DatanodeDetails id = datanodes.get(y * z); - poolManager.addNode(poolName, id); - } - } - OzoneConfiguration config = SCMTestUtils.getOzoneConf(); - config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 2, - TimeUnit.SECONDS); - config.setTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, 1, - TimeUnit.SECONDS); - containerSupervisor = new ContainerSupervisor(config, - nodeManager, poolManager); - datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager, - poolManager); - // Sleep for one second to make sure all threads get time to run. - sleepUninterruptibly(1, TimeUnit.SECONDS); - } - - @Test - /** - * Asserts that at least one pool is picked up for processing. - */ - public void testAssertPoolsAreProcessed() { - // This asserts that replication manager has started processing at least - // one pool. - Assert.assertTrue(containerSupervisor.getInProgressPoolCount() > 0); - - // Since all datanodes are flagged as healthy in this test, for each - // datanode we must have queued a command. - Assert.assertEquals("Commands are in queue :", - POOL_SIZE * containerSupervisor.getInProgressPoolCount(), - commandQueue.getCommandsInQueue()); - } - - @Test - /** - * This test sends container reports for 2 containers to a pool in progress. - * Asserts that we are able to find a container with single replica and do - * not find container with 3 replicas. - */ - public void testDetectSingleContainerReplica() throws TimeoutException, - InterruptedException { - long singleNodeContainerID = 9001; - long threeNodeContainerID = 9003; - InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0); - // Only single datanode reporting that "SingleNodeContainer" exists. - List<ContainerReportsRequestProto> clist = - datanodeStateManager.getContainerReport(singleNodeContainerID, - ppool.getPool().getPoolName(), 1); - ppool.handleContainerReport(clist.get(0)); - - // Three nodes are going to report that ThreeNodeContainer exists. - clist = datanodeStateManager.getContainerReport(threeNodeContainerID, - ppool.getPool().getPoolName(), 3); - - for (ContainerReportsRequestProto reportsProto : clist) { - ppool.handleContainerReport(reportsProto); - } - GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4, - 200, 1000); - ppool.setDoneProcessing(); - - List<Map.Entry<Long, Integer>> containers = ppool.filterContainer(p -> p - .getValue() == 1); - Assert.assertEquals(singleNodeContainerID, - containers.get(0).getKey().longValue()); - int count = containers.get(0).getValue(); - Assert.assertEquals(1L, count); - } - - @Test - /** - * We create three containers, Normal,OveReplicated and WayOverReplicated - * containers. This test asserts that we are able to find the - * over replicated containers. - */ - public void testDetectOverReplica() throws TimeoutException, - InterruptedException { - long normalContainerID = 9000; - long overReplicatedContainerID = 9001; - long wayOverReplicatedContainerID = 9002; - InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0); - - List<ContainerReportsRequestProto> clist = - datanodeStateManager.getContainerReport(normalContainerID, - ppool.getPool().getPoolName(), 3); - ppool.handleContainerReport(clist.get(0)); - - clist = datanodeStateManager.getContainerReport(overReplicatedContainerID, - ppool.getPool().getPoolName(), 4); - - for (ContainerReportsRequestProto reportsProto : clist) { - ppool.handleContainerReport(reportsProto); - } - - clist = datanodeStateManager.getContainerReport( - wayOverReplicatedContainerID, ppool.getPool().getPoolName(), 7); - - for (ContainerReportsRequestProto reportsProto : clist) { - ppool.handleContainerReport(reportsProto); - } - - // We ignore container reports from the same datanodes. - // it is possible that these each of these containers get placed - // on same datanodes, so allowing for 4 duplicates in the set of 14. - GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10, - 200, 1000); - ppool.setDoneProcessing(); - - List<Map.Entry<Long, Integer>> containers = ppool.filterContainer(p -> p - .getValue() > 3); - Assert.assertEquals(2, containers.size()); - } - - @Test - /** - * This test verifies that all pools are picked up for replica processing. - * - */ - public void testAllPoolsAreProcessed() throws TimeoutException, - InterruptedException { - // Verify that we saw all three pools being picked up for processing. - GenericTestUtils.waitFor(() -> containerSupervisor.getPoolProcessCount() - >= 3, 200, 15 * 1000); - Assert.assertTrue(logCapturer.getOutput().contains("Pool1") && - logCapturer.getOutput().contains("Pool2") && - logCapturer.getOutput().contains("Pool3")); - } - - @Test - /** - * Adds a new pool and tests that we are able to pick up that new pool for - * processing as well as handle container reports for datanodes in that pool. - * @throws TimeoutException - * @throws InterruptedException - */ - public void testAddingNewPoolWorks() - throws TimeoutException, InterruptedException, IOException { - LogCapturer inProgressLog = LogCapturer.captureLogs( - LogFactory.getLog(InProgressPool.class)); - GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG); - try { - DatanodeDetails id = TestUtils.getDatanodeDetails(); - ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY); - poolManager.addNode("PoolNew", id); - GenericTestUtils.waitFor(() -> - logCapturer.getOutput().contains("PoolNew"), - 200, 15 * 1000); - - long newContainerID = 7001; - // Assert that we are able to send a container report to this new - // pool and datanode. - List<ContainerReportsRequestProto> clist = - datanodeStateManager.getContainerReport(newContainerID, - "PoolNew", 1); - containerSupervisor.handleContainerReport(clist.get(0)); - GenericTestUtils.waitFor(() -> - inProgressLog.getOutput() - .contains(Long.toString(newContainerID)) && inProgressLog - .getOutput().contains(id.getUuidString()), - 200, 10 * 1000); - } finally { - inProgressLog.stopCapturing(); - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org