http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
new file mode 100644
index 0000000..33eb911
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
+import org.apache.hadoop.utils.MetadataStore;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .NO_SUCH_KEY;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .UNABLE_TO_READ_METADATA_DB;
+
+/**
+ * Utils functions to help key functions.
+ */
+public final class KeyUtils {
+  public static final String ENCODING_NAME = "UTF-8";
+  public static final Charset ENCODING = Charset.forName(ENCODING_NAME);
+
+  /**
+   * Never Constructed.
+   */
+  private KeyUtils() {
+  }
+
+  /**
+   * Get a DB handler for a given container.
+   * If the handler doesn't exist in cache yet, first create one and
+   * add into cache. This function is called with containerManager
+   * ReadLock held.
+   *
+   * @param container container.
+   * @param conf configuration.
+   * @return MetadataStore handle.
+   * @throws StorageContainerException
+   */
+  public static MetadataStore getDB(ContainerData container,
+      Configuration conf) throws StorageContainerException {
+    Preconditions.checkNotNull(container);
+    ContainerCache cache = ContainerCache.getInstance(conf);
+    Preconditions.checkNotNull(cache);
+    try {
+      return cache.getDB(container.getContainerName(), container.getDBPath());
+    } catch (IOException ex) {
+      String message =
+          String.format("Unable to open DB. DB Name: %s, Path: %s. ex: %s",
+          container.getContainerName(), container.getDBPath(), 
ex.getMessage());
+      throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB);
+    }
+  }
+
+  /**
+   * Remove a DB handler from cache.
+   *
+   * @param container - Container data.
+   * @param conf - Configuration.
+   */
+  public static void removeDB(ContainerData container,
+      Configuration conf) {
+    Preconditions.checkNotNull(container);
+    ContainerCache cache = ContainerCache.getInstance(conf);
+    Preconditions.checkNotNull(cache);
+    cache.removeDB(container.getContainerName());
+  }
+  /**
+   * Shutdown all DB Handles.
+   *
+   * @param cache - Cache for DB Handles.
+   */
+  @SuppressWarnings("unchecked")
+  public static void shutdownCache(ContainerCache cache)  {
+    cache.shutdownCache();
+  }
+
+  /**
+   * Returns successful keyResponse.
+   * @param msg - Request.
+   * @return Response.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getKeyResponse(ContainerProtos.ContainerCommandRequestProto msg) {
+    return ContainerUtils.getContainerResponse(msg);
+  }
+
+
+  public static ContainerProtos.ContainerCommandResponseProto
+      getKeyDataResponse(ContainerProtos.ContainerCommandRequestProto msg,
+      KeyData data) {
+    ContainerProtos.GetKeyResponseProto.Builder getKey = ContainerProtos
+        .GetKeyResponseProto.newBuilder();
+    getKey.setKeyData(data.getProtoBufMessage());
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
+            .SUCCESS, "");
+    builder.setGetKey(getKey);
+    return  builder.build();
+  }
+
+  /**
+   * Parses the key name from a bytes array.
+   * @param bytes key name in bytes.
+   * @return key name string.
+   */
+  public static String getKeyName(byte[] bytes) {
+    return new String(bytes, ENCODING);
+  }
+
+  /**
+   * Parses the {@link KeyData} from a bytes array.
+   *
+   * @param bytes key data in bytes.
+   * @return key data.
+   * @throws IOException if the bytes array is malformed or invalid.
+   */
+  public static KeyData getKeyData(byte[] bytes) throws IOException {
+    try {
+      ContainerProtos.KeyData kd = ContainerProtos.KeyData.parseFrom(bytes);
+      KeyData data = KeyData.getFromProtoBuf(kd);
+      return data;
+    } catch (IOException e) {
+      throw new StorageContainerException("Failed to parse key data from the" +
+              " bytes array.", NO_SUCH_KEY);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
new file mode 100644
index 0000000..21f31e1
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+/**
+ Contains protocol buffer helper classes and utilites used in
+ impl.
+ **/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
new file mode 100644
index 0000000..b0286b9
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .UNSUPPORTED_REQUEST;
+
+/**
+ * An implementation of ChunkManager that is used by default in ozone.
+ */
+public class ChunkManagerImpl implements ChunkManager {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+  private final ContainerManager containerManager;
+
+  /**
+   * Constructs a ChunkManager.
+   *
+   * @param manager - ContainerManager.
+   */
+  public ChunkManagerImpl(ContainerManager manager) {
+    this.containerManager = manager;
+  }
+
+  /**
+   * writes a given chunk.
+   *
+   * @param pipeline - Name and the set of machines that make this container.
+   * @param keyName - Name of the Key.
+   * @param info - ChunkInfo.
+   * @throws StorageContainerException
+   */
+  @Override
+  public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info,
+      byte[] data, ContainerProtos.Stage stage)
+      throws StorageContainerException {
+    // we don't want container manager to go away while we are writing chunks.
+    containerManager.readLock();
+
+    // TODO : Take keyManager Write lock here.
+    try {
+      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
+      String containerName = pipeline.getContainerName();
+      Preconditions.checkNotNull(containerName,
+          "Container name cannot be null");
+      ContainerData container =
+          containerManager.readContainer(containerName);
+      File chunkFile = ChunkUtils.validateChunk(pipeline, container, info);
+      File tmpChunkFile = getTmpChunkFile(chunkFile, info);
+
+      LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
+          info.getChunkName(), stage, chunkFile, tmpChunkFile);
+      switch (stage) {
+      case WRITE_DATA:
+        ChunkUtils.writeData(tmpChunkFile, info, data);
+        break;
+      case COMMIT_DATA:
+        commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen());
+        break;
+      case COMBINED:
+        // directly write to the chunk file
+        long oldSize = chunkFile.length();
+        ChunkUtils.writeData(chunkFile, info, data);
+        long newSize = chunkFile.length();
+        containerManager.incrBytesUsed(containerName, newSize - oldSize);
+        containerManager.incrWriteCount(containerName);
+        containerManager.incrWriteBytes(containerName, info.getLen());
+        break;
+      }
+    } catch (ExecutionException | NoSuchAlgorithmException | IOException e) {
+      LOG.error("write data failed. error: {}", e);
+      throw new StorageContainerException("Internal error: ", e,
+          CONTAINER_INTERNAL_ERROR);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.error("write data failed. error: {}", e);
+      throw new StorageContainerException("Internal error: ", e,
+          CONTAINER_INTERNAL_ERROR);
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  // Create a temporary file in the same container directory
+  // in the format "<chunkname>.tmp"
+  private static File getTmpChunkFile(File chunkFile, ChunkInfo info)
+      throws StorageContainerException {
+    return new File(chunkFile.getParent(),
+        chunkFile.getName() +
+            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
+            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
+  }
+
+  // Commit the chunk by renaming the temporary chunk file to chunk file
+  private void commitChunk(File tmpChunkFile, File chunkFile,
+      String containerName, long chunkLen) throws IOException {
+    long sizeDiff = tmpChunkFile.length() - chunkFile.length();
+    // It is safe to replace here as the earlier chunk if existing should be
+    // caught as part of validateChunk
+    Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
+        StandardCopyOption.REPLACE_EXISTING);
+    containerManager.incrBytesUsed(containerName, sizeDiff);
+    containerManager.incrWriteCount(containerName);
+    containerManager.incrWriteBytes(containerName, chunkLen);
+  }
+
+  /**
+   * reads the data defined by a chunk.
+   *
+   * @param pipeline - container pipeline.
+   * @param keyName - Name of the Key
+   * @param info - ChunkInfo.
+   * @return byte array
+   * @throws StorageContainerException
+   * TODO: Right now we do not support partial reads and writes of chunks.
+   * TODO: Explore if we need to do that for ozone.
+   */
+  @Override
+  public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info)
+      throws StorageContainerException {
+    containerManager.readLock();
+    try {
+      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
+      String containerName = pipeline.getContainerName();
+      Preconditions.checkNotNull(containerName,
+          "Container name cannot be null");
+      ContainerData container =
+          containerManager.readContainer(containerName);
+      File chunkFile = ChunkUtils.getChunkFile(pipeline, container, info);
+      ByteBuffer data =  ChunkUtils.readData(chunkFile, info);
+      containerManager.incrReadCount(containerName);
+      containerManager.incrReadBytes(containerName, chunkFile.length());
+      return data.array();
+    } catch (ExecutionException | NoSuchAlgorithmException e) {
+      LOG.error("read data failed. error: {}", e);
+      throw new StorageContainerException("Internal error: ",
+          e, CONTAINER_INTERNAL_ERROR);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.error("read data failed. error: {}", e);
+      throw new StorageContainerException("Internal error: ",
+          e, CONTAINER_INTERNAL_ERROR);
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  /**
+   * Deletes a given chunk.
+   *
+   * @param pipeline - Pipeline.
+   * @param keyName - Key Name
+   * @param info - Chunk Info
+   * @throws StorageContainerException
+   */
+  @Override
+  public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info)
+      throws StorageContainerException {
+    containerManager.readLock();
+    try {
+      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
+      String containerName = pipeline.getContainerName();
+      Preconditions.checkNotNull(containerName,
+          "Container name cannot be null");
+      File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
+          .readContainer(containerName), info);
+      if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
+        FileUtil.fullyDelete(chunkFile);
+        containerManager.decrBytesUsed(containerName, chunkFile.length());
+      } else {
+        LOG.error("Not Supported Operation. Trying to delete a " +
+            "chunk that is in shared file. chunk info : " + info.toString());
+        throw new StorageContainerException("Not Supported Operation. " +
+            "Trying to delete a chunk that is in shared file. chunk info : "
+            + info.toString(), UNSUPPORTED_REQUEST);
+      }
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  /**
+   * Shutdown the chunkManager.
+   *
+   * In the chunkManager we haven't acquired any resources, so nothing to do
+   * here. This call is made with containerManager Writelock held.
+   */
+  @Override
+  public void shutdown() {
+    Preconditions.checkState(this.containerManager.hasWriteLock());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
new file mode 100644
index 0000000..e0e826c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.interfaces
+    .ContainerLocationManager;
+import org.apache.hadoop.ozone.container.common.interfaces
+    .ContainerLocationManagerMXBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A class that tells the ContainerManager where to place the containers.
+ * Please note : There is *no* one-to-one correlation between metadata
+ * Locations and data Locations.
+ *
+ *  For example : A user could map all container files to a
+ *  SSD but leave data/metadata on bunch of other disks.
+ */
+public class ContainerLocationManagerImpl implements ContainerLocationManager,
+    ContainerLocationManagerMXBean {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerLocationManagerImpl.class);
+
+  private final List<ContainerStorageLocation> dataLocations;
+  private int currentIndex;
+  private final List<StorageLocation> metadataLocations;
+  private final ObjectName jmxbean;
+
+  /**
+   * Constructs a Location Manager.
+   * @param metadataLocations  - Refers to the metadataLocations
+   * where we store the container metadata.
+   * @param dataDirs - metadataLocations where we store the actual
+   * data or chunk files.
+   * @param conf - configuration.
+   * @throws IOException
+   */
+  public ContainerLocationManagerImpl(List<StorageLocation> metadataLocations,
+      List<StorageLocation> dataDirs, Configuration conf)
+      throws IOException {
+    dataLocations = new LinkedList<>();
+    for (StorageLocation dataDir : dataDirs) {
+      dataLocations.add(new ContainerStorageLocation(dataDir, conf));
+    }
+    this.metadataLocations = metadataLocations;
+    jmxbean = MBeans.register("OzoneDataNode",
+        ContainerLocationManager.class.getSimpleName(), this);
+  }
+
+  /**
+   * Returns the path where the container should be placed from a set of
+   * metadataLocations.
+   *
+   * @return A path where we should place this container and metadata.
+   * @throws IOException
+   */
+  @Override
+  public Path getContainerPath()
+      throws IOException {
+    Preconditions.checkState(metadataLocations.size() > 0);
+    int index = currentIndex % metadataLocations.size();
+    return Paths.get(metadataLocations.get(index).getNormalizedUri());
+  }
+
+  /**
+   * Returns the path where the container Data file are stored.
+   *
+   * @return  a path where we place the LevelDB and data files of a container.
+   * @throws IOException
+   */
+  @Override
+  public Path getDataPath(String containerName) throws IOException {
+    Path currentPath = Paths.get(
+        dataLocations.get(currentIndex++ % dataLocations.size())
+            .getNormalizedUri());
+    currentPath = currentPath.resolve(OzoneConsts.CONTAINER_PREFIX);
+    return currentPath.resolve(containerName);
+  }
+
+  @Override
+  public StorageLocationReport[] getLocationReport() throws IOException {
+    StorageLocationReport[] reports =
+        new StorageLocationReport[dataLocations.size()];
+    for (int idx = 0; idx < dataLocations.size(); idx++) {
+      ContainerStorageLocation loc = dataLocations.get(idx);
+      long scmUsed = 0;
+      long remaining = 0;
+      try {
+        scmUsed = loc.getScmUsed();
+        remaining = loc.getAvailable();
+      } catch (IOException ex) {
+        LOG.warn("Failed to get scmUsed and remaining for container " +
+            "storage location {}", loc.getNormalizedUri());
+        // reset scmUsed and remaining if df/du failed.
+        scmUsed = 0;
+        remaining = 0;
+      }
+
+      // TODO: handle failed storage
+      // For now, include storage report for location that failed to get df/du.
+      StorageLocationReport r = new StorageLocationReport(
+          loc.getStorageUuId(), false, loc.getCapacity(),
+          scmUsed, remaining);
+      reports[idx] = r;
+    }
+    return reports;
+  }
+
+  /**
+   * Supports clean shutdown of container location du threads.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void shutdown() throws IOException {
+    for (ContainerStorageLocation loc: dataLocations) {
+      loc.shutdown();
+    }
+    MBeans.unregister(jmxbean);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
new file mode 100644
index 0000000..5e7375c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.common.interfaces
+    .ContainerDeletionChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.interfaces
+    .ContainerLocationManager;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.interfaces
+    .ContainerReportManager;
+import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.DigestInputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .CONTAINER_EXISTS;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .CONTAINER_NOT_FOUND;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .ERROR_IN_COMPACT_DB;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .INVALID_CONFIG;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .IO_EXCEPTION;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .NO_SUCH_ALGORITHM;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .UNABLE_TO_READ_METADATA_DB;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .UNCLOSED_CONTAINER_IO;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .UNSUPPORTED_REQUEST;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META;
+
+/**
+ * A Generic ContainerManagerImpl that will be called from Ozone
+ * ContainerManagerImpl. This allows us to support delta changes to ozone
+ * version without having to rewrite the containerManager.
+ */
+public class ContainerManagerImpl implements ContainerManager {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ContainerManagerImpl.class);
+
+  private final ConcurrentSkipListMap<String, ContainerStatus>
+      containerMap = new ConcurrentSkipListMap<>();
+
+  // Use a non-fair RW lock for better throughput, we may revisit this decision
+  // if this causes fairness issues.
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private ContainerLocationManager locationManager;
+  private ChunkManager chunkManager;
+  private KeyManager keyManager;
+  private Configuration conf;
+  private DatanodeDetails datanodeDetails;
+
+  private ContainerDeletionChoosingPolicy containerDeletionChooser;
+  private ContainerReportManager containerReportManager;
+
+  /**
+   * Init call that sets up a container Manager.
+   *
+   * @param config - Configuration.
+   * @param containerDirs - List of Metadata Container locations.
+   * @param dnDetails - DatanodeDetails.
+   * @throws IOException
+   */
+  @Override
+  public void init(
+      Configuration config, List<StorageLocation> containerDirs,
+      DatanodeDetails dnDetails) throws IOException {
+    Preconditions.checkNotNull(config, "Config must not be null");
+    Preconditions.checkNotNull(containerDirs, "Container directories cannot " +
+        "be null");
+    Preconditions.checkNotNull(dnDetails, "Datanode Details cannot " +
+        "be null");
+
+    Preconditions.checkState(containerDirs.size() > 0, "Number of container" +
+        " directories must be greater than zero.");
+
+    this.conf = config;
+    this.datanodeDetails = dnDetails;
+
+    readLock();
+    try {
+      containerDeletionChooser = ReflectionUtils.newInstance(conf.getClass(
+          ScmConfigKeys.OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY,
+          TopNOrderedContainerDeletionChoosingPolicy.class,
+          ContainerDeletionChoosingPolicy.class), conf);
+
+      for (StorageLocation path : containerDirs) {
+        File directory = Paths.get(path.getNormalizedUri()).toFile();
+        if (!directory.exists() && !directory.mkdirs()) {
+          LOG.error("Container metadata directory doesn't exist "
+              + "and cannot be created. Path: {}", path.toString());
+          throw new StorageContainerException("Container metadata "
+              + "directory doesn't exist and cannot be created " + path
+              .toString(), INVALID_CONFIG);
+        }
+
+        // TODO: This will fail if any directory is invalid.
+        // We should fix this to handle invalid directories and continue.
+        // Leaving it this way to fail fast for time being.
+        if (!directory.isDirectory()) {
+          LOG.error("Invalid path to container metadata directory. path: {}",
+              path.toString());
+          throw new StorageContainerException("Invalid path to container " +
+              "metadata directory." + path, INVALID_CONFIG);
+        }
+        LOG.info("Loading containers under {}", path);
+        File[] files = directory.listFiles(new ContainerFilter());
+        if (files != null) {
+          for (File containerFile : files) {
+            LOG.debug("Loading container {}", containerFile);
+            String containerPath =
+                ContainerUtils.getContainerNameFromFile(containerFile);
+            Preconditions.checkNotNull(containerPath, "Container path cannot" +
+                " be null");
+            readContainerInfo(containerPath);
+          }
+        }
+      }
+
+      List<StorageLocation> dataDirs = new LinkedList<>();
+      for (String dir : config.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
+        StorageLocation location = StorageLocation.parse(dir);
+        dataDirs.add(location);
+      }
+      this.locationManager =
+          new ContainerLocationManagerImpl(containerDirs, dataDirs, config);
+
+      this.containerReportManager =
+          new ContainerReportManagerImpl(config);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * Reads the Container Info from a file and verifies that checksum match. If
+   * the checksums match, then that file is added to containerMap.
+   *
+   * @param containerName - Name which points to the persisted container.
+   * @throws StorageContainerException
+   */
+  private void readContainerInfo(String containerName)
+      throws StorageContainerException {
+    Preconditions.checkState(containerName.length() > 0,
+        "Container name length cannot be zero.");
+    FileInputStream containerStream = null;
+    DigestInputStream dis = null;
+    FileInputStream metaStream = null;
+    Path cPath = Paths.get(containerName).getFileName();
+    String keyName = null;
+    if (cPath != null) {
+      keyName = cPath.toString();
+    }
+    Preconditions.checkNotNull(keyName,
+        "Container Name  to container key mapping is null");
+
+    try {
+      String containerFileName = containerName.concat(CONTAINER_EXTENSION);
+      String metaFileName = containerName.concat(CONTAINER_META);
+
+      containerStream = new FileInputStream(containerFileName);
+
+      metaStream = new FileInputStream(metaFileName);
+
+      MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+
+      dis = new DigestInputStream(containerStream, sha);
+
+      ContainerProtos.ContainerData containerDataProto =
+          ContainerProtos.ContainerData.parseDelimitedFrom(dis);
+      ContainerData containerData;
+      if (containerDataProto == null) {
+        // Sometimes container metadata might have been created but empty,
+        // when loading the info we get a null, this often means last time
+        // SCM was ending up at some middle phase causing that the metadata
+        // was not populated. Such containers are marked as inactive.
+        containerMap.put(keyName, new ContainerStatus(null));
+        return;
+      }
+      containerData = ContainerData.getFromProtBuf(containerDataProto, conf);
+      ContainerProtos.ContainerMeta meta =
+          ContainerProtos.ContainerMeta.parseDelimitedFrom(metaStream);
+      if (meta != null && !DigestUtils.sha256Hex(sha.digest())
+          .equals(meta.getHash())) {
+        // This means we were not able read data from the disk when booted the
+        // datanode. We are going to rely on SCM understanding that we don't
+        // have valid data for this container when we send container reports.
+        // Hopefully SCM will ask us to delete this container and rebuild it.
+        LOG.error("Invalid SHA found for container data. Name :{}"
+            + "cowardly refusing to read invalid data", containerName);
+        containerMap.put(keyName, new ContainerStatus(null));
+        return;
+      }
+
+      ContainerStatus containerStatus = new ContainerStatus(containerData);
+      // Initialize pending deletion blocks count in in-memory
+      // container status.
+      MetadataStore metadata = KeyUtils.getDB(containerData, conf);
+      List<Map.Entry<byte[], byte[]>> underDeletionBlocks = metadata
+          .getSequentialRangeKVs(null, Integer.MAX_VALUE,
+              MetadataKeyFilters.getDeletingKeyFilter());
+      containerStatus.incrPendingDeletionBlocks(underDeletionBlocks.size());
+
+      List<Map.Entry<byte[], byte[]>> liveKeys = metadata
+          .getRangeKVs(null, Integer.MAX_VALUE,
+              MetadataKeyFilters.getNormalKeyFilter());
+
+      // Get container bytesUsed upon loading container
+      // The in-memory state is updated upon key write or delete
+      // TODO: update containerDataProto and persist it into container MetaFile
+      long bytesUsed = 0;
+      bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
+        KeyData keyData;
+        try {
+          keyData = KeyUtils.getKeyData(e.getValue());
+          return keyData.getSize();
+        } catch (IOException ex) {
+          return 0L;
+        }
+      }).sum();
+      containerStatus.setBytesUsed(bytesUsed);
+
+      containerMap.put(keyName, containerStatus);
+    } catch (IOException | NoSuchAlgorithmException ex) {
+      LOG.error("read failed for file: {} ex: {}", containerName,
+          ex.getMessage());
+
+      // TODO : Add this file to a recovery Queue.
+
+      // Remember that this container is busted and we cannot use it.
+      containerMap.put(keyName, new ContainerStatus(null));
+      throw new StorageContainerException("Unable to read container info",
+          UNABLE_TO_READ_METADATA_DB);
+    } finally {
+      IOUtils.closeStream(dis);
+      IOUtils.closeStream(containerStream);
+      IOUtils.closeStream(metaStream);
+    }
+  }
+
+  /**
+   * Creates a container with the given name.
+   *
+   * @param pipeline -- Nodes which make up this container.
+   * @param containerData - Container Name and metadata.
+   * @throws StorageContainerException - Exception
+   */
+  @Override
+  public void createContainer(Pipeline pipeline, ContainerData containerData)
+      throws StorageContainerException {
+    Preconditions.checkNotNull(containerData, "Container data cannot be null");
+    writeLock();
+    try {
+      if (containerMap.containsKey(containerData.getName())) {
+        LOG.debug("container already exists. {}", containerData.getName());
+        throw new StorageContainerException("container already exists.",
+            CONTAINER_EXISTS);
+      }
+
+      // This is by design. We first write and close the
+      // container Info and metadata to a directory.
+      // Then read back and put that info into the containerMap.
+      // This allows us to make sure that our write is consistent.
+
+      writeContainerInfo(containerData, false);
+      File cFile = new File(containerData.getContainerPath());
+      readContainerInfo(ContainerUtils.getContainerNameFromFile(cFile));
+    } catch (NoSuchAlgorithmException ex) {
+      LOG.error("Internal error: We seem to be running a JVM without a " +
+          "needed hash algorithm.");
+      throw new StorageContainerException("failed to create container",
+          NO_SUCH_ALGORITHM);
+    } finally {
+      writeUnlock();
+    }
+
+  }
+
+  /**
+   * Writes a container to a chosen location and updates the container Map.
+   *
+   * The file formats of ContainerData and Container Meta is the following.
+   *
+   * message ContainerData {
+   * required string name = 1;
+   * repeated KeyValue metadata = 2;
+   * optional string dbPath = 3;
+   * optional string containerPath = 4;
+   * optional int64 bytesUsed = 5;
+   * optional int64 size = 6;
+   * }
+   *
+   * message ContainerMeta {
+   * required string fileName = 1;
+   * required string hash = 2;
+   * }
+   *
+   * @param containerData - container Data
+   * @param overwrite - Whether we are overwriting.
+   * @throws StorageContainerException, NoSuchAlgorithmException
+   */
+  private void writeContainerInfo(ContainerData containerData,
+      boolean  overwrite)
+      throws StorageContainerException, NoSuchAlgorithmException {
+
+    Preconditions.checkNotNull(this.locationManager,
+        "Internal error: location manager cannot be null");
+
+    FileOutputStream containerStream = null;
+    DigestOutputStream dos = null;
+    FileOutputStream metaStream = null;
+
+    try {
+      Path metadataPath = null;
+      Path location = (!overwrite) ? locationManager.getContainerPath():
+          Paths.get(containerData.getContainerPath()).getParent();
+      if (location == null) {
+        throw new StorageContainerException(
+            "Failed to get container file path.",
+            CONTAINER_INTERNAL_ERROR);
+      }
+
+      File containerFile = ContainerUtils.getContainerFile(containerData,
+          location);
+      File metadataFile = ContainerUtils.getMetadataFile(containerData,
+          location);
+      String containerName = containerData.getContainerName();
+
+      if(!overwrite) {
+        ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
+        metadataPath = this.locationManager.getDataPath(containerName);
+        metadataPath = ContainerUtils.createMetadata(metadataPath,
+            containerName, conf);
+      }  else {
+        metadataPath = ContainerUtils.getMetadataDirectory(containerData);
+      }
+
+      containerStream = new FileOutputStream(containerFile);
+      metaStream = new FileOutputStream(metadataFile);
+      MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+
+      dos = new DigestOutputStream(containerStream, sha);
+      containerData.setDBPath(metadataPath.resolve(
+          ContainerUtils.getContainerDbFileName(containerName))
+          .toString());
+      containerData.setContainerPath(containerFile.toString());
+
+      ContainerProtos.ContainerData protoData = containerData
+          .getProtoBufMessage();
+      protoData.writeDelimitedTo(dos);
+
+      ContainerProtos.ContainerMeta protoMeta = ContainerProtos
+          .ContainerMeta.newBuilder()
+          .setFileName(containerFile.toString())
+          .setHash(DigestUtils.sha256Hex(sha.digest()))
+          .build();
+      protoMeta.writeDelimitedTo(metaStream);
+
+    } catch (IOException ex) {
+      // TODO : we need to clean up partially constructed files
+      // The proper way to do would be for a thread
+      // to read all these 3 artifacts and make sure they are
+      // sane. That info needs to come from the replication
+      // pipeline, and if not consistent delete these file.
+
+      // In case of ozone this is *not* a deal breaker since
+      // SCM is guaranteed to generate unique container names.
+      // The saving grace is that we check if we have residue files
+      // lying around when creating a new container. We need to queue
+      // this information to a cleaner thread.
+
+      LOG.error("Creation of container failed. Name: {}, we might need to " +
+              "cleanup partially created artifacts. ",
+          containerData.getContainerName(), ex);
+      throw new StorageContainerException("Container creation failed. ",
+          ex, CONTAINER_INTERNAL_ERROR);
+    } finally {
+      IOUtils.closeStream(dos);
+      IOUtils.closeStream(containerStream);
+      IOUtils.closeStream(metaStream);
+    }
+  }
+
+  /**
+   * Deletes an existing container.
+   *
+   * @param pipeline - nodes that make this container.
+   * @param containerName - name of the container.
+   * @param forceDelete - whether this container should be deleted forcibly.
+   * @throws StorageContainerException
+   */
+  @Override
+  public void deleteContainer(Pipeline pipeline, String containerName,
+      boolean forceDelete) throws StorageContainerException {
+    Preconditions.checkNotNull(containerName, "Container name cannot be null");
+    Preconditions.checkState(containerName.length() > 0,
+        "Container name length cannot be zero.");
+    writeLock();
+    try {
+      if (isOpen(pipeline.getContainerName())) {
+        throw new StorageContainerException(
+            "Deleting an open container is not allowed.",
+            UNCLOSED_CONTAINER_IO);
+      }
+
+      ContainerStatus status = containerMap.get(containerName);
+      if (status == null) {
+        LOG.debug("No such container. Name: {}", containerName);
+        throw new StorageContainerException("No such container. Name : " +
+            containerName, CONTAINER_NOT_FOUND);
+      }
+      if (status.getContainer() == null) {
+        LOG.debug("Invalid container data. Name: {}", containerName);
+        throw new StorageContainerException("Invalid container data. Name : " +
+            containerName, CONTAINER_NOT_FOUND);
+      }
+      ContainerUtils.removeContainer(status.getContainer(), conf, forceDelete);
+      containerMap.remove(containerName);
+    } catch (StorageContainerException e) {
+      throw e;
+    } catch (IOException e) {
+      // TODO : An I/O error during delete can leave partial artifacts on the
+      // disk. We will need the cleaner thread to cleanup this information.
+      LOG.error("Failed to cleanup container. Name: {}", containerName, e);
+      throw new StorageContainerException(containerName, e, IO_EXCEPTION);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /**
+   * A simple interface for container Iterations.
+   * <p/>
+   * This call make no guarantees about consistency of the data between
+   * different list calls. It just returns the best known data at that point of
+   * time. It is possible that using this iteration you can miss certain
+   * container from the listing.
+   *
+   * @param prefix -  Return keys that match this prefix.
+   * @param count - how many to return
+   * @param prevKey - Previous Key Value or empty String.
+   * @param data - Actual containerData
+   * @throws StorageContainerException
+   */
+  @Override
+  public void listContainer(String prefix, long count, String prevKey,
+      List<ContainerData> data) throws StorageContainerException {
+    // TODO : Support list with Prefix and PrevKey
+    Preconditions.checkNotNull(data,
+        "Internal assertion: data cannot be null");
+    readLock();
+    try {
+      ConcurrentNavigableMap<String, ContainerStatus> map;
+      if (prevKey == null || prevKey.isEmpty()) {
+        map = containerMap.tailMap(containerMap.firstKey(), true);
+      } else {
+        map = containerMap.tailMap(prevKey, false);
+      }
+
+      int currentCount = 0;
+      for (ContainerStatus entry : map.values()) {
+        if (currentCount < count) {
+          data.add(entry.getContainer());
+          currentCount++;
+        } else {
+          return;
+        }
+      }
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * Get metadata about a specific container.
+   *
+   * @param containerName - Name of the container
+   * @return ContainerData - Container Data.
+   * @throws StorageContainerException
+   */
+  @Override
+  public ContainerData readContainer(String containerName) throws
+      StorageContainerException {
+    Preconditions.checkNotNull(containerName, "Container name cannot be null");
+    Preconditions.checkState(containerName.length() > 0,
+        "Container name length cannot be zero.");
+    if (!containerMap.containsKey(containerName)) {
+      throw new StorageContainerException("Unable to find the container. Name: 
"
+          + containerName, CONTAINER_NOT_FOUND);
+    }
+    ContainerData cData = containerMap.get(containerName).getContainer();
+    if (cData == null) {
+      throw new StorageContainerException("Invalid container data. Name: "
+          + containerName, CONTAINER_INTERNAL_ERROR);
+    }
+    return cData;
+  }
+
+  /**
+   * Closes a open container, if it is already closed or does not exist a
+   * StorageContainerException is thrown.
+   *
+   * @param containerName - Name of the container.
+   * @throws StorageContainerException
+   */
+  @Override
+  public void closeContainer(String containerName)
+      throws StorageContainerException, NoSuchAlgorithmException {
+    ContainerData containerData = readContainer(containerName);
+    containerData.closeContainer();
+    writeContainerInfo(containerData, true);
+    MetadataStore db = KeyUtils.getDB(containerData, conf);
+
+    // It is ok if this operation takes a bit of time.
+    // Close container is not expected to be instantaneous.
+    try {
+      db.compactDB();
+    } catch (IOException e) {
+      LOG.error("Error in DB compaction while closing container", e);
+      throw new StorageContainerException(e, ERROR_IN_COMPACT_DB);
+    }
+
+    // Active is different from closed. Closed means it is immutable, active
+    // false means we have some internal error that is happening to this
+    // container. This is a way to track damaged containers if we have an
+    // I/O failure, this allows us to take quick action in case of container
+    // issues.
+
+    ContainerStatus status = new ContainerStatus(containerData);
+    containerMap.put(containerName, status);
+  }
+
+  @Override
+  public void updateContainer(Pipeline pipeline, String containerName,
+      ContainerData data, boolean forceUpdate)
+      throws StorageContainerException {
+    Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
+    Preconditions.checkNotNull(containerName, "Container name cannot be null");
+    Preconditions.checkNotNull(data, "Container data cannot be null");
+    FileOutputStream containerStream = null;
+    DigestOutputStream dos = null;
+    MessageDigest sha = null;
+    File containerFileBK = null, containerFile = null;
+    boolean deleted = false;
+
+    if(!containerMap.containsKey(containerName)) {
+      throw new StorageContainerException("Container doesn't exist. Name :"
+          + containerName, CONTAINER_NOT_FOUND);
+    }
+
+    try {
+      sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    } catch (NoSuchAlgorithmException e) {
+      throw new StorageContainerException("Unable to create Message Digest,"
+          + " usually this is a java configuration issue.",
+          NO_SUCH_ALGORITHM);
+    }
+
+    try {
+      Path location = locationManager.getContainerPath();
+      ContainerData orgData = containerMap.get(containerName).getContainer();
+      if (orgData == null) {
+        // updating a invalid container
+        throw new StorageContainerException("Update a container with invalid" +
+            "container meta data", CONTAINER_INTERNAL_ERROR);
+      }
+
+      if (!forceUpdate && !orgData.isOpen()) {
+        throw new StorageContainerException(
+            "Update a closed container is not allowed. Name: " + containerName,
+            UNSUPPORTED_REQUEST);
+      }
+
+      containerFile = ContainerUtils.getContainerFile(orgData, location);
+      // If forceUpdate is true, there is no need to check
+      // whether the container file exists.
+      if (!forceUpdate) {
+        if (!containerFile.exists() || !containerFile.canWrite()) {
+          throw new StorageContainerException(
+              "Container file not exists or corrupted. Name: " + containerName,
+              CONTAINER_INTERNAL_ERROR);
+        }
+
+        // Backup the container file
+        containerFileBK = File.createTempFile(
+            "tmp_" + System.currentTimeMillis() + "_",
+            containerFile.getName(), containerFile.getParentFile());
+        FileUtils.copyFile(containerFile, containerFileBK);
+
+        deleted = containerFile.delete();
+        containerStream = new FileOutputStream(containerFile);
+        dos = new DigestOutputStream(containerStream, sha);
+
+        ContainerProtos.ContainerData protoData = data.getProtoBufMessage();
+        protoData.writeDelimitedTo(dos);
+      }
+
+      // Update the in-memory map
+      ContainerStatus newStatus = new ContainerStatus(data);
+      containerMap.replace(containerName, newStatus);
+    } catch (IOException e) {
+      // Restore the container file from backup
+      if(containerFileBK != null && containerFileBK.exists() && deleted) {
+        if(containerFile.delete()
+            && containerFileBK.renameTo(containerFile)) {
+          throw new StorageContainerException("Container update failed,"
+              + " container data restored from the backup.",
+              CONTAINER_INTERNAL_ERROR);
+        } else {
+          throw new StorageContainerException(
+              "Failed to restore container data from the backup. Name: "
+                  + containerName, CONTAINER_INTERNAL_ERROR);
+        }
+      } else {
+        throw new StorageContainerException(
+            e.getMessage(), CONTAINER_INTERNAL_ERROR);
+      }
+    } finally {
+      if (containerFileBK != null && containerFileBK.exists()) {
+        if(!containerFileBK.delete()) {
+          LOG.warn("Unable to delete container file backup : {}.",
+              containerFileBK.getAbsolutePath());
+        }
+      }
+      IOUtils.closeStream(dos);
+      IOUtils.closeStream(containerStream);
+    }
+  }
+
+  @VisibleForTesting
+  protected File getContainerFile(ContainerData data) throws IOException {
+    return ContainerUtils.getContainerFile(data,
+        this.locationManager.getContainerPath());
+  }
+
+  /**
+   * Checks if a container exists.
+   *
+   * @param containerName - Name of the container.
+   * @return true if the container is open false otherwise.
+   * @throws StorageContainerException - Throws Exception if we are not able to
+   *                                   find the container.
+   */
+  @Override
+  public boolean isOpen(String containerName) throws StorageContainerException 
{
+    final ContainerStatus status = containerMap.get(containerName);
+    if (status == null) {
+      throw new StorageContainerException(
+          "Container status not found: " + containerName, CONTAINER_NOT_FOUND);
+    }
+    final ContainerData cData = status.getContainer();
+    if (cData == null) {
+      throw new StorageContainerException(
+          "Container not found: " + containerName, CONTAINER_NOT_FOUND);
+    }
+    return cData.isOpen();
+  }
+
+  /**
+   * Supports clean shutdown of container.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void shutdown() throws IOException {
+    Preconditions.checkState(this.hasWriteLock(),
+        "Assumption that we are holding the lock violated.");
+    this.containerMap.clear();
+    this.locationManager.shutdown();
+  }
+
+
+  @VisibleForTesting
+  public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
+    return containerMap;
+  }
+
+  /**
+   * Acquire read lock.
+   */
+  @Override
+  public void readLock() {
+    this.lock.readLock().lock();
+
+  }
+
+  @Override
+  public void readLockInterruptibly() throws InterruptedException {
+    this.lock.readLock().lockInterruptibly();
+  }
+
+  /**
+   * Release read lock.
+   */
+  @Override
+  public void readUnlock() {
+    this.lock.readLock().unlock();
+  }
+
+  /**
+   * Check if the current thread holds read lock.
+   */
+  @Override
+  public boolean hasReadLock() {
+    return this.lock.readLock().tryLock();
+  }
+
+  /**
+   * Acquire write lock.
+   */
+  @Override
+  public void writeLock() {
+    this.lock.writeLock().lock();
+  }
+
+  /**
+   * Acquire write lock, unless interrupted while waiting.
+   */
+  @Override
+  public void writeLockInterruptibly() throws InterruptedException {
+    this.lock.writeLock().lockInterruptibly();
+
+  }
+
+  /**
+   * Release write lock.
+   */
+  @Override
+  public void writeUnlock() {
+    this.lock.writeLock().unlock();
+
+  }
+
+  /**
+   * Check if the current thread holds write lock.
+   */
+  @Override
+  public boolean hasWriteLock() {
+    return this.lock.writeLock().isHeldByCurrentThread();
+  }
+
+  public ChunkManager getChunkManager() {
+    return this.chunkManager;
+  }
+
+  /**
+   * Sets the chunk Manager.
+   *
+   * @param chunkManager - Chunk Manager
+   */
+  public void setChunkManager(ChunkManager chunkManager) {
+    this.chunkManager = chunkManager;
+  }
+
+  /**
+   * Gets the Key Manager.
+   *
+   * @return KeyManager.
+   */
+  @Override
+  public KeyManager getKeyManager() {
+    return this.keyManager;
+  }
+
+  /**
+   * Get the node report.
+   * @return node report.
+   */
+  @Override
+  public SCMNodeReport getNodeReport() throws IOException {
+    StorageLocationReport[] reports = locationManager.getLocationReport();
+    SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+    for (int i = 0; i < reports.length; i++) {
+      SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+      nrb.addStorageReport(i, srb.setStorageUuid(reports[i].getId())
+          .setCapacity(reports[i].getCapacity())
+          .setScmUsed(reports[i].getScmUsed())
+          .setRemaining(reports[i].getRemaining())
+          .build());
+    }
+    return nrb.build();
+  }
+
+
+  /**
+   * Gets container reports.
+   *
+   * @return List of all closed containers.
+   * @throws IOException
+   */
+  @Override
+  public List<ContainerData> getContainerReports() throws IOException {
+    LOG.debug("Starting container report iteration.");
+    // No need for locking since containerMap is a ConcurrentSkipListMap
+    // And we can never get the exact state since close might happen
+    // after we iterate a point.
+    return containerMap.entrySet().stream()
+        .filter(containerStatus ->
+            !containerStatus.getValue().getContainer().isOpen())
+        .map(containerStatus -> containerStatus.getValue().getContainer())
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get container report.
+   *
+   * @return The container report.
+   * @throws IOException
+   */
+  @Override
+  public ContainerReportsRequestProto getContainerReport() throws IOException {
+    LOG.debug("Starting container report iteration.");
+    // No need for locking since containerMap is a ConcurrentSkipListMap
+    // And we can never get the exact state since close might happen
+    // after we iterate a point.
+    List<ContainerStatus> containers = containerMap.values().stream()
+        .collect(Collectors.toList());
+
+    ContainerReportsRequestProto.Builder crBuilder =
+        ContainerReportsRequestProto.newBuilder();
+
+    // TODO: support delta based container report
+    crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
+        .setType(ContainerReportsRequestProto.reportType.fullReport);
+
+    for (ContainerStatus container: containers) {
+      StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
+          StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
+      ciBuilder.setContainerName(container.getContainer().getContainerName())
+          .setSize(container.getContainer().getMaxSize())
+          .setUsed(container.getContainer().getBytesUsed())
+          .setKeyCount(container.getContainer().getKeyCount())
+          .setReadCount(container.getReadCount())
+          .setWriteCount(container.getWriteCount())
+          .setReadBytes(container.getReadBytes())
+          .setWriteBytes(container.getWriteBytes())
+          .setContainerID(container.getContainer().getContainerID());
+
+      if (container.getContainer().getHash() != null) {
+        ciBuilder.setFinalhash(container.getContainer().getHash());
+      }
+      crBuilder.addReports(ciBuilder.build());
+    }
+
+    return crBuilder.build();
+  }
+
+  /**
+   * Sets the Key Manager.
+   *
+   * @param keyManager - Key Manager.
+   */
+  @Override
+  public void setKeyManager(KeyManager keyManager) {
+    this.keyManager = keyManager;
+  }
+
+  /**
+   * Filter out only container files from the container metadata dir.
+   */
+  private static class ContainerFilter implements FilenameFilter {
+    /**
+     * Tests if a specified file should be included in a file list.
+     *
+     * @param dir the directory in which the file was found.
+     * @param name the name of the file.
+     * @return <code>true</code> if and only if the name should be included in
+     * the file list; <code>false</code> otherwise.
+     */
+    @Override
+    public boolean accept(File dir, String name) {
+      return name.endsWith(CONTAINER_EXTENSION);
+    }
+  }
+
+  @Override
+  public List<ContainerData> chooseContainerForBlockDeletion(
+      int count) throws StorageContainerException {
+    readLock();
+    try {
+      return containerDeletionChooser.chooseContainerForBlockDeletion(
+          count, containerMap);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  @VisibleForTesting
+  public ContainerDeletionChoosingPolicy getContainerDeletionChooser() {
+    return containerDeletionChooser;
+  }
+
+  @Override
+  public void incrPendingDeletionBlocks(int numBlocks, String containerId) {
+    writeLock();
+    try {
+      ContainerStatus status = containerMap.get(containerId);
+      status.incrPendingDeletionBlocks(numBlocks);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  @Override
+  public void decrPendingDeletionBlocks(int numBlocks, String containerId) {
+    writeLock();
+    try {
+      ContainerStatus status = containerMap.get(containerId);
+      status.decrPendingDeletionBlocks(numBlocks);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Increase the read count of the container.
+   *
+   * @param containerName - Name of the container.
+   */
+  @Override
+  public void incrReadCount(String containerName) {
+    ContainerStatus status = containerMap.get(containerName);
+    status.incrReadCount();
+  }
+
+  public long getReadCount(String containerName) {
+    ContainerStatus status = containerMap.get(containerName);
+    return status.getReadCount();
+  }
+
+  /**
+   * Increse the read counter for bytes read from the container.
+   *
+   * @param containerName - Name of the container.
+   * @param readBytes     - bytes read from the container.
+   */
+  @Override
+  public void incrReadBytes(String containerName, long readBytes) {
+    ContainerStatus status = containerMap.get(containerName);
+    status.incrReadBytes(readBytes);
+  }
+
+  public long getReadBytes(String containerName) {
+    readLock();
+    try {
+      ContainerStatus status = containerMap.get(containerName);
+      return status.getReadBytes();
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * Increase the write count of the container.
+   *
+   * @param containerName - Name of the container.
+   */
+  @Override
+  public void incrWriteCount(String containerName) {
+    ContainerStatus status = containerMap.get(containerName);
+    status.incrWriteCount();
+  }
+
+  public long getWriteCount(String containerName) {
+    ContainerStatus status = containerMap.get(containerName);
+    return status.getWriteCount();
+  }
+
+  /**
+   * Increse the write counter for bytes write into the container.
+   *
+   * @param containerName - Name of the container.
+   * @param writeBytes    - bytes write into the container.
+   */
+  @Override
+  public void incrWriteBytes(String containerName, long writeBytes) {
+    ContainerStatus status = containerMap.get(containerName);
+    status.incrWriteBytes(writeBytes);
+  }
+
+  public long getWriteBytes(String containerName) {
+    ContainerStatus status = containerMap.get(containerName);
+    return status.getWriteBytes();
+  }
+
+  /**
+   * Increase the bytes used by the container.
+   *
+   * @param containerName - Name of the container.
+   * @param used          - additional bytes used by the container.
+   * @return the current bytes used.
+   */
+  @Override
+  public long incrBytesUsed(String containerName, long used) {
+    ContainerStatus status = containerMap.get(containerName);
+    return status.incrBytesUsed(used);
+  }
+
+  /**
+   * Decrease the bytes used by the container.
+   *
+   * @param containerName - Name of the container.
+   * @param used          - additional bytes reclaimed by the container.
+   * @return the current bytes used.
+   */
+  @Override
+  public long decrBytesUsed(String containerName, long used) {
+    ContainerStatus status = containerMap.get(containerName);
+    return status.decrBytesUsed(used);
+  }
+
+  public long getBytesUsed(String containerName) {
+    ContainerStatus status = containerMap.get(containerName);
+    return status.getBytesUsed();
+  }
+
+  /**
+   * Get the number of keys in the container.
+   *
+   * @param containerName - Name of the container.
+   * @return the current key count.
+   */
+  @Override
+  public long getNumKeys(String containerName) {
+    ContainerStatus status = containerMap.get(containerName);
+    return status.getNumKeys();  }
+
+  /**
+   * Get the container report state to send via HB to SCM.
+   *
+   * @return container report state.
+   */
+  @Override
+  public ReportState getContainerReportState() {
+    return containerReportManager.getContainerReportState();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
new file mode 100644
index 0000000..6c83c66
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.interfaces
+    .ContainerReportManager;
+import org.apache.hadoop.util.Time;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
+
+/**
+ * Class wraps the container report operations on datanode.
+ * // TODO: support incremental/delta container report
+ */
+public class ContainerReportManagerImpl implements ContainerReportManager {
+  // Last non-empty container report time
+  private long lastContainerReportTime;
+  private final long containerReportInterval;
+  private final long heartbeatInterval;
+  private AtomicLong reportCount;
+  private static final ReportState NO_CONTAINER_REPORTSTATE =
+      ReportState.newBuilder()
+          .setState(ReportState.states.noContainerReports)
+          .setCount(0).build();
+
+  public ContainerReportManagerImpl(Configuration config) {
+    this.lastContainerReportTime = -1;
+    this.reportCount = new AtomicLong(0L);
+    this.containerReportInterval = config.getTimeDuration(
+        OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL,
+        OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.heartbeatInterval = getScmHeartbeatInterval(config);
+  }
+
+  public ReportState getContainerReportState() {
+    if (lastContainerReportTime < 0) {
+      return getFullContainerReportState();
+    } else {
+      // Add a random delay (0~30s) on top of the container report
+      // interval (60s) so tha the SCM is overwhelmed by the container reports
+      // sent in sync.
+      if (Time.monotonicNow() - lastContainerReportTime >
+          (containerReportInterval + getRandomReportDelay())) {
+        return getFullContainerReportState();
+      } else {
+        return getNoContainerReportState();
+      }
+    }
+  }
+
+  private ReportState getFullContainerReportState() {
+    ReportState.Builder rsBuilder = ReportState.newBuilder();
+    rsBuilder.setState(ReportState.states.completeContinerReport);
+    rsBuilder.setCount(reportCount.incrementAndGet());
+    this.lastContainerReportTime = Time.monotonicNow();
+    return rsBuilder.build();
+  }
+
+  private ReportState getNoContainerReportState() {
+    return NO_CONTAINER_REPORTSTATE;
+  }
+
+  private long getRandomReportDelay() {
+    return RandomUtils.nextLong(0, heartbeatInterval);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java
new file mode 100644
index 0000000..5577323
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class represents the state of a container. if the
+ * container reading encountered an error when we boot up we will post that
+ * info to a recovery queue and keep the info in the containerMap.
+ * <p/>
+ * if and when the issue is fixed, the expectation is that this entry will be
+ * deleted by the recovery thread from the containerMap and will insert entry
+ * instead of modifying this class.
+ */
+public class ContainerStatus {
+  private final ContainerData containerData;
+
+  /**
+   * Number of pending deletion blocks in container.
+   */
+  private int numPendingDeletionBlocks;
+
+  private AtomicLong readBytes;
+
+  private AtomicLong writeBytes;
+
+  private AtomicLong readCount;
+
+  private AtomicLong writeCount;
+
+  /**
+   * Creates a Container Status class.
+   *
+   * @param containerData - ContainerData.
+   */
+  ContainerStatus(ContainerData containerData) {
+    this.numPendingDeletionBlocks = 0;
+    this.containerData = containerData;
+    this.readCount = new AtomicLong(0L);
+    this.readBytes =  new AtomicLong(0L);
+    this.writeCount =  new AtomicLong(0L);
+    this.writeBytes =  new AtomicLong(0L);
+  }
+
+  /**
+   * Returns container if it is active. It is not active if we have had an
+   * error and we are waiting for the background threads to fix the issue.
+   *
+   * @return ContainerData.
+   */
+  public ContainerData getContainer() {
+    return containerData;
+  }
+
+  /**
+   * Increase the count of pending deletion blocks.
+   *
+   * @param numBlocks increment number
+   */
+  public void incrPendingDeletionBlocks(int numBlocks) {
+    this.numPendingDeletionBlocks += numBlocks;
+  }
+
+  /**
+   * Decrease the count of pending deletion blocks.
+   *
+   * @param numBlocks decrement number
+   */
+  public void decrPendingDeletionBlocks(int numBlocks) {
+    this.numPendingDeletionBlocks -= numBlocks;
+  }
+
+  /**
+   * Get the number of pending deletion blocks.
+   */
+  public int getNumPendingDeletionBlocks() {
+    return this.numPendingDeletionBlocks;
+  }
+
+  /**
+   * Get the number of bytes read from the container.
+   * @return the number of bytes read from the container.
+   */
+  public long getReadBytes() {
+    return readBytes.get();
+  }
+
+  /**
+   * Increase the number of bytes read from the container.
+   * @param bytes number of bytes read.
+   */
+  public void incrReadBytes(long bytes) {
+    this.readBytes.addAndGet(bytes);
+  }
+
+  /**
+   * Get the number of times the container is read.
+   * @return the number of times the container is read.
+   */
+  public long getReadCount() {
+    return readCount.get();
+  }
+
+  /**
+   * Increase the number of container read count by 1.
+   */
+  public void incrReadCount() {
+    this.readCount.incrementAndGet();
+  }
+
+  /**
+   * Get the number of bytes write into the container.
+   * @return the number of bytes write into the container.
+   */
+  public long getWriteBytes() {
+    return writeBytes.get();
+  }
+
+  /**
+   * Increase the number of bytes write into the container.
+   * @param bytes the number of bytes write into the container.
+   */
+  public void incrWriteBytes(long bytes) {
+    this.writeBytes.addAndGet(bytes);
+  }
+
+  /**
+   * Get the number of writes into the container.
+   * @return the number of writes into the container.
+   */
+  public long getWriteCount() {
+    return writeCount.get();
+  }
+
+  /**
+   * Increase the number of writes into the container by 1.
+   */
+  public void incrWriteCount() {
+    this.writeCount.incrementAndGet();
+  }
+
+  /**
+   * Get the number of bytes used by the container.
+   * @return the number of bytes used by the container.
+   */
+  public long getBytesUsed() {
+    return containerData.getBytesUsed();
+  }
+
+  /**
+   * Increase the number of bytes used by the container.
+   * @param used number of bytes used by the container.
+   * @return the current number of bytes used by the container afert increase.
+   */
+  public long incrBytesUsed(long used) {
+    return containerData.addBytesUsed(used);
+  }
+
+  /**
+   * Set the number of bytes used by the container.
+   * @param used the number of bytes used by the container.
+   */
+  public void setBytesUsed(long used) {
+    containerData.setBytesUsed(used);
+  }
+
+  /**
+   * Decrease the number of bytes used by the container.
+   * @param reclaimed the number of bytes reclaimed from the container.
+   * @return the current number of bytes used by the container after decrease.
+   */
+  public long decrBytesUsed(long reclaimed) {
+    return this.containerData.addBytesUsed(-1L * reclaimed);
+  }
+
+  /**
+   * Get the maximum container size.
+   * @return the maximum container size.
+   */
+  public long getMaxSize() {
+    return containerData.getMaxSize();
+  }
+
+  /**
+   * Set the maximum container size.
+   * @param size the maximum container size.
+   */
+  public void setMaxSize(long size) {
+    this.containerData.setMaxSize(size);
+  }
+
+  /**
+   * Get the number of keys in the container.
+   * @return the number of keys in the container.
+   */
+  public long getNumKeys() {
+    return containerData.getKeyCount();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
new file mode 100644
index 0000000..7293895
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CachingGetSpaceUsed;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Scanner;
+
+import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
+
+/**
+ * Class that wraps the space usage of the Datanode Container Storage Location
+ * by SCM containers.
+ */
+public class ContainerStorageLocation {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerStorageLocation.class);
+
+  private static final String DU_CACHE_FILE = "scmUsed";
+  private volatile boolean scmUsedSaved = false;
+
+  private final StorageLocation dataLocation;
+  private final String storageUuId;
+  private final DF usage;
+  private final GetSpaceUsed scmUsage;
+  private final File scmUsedFile;
+
+  public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf)
+      throws IOException {
+    this.dataLocation = dataLoc;
+    this.storageUuId = DatanodeStorage.generateUuid();
+    File dataDir = Paths.get(dataLoc.getNormalizedUri()).resolve(
+        OzoneConsts.CONTAINER_PREFIX).toFile();
+    // Initialize container data root if it does not exist as required by DF/DU
+    if (!dataDir.exists()) {
+      if (!dataDir.mkdirs()) {
+        LOG.error("Unable to create the container storage location at : {}",
+            dataDir);
+        throw new IllegalArgumentException("Unable to create the container" +
+            " storage location at : " + dataDir);
+      }
+    }
+    scmUsedFile = new File(dataDir, DU_CACHE_FILE);
+    // get overall disk usage
+    this.usage = new DF(dataDir, conf);
+    // get SCM specific usage
+    this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir)
+        .setConf(conf)
+        .setInitialUsed(loadScmUsed())
+        .build();
+
+    // Ensure scm usage is saved during shutdown.
+    ShutdownHookManager.get().addShutdownHook(
+        new Runnable() {
+          @Override
+          public void run() {
+            if (!scmUsedSaved) {
+              saveScmUsed();
+            }
+          }
+        }, SHUTDOWN_HOOK_PRIORITY);
+  }
+
+  public URI getNormalizedUri() {
+    return dataLocation.getNormalizedUri();
+  }
+
+  public String getStorageUuId() {
+    return storageUuId;
+  }
+  public long getCapacity() {
+    long capacity = usage.getCapacity();
+    return (capacity > 0) ? capacity : 0;
+  }
+
+  public long getAvailable() throws IOException {
+    long remaining = getCapacity() - getScmUsed();
+    long available = usage.getAvailable();
+    if (remaining > available) {
+      remaining = available;
+    }
+    return (remaining > 0) ? remaining : 0;
+  }
+
+  public long getScmUsed() throws IOException{
+    return scmUsage.getUsed();
+  }
+
+  public void shutdown() {
+    saveScmUsed();
+    scmUsedSaved = true;
+
+    if (scmUsage instanceof CachingGetSpaceUsed) {
+      IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage));
+    }
+  }
+
+  /**
+   * Read in the cached DU value and return it if it is less than 600 seconds
+   * old (DU update interval). Slight imprecision of scmUsed is not critical
+   * and skipping DU can significantly shorten the startup time.
+   * If the cached value is not available or too old, -1 is returned.
+   */
+  long loadScmUsed() {
+    long cachedScmUsed;
+    long mtime;
+    Scanner sc;
+
+    try {
+      sc = new Scanner(scmUsedFile, "UTF-8");
+    } catch (FileNotFoundException fnfe) {
+      return -1;
+    }
+
+    try {
+      // Get the recorded scmUsed from the file.
+      if (sc.hasNextLong()) {
+        cachedScmUsed = sc.nextLong();
+      } else {
+        return -1;
+      }
+      // Get the recorded mtime from the file.
+      if (sc.hasNextLong()) {
+        mtime = sc.nextLong();
+      } else {
+        return -1;
+      }
+
+      // Return the cached value if mtime is okay.
+      if (mtime > 0 && (Time.now() - mtime < 600000L)) {
+        LOG.info("Cached ScmUsed found for {} : {} ", dataLocation,
+            cachedScmUsed);
+        return cachedScmUsed;
+      }
+      return -1;
+    } finally {
+      sc.close();
+    }
+  }
+
+  /**
+   * Write the current scmUsed to the cache file.
+   */
+  void saveScmUsed() {
+    if (scmUsedFile.exists() && !scmUsedFile.delete()) {
+      LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation);
+    }
+    OutputStreamWriter out = null;
+    try {
+      long used = getScmUsed();
+      if (used > 0) {
+        out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
+            StandardCharsets.UTF_8);
+        // mtime is written last, so that truncated writes won't be valid.
+        out.write(Long.toString(used) + " " + Long.toString(Time.now()));
+        out.flush();
+        out.close();
+        out = null;
+      }
+    } catch (IOException ioe) {
+      // If write failed, the volume might be bad. Since the cache file is
+      // not critical, log the error and continue.
+      LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
+    } finally {
+      IOUtils.cleanupWithLogger(null, out);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to