http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
new file mode 100644
index 0000000..1c6e39c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Type;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.FileUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .CLOSED_CONTAINER_IO;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .GET_SMALL_FILE_ERROR;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .NO_SUCH_ALGORITHM;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .PUT_SMALL_FILE_ERROR;
+
+/**
+ * Ozone Container dispatcher takes a call from the netty server and routes it
+ * to the right handler function.
+ */
+public class Dispatcher implements ContainerDispatcher {
+  static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
+
+  private final ContainerManager containerManager;
+  private ContainerMetrics metrics;
+  private Configuration conf;
+
+  /**
+   * Constructs an OzoneContainer that receives calls from
+   * XceiverServerHandler.
+   *
+   * @param containerManager - A class that manages containers.
+   */
+  public Dispatcher(ContainerManager containerManager, Configuration config) {
+    Preconditions.checkNotNull(containerManager);
+    this.containerManager = containerManager;
+    this.metrics = null;
+    this.conf = config;
+  }
+
+  @Override
+  public void init() {
+    this.metrics = ContainerMetrics.create(conf);
+  }
+
+  @Override
+  public void shutdown() {
+  }
+
+  @Override
+  public ContainerCommandResponseProto dispatch(
+      ContainerCommandRequestProto msg) {
+    LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
+        msg.getTraceID());
+    long startNanos = System.nanoTime();
+    ContainerCommandResponseProto resp = null;
+    try {
+      Preconditions.checkNotNull(msg);
+      Type cmdType = msg.getCmdType();
+      metrics.incContainerOpcMetrics(cmdType);
+      if ((cmdType == Type.CreateContainer) ||
+          (cmdType == Type.DeleteContainer) ||
+          (cmdType == Type.ReadContainer) ||
+          (cmdType == Type.ListContainer) ||
+          (cmdType == Type.UpdateContainer) ||
+          (cmdType == Type.CloseContainer)) {
+        resp = containerProcessHandler(msg);
+      }
+
+      if ((cmdType == Type.PutKey) ||
+          (cmdType == Type.GetKey) ||
+          (cmdType == Type.DeleteKey) ||
+          (cmdType == Type.ListKey)) {
+        resp = keyProcessHandler(msg);
+      }
+
+      if ((cmdType == Type.WriteChunk) ||
+          (cmdType == Type.ReadChunk) ||
+          (cmdType == Type.DeleteChunk)) {
+        resp = chunkProcessHandler(msg);
+      }
+
+      if ((cmdType == Type.PutSmallFile) ||
+          (cmdType == Type.GetSmallFile)) {
+        resp = smallFileHandler(msg);
+      }
+
+      if (resp != null) {
+        metrics.incContainerOpsLatencies(cmdType,
+            System.nanoTime() - startNanos);
+        return resp;
+      }
+
+      return ContainerUtils.unsupportedRequest(msg);
+    } catch (StorageContainerException e) {
+      // This useful since the trace ID will allow us to correlate failures.
+      return ContainerUtils.logAndReturnError(LOG, e, msg);
+    } catch (IllegalStateException | NullPointerException e) {
+      return ContainerUtils.logAndReturnError(LOG, e, msg);
+    }
+  }
+
+  public ContainerMetrics getContainerMetrics() {
+    return metrics;
+  }
+
+  /**
+   * Handles the all Container related functionality.
+   *
+   * @param msg - command
+   * @return - response
+   * @throws StorageContainerException
+   */
+  private ContainerCommandResponseProto containerProcessHandler(
+      ContainerCommandRequestProto msg) throws StorageContainerException {
+    try {
+
+      switch (msg.getCmdType()) {
+      case CreateContainer:
+        return handleCreateContainer(msg);
+
+      case DeleteContainer:
+        return handleDeleteContainer(msg);
+
+      case ListContainer:
+        // TODO : Support List Container.
+        return ContainerUtils.unsupportedRequest(msg);
+
+      case UpdateContainer:
+        return handleUpdateContainer(msg);
+
+      case ReadContainer:
+        return handleReadContainer(msg);
+
+      case CloseContainer:
+        return handleCloseContainer(msg);
+
+      default:
+        return ContainerUtils.unsupportedRequest(msg);
+      }
+    } catch (StorageContainerException e) {
+      return ContainerUtils.logAndReturnError(LOG, e, msg);
+    } catch (IOException ex) {
+      LOG.warn("Container operation failed. " +
+              "Container: {} Operation: {}  trace ID: {} Error: {}",
+          msg.getCreateContainer().getContainerData().getName(),
+          msg.getCmdType().name(),
+          msg.getTraceID(),
+          ex.toString(), ex);
+
+      // TODO : Replace with finer error codes.
+      return ContainerUtils.getContainerResponse(msg,
+          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
+          ex.toString()).build();
+    }
+  }
+
+  /**
+   * Handles the all key related functionality.
+   *
+   * @param msg - command
+   * @return - response
+   * @throws StorageContainerException
+   */
+  private ContainerCommandResponseProto keyProcessHandler(
+      ContainerCommandRequestProto msg) throws StorageContainerException {
+    try {
+      switch (msg.getCmdType()) {
+      case PutKey:
+        return handlePutKey(msg);
+
+      case GetKey:
+        return handleGetKey(msg);
+
+      case DeleteKey:
+        return handleDeleteKey(msg);
+
+      case ListKey:
+        return ContainerUtils.unsupportedRequest(msg);
+
+      default:
+        return ContainerUtils.unsupportedRequest(msg);
+
+      }
+    } catch (StorageContainerException e) {
+      return ContainerUtils.logAndReturnError(LOG, e, msg);
+    } catch (IOException ex) {
+      LOG.warn("Container operation failed. " +
+              "Container: {} Operation: {}  trace ID: {} Error: {}",
+          msg.getCreateContainer().getContainerData().getName(),
+          msg.getCmdType().name(),
+          msg.getTraceID(),
+          ex.toString(), ex);
+
+      // TODO : Replace with finer error codes.
+      return ContainerUtils.getContainerResponse(msg,
+          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
+          ex.toString()).build();
+    }
+  }
+
+  /**
+   * Handles the all chunk related functionality.
+   *
+   * @param msg - command
+   * @return - response
+   * @throws StorageContainerException
+   */
+  private ContainerCommandResponseProto chunkProcessHandler(
+      ContainerCommandRequestProto msg) throws StorageContainerException {
+    try {
+      switch (msg.getCmdType()) {
+      case WriteChunk:
+        return handleWriteChunk(msg);
+
+      case ReadChunk:
+        return handleReadChunk(msg);
+
+      case DeleteChunk:
+        return handleDeleteChunk(msg);
+
+      case ListChunk:
+        return ContainerUtils.unsupportedRequest(msg);
+
+      default:
+        return ContainerUtils.unsupportedRequest(msg);
+      }
+    } catch (StorageContainerException e) {
+      return ContainerUtils.logAndReturnError(LOG, e, msg);
+    } catch (IOException ex) {
+      LOG.warn("Container operation failed. " +
+              "Container: {} Operation: {}  trace ID: {} Error: {}",
+          msg.getCreateContainer().getContainerData().getName(),
+          msg.getCmdType().name(),
+          msg.getTraceID(),
+          ex.toString(), ex);
+
+      // TODO : Replace with finer error codes.
+      return ContainerUtils.getContainerResponse(msg,
+          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
+          ex.toString()).build();
+    }
+  }
+
+  /**
+   * Dispatch calls to small file hanlder.
+   * @param msg - request
+   * @return response
+   * @throws StorageContainerException
+   */
+  private ContainerCommandResponseProto smallFileHandler(
+      ContainerCommandRequestProto msg) throws StorageContainerException {
+    switch (msg.getCmdType()) {
+    case PutSmallFile:
+      return handlePutSmallFile(msg);
+    case GetSmallFile:
+      return handleGetSmallFile(msg);
+    default:
+      return ContainerUtils.unsupportedRequest(msg);
+    }
+  }
+
+  /**
+   * Update an existing container with the new container data.
+   *
+   * @param msg Request
+   * @return ContainerCommandResponseProto
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleUpdateContainer(
+      ContainerCommandRequestProto msg)
+      throws IOException {
+    if (!msg.hasUpdateContainer()) {
+      LOG.debug("Malformed read container request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getUpdateContainer().getPipeline());
+    String containerName = msg.getUpdateContainer()
+        .getContainerData().getName();
+
+    ContainerData data = ContainerData.getFromProtBuf(
+        msg.getUpdateContainer().getContainerData(), conf);
+    boolean forceUpdate = msg.getUpdateContainer().getForceUpdate();
+    this.containerManager.updateContainer(
+        pipeline, containerName, data, forceUpdate);
+    return ContainerUtils.getContainerResponse(msg);
+  }
+
+  /**
+   * Calls into container logic and returns appropriate response.
+   *
+   * @param msg - Request
+   * @return ContainerCommandResponseProto
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleReadContainer(
+      ContainerCommandRequestProto msg)
+      throws IOException {
+
+    if (!msg.hasReadContainer()) {
+      LOG.debug("Malformed read container request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+
+    String name = msg.getReadContainer().getName();
+    ContainerData container = this.containerManager.readContainer(name);
+    return ContainerUtils.getReadContainerResponse(msg, container);
+  }
+
+  /**
+   * Calls into container logic and returns appropriate response.
+   *
+   * @param msg - Request
+   * @return Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleDeleteContainer(
+      ContainerCommandRequestProto msg) throws IOException {
+
+    if (!msg.hasDeleteContainer()) {
+      LOG.debug("Malformed delete container request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getDeleteContainer().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+    String name = msg.getDeleteContainer().getName();
+    boolean forceDelete = msg.getDeleteContainer().getForceDelete();
+    this.containerManager.deleteContainer(pipeline, name, forceDelete);
+    return ContainerUtils.getContainerResponse(msg);
+  }
+
+  /**
+   * Calls into container logic and returns appropriate response.
+   *
+   * @param msg - Request
+   * @return Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleCreateContainer(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasCreateContainer()) {
+      LOG.debug("Malformed create container request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+    ContainerData cData = ContainerData.getFromProtBuf(
+        msg.getCreateContainer().getContainerData(), conf);
+    Preconditions.checkNotNull(cData, "Container data is null");
+
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getCreateContainer().getPipeline());
+    Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
+
+    this.containerManager.createContainer(pipeline, cData);
+    return ContainerUtils.getContainerResponse(msg);
+  }
+
+  /**
+   * closes an open container.
+   *
+   * @param msg -
+   * @return
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleCloseContainer(
+      ContainerCommandRequestProto msg) throws IOException {
+    try {
+      if (!msg.hasCloseContainer()) {
+        LOG.debug("Malformed close Container request. trace ID: {}",
+            msg.getTraceID());
+        return ContainerUtils.malformedRequest(msg);
+      }
+      Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getCloseContainer()
+          .getPipeline());
+      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
+      if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+        throw new StorageContainerException("Attempting to close a closed " +
+            "container.", CLOSED_CONTAINER_IO);
+      }
+      this.containerManager.closeContainer(pipeline.getContainerName());
+      return ContainerUtils.getContainerResponse(msg);
+    } catch (NoSuchAlgorithmException e) {
+      throw new StorageContainerException("No such Algorithm", e,
+          NO_SUCH_ALGORITHM);
+    }
+  }
+
+  /**
+   * Calls into chunk manager to write a chunk.
+   *
+   * @param msg - Request.
+   * @return Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleWriteChunk(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasWriteChunk()) {
+      LOG.debug("Malformed write chunk request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+    String keyName = msg.getWriteChunk().getKeyName();
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getWriteChunk().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+    if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+      throw new StorageContainerException("Write to closed container.",
+          CLOSED_CONTAINER_IO);
+    }
+
+    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk()
+        .getChunkData());
+    Preconditions.checkNotNull(chunkInfo);
+    byte[] data = null;
+    if (msg.getWriteChunk().getStage() == ContainerProtos.Stage.WRITE_DATA
+        || msg.getWriteChunk().getStage() == ContainerProtos.Stage.COMBINED) {
+       data = msg.getWriteChunk().getData().toByteArray();
+      metrics.incContainerBytesStats(Type.WriteChunk, data.length);
+
+    }
+    this.containerManager.getChunkManager()
+        .writeChunk(pipeline, keyName, chunkInfo,
+            data, msg.getWriteChunk().getStage());
+
+    return ChunkUtils.getChunkResponse(msg);
+  }
+
+  /**
+   * Calls into chunk manager to read a chunk.
+   *
+   * @param msg - Request.
+   * @return - Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleReadChunk(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasReadChunk()) {
+      LOG.debug("Malformed read chunk request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+
+    String keyName = msg.getReadChunk().getKeyName();
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getReadChunk().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+
+    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk()
+        .getChunkData());
+    Preconditions.checkNotNull(chunkInfo);
+    byte[] data = this.containerManager.getChunkManager().readChunk(pipeline,
+        keyName, chunkInfo);
+    metrics.incContainerBytesStats(Type.ReadChunk, data.length);
+    return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
+  }
+
+  /**
+   * Calls into chunk manager to write a chunk.
+   *
+   * @param msg - Request.
+   * @return Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleDeleteChunk(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasDeleteChunk()) {
+      LOG.debug("Malformed delete chunk request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+
+    String keyName = msg.getDeleteChunk().getKeyName();
+    Pipeline pipeline = Pipeline.getFromProtoBuf(
+        msg.getDeleteChunk().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+    if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+      throw new StorageContainerException("Write to closed container.",
+          CLOSED_CONTAINER_IO);
+    }
+    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk()
+        .getChunkData());
+    Preconditions.checkNotNull(chunkInfo);
+
+    this.containerManager.getChunkManager().deleteChunk(pipeline, keyName,
+        chunkInfo);
+    return ChunkUtils.getChunkResponse(msg);
+  }
+
+  /**
+   * Put Key handler.
+   *
+   * @param msg - Request.
+   * @return - Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handlePutKey(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasPutKey()) {
+      LOG.debug("Malformed put key request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+    Pipeline pipeline = 
Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+    if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+      throw new StorageContainerException("Write to closed container.",
+          CLOSED_CONTAINER_IO);
+    }
+    KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
+    Preconditions.checkNotNull(keyData);
+    this.containerManager.getKeyManager().putKey(pipeline, keyData);
+    long numBytes = keyData.getProtoBufMessage().toByteArray().length;
+    metrics.incContainerBytesStats(Type.PutKey, numBytes);
+    return KeyUtils.getKeyResponse(msg);
+  }
+
+  /**
+   * Handle Get Key.
+   *
+   * @param msg - Request.
+   * @return - Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleGetKey(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasGetKey()) {
+      LOG.debug("Malformed get key request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+    KeyData keyData = KeyData.getFromProtoBuf(msg.getGetKey().getKeyData());
+    Preconditions.checkNotNull(keyData);
+    KeyData responseData =
+        this.containerManager.getKeyManager().getKey(keyData);
+    long numBytes = responseData.getProtoBufMessage().toByteArray().length;
+    metrics.incContainerBytesStats(Type.GetKey, numBytes);
+    return KeyUtils.getKeyDataResponse(msg, responseData);
+  }
+
+  /**
+   * Handle Delete Key.
+   *
+   * @param msg - Request.
+   * @return - Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleDeleteKey(
+      ContainerCommandRequestProto msg) throws IOException {
+    if (!msg.hasDeleteKey()) {
+      LOG.debug("Malformed delete key request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+    Pipeline pipeline =
+        Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+    if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+      throw new StorageContainerException("Write to closed container.",
+          CLOSED_CONTAINER_IO);
+    }
+    String keyName = msg.getDeleteKey().getName();
+    Preconditions.checkNotNull(keyName);
+    Preconditions.checkState(!keyName.isEmpty());
+    this.containerManager.getKeyManager().deleteKey(pipeline, keyName);
+    return KeyUtils.getKeyResponse(msg);
+  }
+
+  /**
+   * Handles writing a chunk and associated key using single RPC.
+   *
+   * @param msg - Message.
+   * @return ContainerCommandResponseProto
+   * @throws StorageContainerException
+   */
+  private ContainerCommandResponseProto handlePutSmallFile(
+      ContainerCommandRequestProto msg) throws StorageContainerException {
+
+    if (!msg.hasPutSmallFile()) {
+      LOG.debug("Malformed put small file request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+    try {
+
+      Pipeline pipeline =
+          Pipeline.getFromProtoBuf(msg.getPutSmallFile()
+              .getKey().getPipeline());
+
+      Preconditions.checkNotNull(pipeline);
+      if (!this.containerManager.isOpen(pipeline.getContainerName())) {
+        throw new StorageContainerException("Write to closed container.",
+            CLOSED_CONTAINER_IO);
+      }
+      KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey()
+          .getKeyData());
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile()
+          .getChunkInfo());
+      byte[] data = msg.getPutSmallFile().getData().toByteArray();
+
+      metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
+      this.containerManager.getChunkManager().writeChunk(pipeline, keyData
+          .getKeyName(), chunkInfo, data, ContainerProtos.Stage.COMBINED);
+      List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
+      chunks.add(chunkInfo.getProtoBufMessage());
+      keyData.setChunks(chunks);
+      this.containerManager.getKeyManager().putKey(pipeline, keyData);
+      return FileUtils.getPutFileResponse(msg);
+    } catch (StorageContainerException e) {
+      return ContainerUtils.logAndReturnError(LOG, e, msg);
+    } catch (IOException e) {
+      throw new StorageContainerException("Put Small File Failed.", e,
+          PUT_SMALL_FILE_ERROR);
+    }
+  }
+
+  /**
+   * Handles getting a data stream using a key. This helps in reducing the RPC
+   * overhead for small files.
+   *
+   * @param msg - ContainerCommandRequestProto
+   * @return ContainerCommandResponseProto
+   * @throws StorageContainerException
+   */
+  private ContainerCommandResponseProto handleGetSmallFile(
+      ContainerCommandRequestProto msg) throws StorageContainerException {
+    ByteString dataBuf = ByteString.EMPTY;
+    if (!msg.hasGetSmallFile()) {
+      LOG.debug("Malformed get small file request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+    try {
+      Pipeline pipeline =
+          Pipeline.getFromProtoBuf(msg.getGetSmallFile()
+              .getKey().getPipeline());
+
+      long bytes = 0;
+      Preconditions.checkNotNull(pipeline);
+      KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
+          .getKey().getKeyData());
+      KeyData data = this.containerManager.getKeyManager().getKey(keyData);
+      ContainerProtos.ChunkInfo c = null;
+      for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
+        bytes += chunk.getSerializedSize();
+        ByteString current =
+            ByteString.copyFrom(this.containerManager.getChunkManager()
+                .readChunk(
+                    pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf(
+                        chunk)));
+        dataBuf = dataBuf.concat(current);
+        c = chunk;
+      }
+      metrics.incContainerBytesStats(Type.GetSmallFile, bytes);
+      return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
+          ChunkInfo.getFromProtoBuf(c));
+    } catch (StorageContainerException e) {
+      return ContainerUtils.logAndReturnError(LOG, e, msg);
+    } catch (IOException e) {
+      throw new StorageContainerException("Get Small File Failed", e,
+          GET_SMALL_FILE_ERROR);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
new file mode 100644
index 0000000..cf6bf12
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
+import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .NO_SUCH_KEY;
+
+/**
+ * Key Manager impl.
+ */
+public class KeyManagerImpl implements KeyManager {
+  static final Logger LOG =
+      LoggerFactory.getLogger(KeyManagerImpl.class);
+
+  private static final float LOAD_FACTOR = 0.75f;
+  private final ContainerManager containerManager;
+  private final Configuration conf;
+
+  /**
+   * Constructs a key Manager.
+   *
+   * @param containerManager - Container Manager.
+   */
+  public KeyManagerImpl(ContainerManager containerManager, Configuration conf) 
{
+    Preconditions.checkNotNull(containerManager, "Container manager cannot be" 
+
+        " null");
+    Preconditions.checkNotNull(conf, "Config cannot be null");
+    this.containerManager = containerManager;
+    this.conf = conf;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void putKey(Pipeline pipeline, KeyData data) throws IOException {
+    containerManager.readLock();
+    try {
+      // We are not locking the key manager since LevelDb serializes all 
actions
+      // against a single DB. We rely on DB level locking to avoid conflicts.
+      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
+      String containerName = pipeline.getContainerName();
+      Preconditions.checkNotNull(containerName,
+          "Container name cannot be null");
+      ContainerData cData = containerManager.readContainer(containerName);
+      MetadataStore db = KeyUtils.getDB(cData, conf);
+
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.
+      Preconditions.checkNotNull(db, "DB cannot be null here");
+      db.put(data.getKeyName().getBytes(KeyUtils.ENCODING), data
+          .getProtoBufMessage().toByteArray());
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public KeyData getKey(KeyData data) throws IOException {
+    containerManager.readLock();
+    try {
+      Preconditions.checkNotNull(data, "Key data cannot be null");
+      Preconditions.checkNotNull(data.getContainerName(),
+          "Container name cannot be null");
+      ContainerData cData = containerManager.readContainer(data
+          .getContainerName());
+      MetadataStore db = KeyUtils.getDB(cData, conf);
+
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.
+      Preconditions.checkNotNull(db, "DB cannot be null here");
+
+      byte[] kData = db.get(data.getKeyName().getBytes(KeyUtils.ENCODING));
+      if (kData == null) {
+        throw new StorageContainerException("Unable to find the key.",
+            NO_SUCH_KEY);
+      }
+      ContainerProtos.KeyData keyData =
+          ContainerProtos.KeyData.parseFrom(kData);
+      return KeyData.getFromProtoBuf(keyData);
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteKey(Pipeline pipeline, String keyName)
+      throws IOException {
+    containerManager.readLock();
+    try {
+      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
+      String containerName = pipeline.getContainerName();
+      Preconditions.checkNotNull(containerName,
+          "Container name cannot be null");
+      ContainerData cData = containerManager.readContainer(containerName);
+      MetadataStore db = KeyUtils.getDB(cData, conf);
+
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.
+      Preconditions.checkNotNull(db, "DB cannot be null here");
+      // Note : There is a race condition here, since get and delete
+      // are not atomic. Leaving it here since the impact is refusing
+      // to delete a key which might have just gotten inserted after
+      // the get check.
+
+      byte[] kData = db.get(keyName.getBytes(KeyUtils.ENCODING));
+      if (kData == null) {
+        throw new StorageContainerException("Unable to find the key.",
+            NO_SUCH_KEY);
+      }
+      db.delete(keyName.getBytes(KeyUtils.ENCODING));
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<KeyData> listKey(
+      Pipeline pipeline, String prefix, String startKey, int count)
+      throws IOException {
+    Preconditions.checkNotNull(pipeline,
+        "Pipeline cannot be null.");
+    Preconditions.checkArgument(count > 0,
+        "Count must be a positive number.");
+    ContainerData cData = containerManager.readContainer(pipeline
+        .getContainerName());
+    MetadataStore db = KeyUtils.getDB(cData, conf);
+
+    List<KeyData> result = new ArrayList<KeyData>();
+    byte[] startKeyInBytes = startKey == null ? null :
+        DFSUtil.string2Bytes(startKey);
+    MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefix);
+    List<Map.Entry<byte[], byte[]>> range =
+        db.getSequentialRangeKVs(startKeyInBytes, count, prefixFilter);
+    for (Map.Entry<byte[], byte[]> entry : range) {
+      String keyName = KeyUtils.getKeyName(entry.getKey());
+      KeyData value = KeyUtils.getKeyData(entry.getValue());
+      KeyData data = new KeyData(value.getContainerName(), keyName);
+      result.add(data);
+    }
+    return result;
+  }
+
+  /**
+   * Shutdown keyManager.
+   */
+  @Override
+  public void shutdown() {
+    Preconditions.checkState(this.containerManager.hasWriteLock(), "asserts " +
+        "that we are holding the container manager lock when shutting down.");
+    KeyUtils.shutdownCache(ContainerCache.getInstance(conf));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
new file mode 100644
index 0000000..3e267d2
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces
+    .ContainerDeletionChoosingPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Randomly choosing containers for block deletion.
+ */
+public class RandomContainerDeletionChoosingPolicy
+    implements ContainerDeletionChoosingPolicy {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RandomContainerDeletionChoosingPolicy.class);
+
+  @Override
+  public List<ContainerData> chooseContainerForBlockDeletion(int count,
+      Map<String, ContainerStatus> candidateContainers)
+      throws StorageContainerException {
+    Preconditions.checkNotNull(candidateContainers,
+        "Internal assertion: candidate containers cannot be null");
+
+    int currentCount = 0;
+    List<ContainerData> result = new LinkedList<>();
+    ContainerStatus[] values = new ContainerStatus[candidateContainers.size()];
+    // to get a shuffle list
+    for (ContainerStatus entry : DFSUtil.shuffle(
+        candidateContainers.values().toArray(values))) {
+      if (currentCount < count) {
+        result.add(entry.getContainer());
+        currentCount++;
+
+        LOG.debug("Select container {} for block deletion, "
+            + "pending deletion blocks num: {}.",
+            entry.getContainer().getContainerName(),
+            entry.getNumPendingDeletionBlocks());
+      } else {
+        break;
+      }
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
new file mode 100644
index 0000000..7ef91a9
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+/**
+ * Storage location stats of datanodes that provide back store for containers.
+ *
+ */
+public class StorageLocationReport {
+  public static final StorageLocationReport[] EMPTY_ARRAY = {};
+
+  private final String id;
+  private final boolean failed;
+  private final long capacity;
+  private final long scmUsed;
+  private final long remaining;
+
+  public StorageLocationReport(String id, boolean failed,
+      long capacity, long scmUsed, long remaining) {
+    this.id = id;
+    this.failed = failed;
+    this.capacity = capacity;
+    this.scmUsed = scmUsed;
+    this.remaining = remaining;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public boolean isFailed() {
+    return failed;
+  }
+
+  public long getCapacity() {
+    return capacity;
+  }
+
+  public long getScmUsed() {
+    return scmUsed;
+  }
+
+  public long getRemaining() {
+    return remaining;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
new file mode 100644
index 0000000..0169a96
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces
+    .ContainerDeletionChoosingPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TopN Ordered choosing policy that choosing containers based on pending
+ * deletion blocks' number.
+ */
+public class TopNOrderedContainerDeletionChoosingPolicy
+    implements ContainerDeletionChoosingPolicy {
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(TopNOrderedContainerDeletionChoosingPolicy.class);
+
+  /** customized comparator used to compare differentiate container status. **/
+  private static final Comparator<ContainerStatus> CONTAINER_STATUS_COMPARATOR
+      = new Comparator<ContainerStatus>() {
+        @Override
+        public int compare(ContainerStatus c1, ContainerStatus c2) {
+          return Integer.compare(c2.getNumPendingDeletionBlocks(),
+              c1.getNumPendingDeletionBlocks());
+        }
+      };
+
+  @Override
+  public List<ContainerData> chooseContainerForBlockDeletion(int count,
+      Map<String, ContainerStatus> candidateContainers)
+      throws StorageContainerException {
+    Preconditions.checkNotNull(candidateContainers,
+        "Internal assertion: candidate containers cannot be null");
+
+    List<ContainerData> result = new LinkedList<>();
+    List<ContainerStatus> orderedList = new LinkedList<>();
+    orderedList.addAll(candidateContainers.values());
+    Collections.sort(orderedList, CONTAINER_STATUS_COMPARATOR);
+
+    // get top N list ordered by pending deletion blocks' number
+    int currentCount = 0;
+    for (ContainerStatus entry : orderedList) {
+      if (currentCount < count) {
+        if (entry.getNumPendingDeletionBlocks() > 0) {
+          result.add(entry.getContainer());
+          currentCount++;
+
+          LOG.debug(
+              "Select container {} for block deletion, "
+                  + "pending deletion blocks num: {}.",
+              entry.getContainer().getContainerName(),
+              entry.getNumPendingDeletionBlocks());
+        } else {
+          LOG.debug("Stop looking for next container, there is no"
+              + " pending deletion block contained in remaining containers.");
+          break;
+        }
+      } else {
+        break;
+      }
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java
new file mode 100644
index 0000000..16da5d9
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+/**
+ This package is contains Ozone container implementation.
+**/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
new file mode 100644
index 0000000..f55d74c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+
+/**
+ * Chunk Manager allows read, write, delete and listing of chunks in
+ * a container.
+ */
+public interface ChunkManager {
+
+  /**
+   * writes a given chunk.
+   * @param pipeline - Name and the set of machines that make this container.
+   * @param keyName - Name of the Key.
+   * @param info - ChunkInfo.
+   * @param stage - Chunk Stage write.
+   * @throws StorageContainerException
+   */
+  void writeChunk(Pipeline pipeline, String keyName,
+                  ChunkInfo info, byte[] data, ContainerProtos.Stage stage)
+      throws StorageContainerException;
+
+  /**
+   * reads the data defined by a chunk.
+   * @param pipeline - container pipeline.
+   * @param keyName - Name of the Key
+   * @param info - ChunkInfo.
+   * @return  byte array
+   * @throws StorageContainerException
+   *
+   * TODO: Right now we do not support partial reads and writes of chunks.
+   * TODO: Explore if we need to do that for ozone.
+   */
+  byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws
+      StorageContainerException;
+
+  /**
+   * Deletes a given chunk.
+   * @param pipeline  - Pipeline.
+   * @param keyName   - Key Name
+   * @param info  - Chunk Info
+   * @throws StorageContainerException
+   */
+  void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws
+      StorageContainerException;
+
+  // TODO : Support list operations.
+
+  /**
+   * Shutdown the chunkManager.
+   */
+  void shutdown();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
new file mode 100644
index 0000000..f7280e2
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerStatus;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This interface is used for choosing desired containers for
+ * block deletion.
+ */
+public interface ContainerDeletionChoosingPolicy {
+
+  /**
+   * Chooses desired containers for block deletion.
+   * @param count
+   *          how many to return
+   * @param candidateContainers
+   *          candidate containers collection
+   * @return container data list
+   * @throws StorageContainerException
+   */
+  List<ContainerData> chooseContainerForBlockDeletion(int count,
+      Map<String, ContainerStatus> candidateContainers)
+      throws StorageContainerException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
new file mode 100644
index 0000000..984fe41
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+
+/**
+ * Dispatcher acts as the bridge between the transport layer and
+ * the actual container layer. This layer is capable of transforming
+ * protobuf objects into corresponding class and issue the function call
+ * into the lower layers.
+ *
+ * The reply from the request is dispatched to the client.
+ */
+public interface ContainerDispatcher {
+  /**
+   * Dispatches commands to container layer.
+   * @param msg - Command Request
+   * @return Command Response
+   */
+  ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg);
+
+  /**
+   * Initialize the Dispatcher.
+   */
+  void init();
+
+  /**
+   * Shutdown Dispatcher services.
+   */
+  void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java
new file mode 100644
index 0000000..9c5fcea
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Returns physical path locations, where the containers will be created.
+ */
+public interface ContainerLocationManager {
+  /**
+   * Returns the path where the container should be placed from a set of
+   * locations.
+   *
+   * @return A path where we should place this container and metadata.
+   * @throws IOException
+   */
+  Path getContainerPath() throws IOException;
+
+  /**
+   * Returns the path where the container Data file are stored.
+   *
+   * @return a path where we place the LevelDB and data files of a container.
+   * @throws IOException
+   */
+  Path getDataPath(String containerName) throws IOException;
+
+  /**
+   * Returns an array of storage location usage report.
+   * @return storage location usage report.
+   */
+  StorageLocationReport[] getLocationReport() throws IOException;
+
+  /**
+   * Supports clean shutdown of container.
+   *
+   * @throws IOException
+   */
+  void shutdown() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManagerMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManagerMXBean.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManagerMXBean.java
new file mode 100644
index 0000000..88e6148
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManagerMXBean.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
+
+import java.io.IOException;
+
+/**
+ * Returns physical path locations, where the containers will be created.
+ */
+public interface ContainerLocationManagerMXBean {
+
+  /**
+   * Returns an array of storage location usage report.
+   *
+   * @return storage location usage report.
+   */
+  StorageLocationReport[] getLocationReport() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
new file mode 100644
index 0000000..2ff636e
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.util.RwLock;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+
+/**
+ * Interface for container operations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ContainerManager extends RwLock {
+
+  /**
+   * Init call that sets up a container Manager.
+   *
+   * @param config        - Configuration.
+   * @param containerDirs - List of Metadata Container locations.
+   * @param datanodeDetails - DatanodeDetails
+   * @throws StorageContainerException
+   */
+  void init(Configuration config, List<StorageLocation> containerDirs,
+            DatanodeDetails datanodeDetails) throws IOException;
+
+  /**
+   * Creates a container with the given name.
+   *
+   * @param pipeline      -- Nodes which make up this container.
+   * @param containerData - Container Name and metadata.
+   * @throws StorageContainerException
+   */
+  void createContainer(Pipeline pipeline, ContainerData containerData)
+      throws StorageContainerException;
+
+  /**
+   * Deletes an existing container.
+   *
+   * @param pipeline      - nodes that make this container.
+   * @param containerName - name of the container.
+   * @param forceDelete   - whether this container should be deleted forcibly.
+   * @throws StorageContainerException
+   */
+  void deleteContainer(Pipeline pipeline, String containerName,
+      boolean forceDelete) throws StorageContainerException;
+
+  /**
+   * Update an existing container.
+   *
+   * @param pipeline container nodes
+   * @param containerName name of the container
+   * @param data container data
+   * @param forceUpdate if true, update container forcibly.
+   * @throws StorageContainerException
+   */
+  void updateContainer(Pipeline pipeline, String containerName,
+      ContainerData data, boolean forceUpdate) throws 
StorageContainerException;
+
+  /**
+   * As simple interface for container Iterations.
+   *
+   * @param prefix - Return only values matching this prefix
+   * @param count   - how many to return
+   * @param prevKey - Previous key - Server returns results from this point.
+   * @param data    - Actual containerData
+   * @throws StorageContainerException
+   */
+  void listContainer(String prefix, long count, String prevKey,
+                     List<ContainerData> data)
+      throws StorageContainerException;
+
+  /**
+   * Choose containers for block deletion.
+   *
+   * @param count   - how many to return
+   * @throws StorageContainerException
+   */
+  List<ContainerData> chooseContainerForBlockDeletion(int count)
+      throws StorageContainerException;
+
+  /**
+   * Get metadata about a specific container.
+   *
+   * @param containerName - Name of the container
+   * @return ContainerData - Container Data.
+   * @throws StorageContainerException
+   */
+  ContainerData readContainer(String containerName)
+      throws StorageContainerException;
+
+  /**
+   * Closes a open container, if it is already closed or does not exist a
+   * StorageContainerException is thrown.
+   * @param containerName - Name of the container.
+   * @throws StorageContainerException
+   */
+  void closeContainer(String containerName)
+      throws StorageContainerException, NoSuchAlgorithmException;
+
+  /**
+   * Checks if a container exists.
+   * @param containerName - Name of the container.
+   * @return true if the container is open false otherwise.
+   * @throws StorageContainerException  - Throws Exception if we are not
+   * able to find the container.
+   */
+  boolean isOpen(String containerName) throws StorageContainerException;
+
+  /**
+   * Supports clean shutdown of container.
+   *
+   * @throws StorageContainerException
+   */
+  void shutdown() throws IOException;
+
+  /**
+   * Sets the Chunk Manager.
+   *
+   * @param chunkManager - ChunkManager.
+   */
+  void setChunkManager(ChunkManager chunkManager);
+
+  /**
+   * Gets the Chunk Manager.
+   *
+   * @return ChunkManager.
+   */
+  ChunkManager getChunkManager();
+
+  /**
+   * Sets the Key Manager.
+   *
+   * @param keyManager - Key Manager.
+   */
+  void setKeyManager(KeyManager keyManager);
+
+  /**
+   * Gets the Key Manager.
+   *
+   * @return KeyManager.
+   */
+  KeyManager getKeyManager();
+
+  /**
+   * Get the Node Report of container storage usage.
+   * @return node report.
+   */
+  SCMNodeReport getNodeReport() throws IOException;
+
+  /**
+   * Gets container report.
+   * @return container report.
+   * @throws IOException
+   */
+  ContainerReportsRequestProto getContainerReport() throws IOException;
+
+  /**
+   * Gets container reports.
+   * @return List of all closed containers.
+   * @throws IOException
+   */
+  List<ContainerData> getContainerReports() throws IOException;
+
+  /**
+   * Increase pending deletion blocks count number of specified container.
+   *
+   * @param numBlocks
+   *          increment  count number
+   * @param containerId
+   *          container id
+   */
+  void incrPendingDeletionBlocks(int numBlocks, String containerId);
+
+  /**
+   * Decrease pending deletion blocks count number of specified container.
+   *
+   * @param numBlocks
+   *          decrement count number
+   * @param containerId
+   *          container id
+   */
+  void decrPendingDeletionBlocks(int numBlocks, String containerId);
+
+  /**
+   * Increase the read count of the container.
+   * @param containerName - Name of the container.
+   */
+  void incrReadCount(String containerName);
+
+  /**
+   * Increse the read counter for bytes read from the container.
+   * @param containerName - Name of the container.
+   * @param readBytes - bytes read from the container.
+   */
+  void incrReadBytes(String containerName, long readBytes);
+
+
+  /**
+   * Increase the write count of the container.
+   * @param containerName - Name of the container.
+   */
+  void incrWriteCount(String containerName);
+
+  /**
+   * Increase the write counter for bytes write into the container.
+   * @param containerName - Name of the container.
+   * @param writeBytes - bytes write into the container.
+   */
+  void incrWriteBytes(String containerName, long writeBytes);
+
+  /**
+   * Increase the bytes used by the container.
+   * @param containerName - Name of the container.
+   * @param used - additional bytes used by the container.
+   * @return the current bytes used.
+   */
+  long incrBytesUsed(String containerName, long used);
+
+  /**
+   * Decrease the bytes used by the container.
+   * @param containerName - Name of the container.
+   * @param used - additional bytes reclaimed by the container.
+   * @return the current bytes used.
+   */
+  long decrBytesUsed(String containerName, long used);
+
+  /**
+   * Get the bytes used by the container.
+   * @param containerName - Name of the container.
+   * @return the current bytes used by the container.
+   */
+  long getBytesUsed(String containerName);
+
+  /**
+   * Get the number of keys in the container.
+   * @param containerName - Name of the container.
+   * @return the current key count.
+   */
+  long getNumKeys(String containerName);
+
+  /**
+   * Get the container report state to send via HB to SCM.
+   * @return container report state.
+   */
+  ReportState getContainerReportState();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java
new file mode 100644
index 0000000..4689dfe
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
+
+/**
+ * Interface for container report manager operations.
+ */
+public interface ContainerReportManager {
+
+  /**
+   * Get the container report state.
+   * @return the container report state.
+   */
+  ReportState getContainerReportState();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
new file mode 100644
index 0000000..8c27ba9
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * KeyManager deals with Key Operations in the container Level.
+ */
+public interface KeyManager {
+  /**
+   * Puts or overwrites a key.
+   *
+   * @param pipeline - Pipeline.
+   * @param data     - Key Data.
+   * @throws IOException
+   */
+  void putKey(Pipeline pipeline, KeyData data) throws IOException;
+
+  /**
+   * Gets an existing key.
+   *
+   * @param data - Key Data.
+   * @return Key Data.
+   * @throws IOException
+   */
+  KeyData getKey(KeyData data) throws IOException;
+
+  /**
+   * Deletes an existing Key.
+   *
+   * @param pipeline - Pipeline.
+   * @param keyName  Key Data.
+   * @throws StorageContainerException
+   */
+  void deleteKey(Pipeline pipeline, String keyName)
+      throws IOException;
+
+  /**
+   * List keys in a container.
+   *
+   * @param pipeline - pipeline.
+   * @param prefix   - Prefix in needed.
+   * @param startKey  - Key to start from, EMPTY_STRING to begin.
+   * @param count    - Number of keys to return.
+   * @return List of Keys that match the criteria.
+   */
+  List<KeyData> listKey(Pipeline pipeline, String prefix, String startKey,
+      int count) throws IOException;
+
+  /**
+   * Shutdown keyManager.
+   */
+  void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java
new file mode 100644
index 0000000..d83bf95
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+/**
+ This package contains common ozone container interfaces.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java
new file mode 100644
index 0000000..1638a36
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+/**
+  Common Container Layer. At this layer the abstractions are:
+
+ 1. Containers - Both data and metadata containers.
+ 2. Keys - Key/Value pairs that live inside a container.
+ 3. Chunks - Keys can be composed of many chunks.
+
+ Ozone uses these abstractions to build Volumes, Buckets and Keys.
+
+ **/
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to