http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
new file mode 100644
index 0000000..0f4988a
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
+
+/**
+ * A implement class of {@link DeletedBlockLog}, and it uses
+ * K/V db to maintain block deletion transactions between scm and datanode.
+ * This is a very basic implementation, it simply scans the log and
+ * memorize the position that scanned by last time, and uses this to
+ * determine where the next scan starts. It has no notion about weight
+ * of each transaction so as long as transaction is still valid, they get
+ * equally same chance to be retrieved which only depends on the nature
+ * order of the transaction ID.
+ */
+public class DeletedBlockLogImpl implements DeletedBlockLog {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DeletedBlockLogImpl.class);
+
+  private static final byte[] LATEST_TXID =
+      DFSUtil.string2Bytes("#LATEST_TXID#");
+
+  private final int maxRetry;
+  private final MetadataStore deletedStore;
+  private final Lock lock;
+  // The latest id of deleted blocks in the db.
+  private long lastTxID;
+  private long lastReadTxID;
+
+  public DeletedBlockLogImpl(Configuration conf) throws IOException {
+    maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
+        OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
+
+    File metaDir = getOzoneMetaDirPath(conf);
+    String scmMetaDataDir = metaDir.getPath();
+    File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
+    int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+    // Load store of all transactions.
+    deletedStore = MetadataStoreBuilder.newBuilder()
+        .setCreateIfMissing(true)
+        .setConf(conf)
+        .setDbFile(deletedLogDbPath)
+        .setCacheSize(cacheSize * OzoneConsts.MB)
+        .build();
+
+    this.lock = new ReentrantLock();
+    // start from the head of deleted store.
+    lastReadTxID = 0;
+    lastTxID = findLatestTxIDInStore();
+  }
+
+  @VisibleForTesting
+  MetadataStore getDeletedStore() {
+    return deletedStore;
+  }
+
+  /**
+   * There is no need to lock before reading because
+   * it's only used in construct method.
+   *
+   * @return latest txid.
+   * @throws IOException
+   */
+  private long findLatestTxIDInStore() throws IOException {
+    long txid = 0;
+    byte[] value = deletedStore.get(LATEST_TXID);
+    if (value != null) {
+      txid = Longs.fromByteArray(value);
+    }
+    return txid;
+  }
+
+  @Override
+  public List<DeletedBlocksTransaction> getTransactions(
+      int count) throws IOException {
+    List<DeletedBlocksTransaction> result = new ArrayList<>();
+    MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey)
+        -> Longs.fromByteArray(currentKey) > lastReadTxID;
+    MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey)
+        -> !Arrays.equals(LATEST_TXID, currentKey);
+    lock.lock();
+    try {
+      deletedStore.iterate(null, (key, value) -> {
+        if (getNextTxID.filterKey(null, key, null) &&
+            avoidInvalidTxid.filterKey(null, key, null)) {
+          DeletedBlocksTransaction block = DeletedBlocksTransaction
+              .parseFrom(value);
+          if (block.getCount() > -1 && block.getCount() <= maxRetry) {
+            result.add(block);
+          }
+        }
+        return result.size() < count;
+      });
+      // Scan the metadata from the beginning.
+      if (result.size() < count || result.size() < 1) {
+        lastReadTxID = 0;
+      } else {
+        lastReadTxID = result.get(result.size() - 1).getTxID();
+      }
+    } finally {
+      lock.unlock();
+    }
+    return result;
+  }
+
+  @Override
+  public List<DeletedBlocksTransaction> getFailedTransactions()
+      throws IOException {
+    lock.lock();
+    try {
+      final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
+      deletedStore.iterate(null, (key, value) -> {
+        if (!Arrays.equals(LATEST_TXID, key)) {
+          DeletedBlocksTransaction delTX =
+              DeletedBlocksTransaction.parseFrom(value);
+          if (delTX.getCount() == -1) {
+            failedTXs.add(delTX);
+          }
+        }
+        return true;
+      });
+      return failedTXs;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param txIDs - transaction ID.
+   * @throws IOException
+   */
+  @Override
+  public void incrementCount(List<Long> txIDs) throws IOException {
+    BatchOperation batch = new BatchOperation();
+    lock.lock();
+    try {
+      for(Long txID : txIDs) {
+        try {
+          DeletedBlocksTransaction block = DeletedBlocksTransaction
+              .parseFrom(deletedStore.get(Longs.toByteArray(txID)));
+          DeletedBlocksTransaction.Builder builder = block.toBuilder();
+          int currentCount = block.getCount();
+          if (currentCount > -1) {
+            builder.setCount(++currentCount);
+          }
+          // if the retry time exceeds the maxRetry value
+          // then set the retry value to -1, stop retrying, admins can
+          // analyze those blocks and purge them manually by SCMCli.
+          if (currentCount > maxRetry) {
+            builder.setCount(-1);
+          }
+          deletedStore.put(Longs.toByteArray(txID),
+              builder.build().toByteArray());
+        } catch (IOException ex) {
+          LOG.warn("Cannot increase count for txID " + txID, ex);
+        }
+      }
+      deletedStore.writeBatch(batch);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private DeletedBlocksTransaction constructNewTransaction(long txID,
+      String containerName, List<String> blocks) {
+    return DeletedBlocksTransaction.newBuilder()
+        .setTxID(txID)
+        .setContainerName(containerName)
+        .addAllBlockID(blocks)
+        .setCount(0)
+        .build();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param txIDs - transaction IDs.
+   * @throws IOException
+   */
+  @Override
+  public void commitTransactions(List<Long> txIDs) throws IOException {
+    lock.lock();
+    try {
+      for (Long txID : txIDs) {
+        try {
+          deletedStore.delete(Longs.toByteArray(txID));
+        } catch (IOException ex) {
+          LOG.warn("Cannot commit txID " + txID, ex);
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param containerName - container name.
+   * @param blocks - blocks that belong to the same container.
+   * @throws IOException
+   */
+  @Override
+  public void addTransaction(String containerName, List<String> blocks)
+      throws IOException {
+    BatchOperation batch = new BatchOperation();
+    lock.lock();
+    try {
+      DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1,
+          containerName, blocks);
+      byte[] key = Longs.toByteArray(lastTxID + 1);
+
+      batch.put(key, tx.toByteArray());
+      batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1));
+
+      deletedStore.writeBatch(batch);
+      lastTxID += 1;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public int getNumOfValidTransactions() throws IOException {
+    lock.lock();
+    try {
+      final AtomicInteger num = new AtomicInteger(0);
+      deletedStore.iterate(null, (key, value) -> {
+        // Exclude latest txid record
+        if (!Arrays.equals(LATEST_TXID, key)) {
+          DeletedBlocksTransaction delTX =
+              DeletedBlocksTransaction.parseFrom(value);
+          if (delTX.getCount() > -1) {
+            num.incrementAndGet();
+          }
+        }
+        return true;
+      });
+      return num.get();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param containerBlocksMap a map of containerBlocks.
+   * @throws IOException
+   */
+  @Override
+  public void addTransactions(Map<String, List<String>> containerBlocksMap)
+      throws IOException {
+    BatchOperation batch = new BatchOperation();
+    lock.lock();
+    try {
+      long currentLatestID = lastTxID;
+      for (Map.Entry<String, List<String>> entry :
+          containerBlocksMap.entrySet()) {
+        currentLatestID += 1;
+        byte[] key = Longs.toByteArray(currentLatestID);
+        DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
+            entry.getKey(), entry.getValue());
+        batch.put(key, tx.toByteArray());
+      }
+      lastTxID = currentLatestID;
+      batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
+      deletedStore.writeBatch(batch);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (deletedStore != null) {
+      deletedStore.close();
+    }
+  }
+
+  @Override
+  public void getTransactions(DatanodeDeletedBlockTransactions transactions)
+      throws IOException {
+    lock.lock();
+    try {
+      deletedStore.iterate(null, (key, value) -> {
+        if (!Arrays.equals(LATEST_TXID, key)) {
+          DeletedBlocksTransaction block = DeletedBlocksTransaction
+              .parseFrom(value);
+
+          if (block.getCount() > -1 && block.getCount() <= maxRetry) {
+            transactions.addTransaction(block);
+          }
+          return !transactions.isFull();
+        }
+        return true;
+      });
+    } finally {
+      lock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
new file mode 100644
index 0000000..2c555e0
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
+
+/**
+ * A background service running in SCM to delete blocks. This service scans
+ * block deletion log in certain interval and caches block deletion commands
+ * in {@link org.apache.hadoop.hdds.scm.node.CommandQueue}, asynchronously
+ * SCM HB thread polls cached commands and sends them to datanode for physical
+ * processing.
+ */
+public class SCMBlockDeletingService extends BackgroundService {
+
+  static final Logger LOG =
+      LoggerFactory.getLogger(SCMBlockDeletingService.class);
+
+  // ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
+  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2;
+  private final DeletedBlockLog deletedBlockLog;
+  private final Mapping mappingService;
+  private final NodeManager nodeManager;
+
+  // Block delete limit size is dynamically calculated based on container
+  // delete limit size (ozone.block.deleting.container.limit.per.interval)
+  // that configured for datanode. To ensure DN not wait for
+  // delete commands, we use this value multiply by a factor 2 as the final
+  // limit TX size for each node.
+  // Currently we implement a throttle algorithm that throttling delete blocks
+  // for each datanode. Each node is limited by the calculation size. Firstly
+  // current node info is fetched from nodemanager, then scan entire delLog
+  // from the beginning to end. If one node reaches maximum value, its records
+  // will be skipped. If not, keep scanning until it reaches maximum value.
+  // Once all node are full, the scan behavior will stop.
+  private int blockDeleteLimitSize;
+
+  public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
+      Mapping mapper, NodeManager nodeManager,
+      long  interval, long serviceTimeout, Configuration conf) {
+    super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
+        BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
+    this.deletedBlockLog = deletedBlockLog;
+    this.mappingService = mapper;
+    this.nodeManager = nodeManager;
+
+    int containerLimit = conf.getInt(
+        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
+        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
+    Preconditions.checkArgument(containerLimit > 0,
+        "Container limit size should be " + "positive.");
+    // Use container limit value multiply by a factor 2 to ensure DN
+    // not wait for orders.
+    this.blockDeleteLimitSize = containerLimit * 2;
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new DeletedBlockTransactionScanner());
+    return queue;
+  }
+
+  private class DeletedBlockTransactionScanner
+      implements BackgroundTask<EmptyTaskResult> {
+
+    @Override
+    public int getPriority() {
+      return 1;
+    }
+
+    @Override
+    public EmptyTaskResult call() throws Exception {
+      int dnTxCount = 0;
+      long startTime = Time.monotonicNow();
+      // Scan SCM DB in HB interval and collect a throttled list of
+      // to delete blocks.
+      LOG.debug("Running DeletedBlockTransactionScanner");
+      DatanodeDeletedBlockTransactions transactions = null;
+      List<DatanodeDetails> datanodes = 
nodeManager.getNodes(NodeState.HEALTHY);
+      if (datanodes != null) {
+        transactions = new DatanodeDeletedBlockTransactions(mappingService,
+            blockDeleteLimitSize, datanodes.size());
+        try {
+          deletedBlockLog.getTransactions(transactions);
+        } catch (IOException e) {
+          // We may tolerant a number of failures for sometime
+          // but if it continues to fail, at some point we need to raise
+          // an exception and probably fail the SCM ? At present, it simply
+          // continues to retry the scanning.
+          LOG.error("Failed to get block deletion transactions from delTX log",
+              e);
+        }
+        LOG.debug("Scanned deleted blocks log and got {} delTX to process.",
+            transactions.getTXNum());
+      }
+
+      if (transactions != null && !transactions.isEmpty()) {
+        for (UUID dnId : transactions.getDatanodeIDs()) {
+          List<DeletedBlocksTransaction> dnTXs = transactions
+              .getDatanodeTransactions(dnId);
+          if (dnTXs != null && !dnTXs.isEmpty()) {
+            dnTxCount += dnTXs.size();
+            // TODO commandQueue needs a cap.
+            // We should stop caching new commands if num of un-processed
+            // command is bigger than a limit, e.g 50. In case datanode goes
+            // offline for sometime, the cached commands be flooded.
+            nodeManager.addDatanodeCommand(dnId,
+                new DeleteBlocksCommand(dnTXs));
+            LOG.debug(
+                "Added delete block command for datanode {} in the queue,"
+                    + " number of delete block transactions: {}, TxID list: 
{}",
+                dnId, dnTXs.size(), String.join(",",
+                    transactions.getTransactionIDList(dnId)));
+          }
+        }
+      }
+
+      if (dnTxCount > 0) {
+        LOG.info(
+            "Totally added {} delete blocks command for"
+                + " {} datanodes, task elapsed time: {}ms",
+            dnTxCount, transactions.getDatanodeIDs().size(),
+            Time.monotonicNow() - startTime);
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  @VisibleForTesting
+  public void setBlockDeleteTXNum(int numTXs) {
+    blockDeleteLimitSize = numTXs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java
new file mode 100644
index 0000000..e1bfdff
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+/**
+ * This package contains routines to manage the block location and
+ * mapping inside SCM
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
new file mode 100644
index 0000000..63cb3a3
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.lease.Lease;
+import org.apache.hadoop.ozone.lease.LeaseException;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_SIZE_GB;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
+
+/**
+ * Mapping class contains the mapping from a name to a pipeline mapping. This
+ * is used by SCM when
+ * allocating new locations and when looking up a key.
+ */
+public class ContainerMapping implements Mapping {
+  private static final Logger LOG = LoggerFactory.getLogger(ContainerMapping
+      .class);
+
+  private final NodeManager nodeManager;
+  private final long cacheSize;
+  private final Lock lock;
+  private final Charset encoding = Charset.forName("UTF-8");
+  private final MetadataStore containerStore;
+  private final PipelineSelector pipelineSelector;
+  private final ContainerStateManager containerStateManager;
+  private final LeaseManager<ContainerInfo> containerLeaseManager;
+  private final ContainerSupervisor containerSupervisor;
+  private final float containerCloseThreshold;
+  private final ContainerCloser closer;
+  private final long size;
+
+  /**
+   * Constructs a mapping class that creates mapping between container names
+   * and pipelines.
+   *
+   * @param nodeManager - NodeManager so that we can get the nodes that are
+   * healthy to place new
+   * containers.
+   * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
+   * its nodes. This is
+   * passed to LevelDB and this memory is allocated in Native code space.
+   * CacheSize is specified
+   * in MB.
+   * @throws IOException on Failure.
+   */
+  @SuppressWarnings("unchecked")
+  public ContainerMapping(
+      final Configuration conf, final NodeManager nodeManager, final int
+      cacheSizeMB) throws IOException {
+    this.nodeManager = nodeManager;
+    this.cacheSize = cacheSizeMB;
+    this.closer = new ContainerCloser(nodeManager, conf);
+
+    File metaDir = getOzoneMetaDirPath(conf);
+
+    // Write the container name to pipeline mapping.
+    File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
+    containerStore =
+        MetadataStoreBuilder.newBuilder()
+            .setConf(conf)
+            .setDbFile(containerDBPath)
+            .setCacheSize(this.cacheSize * OzoneConsts.MB)
+            .build();
+
+    this.lock = new ReentrantLock();
+
+    this.pipelineSelector = new PipelineSelector(nodeManager, conf);
+
+    // To be replaced with code getStorageSize once it is committed.
+    size = conf.getLong(OZONE_SCM_CONTAINER_SIZE_GB,
+        OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024;
+    this.containerStateManager =
+        new ContainerStateManager(conf, this);
+    this.containerSupervisor =
+        new ContainerSupervisor(conf, nodeManager,
+            nodeManager.getNodePoolManager());
+    this.containerCloseThreshold = conf.getFloat(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
+    LOG.trace("Container State Manager created.");
+
+    long containerCreationLeaseTimeout = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    LOG.trace("Starting Container Lease Manager.");
+    containerLeaseManager = new LeaseManager<>(containerCreationLeaseTimeout);
+    containerLeaseManager.start();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ContainerInfo getContainer(final String containerName) throws
+      IOException {
+    ContainerInfo containerInfo;
+    lock.lock();
+    try {
+      byte[] containerBytes = containerStore.get(containerName.getBytes(
+          encoding));
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Specified key does not exist. key : " + containerName,
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+
+      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
+          .parseFrom(containerBytes);
+      containerInfo = ContainerInfo.fromProtobuf(temp);
+      return containerInfo;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<ContainerInfo> listContainer(String startName,
+      String prefixName, int count) throws IOException {
+    List<ContainerInfo> containerList = new ArrayList<>();
+    lock.lock();
+    try {
+      if (containerStore.isEmpty()) {
+        throw new IOException("No container exists in current db");
+      }
+      MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName);
+      byte[] startKey = startName == null ? null : DFSUtil.string2Bytes(
+          startName);
+      List<Map.Entry<byte[], byte[]>> range =
+          containerStore.getSequentialRangeKVs(startKey, count, prefixFilter);
+
+      // Transform the values into the pipelines.
+      // TODO: filter by container state
+      for (Map.Entry<byte[], byte[]> entry : range) {
+        ContainerInfo containerInfo =
+            ContainerInfo.fromProtobuf(
+                HddsProtos.SCMContainerInfo.PARSER.parseFrom(
+                    entry.getValue()));
+        Preconditions.checkNotNull(containerInfo);
+        containerList.add(containerInfo);
+      }
+    } finally {
+      lock.unlock();
+    }
+    return containerList;
+  }
+
+  /**
+   * Allocates a new container.
+   *
+   * @param replicationFactor - replication factor of the container.
+   * @param containerName - Name of the container.
+   * @param owner - The string name of the Service that owns this container.
+   * @return - Pipeline that makes up this container.
+   * @throws IOException - Exception
+   */
+  @Override
+  public ContainerInfo allocateContainer(
+      ReplicationType type,
+      ReplicationFactor replicationFactor,
+      final String containerName,
+      String owner)
+      throws IOException {
+    Preconditions.checkNotNull(containerName);
+    Preconditions.checkState(!containerName.isEmpty());
+
+    ContainerInfo containerInfo;
+    if (!nodeManager.isOutOfChillMode()) {
+      throw new SCMException(
+          "Unable to create container while in chill mode",
+          SCMException.ResultCodes.CHILL_MODE_EXCEPTION);
+    }
+
+    lock.lock();
+    try {
+      byte[] containerBytes = containerStore.get(containerName.getBytes(
+          encoding));
+      if (containerBytes != null) {
+        throw new SCMException(
+            "Specified container already exists. key : " + containerName,
+            SCMException.ResultCodes.CONTAINER_EXISTS);
+      }
+      containerInfo =
+          containerStateManager.allocateContainer(
+              pipelineSelector, type, replicationFactor, containerName,
+              owner);
+      containerStore.put(
+          containerName.getBytes(encoding), containerInfo.getProtobuf()
+              .toByteArray());
+    } finally {
+      lock.unlock();
+    }
+    return containerInfo;
+  }
+
+  /**
+   * Deletes a container from SCM.
+   *
+   * @param containerName - Container name
+   * @throws IOException if container doesn't exist or container store failed
+   *                     to delete the
+   *                     specified key.
+   */
+  @Override
+  public void deleteContainer(String containerName) throws IOException {
+    lock.lock();
+    try {
+      byte[] dbKey = containerName.getBytes(encoding);
+      byte[] containerBytes = containerStore.get(dbKey);
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Failed to delete container " + containerName + ", reason : " +
+                "container doesn't exist.",
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+      containerStore.delete(dbKey);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc} Used by client to update container state on SCM.
+   */
+  @Override
+  public HddsProtos.LifeCycleState updateContainerState(
+      String containerName, HddsProtos.LifeCycleEvent event) throws
+      IOException {
+    ContainerInfo containerInfo;
+    lock.lock();
+    try {
+      byte[] dbKey = containerName.getBytes(encoding);
+      byte[] containerBytes = containerStore.get(dbKey);
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Failed to update container state"
+                + containerName
+                + ", reason : container doesn't exist.",
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+      containerInfo =
+          ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER
+              .parseFrom(containerBytes));
+
+      Preconditions.checkNotNull(containerInfo);
+      switch (event) {
+      case CREATE:
+        // Acquire lease on container
+        Lease<ContainerInfo> containerLease =
+            containerLeaseManager.acquire(containerInfo);
+        // Register callback to be executed in case of timeout
+        containerLease.registerCallBack(() -> {
+          updateContainerState(containerName,
+              HddsProtos.LifeCycleEvent.TIMEOUT);
+          return null;
+        });
+        break;
+      case CREATED:
+        // Release the lease on container
+        containerLeaseManager.release(containerInfo);
+        break;
+      case FINALIZE:
+        // TODO: we don't need a lease manager here for closing as the
+        // container report will include the container state after HDFS-13008
+        // If a client failed to update the container close state, DN container
+        // report from 3 DNs will be used to close the container eventually.
+        break;
+      case CLOSE:
+        break;
+      case UPDATE:
+        break;
+      case DELETE:
+        break;
+      case TIMEOUT:
+        break;
+      case CLEANUP:
+        break;
+      default:
+        throw new SCMException("Unsupported container LifeCycleEvent.",
+            FAILED_TO_CHANGE_CONTAINER_STATE);
+      }
+      // If the below updateContainerState call fails, we should revert the
+      // changes made in switch case.
+      // Like releasing the lease in case of BEGIN_CREATE.
+      ContainerInfo updatedContainer = containerStateManager
+          .updateContainerState(containerInfo, event);
+      containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
+      return updatedContainer.getState();
+    } catch (LeaseException e) {
+      throw new IOException("Lease Exception.", e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns the container State Manager.
+   *
+   * @return ContainerStateManager
+   */
+  @Override
+  public ContainerStateManager getStateManager() {
+    return containerStateManager;
+  }
+
+  /**
+   * Process container report from Datanode.
+   * <p>
+   * Processing follows a very simple logic for time being.
+   * <p>
+   * 1. Datanodes report the current State -- denoted by the datanodeState
+   * <p>
+   * 2. We are the older SCM state from the Database -- denoted by
+   * the knownState.
+   * <p>
+   * 3. We copy the usage etc. from currentState to newState and log that
+   * newState to the DB. This allows us SCM to bootup again and read the
+   * state of the world from the DB, and then reconcile the state from
+   * container reports, when they arrive.
+   *
+   * @param reports Container report
+   */
+  @Override
+  public void processContainerReports(ContainerReportsRequestProto reports)
+      throws IOException {
+    List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
+        containerInfos = reports.getReportsList();
+    containerSupervisor.handleContainerReport(reports);
+    for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
+        containerInfos) {
+      byte[] dbKey = datanodeState.getContainerNameBytes().toByteArray();
+      lock.lock();
+      try {
+        byte[] containerBytes = containerStore.get(dbKey);
+        if (containerBytes != null) {
+          HddsProtos.SCMContainerInfo knownState =
+              HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
+
+          HddsProtos.SCMContainerInfo newState =
+              reconcileState(datanodeState, knownState);
+
+          // FIX ME: This can be optimized, we write twice to memory, where a
+          // single write would work well.
+          //
+          // We need to write this to DB again since the closed only write
+          // the updated State.
+          containerStore.put(dbKey, newState.toByteArray());
+
+          // If the container is closed, then state is already written to SCM
+          // DB.TODO: So can we can write only once to DB.
+          if (closeContainerIfNeeded(newState)) {
+            LOG.info("Closing the Container: {}", newState.getContainerName());
+          }
+        } else {
+          // Container not found in our container db.
+          LOG.error("Error while processing container report from datanode :" +
+                  " {}, for container: {}, reason: container doesn't exist in" 
+
+                  "container database.", reports.getDatanodeDetails(),
+              datanodeState.getContainerName());
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  /**
+   * Reconciles the state from Datanode with the state in SCM.
+   *
+   * @param datanodeState - State from the Datanode.
+   * @param knownState - State inside SCM.
+   * @return new SCM State for this container.
+   */
+  private HddsProtos.SCMContainerInfo reconcileState(
+      StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
+      HddsProtos.SCMContainerInfo knownState) {
+    HddsProtos.SCMContainerInfo.Builder builder =
+        HddsProtos.SCMContainerInfo.newBuilder();
+    builder.setContainerName(knownState.getContainerName());
+    builder.setPipeline(knownState.getPipeline());
+    // If used size is greater than allocated size, we will be updating
+    // allocated size with used size. This update is done as a fallback
+    // mechanism in case SCM crashes without properly updating allocated
+    // size. Correct allocated value will be updated by
+    // ContainerStateManager during SCM shutdown.
+    long usedSize = datanodeState.getUsed();
+    long allocated = knownState.getAllocatedBytes() > usedSize ?
+        knownState.getAllocatedBytes() : usedSize;
+    builder.setAllocatedBytes(allocated);
+    builder.setUsedBytes(usedSize);
+    builder.setNumberOfKeys(datanodeState.getKeyCount());
+    builder.setState(knownState.getState());
+    builder.setStateEnterTime(knownState.getStateEnterTime());
+    builder.setContainerID(knownState.getContainerID());
+    if (knownState.getOwner() != null) {
+      builder.setOwner(knownState.getOwner());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Queues the close container command, to datanode and writes the new state
+   * to container DB.
+   * <p>
+   * TODO : Remove this 2 ContainerInfo definitions. It is brain dead to have
+   * one protobuf in one file and another definition in another file.
+   *
+   * @param newState - This is the state we maintain in SCM.
+   * @throws IOException
+   */
+  private boolean closeContainerIfNeeded(HddsProtos.SCMContainerInfo newState)
+      throws IOException {
+    float containerUsedPercentage = 1.0f *
+        newState.getUsedBytes() / this.size;
+
+    ContainerInfo scmInfo = getContainer(newState.getContainerName());
+    if (containerUsedPercentage >= containerCloseThreshold
+        && !isClosed(scmInfo)) {
+      // We will call closer till get to the closed state.
+      // That is SCM will make this call repeatedly until we reach the closed
+      // state.
+      closer.close(newState);
+
+      if (shouldClose(scmInfo)) {
+        // This event moves the Container from Open to Closing State, this is
+        // a state inside SCM. This is the desired state that SCM wants this
+        // container to reach. We will know that a container has reached the
+        // closed state from container reports. This state change should be
+        // invoked once and only once.
+        HddsProtos.LifeCycleState state = updateContainerState(
+            scmInfo.getContainerName(),
+            HddsProtos.LifeCycleEvent.FINALIZE);
+        if (state != HddsProtos.LifeCycleState.CLOSING) {
+          LOG.error("Failed to close container {}, reason : Not able " +
+                  "to " +
+                  "update container state, current container state: {}.",
+              newState.getContainerName(), state);
+          return false;
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * In Container is in closed state, if it is in closed, Deleting or Deleted
+   * State.
+   *
+   * @param info - ContainerInfo.
+   * @return true if is in open state, false otherwise
+   */
+  private boolean shouldClose(ContainerInfo info) {
+    return info.getState() == HddsProtos.LifeCycleState.OPEN;
+  }
+
+  private boolean isClosed(ContainerInfo info) {
+    return info.getState() == HddsProtos.LifeCycleState.CLOSED;
+  }
+
+  @VisibleForTesting
+  public ContainerCloser getCloser() {
+    return closer;
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated with it.
+   * If the stream is
+   * already closed then invoking this method has no effect.
+   * <p>
+   * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
+   * fail require careful
+   * attention. It is strongly advised to relinquish the underlying resources
+   * and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing the
+   * {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    if (containerLeaseManager != null) {
+      containerLeaseManager.shutdown();
+    }
+    if (containerStateManager != null) {
+      flushContainerInfo();
+      containerStateManager.close();
+    }
+    if (containerStore != null) {
+      containerStore.close();
+    }
+  }
+
+  /**
+   * Since allocatedBytes of a container is only in memory, stored in
+   * containerStateManager, when closing ContainerMapping, we need to update
+   * this in the container store.
+   *
+   * @throws IOException on failure.
+   */
+  @VisibleForTesting
+  public void flushContainerInfo() throws IOException {
+    List<ContainerInfo> containers = containerStateManager.getAllContainers();
+    List<String> failedContainers = new ArrayList<>();
+    for (ContainerInfo info : containers) {
+      // even if some container updated failed, others can still proceed
+      try {
+        byte[] dbKey = info.getContainerName().getBytes(encoding);
+        byte[] containerBytes = containerStore.get(dbKey);
+        // TODO : looks like when a container is deleted, the container is
+        // removed from containerStore but not containerStateManager, so it can
+        // return info of a deleted container. may revisit this in the future,
+        // for now, just skip a not-found container
+        if (containerBytes != null) {
+          HddsProtos.SCMContainerInfo oldInfoProto =
+              HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
+          ContainerInfo oldInfo = ContainerInfo.fromProtobuf(oldInfoProto);
+          ContainerInfo newInfo = new ContainerInfo.Builder()
+              .setAllocatedBytes(info.getAllocatedBytes())
+              .setContainerName(oldInfo.getContainerName())
+              .setNumberOfKeys(oldInfo.getNumberOfKeys())
+              .setOwner(oldInfo.getOwner())
+              .setPipeline(oldInfo.getPipeline())
+              .setState(oldInfo.getState())
+              .setUsedBytes(oldInfo.getUsedBytes())
+              .build();
+          containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
+        } else {
+          LOG.debug("Container state manager has container {} but not found " +
+                  "in container store, a deleted container?",
+              info.getContainerName());
+        }
+      } catch (IOException ioe) {
+        failedContainers.add(info.getContainerName());
+      }
+    }
+    if (!failedContainers.isEmpty()) {
+      throw new IOException("Error in flushing container info from container " 
+
+          "state manager: " + failedContainers);
+    }
+  }
+
+  @VisibleForTesting
+  public MetadataStore getContainerStore() {
+    return containerStore;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
new file mode 100644
index 0000000..227eca0
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.states.ContainerState;
+import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.statemachine
+    .InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_CONTAINER_STATE;
+
+/**
+ * A container state manager keeps track of container states and returns
+ * containers that match various queries.
+ * <p>
+ * This state machine is driven by a combination of server and client actions.
+ * <p>
+ * This is how a create container happens: 1. When a container is created, the
+ * Server(or SCM) marks that Container as ALLOCATED state. In this state, SCM
+ * has chosen a pipeline for container to live on. However, the container is 
not
+ * created yet. This container along with the pipeline is returned to the
+ * client.
+ * <p>
+ * 2. The client when it sees the Container state as ALLOCATED understands that
+ * container needs to be created on the specified pipeline. The client lets the
+ * SCM know that saw this flag and is initiating the on the data nodes.
+ * <p>
+ * This is done by calling into notifyObjectCreation(ContainerName,
+ * BEGIN_CREATE) flag. When SCM gets this call, SCM puts the container state
+ * into CREATING. All this state means is that SCM told Client to create a
+ * container and client saw that request.
+ * <p>
+ * 3. Then client makes calls to datanodes directly, asking the datanodes to
+ * create the container. This is done with the help of pipeline that supports
+ * this container.
+ * <p>
+ * 4. Once the creation of the container is complete, the client will make
+ * another call to the SCM, this time specifying the containerName and the
+ * COMPLETE_CREATE as the Event.
+ * <p>
+ * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
+ * the state when clients can write to a container.
+ * <p>
+ * 6. If the client does not respond with the COMPLETE_CREATE event with a
+ * certain time, the state machine times out and triggers a delete operation of
+ * the container.
+ * <p>
+ * Please see the function initializeStateMachine below to see how this looks 
in
+ * code.
+ * <p>
+ * Reusing existing container :
+ * <p>
+ * The create container call is not made all the time, the system tries to use
+ * open containers as much as possible. So in those cases, it looks thru the
+ * list of open containers and will return containers that match the specific
+ * signature.
+ * <p>
+ * Please note : Logically there are 3 separate state machines in the case of
+ * containers.
+ * <p>
+ * The Create State Machine -- Commented extensively above.
+ * <p>
+ * Open/Close State Machine - Once the container is in the Open State,
+ * eventually it will be closed, once sufficient data has been written to it.
+ * <p>
+ * TimeOut Delete Container State Machine - if the container creating times 
out,
+ * then Container State manager decides to delete the container.
+ */
+public class ContainerStateManager implements Closeable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerStateManager.class);
+
+  private final StateMachine<HddsProtos.LifeCycleState,
+      HddsProtos.LifeCycleEvent> stateMachine;
+
+  private final long containerSize;
+  private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
+  private final ContainerStateMap containers;
+  private final AtomicLong containerCount;
+
+  /**
+   * Constructs a Container State Manager that tracks all containers owned by
+   * SCM for the purpose of allocation of blocks.
+   * <p>
+   * TODO : Add Container Tags so we know which containers are owned by SCM.
+   */
+  @SuppressWarnings("unchecked")
+  public ContainerStateManager(Configuration configuration,
+      Mapping containerMapping) {
+
+    // Initialize the container state machine.
+    Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
+
+    // These are the steady states of a container.
+    finalStates.add(LifeCycleState.OPEN);
+    finalStates.add(LifeCycleState.CLOSED);
+    finalStates.add(LifeCycleState.DELETED);
+
+    this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
+        finalStates);
+    initializeStateMachine();
+
+    this.containerSize = OzoneConsts.GB * configuration.getInt(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
+
+    lastUsedMap = new ConcurrentHashMap<>();
+    containerCount = new AtomicLong(0);
+    containers = new ContainerStateMap();
+    loadExistingContainers(containerMapping);
+  }
+
+  private void loadExistingContainers(Mapping containerMapping) {
+
+    List<ContainerInfo> containerList;
+    try {
+      containerList = containerMapping.listContainer(null,
+          null, Integer.MAX_VALUE);
+
+      // if there are no container to load, let us return.
+      if (containerList == null || containerList.size() == 0) {
+        LOG.info("No containers to load for this cluster.");
+        return;
+      }
+    } catch (IOException e) {
+      if (!e.getMessage().equals("No container exists in current db")) {
+        LOG.error("Could not list the containers", e);
+      }
+      return;
+    }
+
+    try {
+      long maxID = 0;
+      for (ContainerInfo container : containerList) {
+        containers.addContainer(container);
+
+        if (maxID < container.getContainerID()) {
+          maxID = container.getContainerID();
+        }
+
+        containerCount.set(maxID);
+      }
+    } catch (SCMException ex) {
+      LOG.error("Unable to create a container information. ", ex);
+      // Fix me, what is the proper shutdown procedure for SCM ??
+      // System.exit(1) // Should we exit here?
+    }
+  }
+
+  /**
+   * Return the info of all the containers kept by the in-memory mapping.
+   *
+   * @return the list of all container info.
+   */
+  public List<ContainerInfo> getAllContainers() {
+    List<ContainerInfo> list = new ArrayList<>();
+
+    //No Locking needed since the return value is an immutable map.
+    containers.getContainerMap().forEach((key, value) -> list.add(value));
+    return list;
+  }
+
+  /*
+   *
+   * Event and State Transition Mapping:
+   *
+   * State: ALLOCATED ---------------> CREATING
+   * Event:                CREATE
+   *
+   * State: CREATING  ---------------> OPEN
+   * Event:               CREATED
+   *
+   * State: OPEN      ---------------> CLOSING
+   * Event:               FINALIZE
+   *
+   * State: CLOSING   ---------------> CLOSED
+   * Event:                CLOSE
+   *
+   * State: CLOSED   ----------------> DELETING
+   * Event:                DELETE
+   *
+   * State: DELETING ----------------> DELETED
+   * Event:               CLEANUP
+   *
+   * State: CREATING  ---------------> DELETING
+   * Event:               TIMEOUT
+   *
+   *
+   * Container State Flow:
+   *
+   * 
[ALLOCATED]------->[CREATING]--------->[OPEN]---------->[CLOSING]------->[CLOSED]
+   *            (CREATE)     |    (CREATED)       (FINALIZE)          (CLOSE)  
  |
+   *                         |                                                 
  |
+   *                         |                                                 
  |
+   *                         |(TIMEOUT)                                  
(DELETE)|
+   *                         |                                                 
  |
+   *                         +------------------> [DELETING] 
<-------------------+
+   *                                                   |
+   *                                                   |
+   *                                          (CLEANUP)|
+   *                                                   |
+   *                                               [DELETED]
+   */
+  private void initializeStateMachine() {
+    stateMachine.addTransition(LifeCycleState.ALLOCATED,
+        LifeCycleState.CREATING,
+        LifeCycleEvent.CREATE);
+
+    stateMachine.addTransition(LifeCycleState.CREATING,
+        LifeCycleState.OPEN,
+        LifeCycleEvent.CREATED);
+
+    stateMachine.addTransition(LifeCycleState.OPEN,
+        LifeCycleState.CLOSING,
+        LifeCycleEvent.FINALIZE);
+
+    stateMachine.addTransition(LifeCycleState.CLOSING,
+        LifeCycleState.CLOSED,
+        LifeCycleEvent.CLOSE);
+
+    stateMachine.addTransition(LifeCycleState.CLOSED,
+        LifeCycleState.DELETING,
+        LifeCycleEvent.DELETE);
+
+    stateMachine.addTransition(LifeCycleState.CREATING,
+        LifeCycleState.DELETING,
+        LifeCycleEvent.TIMEOUT);
+
+    stateMachine.addTransition(LifeCycleState.DELETING,
+        LifeCycleState.DELETED,
+        LifeCycleEvent.CLEANUP);
+  }
+
+  /**
+   * allocates a new container based on the type, replication etc.
+   *
+   * @param selector -- Pipeline selector class.
+   * @param type -- Replication type.
+   * @param replicationFactor - Replication replicationFactor.
+   * @param containerName - Container Name.
+   * @return Container Info.
+   * @throws IOException  on Failure.
+   */
+  public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos
+      .ReplicationType type, HddsProtos.ReplicationFactor replicationFactor,
+      final String containerName, String owner) throws
+      IOException {
+
+    Pipeline pipeline = selector.getReplicationPipeline(type,
+        replicationFactor, containerName);
+
+    Preconditions.checkNotNull(pipeline, "Pipeline type=%s/"
+        + "replication=%s couldn't be found for the new container. "
+        + "Do you have enough nodes?", type, replicationFactor);
+
+    ContainerInfo containerInfo = new ContainerInfo.Builder()
+        .setContainerName(containerName)
+        .setState(HddsProtos.LifeCycleState.ALLOCATED)
+        .setPipeline(pipeline)
+        // This is bytes allocated for blocks inside container, not the
+        // container size
+        .setAllocatedBytes(0)
+        .setUsedBytes(0)
+        .setNumberOfKeys(0)
+        .setStateEnterTime(Time.monotonicNow())
+        .setOwner(owner)
+        .setContainerID(containerCount.incrementAndGet())
+        .build();
+    Preconditions.checkNotNull(containerInfo);
+    containers.addContainer(containerInfo);
+    LOG.trace("New container allocated: {}", containerInfo);
+    return containerInfo;
+  }
+
+  /**
+   * Update the Container State to the next state.
+   *
+   * @param info - ContainerInfo
+   * @param event - LifeCycle Event
+   * @return Updated ContainerInfo.
+   * @throws SCMException  on Failure.
+   */
+  public ContainerInfo updateContainerState(ContainerInfo
+      info, HddsProtos.LifeCycleEvent event) throws SCMException {
+    LifeCycleState newState;
+    try {
+      newState = this.stateMachine.getNextState(info.getState(), event);
+    } catch (InvalidStateTransitionException ex) {
+      String error = String.format("Failed to update container state %s, " +
+              "reason: invalid state transition from state: %s upon " +
+              "event: %s.",
+          info.getPipeline().getContainerName(), info.getState(), event);
+      LOG.error(error);
+      throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
+    }
+
+    // This is a post condition after executing getNextState.
+    Preconditions.checkNotNull(newState);
+    containers.updateState(info, info.getState(), newState);
+    return containers.getContainerInfo(info);
+  }
+
+  /**
+   * Update the container State.
+   * @param info - Container Info
+   * @return  ContainerInfo
+   * @throws SCMException - on Error.
+   */
+  public ContainerInfo updateContainerInfo(ContainerInfo info)
+      throws SCMException {
+    containers.updateContainerInfo(info);
+    return containers.getContainerInfo(info);
+  }
+
+
+  /**
+   * Return a container matching the attributes specified.
+   *
+   * @param size - Space needed in the Container.
+   * @param owner - Owner of the container - A specific nameservice.
+   * @param type - Replication Type {StandAlone, Ratis}
+   * @param factor - Replication Factor {ONE, THREE}
+   * @param state - State of the Container-- {Open, Allocated etc.}
+   * @return ContainerInfo, null if there is no match found.
+   */
+  public ContainerInfo getMatchingContainer(final long size,
+      String owner, ReplicationType type, ReplicationFactor factor,
+      LifeCycleState state) {
+
+    // Find containers that match the query spec, if no match return null.
+    NavigableSet<ContainerID> matchingSet =
+        containers.getMatchingContainerIDs(state, owner, factor, type);
+    if (matchingSet == null || matchingSet.size() == 0) {
+      return null;
+    }
+
+    // Get the last used container and find container above the last used
+    // container ID.
+    ContainerState key = new ContainerState(owner, type, factor);
+    ContainerID lastID = lastUsedMap.get(key);
+    if(lastID == null) {
+      lastID = matchingSet.first();
+    }
+
+    // There is a small issue here. The first time, we will skip the first
+    // container. But in most cases it will not matter.
+    NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false);
+    if (resultSet.size() == 0) {
+      resultSet = matchingSet;
+    }
+
+    ContainerInfo selectedContainer =
+        findContainerWithSpace(size, resultSet, owner);
+    if (selectedContainer == null) {
+
+      // If we did not find any space in the tailSet, we need to look for
+      // space in the headset, we need to pass true to deal with the
+      // situation that we have a lone container that has space. That is we
+      // ignored the last used container under the assumption we can find
+      // other containers with space, but if have a single container that is
+      // not true. Hence we need to include the last used container as the
+      // last element in the sorted set.
+
+      resultSet = matchingSet.headSet(lastID, true);
+      selectedContainer = findContainerWithSpace(size, resultSet, owner);
+    }
+    // Update the allocated Bytes on this container.
+    if(selectedContainer != null) {
+      selectedContainer.updateAllocatedBytes(size);
+    }
+    return selectedContainer;
+
+  }
+
+  private ContainerInfo findContainerWithSpace(long size,
+      NavigableSet<ContainerID> searchSet, String owner) {
+    // Get the container with space to meet our request.
+    for (ContainerID id : searchSet) {
+      ContainerInfo containerInfo = containers.getContainerInfo(id.getId());
+      if (containerInfo.getAllocatedBytes() + size <= this.containerSize) {
+        containerInfo.updateLastUsedTime();
+
+        ContainerState key = new ContainerState(owner,
+            containerInfo.getPipeline().getType(),
+            containerInfo.getPipeline().getFactor());
+        lastUsedMap.put(key, containerInfo.containerID());
+        return containerInfo;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns a set of ContainerIDs that match the Container.
+   *
+   * @param owner  Owner of the Containers.
+   * @param type - Replication Type of the containers
+   * @param factor - Replication factor of the containers.
+   * @param state - Current State, like Open, Close etc.
+   * @return Set of containers that match the specific query parameters.
+   */
+  public NavigableSet<ContainerID> getMatchingContainerIDs(
+      String owner, ReplicationType type, ReplicationFactor factor,
+      LifeCycleState state) {
+    return containers.getMatchingContainerIDs(state, owner,
+        factor, type);
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
new file mode 100644
index 0000000..c949c6c
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Mapping class contains the mapping from a name to a pipeline mapping. This 
is
+ * used by SCM when allocating new locations and when looking up a key.
+ */
+public interface Mapping extends Closeable {
+  /**
+   * Returns the ContainerInfo from the container name.
+   *
+   * @param containerName - Name
+   * @return - ContainerInfo such as creation state and the pipeline.
+   * @throws IOException
+   */
+  ContainerInfo getContainer(String containerName) throws IOException;
+
+  /**
+   * Returns containers under certain conditions.
+   * Search container names from start name(exclusive),
+   * and use prefix name to filter the result. The max
+   * size of the searching range cannot exceed the
+   * value of count.
+   *
+   * @param startName start name, if null, start searching at the head.
+   * @param prefixName prefix name, if null, then filter is disabled.
+   * @param count count, if count < 0, the max size is unlimited.(
+   *              Usually the count will be replace with a very big
+   *              value instead of being unlimited in case the db is very big)
+   *
+   * @return a list of container.
+   * @throws IOException
+   */
+  List<ContainerInfo> listContainer(String startName, String prefixName,
+      int count) throws IOException;
+
+  /**
+   * Allocates a new container for a given keyName and replication factor.
+   *
+   * @param replicationFactor - replication factor of the container.
+   * @param containerName - Name.
+   * @param owner
+   * @return - Container Info.
+   * @throws IOException
+   */
+  ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor replicationFactor,
+      String containerName, String owner) throws IOException;
+
+  /**
+   * Deletes a container from SCM.
+   *
+   * @param containerName - Container Name
+   * @throws IOException
+   */
+  void deleteContainer(String containerName) throws IOException;
+
+  /**
+   * Update container state.
+   * @param containerName - Container Name
+   * @param event - container life cycle event
+   * @return - new container state
+   * @throws IOException
+   */
+  HddsProtos.LifeCycleState updateContainerState(String containerName,
+      HddsProtos.LifeCycleEvent event) throws IOException;
+
+  /**
+   * Returns the container State Manager.
+   * @return ContainerStateManager
+   */
+  ContainerStateManager getStateManager();
+
+  /**
+   * Process container report from Datanode.
+   *
+   * @param reports Container report
+   */
+  void processContainerReports(ContainerReportsRequestProto reports)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
new file mode 100644
index 0000000..b5d4da9
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.container.closer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * A class that manages closing of containers. This allows transition from a
+ * open but full container to a closed container, to which no data is written.
+ */
+public class ContainerCloser {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerCloser.class);
+  private static final long MULTIPLIER = 3L;
+  private static final int CLEANUP_WATER_MARK = 1000;
+  private final NodeManager nodeManager;
+  private final Map<String, Long> commandIssued;
+  private final Configuration configuration;
+  private final AtomicInteger mapCount;
+  private final long reportInterval;
+  private final AtomicInteger threadRunCount;
+  private final AtomicBoolean isRunning;
+
+  /**
+   * Constructs the ContainerCloser class.
+   *
+   * @param nodeManager - NodeManager
+   * @param conf -   Configuration
+   */
+  public ContainerCloser(NodeManager nodeManager, Configuration conf) {
+    Preconditions.checkNotNull(nodeManager);
+    Preconditions.checkNotNull(conf);
+    this.nodeManager = nodeManager;
+    this.configuration = conf;
+    this.commandIssued = new ConcurrentHashMap<>();
+    this.mapCount = new AtomicInteger(0);
+    this.threadRunCount = new AtomicInteger(0);
+    this.isRunning = new AtomicBoolean(false);
+    this.reportInterval = this.configuration.getTimeDuration(
+        OZONE_CONTAINER_REPORT_INTERVAL,
+        OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
+    Preconditions.checkState(this.reportInterval > 0,
+        "report interval has to be greater than 0");
+  }
+
+  @VisibleForTesting
+  public static int getCleanupWaterMark() {
+    return CLEANUP_WATER_MARK;
+  }
+
+  /**
+   * Sends a Container Close command to the data nodes where this container
+   * lives.
+   *
+   * @param info - ContainerInfo.
+   */
+  public void close(HddsProtos.SCMContainerInfo info) {
+
+    if (commandIssued.containsKey(info.getContainerName())) {
+      // We check if we issued a close command in last 3 * reportInterval secs.
+      long commandQueueTime = commandIssued.get(info.getContainerName());
+      long currentTime = TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow());
+      if (currentTime > commandQueueTime + (MULTIPLIER * reportInterval)) {
+        commandIssued.remove(info.getContainerName());
+        mapCount.decrementAndGet();
+      } else {
+        // Ignore this request, since we just issued a close command. We
+        // should wait instead of sending a command to datanode again.
+        return;
+      }
+    }
+
+    // if we reached here, it means that we have not issued a command to the
+    // data node in last (3 times report interval). We are presuming that is
+    // enough time to close the container. Let us go ahead and queue a close
+    // to all the datanodes that participate in the container.
+    //
+    // Three important things to note here:
+    //
+    // 1. It is ok to send this command multiple times to a datanode. Close
+    // container is an idempotent command, if the container is already closed
+    // then we have no issues.
+    //
+    // 2. The container close command is issued to all datanodes. But
+    // depending on the pipeline type, some of the datanodes might ignore it.
+    //
+    // 3. SCM will see that datanode is closed from container reports, but it
+    // is possible that datanodes might get close commands since
+    // this queue can be emptied by a datanode after a close report is send
+    // to SCM. In that case also, data node will ignore this command.
+
+    HddsProtos.Pipeline pipeline = info.getPipeline();
+    for (HddsProtos.DatanodeDetailsProto datanodeDetails :
+        pipeline.getPipelineChannel().getMembersList()) {
+      nodeManager.addDatanodeCommand(
+          DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(),
+          new CloseContainerCommand(info.getContainerName()));
+    }
+    if (!commandIssued.containsKey(info.getContainerName())) {
+      commandIssued.put(info.getContainerName(),
+          TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow()));
+      mapCount.incrementAndGet();
+    }
+    // run the hash map cleaner thread if needed, non-blocking call.
+    runCleanerThreadIfNeeded();
+  }
+
+  private void runCleanerThreadIfNeeded() {
+    // Let us check if we should run a cleaner thread, not using map.size
+    // since it runs a loop in the case of the concurrentMap.
+    if (mapCount.get() > CLEANUP_WATER_MARK &&
+        isRunning.compareAndSet(false, true)) {
+      Runnable entryCleaner = () -> {
+        LOG.debug("Starting close container Hash map cleaner.");
+        try {
+          for (Map.Entry<String, Long> entry : commandIssued.entrySet()) {
+            long commandQueueTime = entry.getValue();
+            if (commandQueueTime + (MULTIPLIER * reportInterval) >
+                TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow())) {
+
+              // It is possible for this remove to fail due to race conditions.
+              // No big deal we will cleanup next time.
+              commandIssued.remove(entry.getKey());
+              mapCount.decrementAndGet();
+            }
+          }
+          isRunning.compareAndSet(true, false);
+          LOG.debug("Finished running, close container Hash map cleaner.");
+        } catch (Exception ex) {
+          LOG.error("Unable to finish cleaning the closed containers map.", 
ex);
+        }
+      };
+
+      // Launch the cleaner thread when we need instead of having a daemon
+      // thread that is sleeping all the time. We need to set the Daemon to
+      // true to avoid blocking clean exits.
+      Thread cleanerThread = new ThreadFactoryBuilder()
+          .setDaemon(true)
+          .setNameFormat("Closed Container Cleaner Thread - %d")
+          .build().newThread(entryCleaner);
+      threadRunCount.incrementAndGet();
+      cleanerThread.start();
+    }
+  }
+
+  @VisibleForTesting
+  public int getThreadRunCount() {
+    return threadRunCount.get();
+  }
+
+  @VisibleForTesting
+  public int getCloseCount() {
+    return mapCount.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java
new file mode 100644
index 0000000..ee02bbd
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+/**
+ * This package has class that close a container. That is move a container from
+ * open state to close state.
+ */
+package org.apache.hadoop.hdds.scm.container.closer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java
new file mode 100644
index 0000000..3f8d056
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+/**
+ * This package contains routines to manage the container location and
+ * mapping inside SCM
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
new file mode 100644
index 0000000..5d91ac5
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A ContainerPlacementPolicy support choosing datanodes to build replication
+ * pipeline with specified constraints.
+ */
+public interface ContainerPlacementPolicy {
+
+  /**
+   * Given the replication factor and size required, return set of datanodes
+   * that satisfy the nodes and size requirement.
+   * @param nodesRequired - number of datanodes required.
+   * @param sizeRequired - size required for the container or block.
+   * @return list of datanodes chosen.
+   * @throws IOException
+   */
+  List<DatanodeDetails> chooseDatanodes(int nodesRequired, long sizeRequired)
+      throws IOException;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to