http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 7078b8f..42b39f9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -17,17 +17,12 @@ package org.apache.hadoop.hdds.scm.container; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.container.states.ContainerState; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -45,11 +40,8 @@ import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Set; @@ -116,7 +108,7 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes * TimeOut Delete Container State Machine - if the container creating times out, * then Container State manager decides to delete the container. */ -public class ContainerStateManager implements Closeable { +public class ContainerStateManager { private static final Logger LOG = LoggerFactory.getLogger(ContainerStateManager.class); @@ -135,11 +127,10 @@ public class ContainerStateManager implements Closeable { * TODO : Add Container Tags so we know which containers are owned by SCM. */ @SuppressWarnings("unchecked") - public ContainerStateManager(Configuration configuration, - ContainerManager containerManager, PipelineSelector pipelineSelector) { + public ContainerStateManager(final Configuration configuration) { // Initialize the container state machine. - Set<HddsProtos.LifeCycleState> finalStates = new HashSet(); + final Set<HddsProtos.LifeCycleState> finalStates = new HashSet(); // These are the steady states of a container. finalStates.add(LifeCycleState.OPEN); @@ -155,22 +146,9 @@ public class ContainerStateManager implements Closeable { ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - lastUsedMap = new ConcurrentHashMap<>(); - containerCount = new AtomicLong(0); - containers = new ContainerStateMap(); - } - - /** - * Return the info of all the containers kept by the in-memory mapping. - * - * @return the list of all container info. - */ - public List<ContainerInfo> getAllContainers() { - List<ContainerInfo> list = new ArrayList<>(); - - //No Locking needed since the return value is an immutable map. - containers.getContainerMap().forEach((key, value) -> list.add(value)); - return list; + this.lastUsedMap = new ConcurrentHashMap<>(); + this.containerCount = new AtomicLong(0); + this.containers = new ContainerStateMap(); } /* @@ -244,17 +222,15 @@ public class ContainerStateManager implements Closeable { LifeCycleEvent.CLEANUP); } - public void addExistingContainer(ContainerInfo containerInfo) + void loadContainer(final ContainerInfo containerInfo) throws SCMException { containers.addContainer(containerInfo); - long containerID = containerInfo.getContainerID(); - if (containerCount.get() < containerID) { - containerCount.set(containerID); - } + containerCount.set(Long.max( + containerInfo.getContainerID(), containerCount.get())); } /** - * allocates a new container based on the type, replication etc. + * Allocates a new container based on the type, replication etc. * * @param selector -- Pipeline selector class. * @param type -- Replication type. @@ -262,25 +238,22 @@ public class ContainerStateManager implements Closeable { * @return ContainerWithPipeline * @throws IOException on Failure. */ - public ContainerWithPipeline allocateContainer(PipelineSelector selector, - HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor replicationFactor, String owner) + ContainerInfo allocateContainer(final PipelineSelector selector, + final HddsProtos.ReplicationType type, + final HddsProtos.ReplicationFactor replicationFactor, final String owner) throws IOException { - Pipeline pipeline = selector.getReplicationPipeline(type, + final Pipeline pipeline = selector.getReplicationPipeline(type, replicationFactor); Preconditions.checkNotNull(pipeline, "Pipeline type=%s/" + "replication=%s couldn't be found for the new container. " + "Do you have enough nodes?", type, replicationFactor); - long containerID = containerCount.incrementAndGet(); - ContainerInfo containerInfo = new ContainerInfo.Builder() + final long containerID = containerCount.incrementAndGet(); + final ContainerInfo containerInfo = new ContainerInfo.Builder() .setState(HddsProtos.LifeCycleState.ALLOCATED) .setPipelineID(pipeline.getId()) - // This is bytes allocated for blocks inside container, not the - // container size - .setAllocatedBytes(0) .setUsedBytes(0) .setNumberOfKeys(0) .setStateEnterTime(Time.monotonicNow()) @@ -294,35 +267,34 @@ public class ContainerStateManager implements Closeable { Preconditions.checkNotNull(containerInfo); containers.addContainer(containerInfo); LOG.trace("New container allocated: {}", containerInfo); - return new ContainerWithPipeline(containerInfo, pipeline); + return containerInfo; } /** * Update the Container State to the next state. * - * @param info - ContainerInfo + * @param containerID - ContainerID * @param event - LifeCycle Event * @return Updated ContainerInfo. * @throws SCMException on Failure. */ - public ContainerInfo updateContainerState(ContainerInfo - info, HddsProtos.LifeCycleEvent event) throws SCMException { - LifeCycleState newState; + ContainerInfo updateContainerState(final ContainerID containerID, + final HddsProtos.LifeCycleEvent event) + throws SCMException, ContainerNotFoundException { + final ContainerInfo info = containers.getContainerInfo(containerID); try { - newState = this.stateMachine.getNextState(info.getState(), event); + final LifeCycleState newState = stateMachine.getNextState( + info.getState(), event); + containers.updateState(containerID, info.getState(), newState); + return containers.getContainerInfo(containerID); } catch (InvalidStateTransitionException ex) { String error = String.format("Failed to update container state %s, " + "reason: invalid state transition from state: %s upon " + "event: %s.", - info.getContainerID(), info.getState(), event); + containerID, info.getState(), event); LOG.error(error); throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE); } - - // This is a post condition after executing getNextState. - Preconditions.checkNotNull(newState); - containers.updateState(info, info.getState(), newState); - return containers.getContainerInfo(info); } /** @@ -331,10 +303,10 @@ public class ContainerStateManager implements Closeable { * @return ContainerInfo * @throws SCMException - on Error. */ - public ContainerInfo updateContainerInfo(ContainerInfo info) - throws SCMException { + ContainerInfo updateContainerInfo(final ContainerInfo info) + throws ContainerNotFoundException { containers.updateContainerInfo(info); - return containers.getContainerInfo(info); + return containers.getContainerInfo(info.containerID()); } /** @@ -343,11 +315,16 @@ public class ContainerStateManager implements Closeable { * @param deleteTransactionMap maps containerId to its new * deleteTransactionID */ - public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) { - for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) { - containers.getContainerMap().get(ContainerID.valueof(entry.getKey())) - .updateDeleteTransactionId(entry.getValue()); - } + void updateDeleteTransactionId( + final Map<Long, Long> deleteTransactionMap) { + deleteTransactionMap.forEach((k, v) -> { + try { + containers.getContainerInfo(ContainerID.valueof(k)) + .updateDeleteTransactionId(v); + } catch (ContainerNotFoundException e) { + LOG.warn("Exception while updating delete transaction id.", e); + } + }); } /** @@ -360,12 +337,12 @@ public class ContainerStateManager implements Closeable { * @param state - State of the Container-- {Open, Allocated etc.} * @return ContainerInfo, null if there is no match found. */ - public ContainerInfo getMatchingContainer(final long size, + ContainerInfo getMatchingContainer(final long size, String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) { // Find containers that match the query spec, if no match return null. - NavigableSet<ContainerID> matchingSet = + final NavigableSet<ContainerID> matchingSet = containers.getMatchingContainerIDs(state, owner, factor, type); if (matchingSet == null || matchingSet.size() == 0) { return null; @@ -373,11 +350,9 @@ public class ContainerStateManager implements Closeable { // Get the last used container and find container above the last used // container ID. - ContainerState key = new ContainerState(owner, type, factor); - ContainerID lastID = lastUsedMap.get(key); - if (lastID == null) { - lastID = matchingSet.first(); - } + final ContainerState key = new ContainerState(owner, type, factor); + final ContainerID lastID = lastUsedMap + .getOrDefault(key, matchingSet.first()); // There is a small issue here. The first time, we will skip the first // container. But in most cases it will not matter. @@ -401,32 +376,47 @@ public class ContainerStateManager implements Closeable { resultSet = matchingSet.headSet(lastID, true); selectedContainer = findContainerWithSpace(size, resultSet, owner); } - // Update the allocated Bytes on this container. - if (selectedContainer != null) { - selectedContainer.updateAllocatedBytes(size); - } return selectedContainer; } - private ContainerInfo findContainerWithSpace(long size, - NavigableSet<ContainerID> searchSet, String owner) { - // Get the container with space to meet our request. - for (ContainerID id : searchSet) { - ContainerInfo containerInfo = containers.getContainerInfo(id); - if (containerInfo.getAllocatedBytes() + size <= this.containerSize) { - containerInfo.updateLastUsedTime(); - - ContainerState key = new ContainerState(owner, - containerInfo.getReplicationType(), - containerInfo.getReplicationFactor()); - lastUsedMap.put(key, containerInfo.containerID()); - return containerInfo; + private ContainerInfo findContainerWithSpace(final long size, + final NavigableSet<ContainerID> searchSet, final String owner) { + try { + // Get the container with space to meet our request. + for (ContainerID id : searchSet) { + final ContainerInfo containerInfo = containers.getContainerInfo(id); + if (containerInfo.getUsedBytes() + size <= this.containerSize) { + containerInfo.updateLastUsedTime(); + + final ContainerState key = new ContainerState(owner, + containerInfo.getReplicationType(), + containerInfo.getReplicationFactor()); + lastUsedMap.put(key, containerInfo.containerID()); + return containerInfo; + } } + } catch (ContainerNotFoundException e) { + // This should not happen! + LOG.warn("Exception while finding container with space", e); } return null; } + Set<ContainerID> getAllContainerIDs() { + return containers.getAllContainerIDs(); + } + + /** + * Returns Containers by State. + * + * @param state - State - Open, Closed etc. + * @return List of containers by state. + */ + Set<ContainerID> getContainerIDsByState(final LifeCycleState state) { + return containers.getContainerIDsByState(state); + } + /** * Returns a set of ContainerIDs that match the Container. * @@ -436,39 +426,25 @@ public class ContainerStateManager implements Closeable { * @param state - Current State, like Open, Close etc. * @return Set of containers that match the specific query parameters. */ - public NavigableSet<ContainerID> getMatchingContainerIDs( - String owner, ReplicationType type, ReplicationFactor factor, - LifeCycleState state) { + NavigableSet<ContainerID> getMatchingContainerIDs(final String owner, + final ReplicationType type, final ReplicationFactor factor, + final LifeCycleState state) { return containers.getMatchingContainerIDs(state, owner, factor, type); } /** - * Returns the containerInfo with pipeline for the given container id. - * @param selector -- Pipeline selector class. - * @param containerID id of the container - * @return ContainerInfo containerInfo - * @throws IOException - */ - public ContainerWithPipeline getContainer(PipelineSelector selector, - ContainerID containerID) { - ContainerInfo info = containers.getContainerInfo(containerID.getId()); - Pipeline pipeline = selector.getPipeline(info.getPipelineID()); - return new ContainerWithPipeline(info, pipeline); - } - - /** * Returns the containerInfo for the given container id. * @param containerID id of the container * @return ContainerInfo containerInfo * @throws IOException */ - public ContainerInfo getContainer(ContainerID containerID) { + ContainerInfo getContainer(final ContainerID containerID) + throws ContainerNotFoundException { return containers.getContainerInfo(containerID); } - @Override - public void close() throws IOException { + void close() throws IOException { } /** @@ -478,8 +454,8 @@ public class ContainerStateManager implements Closeable { * @param containerID * @return Set<DatanodeDetails> */ - public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID) - throws SCMException { + Set<ContainerReplica> getContainerReplicas( + final ContainerID containerID) throws ContainerNotFoundException { return containers.getContainerReplicas(containerID); } @@ -487,53 +463,29 @@ public class ContainerStateManager implements Closeable { * Add a container Replica for given DataNode. * * @param containerID - * @param dn + * @param replica */ - public void addContainerReplica(ContainerID containerID, DatanodeDetails dn) { - containers.addContainerReplica(containerID, dn); + void updateContainerReplica(final ContainerID containerID, + final ContainerReplica replica) throws ContainerNotFoundException { + containers.updateContainerReplica(containerID, replica); } /** * Remove a container Replica for given DataNode. * * @param containerID - * @param dn + * @param replica * @return True of dataNode is removed successfully else false. */ - public boolean removeContainerReplica(ContainerID containerID, - DatanodeDetails dn) throws SCMException { - return containers.removeContainerReplica(containerID, dn); - } - - /** - * Compare the existing replication number with the expected one. - */ - public ReplicationRequest checkReplicationState(ContainerID containerID) - throws SCMException { - int existingReplicas = getContainerReplicas(containerID).size(); - int expectedReplicas = getContainer(containerID) - .getReplicationFactor().getNumber(); - if (existingReplicas != expectedReplicas) { - return new ReplicationRequest(containerID.getId(), existingReplicas, - expectedReplicas); - } - return null; - } - - /** - * Checks if the container is open. - */ - public boolean isOpen(ContainerID containerID) { - Preconditions.checkNotNull(containerID); - ContainerInfo container = Preconditions - .checkNotNull(getContainer(containerID), - "Container can't be found " + containerID); - return container.isContainerOpen(); + void removeContainerReplica(final ContainerID containerID, + final ContainerReplica replica) + throws ContainerNotFoundException, ContainerReplicaNotFoundException { + containers.removeContainerReplica(containerID, replica); } - @VisibleForTesting - public ContainerStateMap getContainerStateMap() { - return containers; + void removeContainer(final ContainerID containerID) + throws ContainerNotFoundException { + containers.removeContainer(containerID); } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 9d6cadb..96ad731 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -24,12 +24,10 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo; import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -55,19 +53,23 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_CONTAINER_SIZE_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_CONTAINER_SIZE; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes .FAILED_TO_CHANGE_CONTAINER_STATE; import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; @@ -82,10 +84,7 @@ public class SCMContainerManager implements ContainerManager { private static final Logger LOG = LoggerFactory.getLogger(SCMContainerManager .class); - private final NodeManager nodeManager; - private final long cacheSize; private final Lock lock; - private final Charset encoding = Charset.forName("UTF-8"); private final MetadataStore containerStore; private final PipelineSelector pipelineSelector; private final ContainerStateManager containerStateManager; @@ -100,113 +99,110 @@ public class SCMContainerManager implements ContainerManager { * @param nodeManager - NodeManager so that we can get the nodes that are * healthy to place new * containers. - * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache - * its nodes. This is * passed to LevelDB and this memory is allocated in Native code space. * CacheSize is specified * in MB. * @throws IOException on Failure. */ @SuppressWarnings("unchecked") - public SCMContainerManager( - final Configuration conf, final NodeManager nodeManager, final int - cacheSizeMB, EventPublisher eventPublisher) throws IOException { - this.nodeManager = nodeManager; - this.cacheSize = cacheSizeMB; - - File metaDir = getOzoneMetaDirPath(conf); - - // Write the container name to pipeline mapping. - File containerDBPath = new File(metaDir, SCM_CONTAINER_DB); - containerStore = - MetadataStoreBuilder.newBuilder() - .setConf(conf) - .setDbFile(containerDBPath) - .setCacheSize(this.cacheSize * OzoneConsts.MB) - .build(); + public SCMContainerManager(final Configuration conf, + final NodeManager nodeManager, final EventPublisher eventPublisher) + throws IOException { - this.lock = new ReentrantLock(); + final File metaDir = getOzoneMetaDirPath(conf); + final File containerDBPath = new File(metaDir, SCM_CONTAINER_DB); + final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); - size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE, - OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + this.containerStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(containerDBPath) + .setCacheSize(cacheSize * OzoneConsts.MB) + .build(); + this.lock = new ReentrantLock(); + this.size = (long) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE, + OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); this.pipelineSelector = new PipelineSelector(nodeManager, - conf, eventPublisher, cacheSizeMB); - - this.containerStateManager = - new ContainerStateManager(conf, this, pipelineSelector); - LOG.trace("Container State Manager created."); - + conf, eventPublisher, cacheSize); + this.containerStateManager = new ContainerStateManager(conf); this.eventPublisher = eventPublisher; - long containerCreationLeaseTimeout = conf.getTimeDuration( + final long containerCreationLeaseTimeout = conf.getTimeDuration( ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - containerLeaseManager = new LeaseManager<>("ContainerCreation", + this.containerLeaseManager = new LeaseManager<>("ContainerCreation", containerCreationLeaseTimeout); - containerLeaseManager.start(); + this.containerLeaseManager.start(); + loadExistingContainers(); } - private void loadExistingContainers() { - - List<ContainerInfo> containerList; - try { - containerList = listContainer(0, Integer.MAX_VALUE); - - // if there are no container to load, let us return. - if (containerList == null || containerList.size() == 0) { - LOG.info("No containers to load for this cluster."); - return; - } - } catch (IOException e) { - if (!e.getMessage().equals("No container exists in current db")) { - LOG.error("Could not list the containers", e); - } - return; + private void loadExistingContainers() throws IOException { + List<Map.Entry<byte[], byte[]>> range = containerStore + .getSequentialRangeKVs(null, Integer.MAX_VALUE, null); + for (Map.Entry<byte[], byte[]> entry : range) { + ContainerInfo container = ContainerInfo.fromProtobuf( + HddsProtos.SCMContainerInfo.PARSER.parseFrom(entry.getValue())); + Preconditions.checkNotNull(container); + containerStateManager.loadContainer(container); + pipelineSelector.addContainerToPipeline( + container.getPipelineID(), container.getContainerID()); } + } + @VisibleForTesting + // TODO: remove this later. + public ContainerStateManager getContainerStateManager() { + return containerStateManager; + } + + @Override + public List<ContainerInfo> getContainers() { + lock.lock(); try { - for (ContainerInfo container : containerList) { - containerStateManager.addExistingContainer(container); - pipelineSelector.addContainerToPipeline( - container.getPipelineID(), container.getContainerID()); - } - } catch (SCMException ex) { - LOG.error("Unable to create a container information. ", ex); - // Fix me, what is the proper shutdown procedure for SCM ?? - // System.exit(1) // Should we exit here? + return containerStateManager.getAllContainerIDs().stream().map(id -> { + try { + return containerStateManager.getContainer(id); + } catch (ContainerNotFoundException e) { + // How can this happen? + return null; + } + }).filter(Objects::nonNull).collect(Collectors.toList()); + } finally { + lock.unlock(); } } - /** - * {@inheritDoc} - */ @Override - public ContainerInfo getContainer(final long containerID) throws - IOException { - ContainerInfo containerInfo; + public List<ContainerInfo> getContainers(LifeCycleState state) { lock.lock(); try { - byte[] containerBytes = containerStore.get( - Longs.toByteArray(containerID)); - if (containerBytes == null) { - throw new SCMException( - "Specified key does not exist. key : " + containerID, - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - - HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes); - containerInfo = ContainerInfo.fromProtobuf(temp); - return containerInfo; + return containerStateManager.getContainerIDsByState(state).stream() + .map(id -> { + try { + return containerStateManager.getContainer(id); + } catch (ContainerNotFoundException e) { + // How can this happen? + return null; + } + }).filter(Objects::nonNull).collect(Collectors.toList()); } finally { lock.unlock(); } } /** + * {@inheritDoc} + */ + @Override + public ContainerInfo getContainer(final ContainerID containerID) + throws ContainerNotFoundException { + return containerStateManager.getContainer(containerID); + } + + /** * Returns the ContainerInfo and pipeline from the containerID. If container * has no available replicas in datanodes it returns pipeline with no * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for @@ -217,38 +213,29 @@ public class SCMContainerManager implements ContainerManager { * @throws IOException */ @Override - public ContainerWithPipeline getContainerWithPipeline(long containerID) - throws IOException { - ContainerInfo contInfo; + public ContainerWithPipeline getContainerWithPipeline(ContainerID containerID) + throws ContainerNotFoundException { lock.lock(); try { - byte[] containerBytes = containerStore.get( - Longs.toByteArray(containerID)); - if (containerBytes == null) { - throw new SCMException( - "Specified key does not exist. key : " + containerID, - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes); - contInfo = ContainerInfo.fromProtobuf(temp); - + final ContainerInfo contInfo = getContainer(containerID); Pipeline pipeline; String leaderId = ""; - if (contInfo.isContainerOpen()) { + if (contInfo.isOpen()) { // If pipeline with given pipeline Id already exist return it pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID()); } else { // For close containers create pipeline from datanodes with replicas - Set<DatanodeDetails> dnWithReplicas = containerStateManager + Set<ContainerReplica> dnWithReplicas = containerStateManager .getContainerReplicas(contInfo.containerID()); if (!dnWithReplicas.isEmpty()) { - leaderId = dnWithReplicas.iterator().next().getUuidString(); + leaderId = dnWithReplicas.iterator().next() + .getDatanodeDetails().getUuidString(); } pipeline = new Pipeline(leaderId, contInfo.getState(), ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(), PipelineID.randomId()); - dnWithReplicas.forEach(pipeline::addMember); + dnWithReplicas.stream().map(ContainerReplica::getDatanodeDetails). + forEach(pipeline::addMember); } return new ContainerWithPipeline(contInfo, pipeline); } finally { @@ -260,33 +247,32 @@ public class SCMContainerManager implements ContainerManager { * {@inheritDoc} */ @Override - public List<ContainerInfo> listContainer(long startContainerID, - int count) throws IOException { - List<ContainerInfo> containerList = new ArrayList<>(); + public List<ContainerInfo> listContainer(ContainerID startContainerID, + int count) { lock.lock(); try { - if (containerStore.isEmpty()) { - throw new IOException("No container exists in current db"); - } - byte[] startKey = startContainerID <= 0 ? null : - Longs.toByteArray(startContainerID); - List<Map.Entry<byte[], byte[]>> range = - containerStore.getSequentialRangeKVs(startKey, count, null); - - // Transform the values into the pipelines. - // TODO: filter by container state - for (Map.Entry<byte[], byte[]> entry : range) { - ContainerInfo containerInfo = - ContainerInfo.fromProtobuf( - HddsProtos.SCMContainerInfo.PARSER.parseFrom( - entry.getValue())); - Preconditions.checkNotNull(containerInfo); - containerList.add(containerInfo); - } + final long startId = startContainerID == null ? + 0 : startContainerID.getId(); + final List<ContainerID> containersIds = + new ArrayList<>(containerStateManager.getAllContainerIDs()); + Collections.sort(containersIds); + + return containersIds.stream() + .filter(id -> id.getId() > startId) + .limit(count) + .map(id -> { + try { + return containerStateManager.getContainer(id); + } catch (ContainerNotFoundException ex) { + // This can never happen, as we hold lock no one else can remove + // the container after we got the container ids. + LOG.warn("Container Missing.", ex); + return null; + } + }).collect(Collectors.toList()); } finally { lock.unlock(); } - return containerList; } /** @@ -298,29 +284,35 @@ public class SCMContainerManager implements ContainerManager { * @throws IOException - Exception */ @Override - public ContainerWithPipeline allocateContainer( - ReplicationType type, - ReplicationFactor replicationFactor, - String owner) + public ContainerWithPipeline allocateContainer(final ReplicationType type, + final ReplicationFactor replicationFactor, final String owner) throws IOException { - - ContainerInfo containerInfo; - ContainerWithPipeline containerWithPipeline; - lock.lock(); try { - containerWithPipeline = containerStateManager.allocateContainer( - pipelineSelector, type, replicationFactor, owner); - containerInfo = containerWithPipeline.getContainerInfo(); - - byte[] containerIDBytes = Longs.toByteArray( - containerInfo.getContainerID()); - containerStore.put(containerIDBytes, containerInfo.getProtobuf() - .toByteArray()); + final ContainerInfo containerInfo; containerInfo = containerStateManager + .allocateContainer(pipelineSelector, type, replicationFactor, owner); + final Pipeline pipeline = pipelineSelector.getPipeline( + containerInfo.getPipelineID()); + + try { + final byte[] containerIDBytes = Longs.toByteArray( + containerInfo.getContainerID()); + containerStore.put(containerIDBytes, + containerInfo.getProtobuf().toByteArray()); + } catch (IOException ex) { + // If adding to containerStore fails, we should remove the container + // from in-memory map. + try { + containerStateManager.removeContainer(containerInfo.containerID()); + } catch (ContainerNotFoundException cnfe) { + // No need to worry much, everything is going as planned. + } + throw ex; + } + return new ContainerWithPipeline(containerInfo, pipeline); } finally { lock.unlock(); } - return containerWithPipeline; } /** @@ -332,18 +324,24 @@ public class SCMContainerManager implements ContainerManager { * specified key. */ @Override - public void deleteContainer(long containerID) throws IOException { + public void deleteContainer(ContainerID containerID) throws IOException { lock.lock(); try { - byte[] dbKey = Longs.toByteArray(containerID); - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes == null) { - throw new SCMException( - "Failed to delete container " + containerID + ", reason : " + - "container doesn't exist.", - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + containerStateManager.removeContainer(containerID); + final byte[] dbKey = Longs.toByteArray(containerID.getId()); + final byte[] containerBytes = containerStore.get(dbKey); + if (containerBytes != null) { + containerStore.delete(dbKey); + } else { + // Where did the container go? o_O + LOG.warn("Unable to remove the container {} from container store," + + " it's missing!", containerID); } - containerStore.delete(dbKey); + } catch (ContainerNotFoundException cnfe) { + throw new SCMException( + "Failed to delete container " + containerID + ", reason : " + + "container doesn't exist.", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); } finally { lock.unlock(); } @@ -354,40 +352,51 @@ public class SCMContainerManager implements ContainerManager { */ @Override public HddsProtos.LifeCycleState updateContainerState( - long containerID, HddsProtos.LifeCycleEvent event) throws - IOException { - ContainerInfo containerInfo; + ContainerID containerID, HddsProtos.LifeCycleEvent event) + throws IOException { + // Should we return the updated ContainerInfo instead of LifeCycleState? lock.lock(); try { - byte[] dbKey = Longs.toByteArray(containerID); - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes == null) { - throw new SCMException( - "Failed to update container state" - + containerID - + ", reason : container doesn't exist.", - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + ContainerInfo updatedContainer = + updateContainerStateInternal(containerID, event); + if (!updatedContainer.isOpen()) { + pipelineSelector.removeContainerFromPipeline( + updatedContainer.getPipelineID(), containerID.getId()); } - containerInfo = - ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes)); + final byte[] dbKey = Longs.toByteArray(containerID.getId()); + containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); + return updatedContainer.getState(); + } catch (ContainerNotFoundException cnfe) { + throw new SCMException( + "Failed to update container state" + + containerID + + ", reason : container doesn't exist.", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } finally { + lock.unlock(); + } + } - Preconditions.checkNotNull(containerInfo); + private ContainerInfo updateContainerStateInternal(ContainerID containerID, + HddsProtos.LifeCycleEvent event) throws IOException { + // Refactor the below code for better clarity. + try { + final ContainerInfo info = + containerStateManager.getContainer(containerID); switch (event) { case CREATE: // Acquire lease on container Lease<ContainerInfo> containerLease = - containerLeaseManager.acquire(containerInfo); + containerLeaseManager.acquire(info); // Register callback to be executed in case of timeout containerLease.registerCallBack(() -> { updateContainerState(containerID, HddsProtos.LifeCycleEvent.TIMEOUT); - return null; - }); + return null; }); break; case CREATED: // Release the lease on container - containerLeaseManager.release(containerInfo); + containerLeaseManager.release(info); break; case FINALIZE: // TODO: we don't need a lease manager here for closing as the @@ -412,28 +421,20 @@ public class SCMContainerManager implements ContainerManager { // If the below updateContainerState call fails, we should revert the // changes made in switch case. // Like releasing the lease in case of BEGIN_CREATE. - ContainerInfo updatedContainer = containerStateManager - .updateContainerState(containerInfo, event); - if (!updatedContainer.isContainerOpen()) { - pipelineSelector.removeContainerFromPipeline( - containerInfo.getPipelineID(), containerID); - } - containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); - return updatedContainer.getState(); + return containerStateManager.updateContainerState(containerID, event); } catch (LeaseException e) { throw new IOException("Lease Exception.", e); - } finally { - lock.unlock(); } } - /** - * Update deleteTransactionId according to deleteTransactionMap. - * - * @param deleteTransactionMap Maps the containerId to latest delete - * transaction id for the container. - * @throws IOException - */ + + /** + * Update deleteTransactionId according to deleteTransactionMap. + * + * @param deleteTransactionMap Maps the containerId to latest delete + * transaction id for the container. + * @throws IOException + */ public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) throws IOException { if (deleteTransactionMap == null) { @@ -467,16 +468,6 @@ public class SCMContainerManager implements ContainerManager { } /** - * Returns the container State Manager. - * - * @return ContainerStateManager - */ - @Override - public ContainerStateManager getStateManager() { - return containerStateManager; - } - - /** * Return a container matching the attributes specified. * * @param sizeRequired - Space needed in the Container. @@ -489,7 +480,7 @@ public class SCMContainerManager implements ContainerManager { public ContainerWithPipeline getMatchingContainerWithPipeline( final long sizeRequired, String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) throws IOException { - ContainerInfo containerInfo = getStateManager() + ContainerInfo containerInfo = containerStateManager .getMatchingContainer(sizeRequired, owner, type, factor, state); if (containerInfo == null) { return null; @@ -518,70 +509,45 @@ public class SCMContainerManager implements ContainerManager { */ @Override public void processContainerReports(DatanodeDetails datanodeDetails, - ContainerReportsProto reports, boolean isRegisterCall) - throws IOException { + ContainerReportsProto reports) throws IOException { List<StorageContainerDatanodeProtocolProtos.ContainerInfo> containerInfos = reports.getReportsList(); PendingDeleteStatusList pendingDeleteStatusList = new PendingDeleteStatusList(datanodeDetails); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo : + for (StorageContainerDatanodeProtocolProtos.ContainerInfo newInfo : containerInfos) { - // Update replica info during registration process. - if (isRegisterCall) { - try { - getStateManager().addContainerReplica(ContainerID. - valueof(contInfo.getContainerID()), datanodeDetails); - } catch (Exception ex) { - // Continue to next one after logging the error. - LOG.error("Error while adding replica for containerId {}.", - contInfo.getContainerID(), ex); - } - } - byte[] dbKey = Longs.toByteArray(contInfo.getContainerID()); + ContainerID id = ContainerID.valueof(newInfo.getContainerID()); + ContainerReplica replica = ContainerReplica.newBuilder() + .setContainerID(id) + .setDatanodeDetails(datanodeDetails) + .setOriginNodeId(datanodeDetails.getUuid()) + .build(); lock.lock(); try { - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes != null) { - HddsProtos.SCMContainerInfo knownState = - HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); - - if (knownState.getState() == LifeCycleState.CLOSING - && contInfo.getState() == LifeCycleState.CLOSED) { - - updateContainerState(contInfo.getContainerID(), - LifeCycleEvent.CLOSE); - - //reread the container - knownState = - HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerStore.get(dbKey)); - } - - HddsProtos.SCMContainerInfo newState = - reconcileState(contInfo, knownState, datanodeDetails); - - if (knownState.getDeleteTransactionId() > contInfo - .getDeleteTransactionId()) { - pendingDeleteStatusList - .addPendingDeleteStatus(contInfo.getDeleteTransactionId(), - knownState.getDeleteTransactionId(), - knownState.getContainerID()); - } - - // FIX ME: This can be optimized, we write twice to memory, where a - // single write would work well. - // - // We need to write this to DB again since the closed only write - // the updated State. - containerStore.put(dbKey, newState.toByteArray()); - - } else { - // Container not found in our container db. - LOG.error("Error while processing container report from datanode :" + - " {}, for container: {}, reason: container doesn't exist in" + - "container database.", datanodeDetails, - contInfo.getContainerID()); + containerStateManager.updateContainerReplica(id, replica); + ContainerInfo currentInfo = containerStateManager.getContainer(id); + if (newInfo.getState() == LifeCycleState.CLOSING + && currentInfo.getState() == LifeCycleState.CLOSED) { + currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE); } + + HddsProtos.SCMContainerInfo newState = + reconcileState(newInfo, currentInfo); + + if (currentInfo.getDeleteTransactionId() > + newInfo.getDeleteTransactionId()) { + pendingDeleteStatusList + .addPendingDeleteStatus(newInfo.getDeleteTransactionId(), + currentInfo.getDeleteTransactionId(), + currentInfo.getContainerID()); + } + containerStateManager.updateContainerInfo( + ContainerInfo.fromProtobuf(newState)); + containerStore.put(id.getBytes(), newState.toByteArray()); + } catch (ContainerNotFoundException e) { + LOG.error("Error while processing container report from datanode :" + + " {}, for container: {}, reason: container doesn't exist in" + + "container database.", datanodeDetails, id); } finally { lock.unlock(); } @@ -598,36 +564,21 @@ public class SCMContainerManager implements ContainerManager { * * @param datanodeState - State from the Datanode. * @param knownState - State inside SCM. - * @param dnDetails * @return new SCM State for this container. */ private HddsProtos.SCMContainerInfo reconcileState( StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState, - SCMContainerInfo knownState, DatanodeDetails dnDetails) { + ContainerInfo knownState) { HddsProtos.SCMContainerInfo.Builder builder = HddsProtos.SCMContainerInfo.newBuilder(); builder.setContainerID(knownState.getContainerID()) - .setPipelineID(knownState.getPipelineID()) + .setPipelineID(knownState.getPipelineID().getProtobuf()) + .setState(knownState.getState()) .setReplicationType(knownState.getReplicationType()) - .setReplicationFactor(knownState.getReplicationFactor()); - - // TODO: If current state doesn't have this DN in list of DataNodes with - // replica then add it in list of replicas. - - // If used size is greater than allocated size, we will be updating - // allocated size with used size. This update is done as a fallback - // mechanism in case SCM crashes without properly updating allocated - // size. Correct allocated value will be updated by - // ContainerStateManager during SCM shutdown. - long usedSize = datanodeState.getUsed(); - long allocated = knownState.getAllocatedBytes() > usedSize ? - knownState.getAllocatedBytes() : usedSize; - builder.setAllocatedBytes(allocated) - .setUsedBytes(usedSize) + .setReplicationFactor(knownState.getReplicationFactor()) + .setUsedBytes(datanodeState.getUsed()) .setNumberOfKeys(datanodeState.getKeyCount()) - .setState(knownState.getState()) .setStateEnterTime(knownState.getStateEnterTime()) - .setContainerID(knownState.getContainerID()) .setDeleteTransactionId(knownState.getDeleteTransactionId()); if (knownState.getOwner() != null) { builder.setOwner(knownState.getOwner()); @@ -635,20 +586,40 @@ public class SCMContainerManager implements ContainerManager { return builder.build(); } + /** + * Returns the latest list of DataNodes where replica for given containerId + * exist. Throws an SCMException if no entry is found for given containerId. + * + * @param containerID + * @return Set<DatanodeDetails> + */ + public Set<ContainerReplica> getContainerReplicas( + final ContainerID containerID) throws ContainerNotFoundException { + return containerStateManager.getContainerReplicas(containerID); + } /** - * In Container is in closed state, if it is in closed, Deleting or Deleted - * State. + * Add a container Replica for given DataNode. * - * @param info - ContainerInfo. - * @return true if is in open state, false otherwise + * @param containerID + * @param replica */ - private boolean shouldClose(ContainerInfo info) { - return info.getState() == HddsProtos.LifeCycleState.OPEN; + public void updateContainerReplica(final ContainerID containerID, + final ContainerReplica replica) throws ContainerNotFoundException { + containerStateManager.updateContainerReplica(containerID, replica); } - private boolean isClosed(ContainerInfo info) { - return info.getState() == HddsProtos.LifeCycleState.CLOSED; + /** + * Remove a container Replica for given DataNode. + * + * @param containerID + * @param replica + * @return True of dataNode is removed successfully else false. + */ + public void removeContainerReplica(final ContainerID containerID, + final ContainerReplica replica) + throws ContainerNotFoundException, ContainerReplicaNotFoundException { + containerStateManager.removeContainerReplica(containerID, replica); } /** @@ -671,7 +642,6 @@ public class SCMContainerManager implements ContainerManager { containerLeaseManager.shutdown(); } if (containerStateManager != null) { - flushContainerInfo(); containerStateManager.close(); } if (containerStore != null) { @@ -683,48 +653,6 @@ public class SCMContainerManager implements ContainerManager { } } - /** - * Since allocatedBytes of a container is only in memory, stored in - * containerStateManager, when closing SCMContainerManager, we need to update - * this in the container store. - * - * @throws IOException on failure. - */ - @VisibleForTesting - public void flushContainerInfo() throws IOException { - List<ContainerInfo> containers = containerStateManager.getAllContainers(); - List<Long> failedContainers = new ArrayList<>(); - for (ContainerInfo info : containers) { - // even if some container updated failed, others can still proceed - try { - byte[] dbKey = Longs.toByteArray(info.getContainerID()); - byte[] containerBytes = containerStore.get(dbKey); - // TODO : looks like when a container is deleted, the container is - // removed from containerStore but not containerStateManager, so it can - // return info of a deleted container. may revisit this in the future, - // for now, just skip a not-found container - if (containerBytes != null) { - containerStore.put(dbKey, info.getProtobuf().toByteArray()); - } else { - LOG.debug("Container state manager has container {} but not found " + - "in container store, a deleted container?", - info.getContainerID()); - } - } catch (IOException ioe) { - failedContainers.add(info.getContainerID()); - } - } - if (!failedContainers.isEmpty()) { - throw new IOException("Error in flushing container info from container " + - "state manager: " + failedContainers); - } - } - - @VisibleForTesting - public MetadataStore getContainerStore() { - return containerStore; - } - public PipelineSelector getPipelineSelector() { return pipelineSelector; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index ddecdbc..8c11e84 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -22,12 +22,14 @@ import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.ThreadFactory; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerStateManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.events.SCMEvents; @@ -64,14 +66,14 @@ public class ReplicationManager implements Runnable { private boolean running = true; - private ContainerStateManager containerStateManager; + private ContainerManager containerManager; public ReplicationManager(ContainerPlacementPolicy containerPlacement, - ContainerStateManager containerStateManager, EventQueue eventQueue, + ContainerManager containerManager, EventQueue eventQueue, LeaseManager<Long> commandWatcherLeaseManager) { this.containerPlacement = containerPlacement; - this.containerStateManager = containerStateManager; + this.containerManager = containerManager; this.eventPublisher = eventQueue; this.replicationCommandWatcher = @@ -106,7 +108,7 @@ public class ReplicationManager implements Runnable { ContainerID containerID = new ContainerID(request.getContainerId()); ContainerInfo containerInfo = - containerStateManager.getContainer(containerID); + containerManager.getContainer(containerID); Preconditions.checkNotNull(containerInfo, "No information about the container " + request.getContainerId()); @@ -116,10 +118,10 @@ public class ReplicationManager implements Runnable { "Container should be in closed state"); //check the current replication - List<DatanodeDetails> datanodesWithReplicas = + List<ContainerReplica> containerReplicas = new ArrayList<>(getCurrentReplicas(request)); - if (datanodesWithReplicas.size() == 0) { + if (containerReplicas.size() == 0) { LOG.warn( "Container {} should be replicated but can't find any existing " + "replicas", @@ -134,21 +136,23 @@ public class ReplicationManager implements Runnable { .size(); int deficit = - request.getExpecReplicationCount() - datanodesWithReplicas.size() + request.getExpecReplicationCount() - containerReplicas.size() - inFlightReplications; if (deficit > 0) { + List<DatanodeDetails> datanodes = containerReplicas.stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); List<DatanodeDetails> selectedDatanodes = containerPlacement - .chooseDatanodes(datanodesWithReplicas, deficit, + .chooseDatanodes(datanodes, deficit, containerInfo.getUsedBytes()); //send the command for (DatanodeDetails datanode : selectedDatanodes) { ReplicateContainerCommand replicateCommand = - new ReplicateContainerCommand(containerID.getId(), - datanodesWithReplicas); + new ReplicateContainerCommand(containerID.getId(), datanodes); eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, new CommandForDatanode<>( @@ -174,9 +178,9 @@ public class ReplicationManager implements Runnable { } @VisibleForTesting - protected Set<DatanodeDetails> getCurrentReplicas(ReplicationRequest request) + protected Set<ContainerReplica> getCurrentReplicas(ReplicationRequest request) throws IOException { - return containerStateManager + return containerManager .getContainerReplicas(new ContainerID(request.getContainerId())); } @@ -234,7 +238,11 @@ public class ReplicationManager implements Runnable { } } - public static class ReplicationCompleted implements IdentifiableEventPayload { + /** + * Add javadoc. + */ + public static class ReplicationCompleted + implements IdentifiableEventPayload { private final long uuid; http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 880a715..b8052a4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -20,19 +20,21 @@ package org.apache.hadoop.hdds.scm.container.states; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import java.util.HashSet; -import java.util.Set; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; + import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -46,8 +48,6 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes .CONTAINER_EXISTS; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes .FAILED_TO_CHANGE_CONTAINER_STATE; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_FIND_CONTAINER; /** * Container State Map acts like a unified map for various attributes that are @@ -84,16 +84,15 @@ public class ContainerStateMap { private static final Logger LOG = LoggerFactory.getLogger(ContainerStateMap.class); + private final static NavigableSet<ContainerID> EMPTY_SET = + Collections.unmodifiableNavigableSet(new TreeSet<>()); + private final ContainerAttribute<LifeCycleState> lifeCycleStateMap; private final ContainerAttribute<String> ownerMap; private final ContainerAttribute<ReplicationFactor> factorMap; private final ContainerAttribute<ReplicationType> typeMap; - private final Map<ContainerID, ContainerInfo> containerMap; - // Map to hold replicas of given container. - private final Map<ContainerID, Set<DatanodeDetails>> contReplicaMap; - private final static NavigableSet<ContainerID> EMPTY_SET = - Collections.unmodifiableNavigableSet(new TreeSet<>()); + private final Map<ContainerID, Set<ContainerReplica>> replicaMap; private final Map<ContainerQueryKey, NavigableSet<ContainerID>> resultCache; // Container State Map lock should be held before calling into @@ -105,18 +104,14 @@ public class ContainerStateMap { * Create a ContainerStateMap. */ public ContainerStateMap() { - lifeCycleStateMap = new ContainerAttribute<>(); - ownerMap = new ContainerAttribute<>(); - factorMap = new ContainerAttribute<>(); - typeMap = new ContainerAttribute<>(); - containerMap = new HashMap<>(); - lock = new ReentrantReadWriteLock(); - contReplicaMap = new HashMap<>(); -// new InstrumentedLock(getClass().getName(), LOG, -// new ReentrantLock(), -// 1000, -// 300)); - resultCache = new ConcurrentHashMap<>(); + this.lifeCycleStateMap = new ContainerAttribute<>(); + this.ownerMap = new ContainerAttribute<>(); + this.factorMap = new ContainerAttribute<>(); + this.typeMap = new ContainerAttribute<>(); + this.containerMap = new HashMap<>(); + this.lock = new ReentrantReadWriteLock(); + this.replicaMap = new HashMap<>(); + this.resultCache = new ConcurrentHashMap<>(); } /** @@ -125,7 +120,7 @@ public class ContainerStateMap { * @param info - container info * @throws SCMException - throws if create failed. */ - public void addContainer(ContainerInfo info) + public void addContainer(final ContainerInfo info) throws SCMException { Preconditions.checkNotNull(info, "Container Info cannot be null"); Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0, @@ -133,7 +128,7 @@ public class ContainerStateMap { lock.writeLock().lock(); try { - ContainerID id = ContainerID.valueof(info.getContainerID()); + final ContainerID id = info.containerID(); if (containerMap.putIfAbsent(id, info) != null) { LOG.debug("Duplicate container ID detected. {}", id); throw new @@ -145,6 +140,7 @@ public class ContainerStateMap { ownerMap.insert(info.getOwner(), id); factorMap.insert(info.getReplicationFactor(), id); typeMap.insert(info.getReplicationType(), id); + replicaMap.put(id, new HashSet<>()); // Flush the cache of this container type, will be added later when // get container queries are executed. @@ -156,23 +152,30 @@ public class ContainerStateMap { } /** - * Returns the latest state of Container from SCM's Container State Map. + * Removes a Container Entry from ContainerStateMap. * - * @param info - ContainerInfo - * @return ContainerInfo - */ - public ContainerInfo getContainerInfo(ContainerInfo info) { - return getContainerInfo(info.getContainerID()); - } - - /** - * Returns the latest state of Container from SCM's Container State Map. - * - * @param containerID - int - * @return container info, if found. + * @param containerID - ContainerID + * @throws SCMException - throws if create failed. */ - public ContainerInfo getContainerInfo(long containerID) { - return getContainerInfo(ContainerID.valueof(containerID)); + public void removeContainer(final ContainerID containerID) + throws ContainerNotFoundException { + Preconditions.checkNotNull(containerID, "ContainerID cannot be null"); + lock.writeLock().lock(); + try { + checkIfContainerExist(containerID); + // Should we revert back to the original state if any of the below + // remove operation fails? + final ContainerInfo info = containerMap.remove(containerID); + lifeCycleStateMap.remove(info.getState(), containerID); + ownerMap.remove(info.getOwner(), containerID); + factorMap.remove(info.getReplicationFactor(), containerID); + typeMap.remove(info.getReplicationType(), containerID); + // Flush the cache of this container type. + flushCache(info); + LOG.trace("Removed container with {} successfully.", containerID); + } finally { + lock.writeLock().unlock(); + } } /** @@ -181,9 +184,11 @@ public class ContainerStateMap { * @param containerID - ContainerID * @return container info, if found. */ - public ContainerInfo getContainerInfo(ContainerID containerID) { + public ContainerInfo getContainerInfo(final ContainerID containerID) + throws ContainerNotFoundException { lock.readLock().lock(); try { + checkIfContainerExist(containerID); return containerMap.get(containerID); } finally { lock.readLock().unlock(); @@ -197,21 +202,17 @@ public class ContainerStateMap { * @param containerID * @return Set<DatanodeDetails> */ - public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID) - throws SCMException { + public Set<ContainerReplica> getContainerReplicas( + final ContainerID containerID) throws ContainerNotFoundException { Preconditions.checkNotNull(containerID); lock.readLock().lock(); try { - if (contReplicaMap.containsKey(containerID)) { - return Collections - .unmodifiableSet(contReplicaMap.get(containerID)); - } + checkIfContainerExist(containerID); + return Collections + .unmodifiableSet(new HashSet<>(replicaMap.get(containerID))); } finally { lock.readLock().unlock(); } - throw new SCMException( - "No entry exist for containerId: " + containerID + " in replica map.", - ResultCodes.NO_REPLICA_FOUND); } /** @@ -220,26 +221,17 @@ public class ContainerStateMap { * ContainerId. * * @param containerID - * @param dnList + * @param replica */ - public void addContainerReplica(ContainerID containerID, - DatanodeDetails... dnList) { + public void updateContainerReplica(final ContainerID containerID, + final ContainerReplica replica) throws ContainerNotFoundException { Preconditions.checkNotNull(containerID); lock.writeLock().lock(); try { - for (DatanodeDetails dn : dnList) { - Preconditions.checkNotNull(dn); - if (contReplicaMap.containsKey(containerID)) { - if(!contReplicaMap.get(containerID).add(dn)) { - LOG.debug("ReplicaMap already contains entry for container Id: " - + "{},DataNode: {}", containerID, dn); - } - } else { - Set<DatanodeDetails> dnSet = new HashSet<>(); - dnSet.add(dn); - contReplicaMap.put(containerID, dnSet); - } - } + checkIfContainerExist(containerID); + Set<ContainerReplica> replicas = replicaMap.get(containerID); + replicas.remove(replica); + replicas.add(replica); } finally { lock.writeLock().unlock(); } @@ -249,61 +241,45 @@ public class ContainerStateMap { * Remove a container Replica for given DataNode. * * @param containerID - * @param dn + * @param replica * @return True of dataNode is removed successfully else false. */ - public boolean removeContainerReplica(ContainerID containerID, - DatanodeDetails dn) throws SCMException { + public void removeContainerReplica(final ContainerID containerID, + final ContainerReplica replica) + throws ContainerNotFoundException, ContainerReplicaNotFoundException { Preconditions.checkNotNull(containerID); - Preconditions.checkNotNull(dn); + Preconditions.checkNotNull(replica); lock.writeLock().lock(); try { - if (contReplicaMap.containsKey(containerID)) { - return contReplicaMap.get(containerID).remove(dn); + checkIfContainerExist(containerID); + if(!replicaMap.get(containerID).remove(replica)) { + throw new ContainerReplicaNotFoundException( + "Container #" + + containerID.getId() + ", replica: " + replica); } } finally { lock.writeLock().unlock(); } - throw new SCMException( - "No entry exist for containerId: " + containerID + " in replica map.", - ResultCodes.FAILED_TO_FIND_CONTAINER); } @VisibleForTesting + // TODO: fix the test case and remove this method! public static Logger getLOG() { return LOG; } /** - * Returns the full container Map. - * - * @return - Map - */ - public Map<ContainerID, ContainerInfo> getContainerMap() { - lock.readLock().lock(); - try { - return Collections.unmodifiableMap(containerMap); - } finally { - lock.readLock().unlock(); - } - } - - /** * Just update the container State. * @param info ContainerInfo. */ - public void updateContainerInfo(ContainerInfo info) throws SCMException { - Preconditions.checkNotNull(info); - ContainerInfo currentInfo = null; + public void updateContainerInfo(final ContainerInfo info) + throws ContainerNotFoundException { lock.writeLock().lock(); try { - currentInfo = containerMap.get( - ContainerID.valueof(info.getContainerID())); - - if (currentInfo == null) { - throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER); - } + Preconditions.checkNotNull(info); + checkIfContainerExist(info.containerID()); + final ContainerInfo currentInfo = containerMap.get(info.containerID()); flushCache(info, currentInfo); containerMap.put(info.containerID(), info); } finally { @@ -314,33 +290,23 @@ public class ContainerStateMap { /** * Update the State of a container. * - * @param info - ContainerInfo + * @param containerID - ContainerID * @param currentState - CurrentState * @param newState - NewState. * @throws SCMException - in case of failure. */ - public void updateState(ContainerInfo info, LifeCycleState currentState, - LifeCycleState newState) throws SCMException { + public void updateState(ContainerID containerID, LifeCycleState currentState, + LifeCycleState newState) throws SCMException, ContainerNotFoundException { Preconditions.checkNotNull(currentState); Preconditions.checkNotNull(newState); - - ContainerID id = new ContainerID(info.getContainerID()); - ContainerInfo currentInfo = null; - lock.writeLock().lock(); try { + checkIfContainerExist(containerID); + final ContainerInfo currentInfo = containerMap.get(containerID); try { - // Just flush both old and new data sets from the result cache. - ContainerInfo newInfo = new ContainerInfo(info); + final ContainerInfo newInfo = new ContainerInfo(currentInfo); newInfo.setState(newState); - flushCache(newInfo, info); - - currentInfo = containerMap.get(id); - if (currentInfo == null) { - throw new - SCMException("No such container.", FAILED_TO_FIND_CONTAINER); - } // We are updating two places before this update is done, these can // fail independently, since the code needs to handle it. @@ -351,11 +317,13 @@ public class ContainerStateMap { // roll back the earlier change we did. If the rollback fails, we can // be in an inconsistent state, - info.setState(newState); - containerMap.put(id, info); - lifeCycleStateMap.update(currentState, newState, id); + containerMap.put(containerID, newInfo); + lifeCycleStateMap.update(currentState, newState, containerID); LOG.trace("Updated the container {} to new state. Old = {}, new = " + - "{}", id, currentState, newState); + "{}", containerID, currentState, newState); + + // Just flush both old and new data sets from the result cache. + flushCache(currentInfo, newInfo); } catch (SCMException ex) { LOG.error("Unable to update the container state. {}", ex); // we need to revert the change in this attribute since we are not @@ -364,13 +332,13 @@ public class ContainerStateMap { "old state. Old = {}, Attempted state = {}", currentState, newState); - containerMap.put(id, currentInfo); + containerMap.put(containerID, currentInfo); // if this line throws, the state map can be in an inconsistent // state, since we will have modified the attribute by the // container state will not in sync since we were not able to put // that into the hash table. - lifeCycleStateMap.update(newState, currentState, id); + lifeCycleStateMap.update(newState, currentState, containerID); throw new SCMException("Updating the container map failed.", ex, FAILED_TO_CHANGE_CONTAINER_STATE); @@ -380,13 +348,17 @@ public class ContainerStateMap { } } + public Set<ContainerID> getAllContainerIDs() { + return containerMap.keySet(); + } + /** * Returns A list of containers owned by a name service. * * @param ownerName - Name of the NameService. * @return - NavigableSet of ContainerIDs. */ - NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) { + NavigableSet<ContainerID> getContainerIDsByOwner(final String ownerName) { Preconditions.checkNotNull(ownerName); lock.readLock().lock(); try { @@ -402,7 +374,7 @@ public class ContainerStateMap { * @param type - Replication type -- StandAlone, Ratis etc. * @return NavigableSet */ - NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) { + NavigableSet<ContainerID> getContainerIDsByType(final ReplicationType type) { Preconditions.checkNotNull(type); lock.readLock().lock(); try { @@ -418,7 +390,8 @@ public class ContainerStateMap { * @param factor - Replication Factor. * @return NavigableSet. */ - NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) { + NavigableSet<ContainerID> getContainerIDsByFactor( + final ReplicationFactor factor) { Preconditions.checkNotNull(factor); lock.readLock().lock(); try { @@ -435,7 +408,7 @@ public class ContainerStateMap { * @return List of containers by state. */ public NavigableSet<ContainerID> getContainerIDsByState( - LifeCycleState state) { + final LifeCycleState state) { Preconditions.checkNotNull(state); lock.readLock().lock(); try { @@ -455,8 +428,8 @@ public class ContainerStateMap { * @return ContainerInfo or Null if not container satisfies the criteria. */ public NavigableSet<ContainerID> getMatchingContainerIDs( - LifeCycleState state, String owner, - ReplicationFactor factor, ReplicationType type) { + final LifeCycleState state, final String owner, + final ReplicationFactor factor, final ReplicationType type) { Preconditions.checkNotNull(state, "State cannot be null"); Preconditions.checkNotNull(owner, "Owner cannot be null"); @@ -465,7 +438,7 @@ public class ContainerStateMap { lock.readLock().lock(); try { - ContainerQueryKey queryKey = + final ContainerQueryKey queryKey = new ContainerQueryKey(state, owner, factor, type); if(resultCache.containsKey(queryKey)){ return resultCache.get(queryKey); @@ -474,30 +447,33 @@ public class ContainerStateMap { // If we cannot meet any one condition we return EMPTY_SET immediately. // Since when we intersect these sets, the result will be empty if any // one is empty. - NavigableSet<ContainerID> stateSet = + final NavigableSet<ContainerID> stateSet = lifeCycleStateMap.getCollection(state); if (stateSet.size() == 0) { return EMPTY_SET; } - NavigableSet<ContainerID> ownerSet = ownerMap.getCollection(owner); + final NavigableSet<ContainerID> ownerSet = + ownerMap.getCollection(owner); if (ownerSet.size() == 0) { return EMPTY_SET; } - NavigableSet<ContainerID> factorSet = factorMap.getCollection(factor); + final NavigableSet<ContainerID> factorSet = + factorMap.getCollection(factor); if (factorSet.size() == 0) { return EMPTY_SET; } - NavigableSet<ContainerID> typeSet = typeMap.getCollection(type); + final NavigableSet<ContainerID> typeSet = + typeMap.getCollection(type); if (typeSet.size() == 0) { return EMPTY_SET; } // if we add more constraints we will just add those sets here.. - NavigableSet<ContainerID>[] sets = sortBySize(stateSet, + final NavigableSet<ContainerID>[] sets = sortBySize(stateSet, ownerSet, factorSet, typeSet); NavigableSet<ContainerID> currentSet = sets[0]; @@ -521,12 +497,12 @@ public class ContainerStateMap { * @return resultSet which is the intersection of these two sets. */ private NavigableSet<ContainerID> intersectSets( - NavigableSet<ContainerID> smaller, - NavigableSet<ContainerID> bigger) { + final NavigableSet<ContainerID> smaller, + final NavigableSet<ContainerID> bigger) { Preconditions.checkState(smaller.size() <= bigger.size(), "This function assumes the first set is lesser or equal to second " + "set"); - NavigableSet<ContainerID> resultSet = new TreeSet<>(); + final NavigableSet<ContainerID> resultSet = new TreeSet<>(); for (ContainerID id : smaller) { if (bigger.contains(id)) { resultSet.add(id); @@ -544,11 +520,11 @@ public class ContainerStateMap { */ @SuppressWarnings("unchecked") private NavigableSet<ContainerID>[] sortBySize( - NavigableSet<ContainerID>... sets) { + final NavigableSet<ContainerID>... sets) { for (int x = 0; x < sets.length - 1; x++) { for (int y = 0; y < sets.length - x - 1; y++) { if (sets[y].size() > sets[y + 1].size()) { - NavigableSet temp = sets[y]; + final NavigableSet temp = sets[y]; sets[y] = sets[y + 1]; sets[y + 1] = temp; } @@ -557,13 +533,22 @@ public class ContainerStateMap { return sets; } - private void flushCache(ContainerInfo... containerInfos) { + private void flushCache(final ContainerInfo... containerInfos) { for (ContainerInfo containerInfo : containerInfos) { - ContainerQueryKey key = new ContainerQueryKey(containerInfo.getState(), - containerInfo.getOwner(), containerInfo.getReplicationFactor(), + final ContainerQueryKey key = new ContainerQueryKey( + containerInfo.getState(), + containerInfo.getOwner(), + containerInfo.getReplicationFactor(), containerInfo.getReplicationType()); resultCache.remove(key); } } + private void checkIfContainerExist(ContainerID containerID) + throws ContainerNotFoundException { + if (!containerMap.containsKey(containerID)) { + throw new ContainerNotFoundException("#" + containerID.getId()); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 17edf9e..1030428 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -21,11 +21,14 @@ package org.apache.hadoop.hdds.scm.node; import java.util.Set; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.ContainerException; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -37,7 +40,7 @@ import org.slf4j.LoggerFactory; */ public class DeadNodeHandler implements EventHandler<DatanodeDetails> { - private final ContainerStateManager containerStateManager; + private final ContainerManager containerManager; private final NodeManager nodeManager; @@ -45,8 +48,8 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> { LoggerFactory.getLogger(DeadNodeHandler.class); public DeadNodeHandler(NodeManager nodeManager, - ContainerStateManager containerStateManager) { - this.containerStateManager = containerStateManager; + ContainerManager containerManager) { + this.containerManager = containerManager; this.nodeManager = nodeManager; } @@ -55,45 +58,58 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> { EventPublisher publisher) { nodeManager.processDeadNode(datanodeDetails.getUuid()); - Set<ContainerID> containers = + // TODO: check if there are any pipeline on this node and fire close + // pipeline event + Set<ContainerID> ids = nodeManager.getContainers(datanodeDetails.getUuid()); - if (containers == null) { + if (ids == null) { LOG.info("There's no containers in dead datanode {}, no replica will be" + " removed from the in-memory state.", datanodeDetails.getUuid()); return; } - LOG.info( - "Datanode {} is dead. Removing replications from the in-memory state.", - datanodeDetails.getUuid()); - for (ContainerID container : containers) { + LOG.info("Datanode {} is dead. Removing replications from the in-memory" + + " state.", datanodeDetails.getUuid()); + for (ContainerID id : ids) { try { - try { - containerStateManager.removeContainerReplica(container, - datanodeDetails); - } catch (SCMException ex) { - LOG.info("DataNode {} doesn't have replica for container {}.", - datanodeDetails.getUuid(), container.getId()); - } - - if (!containerStateManager.isOpen(container)) { - ReplicationRequest replicationRequest = - containerStateManager.checkReplicationState(container); - - if (replicationRequest != null) { - publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, - replicationRequest); + final ContainerInfo container = containerManager.getContainer(id); + if (!container.isOpen()) { + final ContainerReplica replica = ContainerReplica.newBuilder() + .setContainerID(id) + .setDatanodeDetails(datanodeDetails) + .build(); + try { + containerManager.removeContainerReplica(id, replica); + replicateIfNeeded(container, publisher); + } catch (ContainerException ex) { + LOG.warn("Exception while removing container replica #{} for " + + "container #{}.", replica, container, ex); } } - } catch (SCMException e) { - LOG.error("Can't remove container from containerStateMap {}", container - .getId(), e); + } catch (ContainerNotFoundException cnfe) { + LOG.warn("Container Not found!", cnfe); } } } /** + * Compare the existing replication number with the expected one. + */ + private void replicateIfNeeded(ContainerInfo container, + EventPublisher publisher) throws ContainerNotFoundException { + final int existingReplicas = containerManager + .getContainerReplicas(container.containerID()).size(); + final int expectedReplicas = container.getReplicationFactor().getNumber(); + if (existingReplicas != expectedReplicas) { + publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest( + container.getContainerID(), existingReplicas, expectedReplicas)); + } + } + + /** * Returns logger. * */ + // TODO: remove this. public static Logger getLogger() { return LOG; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java index 44744f0..7135267 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java @@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer .NodeRegistrationContainerReport; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org