http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
new file mode 100644
index 0000000..258d4da
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.NO_SUCH_ALGORITHM;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
+
+/**
+ * This class is for performing chunk related operations.
+ */
+public class ChunkManagerImpl implements ChunkManager {
+  static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+  /**
+   * writes a given chunk.
+   *
+   * @param container - Container for the chunk
+   * @param blockID - ID of the block
+   * @param info - ChunkInfo
+   * @param data - data of the chunk
+   * @param stage - Stage of the Chunk operation
+   * @throws StorageContainerException
+   */
+  public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
+                         byte[] data, ContainerProtos.Stage stage)
+      throws StorageContainerException {
+
+    try {
+
+      KeyValueContainerData containerData = (KeyValueContainerData) container
+          .getContainerData();
+
+      File chunkFile = ChunkUtils.validateChunk(containerData, info);
+      File tmpChunkFile = getTmpChunkFile(chunkFile, info);
+
+      LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
+          info.getChunkName(), stage, chunkFile, tmpChunkFile);
+
+      switch (stage) {
+      case WRITE_DATA:
+        // Initially writes to temporary chunk file.
+        ChunkUtils.writeData(tmpChunkFile, info, data);
+        break;
+      case COMMIT_DATA:
+        // commit the data, means move chunk data from temporary chunk file
+        // to actual chunk file.
+        long sizeDiff = tmpChunkFile.length() - chunkFile.length();
+        commitChunk(tmpChunkFile, chunkFile);
+        containerData.incrBytesUsed(sizeDiff);
+        containerData.incrWriteCount();
+        containerData.incrWriteBytes(sizeDiff);
+        break;
+      case COMBINED:
+        // directly write to the chunk file
+        ChunkUtils.writeData(chunkFile, info, data);
+        containerData.incrBytesUsed(info.getLen());
+        containerData.incrWriteCount();
+        containerData.incrWriteBytes(info.getLen());
+        break;
+      default:
+        throw new IOException("Can not identify write operation.");
+      }
+    } catch (StorageContainerException ex) {
+      throw ex;
+    } catch (NoSuchAlgorithmException ex) {
+      LOG.error("write data failed. error: {}", ex);
+      throw new StorageContainerException("Internal error: ", ex,
+          NO_SUCH_ALGORITHM);
+    } catch (ExecutionException  | IOException ex) {
+      LOG.error("write data failed. error: {}", ex);
+      throw new StorageContainerException("Internal error: ", ex,
+          CONTAINER_INTERNAL_ERROR);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.error("write data failed. error: {}", e);
+      throw new StorageContainerException("Internal error: ", e,
+          CONTAINER_INTERNAL_ERROR);
+    }
+  }
+
+  /**
+   * reads the data defined by a chunk.
+   *
+   * @param container - Container for the chunk
+   * @param blockID - ID of the block.
+   * @param info - ChunkInfo.
+   * @return byte array
+   * @throws StorageContainerException
+   * TODO: Right now we do not support partial reads and writes of chunks.
+   * TODO: Explore if we need to do that for ozone.
+   */
+  public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info)
+      throws StorageContainerException {
+    try {
+      KeyValueContainerData containerData = (KeyValueContainerData) container
+          .getContainerData();
+      ByteBuffer data;
+
+      // Checking here, which layout version the container is, and reading
+      // the chunk file in that format.
+      // In version1, we verify checksum if it is available and return data
+      // of the chunk file.
+      if (containerData.getLayOutVersion() == ChunkLayOutVersion
+          .getLatestVersion().getVersion()) {
+        File chunkFile = ChunkUtils.getChunkFile(containerData, info);
+        data = ChunkUtils.readData(chunkFile, info);
+        containerData.incrReadCount();
+        containerData.incrReadBytes(chunkFile.length());
+        return data.array();
+      }
+    } catch(NoSuchAlgorithmException ex) {
+      LOG.error("read data failed. error: {}", ex);
+      throw new StorageContainerException("Internal error: ",
+          ex, NO_SUCH_ALGORITHM);
+    } catch (ExecutionException ex) {
+      LOG.error("read data failed. error: {}", ex);
+      throw new StorageContainerException("Internal error: ",
+          ex, CONTAINER_INTERNAL_ERROR);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.error("read data failed. error: {}", e);
+      throw new StorageContainerException("Internal error: ",
+          e, CONTAINER_INTERNAL_ERROR);
+    }
+    return null;
+  }
+
+  /**
+   * Deletes a given chunk.
+   *
+   * @param container - Container for the chunk
+   * @param blockID - ID of the block
+   * @param info - Chunk Info
+   * @throws StorageContainerException
+   */
+  public void deleteChunk(Container container, BlockID blockID, ChunkInfo info)
+      throws StorageContainerException {
+    Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
+    KeyValueContainerData containerData = (KeyValueContainerData) container
+        .getContainerData();
+    // Checking here, which layout version the container is, and performing
+    // deleting chunk operation.
+    // In version1, we have only chunk file.
+    if (containerData.getLayOutVersion() == ChunkLayOutVersion
+        .getLatestVersion().getVersion()) {
+      File chunkFile = ChunkUtils.getChunkFile(containerData, info);
+      if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
+        FileUtil.fullyDelete(chunkFile);
+        containerData.decrBytesUsed(chunkFile.length());
+      } else {
+        LOG.error("Not Supported Operation. Trying to delete a " +
+            "chunk that is in shared file. chunk info : " + info.toString());
+        throw new StorageContainerException("Not Supported Operation. " +
+            "Trying to delete a chunk that is in shared file. chunk info : "
+            + info.toString(), UNSUPPORTED_REQUEST);
+      }
+    }
+  }
+
+  /**
+   * Shutdown the chunkManager.
+   *
+   * In the chunkManager we haven't acquired any resources, so nothing to do
+   * here.
+   */
+
+  public void shutdown() {
+    //TODO: need to revisit this during integration of container IO.
+  }
+
+  /**
+   * Returns the temporary chunkFile path.
+   * @param chunkFile
+   * @param info
+   * @return temporary chunkFile path
+   * @throws StorageContainerException
+   */
+  private File getTmpChunkFile(File chunkFile, ChunkInfo info)
+      throws StorageContainerException {
+    return new File(chunkFile.getParent(),
+        chunkFile.getName() +
+            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
+            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
+  }
+
+  /**
+   * Commit the chunk by renaming the temporary chunk file to chunk file.
+   * @param tmpChunkFile
+   * @param chunkFile
+   * @throws IOException
+   */
+  private void commitChunk(File tmpChunkFile, File chunkFile) throws
+      IOException {
+    Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
+        StandardCopyOption.REPLACE_EXISTING);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
new file mode 100644
index 0000000..40736e5
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
+import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_KEY;
+
+/**
+ * This class is for performing key related operations on the KeyValue
+ * Container.
+ */
+public class KeyManagerImpl implements KeyManager {
+
+  static final Logger LOG = LoggerFactory.getLogger(KeyManagerImpl.class);
+
+  private Configuration config;
+
+  /**
+   * Constructs a key Manager.
+   *
+   * @param conf - Ozone configuration
+   */
+  public KeyManagerImpl(Configuration conf) {
+    Preconditions.checkNotNull(conf, "Config cannot be null");
+    this.config = conf;
+  }
+
+  /**
+   * Puts or overwrites a key.
+   *
+   * @param container - Container for which key need to be added.
+   * @param data     - Key Data.
+   * @throws IOException
+   */
+  public void putKey(Container container, KeyData data) throws IOException {
+    Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
+        "operation.");
+    Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
+        "cannot be negative");
+    // We are not locking the key manager since LevelDb serializes all actions
+    // against a single DB. We rely on DB level locking to avoid conflicts.
+    MetadataStore db = KeyUtils.getDB((KeyValueContainerData) container
+        .getContainerData(), config);
+
+    // This is a post condition that acts as a hint to the user.
+    // Should never fail.
+    Preconditions.checkNotNull(db, "DB cannot be null here");
+    db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage()
+        .toByteArray());
+  }
+
+  /**
+   * Gets an existing key.
+   *
+   * @param container - Container from which key need to be get.
+   * @param blockID - BlockID of the key.
+   * @return Key Data.
+   * @throws IOException
+   */
+  public KeyData getKey(Container container, BlockID blockID)
+      throws IOException {
+    Preconditions.checkNotNull(blockID,
+        "BlockID cannot be null in GetKet request");
+    Preconditions.checkNotNull(blockID.getContainerID(),
+        "Container name cannot be null");
+
+    KeyValueContainerData containerData = (KeyValueContainerData) container
+        .getContainerData();
+    MetadataStore db = KeyUtils.getDB(containerData, config);
+    // This is a post condition that acts as a hint to the user.
+    // Should never fail.
+    Preconditions.checkNotNull(db, "DB cannot be null here");
+    byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
+    if (kData == null) {
+      throw new StorageContainerException("Unable to find the key.",
+          NO_SUCH_KEY);
+    }
+    ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData);
+    return KeyData.getFromProtoBuf(keyData);
+  }
+
+  /**
+   * Deletes an existing Key.
+   *
+   * @param container - Container from which key need to be deleted.
+   * @param blockID - ID of the block.
+   * @throws StorageContainerException
+   */
+  public void deleteKey(Container container, BlockID blockID) throws
+      IOException {
+    Preconditions.checkNotNull(blockID, "block ID cannot be null.");
+    Preconditions.checkState(blockID.getContainerID() >= 0,
+        "Container ID cannot be negative.");
+    Preconditions.checkState(blockID.getLocalID() >= 0,
+        "Local ID cannot be negative.");
+
+    KeyValueContainerData cData = (KeyValueContainerData) container
+        .getContainerData();
+    MetadataStore db = KeyUtils.getDB(cData, config);
+    // This is a post condition that acts as a hint to the user.
+    // Should never fail.
+    Preconditions.checkNotNull(db, "DB cannot be null here");
+    // Note : There is a race condition here, since get and delete
+    // are not atomic. Leaving it here since the impact is refusing
+    // to delete a key which might have just gotten inserted after
+    // the get check.
+    byte[] kKey = Longs.toByteArray(blockID.getLocalID());
+    byte[] kData = db.get(kKey);
+    if (kData == null) {
+      throw new StorageContainerException("Unable to find the key.",
+          NO_SUCH_KEY);
+    }
+    db.delete(kKey);
+  }
+
+  /**
+   * List keys in a container.
+   *
+   * @param container - Container from which keys need to be listed.
+   * @param startLocalID  - Key to start from, 0 to begin.
+   * @param count    - Number of keys to return.
+   * @return List of Keys that match the criteria.
+   */
+  public List<KeyData> listKey(Container container, long startLocalID, int
+      count) throws IOException {
+    Preconditions.checkNotNull(container, "container cannot be null");
+    Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
+        "negative");
+    Preconditions.checkArgument(count > 0,
+        "Count must be a positive number.");
+    container.readLock();
+    List<KeyData> result = null;
+    KeyValueContainerData cData = (KeyValueContainerData) container
+        .getContainerData();
+    MetadataStore db = KeyUtils.getDB(cData, config);
+    result = new ArrayList<>();
+    byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
+    List<Map.Entry<byte[], byte[]>> range = db.getSequentialRangeKVs(
+        startKeyInBytes, count, null);
+    for (Map.Entry<byte[], byte[]> entry : range) {
+      KeyData value = KeyUtils.getKeyData(entry.getValue());
+      KeyData data = new KeyData(value.getBlockID());
+      result.add(data);
+    }
+    return result;
+  }
+
+  /**
+   * Shutdown KeyValueContainerManager.
+   */
+  public void shutdown() {
+    KeyUtils.shutdownCache(ContainerCache.getInstance(config));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
index ebda97e..7a5d48b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
@@ -43,11 +43,11 @@ public interface KeyManager {
    * Gets an existing key.
    *
    * @param container - Container from which key need to be get.
-   * @param data - Key Data.
+   * @param blockID - BlockID of the Key.
    * @return Key Data.
    * @throws IOException
    */
-  KeyData getKey(Container container, KeyData data) throws IOException;
+  KeyData getKey(Container container, BlockID blockID) throws IOException;
 
   /**
    * Deletes an existing Key.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
index 055110c..52f291b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
@@ -19,12 +19,11 @@
 package org.apache.hadoop.ozone.container.common;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -33,39 +32,6 @@ import java.util.concurrent.atomic.AtomicLong;
 public class TestKeyValueContainerData {
 
   @Test
-  public void testGetFromProtoBuf() throws IOException {
-
-    long containerId = 1L;
-    ContainerProtos.ContainerType containerType = ContainerProtos
-        .ContainerType.KeyValueContainer;
-    int layOutVersion = 1;
-    ContainerProtos.ContainerLifeCycleState state = ContainerProtos
-        .ContainerLifeCycleState.OPEN;
-
-    ContainerProtos.KeyValue.Builder keyValBuilder =
-        ContainerProtos.KeyValue.newBuilder();
-    ContainerProtos.CreateContainerData containerData = ContainerProtos
-        .CreateContainerData.newBuilder()
-        .setContainerType(containerType)
-        .setContainerId(containerId)
-        .addMetadata(0, keyValBuilder.setKey("VOLUME").setValue("ozone")
-            .build())
-        .addMetadata(1, keyValBuilder.setKey("OWNER").setValue("hdfs")
-            .build()).build();
-
-    KeyValueContainerData kvData = KeyValueContainerData.getFromProtoBuf(
-        containerData);
-    assertEquals(containerType, kvData.getContainerType());
-    assertEquals(containerId, kvData.getContainerId());
-    assertEquals(layOutVersion, kvData.getLayOutVersion());
-    assertEquals(state, kvData.getState());
-    assertEquals(2, kvData.getMetadata().size());
-    assertEquals("ozone", kvData.getMetadata().get("VOLUME"));
-    assertEquals("hdfs", kvData.getMetadata().get("OWNER"));
-
-  }
-
-  @Test
   public void testKeyValueData() {
     long containerId = 1L;
     ContainerProtos.ContainerType containerType = ContainerProtos

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
index 2c9c2c3..5a29e8a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
@@ -25,6 +25,8 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -37,6 +39,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Class used to test ContainerSet operations.
@@ -59,8 +62,13 @@ public class TestContainerSet {
     //addContainer
     boolean result = containerSet.addContainer(keyValueContainer);
     assertTrue(result);
-    result = containerSet.addContainer(keyValueContainer);
-    assertFalse(result);
+    try {
+      result = containerSet.addContainer(keyValueContainer);
+      fail("Adding same container ID twice should fail.");
+    } catch (StorageContainerException ex) {
+      GenericTestUtils.assertExceptionContains("Container already exists with" 
+
+          " container Id " + containerId, ex);
+    }
 
     //getContainer
     KeyValueContainer container = (KeyValueContainer) containerSet

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
index 8550c47..75c0139 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.ozone.container.common.impl;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueYaml;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
new file mode 100644
index 0000000..50927d1
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+
+import java.util.UUID;
+
+/**
+ * Tests Handler interface.
+ */
+public class TestHandler {
+  @Rule
+  public TestRule timeout = new Timeout(300000);
+
+  private Configuration conf;
+  private HddsDispatcher dispatcher;
+  private ContainerSet containerSet;
+  private VolumeSet volumeSet;
+  private Handler handler;
+
+  private final static String SCM_ID = UUID.randomUUID().toString();
+  private final static String DATANODE_UUID = UUID.randomUUID().toString();
+
+  @Before
+  public void setup() throws Exception {
+    this.conf = new Configuration();
+    this.containerSet = Mockito.mock(ContainerSet.class);
+    this.volumeSet = Mockito.mock(VolumeSet.class);
+
+    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, 
SCM_ID);
+  }
+
+  @Test
+  public void testGetKeyValueHandler() throws Exception {
+    Handler kvHandler = dispatcher.getHandlerForContainerType(
+        ContainerProtos.ContainerType.KeyValueContainer);
+
+    Assert.assertTrue("getHandlerForContainerType returned incorrect handler",
+        (kvHandler instanceof KeyValueHandler));
+  }
+
+  @Test
+  public void testGetHandlerForInvalidContainerType() {
+    // When new ContainerProtos.ContainerType are added, increment the code
+    // for invalid enum.
+    ContainerProtos.ContainerType invalidContainerType =
+        ContainerProtos.ContainerType.forNumber(2);
+
+    Assert.assertEquals("New ContainerType detected. Not an invalid " +
+        "containerType", invalidContainerType, null);
+
+    Handler handler = dispatcher.getHandlerForContainerType(
+        invalidContainerType);
+    Assert.assertEquals("Get Handler for Invalid ContainerType should " +
+        "return null.", handler, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
index ca936c7..4576db6 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
@@ -18,17 +18,16 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
index a6f50c4..722cece 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
@@ -26,9 +26,9 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.volume
     .RoundRobinVolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Rule;
@@ -110,7 +110,7 @@ public class TestKeyManagerImpl {
 
     //Get Key
     KeyData fromGetKeyData = keyValueContainerManager.getKey(keyValueContainer,
-        keyData);
+        keyData.getBlockID());
 
     assertEquals(keyData.getContainerID(), fromGetKeyData.getContainerID());
     assertEquals(keyData.getLocalID(), fromGetKeyData.getLocalID());
@@ -168,8 +168,7 @@ public class TestKeyManagerImpl {
   @Test
   public void testGetNoSuchKey() throws Exception {
     try {
-      keyData = new KeyData(new BlockID(1L, 2L));
-      keyValueContainerManager.getKey(keyValueContainer, keyData);
+      keyValueContainerManager.getKey(keyValueContainer, new BlockID(1L, 2L));
       fail("testGetNoSuchKey failed");
     } catch (StorageContainerException ex) {
       GenericTestUtils.assertExceptionContains("Unable to find the key.", ex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index b24f601..006b82c 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -22,13 +22,15 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
 
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueYaml;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume
+    .RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 
+import org.apache.hadoop.ozone.container.keyvalue.helpers
+    .KeyValueContainerLocationUtil;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
new file mode 100644
index 0000000..f4dd41c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.times;
+
+import java.util.UUID;
+
+/**
+ * Unit tests for {@link KeyValueHandler}.
+ */
+public class TestKeyValueHandler {
+
+  @Rule
+  public TestRule timeout = new Timeout(300000);
+
+  private Configuration conf;
+  private HddsDispatcher dispatcher;
+  private ContainerSet containerSet;
+  private VolumeSet volumeSet;
+  private KeyValueHandler handler;
+
+  private final static String SCM_ID = UUID.randomUUID().toString();
+  private final static String DATANODE_UUID = UUID.randomUUID().toString();
+  private int containerID;
+
+  private final String baseDir = MiniDFSCluster.getBaseDirectory();
+  private final String volume = baseDir + "disk1";
+
+  private void setup() throws Exception {
+    this.conf = new Configuration();
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, volume);
+
+    this.containerSet = new ContainerSet();
+    DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
+        .setUuid(DATANODE_UUID)
+        .setHostName("localhost")
+        .setIpAddress("127.0.0.1")
+        .build();
+    this.volumeSet = new VolumeSet(datanodeDetails, conf);
+
+    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, 
SCM_ID);
+    this.handler = (KeyValueHandler) dispatcher.getHandlerForContainerType(
+        ContainerProtos.ContainerType.KeyValueContainer);
+  }
+
+  @Test
+  /**
+   * Test that Handler handles different command types correctly.
+   */
+  public void testHandlerCommandHandling() throws Exception{
+    // Create mock HddsDispatcher and KeyValueHandler.
+    this.handler = Mockito.mock(KeyValueHandler.class);
+    this.dispatcher = Mockito.mock(HddsDispatcher.class);
+    Mockito.when(dispatcher.getHandlerForContainerType(any())).thenReturn
+        (handler);
+    Mockito.when(dispatcher.dispatch(any())).thenCallRealMethod();
+    Mockito.when(dispatcher.getContainer(anyLong())).thenReturn(
+        Mockito.mock(KeyValueContainer.class));
+    Mockito.when(handler.handle(any(), any())).thenCallRealMethod();
+
+    // Test Create Container Request handling
+    ContainerCommandRequestProto createContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.CreateContainer);
+    dispatcher.dispatch(createContainerRequest);
+    Mockito.verify(handler, times(1)).handleCreateContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Read Container Request handling
+    ContainerCommandRequestProto readContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ReadContainer);
+    dispatcher.dispatch(readContainerRequest);
+    Mockito.verify(handler, times(1)).handleReadContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Update Container Request handling
+    ContainerCommandRequestProto updateContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.UpdateContainer);
+    dispatcher.dispatch(updateContainerRequest);
+    Mockito.verify(handler, times(1)).handleUpdateContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Delete Container Request handling
+    ContainerCommandRequestProto deleteContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.DeleteContainer);
+    dispatcher.dispatch(deleteContainerRequest);
+    Mockito.verify(handler, times(1)).handleDeleteContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test List Container Request handling
+    ContainerCommandRequestProto listContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ListContainer);
+    dispatcher.dispatch(listContainerRequest);
+    Mockito.verify(handler, times(1)).handleUnsupportedOp(
+        any(ContainerCommandRequestProto.class));
+
+    // Test Close Container Request handling
+    ContainerCommandRequestProto closeContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.CloseContainer);
+    dispatcher.dispatch(closeContainerRequest);
+    Mockito.verify(handler, times(1)).handleCloseContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Put Key Request handling
+    ContainerCommandRequestProto putKeyRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.PutKey);
+    dispatcher.dispatch(putKeyRequest);
+    Mockito.verify(handler, times(1)).handlePutKey(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Get Key Request handling
+    ContainerCommandRequestProto getKeyRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.GetKey);
+    dispatcher.dispatch(getKeyRequest);
+    Mockito.verify(handler, times(1)).handleGetKey(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Delete Key Request handling
+    ContainerCommandRequestProto deleteKeyRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.DeleteKey);
+    dispatcher.dispatch(deleteKeyRequest);
+    Mockito.verify(handler, times(1)).handleDeleteKey(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test List Key Request handling
+    ContainerCommandRequestProto listKeyRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ListKey);
+    dispatcher.dispatch(listKeyRequest);
+    Mockito.verify(handler, times(2)).handleUnsupportedOp(
+        any(ContainerCommandRequestProto.class));
+
+    // Test Read Chunk Request handling
+    ContainerCommandRequestProto readChunkRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ReadChunk);
+    dispatcher.dispatch(readChunkRequest);
+    Mockito.verify(handler, times(1)).handleReadChunk(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Delete Chunk Request handling
+    ContainerCommandRequestProto deleteChunkRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.DeleteChunk);
+    dispatcher.dispatch(deleteChunkRequest);
+    Mockito.verify(handler, times(1)).handleDeleteChunk(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Write Chunk Request handling
+    ContainerCommandRequestProto writeChunkRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.WriteChunk);
+    dispatcher.dispatch(writeChunkRequest);
+    Mockito.verify(handler, times(1)).handleWriteChunk(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test List Chunk Request handling
+    ContainerCommandRequestProto listChunkRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ListChunk);
+    dispatcher.dispatch(listChunkRequest);
+    Mockito.verify(handler, times(3)).handleUnsupportedOp(
+        any(ContainerCommandRequestProto.class));
+
+    // Test Put Small File Request handling
+    ContainerCommandRequestProto putSmallFileRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.PutSmallFile);
+    dispatcher.dispatch(putSmallFileRequest);
+    Mockito.verify(handler, times(1)).handlePutSmallFile(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Get Small File Request handling
+    ContainerCommandRequestProto getSmallFileRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.GetSmallFile);
+    dispatcher.dispatch(getSmallFileRequest);
+    Mockito.verify(handler, times(1)).handleGetSmallFile(
+        any(ContainerCommandRequestProto.class), any());
+  }
+
+  private ContainerCommandRequestProto getDummyCommandRequestProto
+      (ContainerProtos.Type cmdType) {
+    ContainerCommandRequestProto request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+            .setCmdType(cmdType)
+            .setDatanodeUuid(DATANODE_UUID)
+            .build();
+
+    return request;
+  }
+
+  @Test
+  public void testCreateContainer() throws Exception {
+    setup();
+
+    long contId = ++containerID;
+    ContainerProtos.CreateContainerRequestProto createReq =
+        ContainerProtos.CreateContainerRequestProto.newBuilder()
+            .setContainerID(contId)
+            .build();
+
+    ContainerCommandRequestProto request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+            .setCmdType(ContainerProtos.Type.CreateContainer)
+            .setDatanodeUuid(DATANODE_UUID)
+            .setCreateContainer(createReq)
+            .build();
+
+    dispatcher.dispatch(request);
+
+    // Verify that new container is added to containerSet.
+    Container container = containerSet.getContainer(contId);
+    Assert.assertEquals(contId, container.getContainerData().getContainerId());
+    Assert.assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN,
+        container.getContainerState());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
index e1a2918..38a4769 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
@@ -281,10 +281,10 @@ public class ChunkGroupInputStream extends InputStream 
implements Seekable {
         LOG.debug("get key accessing {} {}",
             containerID, containerKey);
         groupInputStream.streamOffset[i] = length;
-          ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
-            .containerKeyDataForRead(blockID);
+        ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
+            .getDatanodeBlockIDProtobuf();
         ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls
-            .getKey(xceiverClient, containerKeyData, requestId);
+            .getKey(xceiverClient, datanodeBlockID, requestId);
         List<ContainerProtos.ChunkInfo> chunks =
             response.getKeyData().getChunksList();
         for (ContainerProtos.ChunkInfo chunk : chunks) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java
deleted file mode 100644
index e74fffd..0000000
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.io;
-
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyData;
-import org.apache.hadoop.hdds.client.BlockID;
-
-
-/**
- * This class contains methods that define the translation between the Ozone
- * domain model and the storage container domain model.
- */
-final class OzoneContainerTranslation {
-
-  /**
-   * Creates key data intended for reading a container key.
-   *
-   * @param blockID - ID of the block.
-   * @return KeyData intended for reading the container key
-   */
-  public static KeyData containerKeyDataForRead(BlockID blockID) {
-    return KeyData
-        .newBuilder()
-        .setBlockID(blockID.getDatanodeBlockIDProtobuf())
-        .build();
-  }
-
-  /**
-   * There is no need to instantiate this class.
-   */
-  private OzoneContainerTranslation() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 7046132..9decdb9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -362,10 +362,7 @@ public final class ContainerTestHelper {
     ContainerProtos.CreateContainerRequestProto.Builder createRequest =
         ContainerProtos.CreateContainerRequestProto
             .newBuilder();
-    ContainerProtos.ContainerData.Builder containerData = ContainerProtos
-        .ContainerData.newBuilder();
-    containerData.setContainerID(containerID);
-    createRequest.setContainerData(containerData.build());
+    createRequest.setContainerID(containerID);
 
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
@@ -391,19 +388,16 @@ public final class ContainerTestHelper {
       long containerID, Map<String, String> metaData) throws IOException {
     ContainerProtos.UpdateContainerRequestProto.Builder updateRequestBuilder =
         ContainerProtos.UpdateContainerRequestProto.newBuilder();
-    ContainerProtos.ContainerData.Builder containerData = ContainerProtos
-        .ContainerData.newBuilder();
-    containerData.setContainerID(containerID);
+    updateRequestBuilder.setContainerID(containerID);
     String[] keys = metaData.keySet().toArray(new String[]{});
     for(int i=0; i<keys.length; i++) {
       KeyValue.Builder kvBuilder = KeyValue.newBuilder();
       kvBuilder.setKey(keys[i]);
       kvBuilder.setValue(metaData.get(keys[i]));
-      containerData.addMetadata(i, kvBuilder.build());
+      updateRequestBuilder.addMetadata(kvBuilder.build());
     }
     Pipeline pipeline =
         ContainerTestHelper.createSingleNodePipeline();
-    updateRequestBuilder.setContainerData(containerData.build());
 
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
@@ -478,10 +472,7 @@ public final class ContainerTestHelper {
 
     ContainerProtos.GetKeyRequestProto.Builder getRequest =
         ContainerProtos.GetKeyRequestProto.newBuilder();
-    ContainerProtos.KeyData.Builder keyData = ContainerProtos.KeyData
-        .newBuilder();
-    keyData.setBlockID(blockID);
-    getRequest.setKeyData(keyData);
+    getRequest.setBlockID(blockID);
 
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
@@ -499,13 +490,11 @@ public final class ContainerTestHelper {
    * @param response - Response
    */
   public static void verifyGetKey(ContainerCommandRequestProto request,
-      ContainerCommandResponseProto response) {
+      ContainerCommandResponseProto response, int expectedChunksCount) {
     Assert.assertEquals(request.getTraceID(), response.getTraceID());
     Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
-    ContainerProtos.PutKeyRequestProto putKey = request.getPutKey();
-    ContainerProtos.GetKeyRequestProto getKey = request.getGetKey();
-    Assert.assertEquals(putKey.getKeyData().getChunksCount(),
-        getKey.getKeyData().getChunksCount());
+    Assert.assertEquals(expectedChunksCount,
+        response.getGetKey().getKeyData().getChunksCount());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 67a8160..3f02036 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -158,7 +158,8 @@ public class TestOzoneContainer {
       // Get Key
       request = ContainerTestHelper.getKeyRequest(pipeline, 
putKeyRequest.getPutKey());
       response = client.sendCommand(request);
-      ContainerTestHelper.verifyGetKey(request, response);
+      int chunksCount = 
putKeyRequest.getPutKey().getKeyData().getChunksCount();
+      ContainerTestHelper.verifyGetKey(request, response, chunksCount);
 
 
       // Delete Key
@@ -331,7 +332,8 @@ public class TestOzoneContainer {
       request = ContainerTestHelper.getKeyRequest(client.getPipeline(),
           putKeyRequest.getPutKey());
       response = client.sendCommand(request);
-      ContainerTestHelper.verifyGetKey(request, response);
+      int chunksCount = 
putKeyRequest.getPutKey().getKeyData().getChunksCount();
+      ContainerTestHelper.verifyGetKey(request, response, chunksCount);
 
       // Delete Key must fail on a closed container.
       request =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
index 13b04c3..c14c1b9 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
@@ -174,9 +174,7 @@ public class BenchMarkDatanodeDispatcher {
   private ContainerCommandRequestProto getCreateContainerCommand(long 
containerID) {
     CreateContainerRequestProto.Builder createRequest =
         CreateContainerRequestProto.newBuilder();
-    createRequest.setContainerData(
-        ContainerData.newBuilder().setContainerID(
-            containerID).build());
+    createRequest.setContainerID(containerID).build();
 
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
@@ -245,10 +243,9 @@ public class BenchMarkDatanodeDispatcher {
     return request.build();
   }
 
-  private ContainerCommandRequestProto getGetKeyCommand(
-      BlockID blockID, String chunkKey) {
+  private ContainerCommandRequestProto getGetKeyCommand(BlockID blockID) {
     GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto.newBuilder()
-        .setKeyData(getKeyData(blockID, chunkKey));
+        .setBlockID(blockID.getDatanodeBlockIDProtobuf());
     ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
         .newBuilder()
         .setCmdType(ContainerProtos.Type.GetKey)
@@ -300,8 +297,7 @@ public class BenchMarkDatanodeDispatcher {
   @Benchmark
   public void getKey(BenchMarkDatanodeDispatcher bmdd) {
     BlockID blockID = getRandomBlockID();
-    String chunkKey = getNewChunkToWrite();
-    bmdd.dispatcher.dispatch(getGetKeyCommand(blockID, chunkKey));
+    bmdd.dispatcher.dispatch(getGetKeyCommand(blockID));
   }
 
   // Chunks writes from benchmark only reaches certain containers


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to