http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 4156f5a..9e25c59 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -1,72 +1,49 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
-import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
-import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
-import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
-import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
-import org.apache.hadoop.ozone.container.common.statemachine.background
-    .BlockDeletingService;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
-import org.apache.hadoop.ozone.container.common.transport.server
-    .XceiverServerGrpc;
-import org.apache.hadoop.ozone.container.common.transport.server
-    .XceiverServerSpi;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis
-    .XceiverServerRatis;
+import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
+import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Iterator;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
 
 /**
@@ -74,226 +51,123 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
  * layer.
  */
 public class OzoneContainer {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(OzoneContainer.class);
 
-  private final Configuration ozoneConfig;
-  private final ContainerDispatcher dispatcher;
-  private final ContainerManager manager;
+  public static final Logger LOG = LoggerFactory.getLogger(
+      OzoneContainer.class);
+
+  private final HddsDispatcher hddsDispatcher;
+  private final DatanodeDetails dnDetails;
+  private final OzoneConfiguration config;
+  private final VolumeSet volumeSet;
+  private final ContainerSet containerSet;
   private final XceiverServerSpi[] server;
-  private final ChunkManager chunkManager;
-  private final KeyManager keyManager;
-  private final BlockDeletingService blockDeletingService;
 
   /**
-   * Creates a network endpoint and enables Ozone container.
-   *
-   * @param ozoneConfig - Config
+   * Construct OzoneContainer object.
+   * @param datanodeDetails
+   * @param conf
+   * @throws DiskOutOfSpaceException
    * @throws IOException
    */
-  public OzoneContainer(
-      DatanodeDetails datanodeDetails, Configuration ozoneConfig)
-      throws IOException {
-    this.ozoneConfig = ozoneConfig;
-    List<StorageLocation> locations = new LinkedList<>();
-    String[] paths = ozoneConfig.getStrings(
-        OzoneConfigKeys.OZONE_METADATA_DIRS);
-    if (paths != null && paths.length > 0) {
-      for (String p : paths) {
-        locations.add(StorageLocation.parse(
-            Paths.get(p).resolve(CONTAINER_ROOT_PREFIX).toString()));
-      }
-    } else {
-      getDataDir(locations);
-    }
-
-    manager = new ContainerManagerImpl();
-    manager.init(this.ozoneConfig, locations, datanodeDetails);
-    this.chunkManager = new ChunkManagerImpl(manager);
-    manager.setChunkManager(this.chunkManager);
-
-    this.keyManager = new KeyManagerImpl(manager, ozoneConfig);
-    manager.setKeyManager(this.keyManager);
-
-    long svcInterval =
-        ozoneConfig.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
-        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
-    long serviceTimeout = ozoneConfig.getTimeDuration(
-        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
-        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
-    this.blockDeletingService = new BlockDeletingService(manager,
-        svcInterval, serviceTimeout, ozoneConfig);
-
-    this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
-
-    boolean useGrpc = this.ozoneConfig.getBoolean(
+  public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
+      conf) throws IOException {
+    this.dnDetails = datanodeDetails;
+    this.config = conf;
+    this.volumeSet = new VolumeSet(datanodeDetails, conf);
+    this.containerSet = new ContainerSet();
+    boolean useGrpc = this.config.getBoolean(
         ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
         ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
+    buildContainerSet();
+    hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet);
     server = new XceiverServerSpi[]{
-        useGrpc ? new XceiverServerGrpc(datanodeDetails,
-            this.ozoneConfig, this.dispatcher) :
+        useGrpc ? new XceiverServerGrpc(datanodeDetails, this.config, this
+            .hddsDispatcher) :
             new XceiverServer(datanodeDetails,
-                this.ozoneConfig, this.dispatcher),
-      XceiverServerRatis
-          .newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher)
+                this.config, this.hddsDispatcher),
+        XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
+            .config, hddsDispatcher)
     };
-  }
 
-  /**
-   * Starts serving requests to ozone container.
-   *
-   * @throws IOException
-   */
-  public void start() throws IOException {
-    for (XceiverServerSpi serverinstance : server) {
-      serverinstance.start();
-    }
-    blockDeletingService.start();
-    dispatcher.init();
+
   }
 
+
   /**
-   * Stops the ozone container.
-   * <p>
-   * Shutdown logic is not very obvious from the following code. if you need to
-   * modify the logic, please keep these comments in mind. Here is the shutdown
-   * sequence.
-   * <p>
-   * 1. We shutdown the network ports.
-   * <p>
-   * 2. Now we need to wait for all requests in-flight to finish.
-   * <p>
-   * 3. The container manager lock is a read-write lock with "Fairness"
-   * enabled.
-   * <p>
-   * 4. This means that the waiting threads are served in a "first-come-first
-   * -served" manner. Please note that this applies to waiting threads only.
-   * <p>
-   * 5. Since write locks are exclusive, if we are waiting to get a lock it
-   * implies that we are waiting for in-flight operations to complete.
-   * <p>
-   * 6. if there are other write operations waiting on the reader-writer lock,
-   * fairness guarantees that they will proceed before the shutdown lock
-   * request.
-   * <p>
-   * 7. Since all operations either take a reader or writer lock of container
-   * manager, we are guaranteed that we are the last operation since we have
-   * closed the network port, and we wait until close is successful.
-   * <p>
-   * 8. We take the writer lock and call shutdown on each of the managers in
-   * reverse order. That is chunkManager, keyManager and containerManager is
-   * shutdown.
+   * Build's container map.
    */
-  public void stop() {
-    LOG.info("Attempting to stop container services.");
-    for(XceiverServerSpi serverinstance: server) {
-      serverinstance.stop();
+  public void buildContainerSet() {
+    Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
+        .iterator();
+    ArrayList<Thread> volumeThreads = new ArrayList<Thread>();
+
+    //TODO: diskchecker should be run before this, to see how disks are.
+    // And also handle disk failure tolerance need to be added
+    while (volumeSetIterator.hasNext()) {
+      HddsVolume volume = volumeSetIterator.next();
+      File hddsVolumeRootDir = volume.getHddsRootDir();
+      Thread thread = new Thread(new ContainerReader(hddsVolumeRootDir,
+          containerSet, config));
+      thread.start();
+      volumeThreads.add(thread);
     }
-    dispatcher.shutdown();
 
     try {
-      this.manager.writeLock();
-      this.chunkManager.shutdown();
-      this.keyManager.shutdown();
-      this.manager.shutdown();
-      this.blockDeletingService.shutdown();
-      LOG.info("container services shutdown complete.");
-    } catch (IOException ex) {
-      LOG.warn("container service shutdown error:", ex);
-    } finally {
-      this.manager.writeUnlock();
+      for (int i = 0; i < volumeThreads.size(); i++) {
+        volumeThreads.get(i).join();
+      }
+    } catch (InterruptedException ex) {
+      LOG.info("Volume Threads Interrupted exception", ex);
     }
+
   }
 
   /**
-   * Returns a paths to data dirs.
+   * Starts serving requests to ozone container.
    *
-   * @param pathList - List of paths.
    * @throws IOException
    */
-  private void getDataDir(List<StorageLocation> pathList) throws IOException {
-    for (String dir : ozoneConfig.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
-      StorageLocation location = StorageLocation.parse(dir);
-      pathList.add(location);
+  public void start() throws IOException {
+    LOG.info("Attempting to start container services.");
+    for (XceiverServerSpi serverinstance : server) {
+      serverinstance.start();
     }
+    hddsDispatcher.init();
   }
 
   /**
-   * Returns node report of container storage usage.
+   * Stop Container Service on the datanode.
    */
-  public NodeReportProto getNodeReport() throws IOException {
-    return this.manager.getNodeReport();
-  }
-
-  private int getPortbyType(HddsProtos.ReplicationType replicationType) {
-    for (XceiverServerSpi serverinstance : server) {
-      if (serverinstance.getServerType() == replicationType) {
-        return serverinstance.getIPCPort();
-      }
+  public void stop() {
+    //TODO: at end of container IO integration work.
+    LOG.info("Attempting to stop container services.");
+    for(XceiverServerSpi serverinstance: server) {
+      serverinstance.stop();
     }
-    return INVALID_PORT;
+    hddsDispatcher.shutdown();
   }
 
-  /**
-   * Returns the container server IPC port.
-   *
-   * @return Container server IPC port.
-   */
-  public int getContainerServerPort() {
-    return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE);
-  }
 
-  /**
-   * Returns the Ratis container Server IPC port.
-   *
-   * @return Ratis port.
-   */
-  public int getRatisContainerServerPort() {
-    return getPortbyType(HddsProtos.ReplicationType.RATIS);
+  @VisibleForTesting
+  public ContainerSet getContainerSet() {
+    return containerSet;
   }
-
   /**
    * Returns container report.
    * @return - container report.
    * @throws IOException
    */
-  public ContainerReportsProto getContainerReport() throws IOException {
-    return this.manager.getContainerReport();
+  public StorageContainerDatanodeProtocolProtos.ContainerReportsProto
+      getContainerReport() throws IOException {
+    return this.containerSet.getContainerReport();
   }
 
-// TODO: remove getContainerReports
   /**
-   * Returns the list of closed containers.
-   * @return - List of closed containers.
+   * Submit ContainerRequest.
+   * @param request
+   * @param replicationType
    * @throws IOException
    */
-  public List<ContainerData> getClosedContainerReports() throws IOException {
-    return this.manager.getClosedContainerReports();
-  }
-
-  private XceiverServerSpi getRatisSerer() {
-    for (XceiverServerSpi serverInstance : server) {
-      if (serverInstance instanceof XceiverServerRatis) {
-        return serverInstance;
-      }
-    }
-    return null;
-  }
-
-  private XceiverServerSpi getStandaAloneSerer() {
-    for (XceiverServerSpi serverInstance : server) {
-      if (!(serverInstance instanceof XceiverServerRatis)) {
-        return serverInstance;
-      }
-    }
-    return null;
-  }
-
-  @VisibleForTesting
-  public ContainerManager getContainerManager() {
-    return this.manager;
-  }
-
   public void submitContainerRequest(
       ContainerProtos.ContainerCommandRequestProto request,
       HddsProtos.ReplicationType replicationType) throws IOException {
@@ -332,4 +206,66 @@ public class OzoneContainer {
           + " not supported over HearBeat Response");
     }
   }
-}
\ No newline at end of file
+
+  private XceiverServerSpi getRatisSerer() {
+    for (XceiverServerSpi serverInstance : server) {
+      if (serverInstance instanceof XceiverServerRatis) {
+        return serverInstance;
+      }
+    }
+    return null;
+  }
+
+  private XceiverServerSpi getStandaAloneSerer() {
+    for (XceiverServerSpi serverInstance : server) {
+      if (!(serverInstance instanceof XceiverServerRatis)) {
+        return serverInstance;
+      }
+    }
+    return null;
+  }
+
+  private int getPortbyType(HddsProtos.ReplicationType replicationType) {
+    for (XceiverServerSpi serverinstance : server) {
+      if (serverinstance.getServerType() == replicationType) {
+        return serverinstance.getIPCPort();
+      }
+    }
+    return INVALID_PORT;
+  }
+
+  /**
+   * Returns the container server IPC port.
+   *
+   * @return Container server IPC port.
+   */
+  public int getContainerServerPort() {
+    return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE);
+  }
+
+  /**
+   * Returns the Ratis container Server IPC port.
+   *
+   * @return Ratis port.
+   */
+  public int getRatisContainerServerPort() {
+    return getPortbyType(HddsProtos.ReplicationType.RATIS);
+  }
+
+  /**
+   * Returns node report of container storage usage.
+   */
+  public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport()
+      throws IOException {
+    return volumeSet.getNodeReport();
+  }
+
+  @VisibleForTesting
+  public ContainerDispatcher getDispatcher() {
+    return this.hddsDispatcher;
+  }
+
+  public VolumeSet getVolumeSet() {
+    return volumeSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java
index 83acf5b..4d328d3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java
@@ -105,6 +105,10 @@ public class VersionResponse {
             .addAllKeys(list).build();
   }
 
+  public String getValue(String key) {
+    return this.values.get(key);
+  }
+
   /**
    * Builder class.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
index b63c5fb..a24f096 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
@@ -25,16 +25,20 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolService;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.ozone.protocolPB
     .StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.test.GenericTestUtils;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+
 /**
  * Test Endpoint class.
  */
@@ -109,8 +113,13 @@ public final class SCMTestUtils {
     }
   }
 
-  public static Configuration getConf() {
-    return new Configuration();
+  public static OzoneConfiguration getConf() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(HDDS_DATANODE_DIR_KEY, GenericTestUtils
+        .getRandomizedTempPath());
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, GenericTestUtils
+        .getRandomizedTempPath());
+    return conf;
   }
 
   public static OzoneConfiguration getOzoneConf() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 14da960..8f4b0e3 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 
@@ -151,7 +152,10 @@ public class ScmTestMock implements 
StorageContainerDatanodeProtocol {
     return VersionResponse.newBuilder()
         .setVersion(versionInfo.getVersion())
         .addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription())
+        .addValue(OzoneConsts.SCM_ID, UUID.randomUUID().toString())
+        .addValue(OzoneConsts.CLUSTER_ID, UUID.randomUUID().toString())
         .build().getProtobufMessage();
+
   }
 
   private void sleepIfNeeded() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
index 52f291b..249b0fe 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
@@ -42,8 +42,7 @@ public class TestKeyValueContainerData {
         .ContainerLifeCycleState.CLOSED;
     AtomicLong val = new AtomicLong(0);
 
-    KeyValueContainerData kvData = new KeyValueContainerData(containerType,
-        containerId);
+    KeyValueContainerData kvData = new KeyValueContainerData(containerId);
 
     assertEquals(containerType, kvData.getContainerType());
     assertEquals(containerId, kvData.getContainerId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
new file mode 100644
index 0000000..e1b7bd2
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This class tests create/read .container files.
+ */
+public class TestContainerDataYaml {
+
+  @Test
+  public void testCreateContainerFile() throws IOException {
+    String path = new FileSystemTestHelper().getTestRootDir();
+    String containerPath = "1.container";
+
+    File filePath = new File(new FileSystemTestHelper().getTestRootDir());
+    filePath.mkdirs();
+
+    KeyValueContainerData keyValueContainerData = new 
KeyValueContainerData(Long.MAX_VALUE);
+    keyValueContainerData.setContainerDBType("RocksDB");
+    keyValueContainerData.setMetadataPath(path);
+    keyValueContainerData.setChunksPath(path);
+
+    File containerFile = new File(filePath, containerPath);
+
+    // Create .container file with ContainerData
+    ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType
+            .KeyValueContainer, containerFile, keyValueContainerData);
+
+    //Check .container file exists or not.
+    assertTrue(containerFile.exists());
+
+    // Read from .container file, and verify data.
+    KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
+        .readContainerFile(containerFile);
+    assertEquals(Long.MAX_VALUE, kvData.getContainerId());
+    assertEquals(ContainerProtos.ContainerType.KeyValueContainer, kvData
+        .getContainerType());
+    assertEquals("RocksDB", kvData.getContainerDBType());
+    assertEquals(path, kvData.getMetadataPath());
+    assertEquals(path, kvData.getChunksPath());
+    assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN, kvData
+        .getState());
+    assertEquals(1, kvData.getLayOutVersion());
+    assertEquals(0, kvData.getMetadata().size());
+
+    // Update ContainerData.
+    kvData.addMetadata("VOLUME", "hdfs");
+    kvData.addMetadata("OWNER", "ozone");
+    kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
+
+
+    // Update .container file with new ContainerData.
+    containerFile = new File(filePath, containerPath);
+    ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType
+            .KeyValueContainer, containerFile, kvData);
+
+    // Reading newly updated data from .container file
+    kvData =  (KeyValueContainerData) ContainerDataYaml.readContainerFile(
+        containerFile);
+
+    // verify data.
+    assertEquals(Long.MAX_VALUE, kvData.getContainerId());
+    assertEquals(ContainerProtos.ContainerType.KeyValueContainer, kvData
+        .getContainerType());
+    assertEquals("RocksDB", kvData.getContainerDBType());
+    assertEquals(path, kvData.getMetadataPath());
+    assertEquals(path, kvData.getChunksPath());
+    assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, kvData
+        .getState());
+    assertEquals(1, kvData.getLayOutVersion());
+    assertEquals(2, kvData.getMetadata().size());
+    assertEquals("hdfs", kvData.getMetadata().get("VOLUME"));
+    assertEquals("ozone", kvData.getMetadata().get("OWNER"));
+
+    FileUtil.fullyDelete(filePath);
+
+
+  }
+
+  @Test
+  public void testIncorrectContainerFile() throws IOException{
+    try {
+      String path = "incorrect.container";
+      //Get file from resources folder
+      ClassLoader classLoader = getClass().getClassLoader();
+      File file = new File(classLoader.getResource(path).getFile());
+      KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
+          .readContainerFile(file);
+      fail("testIncorrectContainerFile failed");
+    } catch (IllegalStateException ex) {
+      GenericTestUtils.assertExceptionContains("Unexpected " +
+          "ContainerLifeCycleState", ex);
+    }
+  }
+
+
+  @Test
+  public void testCheckBackWardCompatabilityOfContainerFile() throws
+      IOException {
+    // This test is for if we upgrade, and then .container files added by new
+    // server will have new fields added to .container file, after a while we
+    // decided to rollback. Then older ozone can read .container files
+    // created or not.
+
+    try {
+      String path = "additionalfields.container";
+      //Get file from resources folder
+      ClassLoader classLoader = getClass().getClassLoader();
+      File file = new File(classLoader.getResource(path).getFile());
+      KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
+          .readContainerFile(file);
+
+      //Checking the Container file data is consistent or not
+      assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, kvData
+          .getState());
+      assertEquals("RocksDB", kvData.getContainerDBType());
+      assertEquals(ContainerProtos.ContainerType.KeyValueContainer, kvData
+          .getContainerType());
+      assertEquals(9223372036854775807L, kvData.getContainerId());
+      assertEquals("/hdds/current/aed-fg4-hji-jkl/containerdir0/1", kvData
+          .getChunksPath());
+      assertEquals("/hdds/current/aed-fg4-hji-jkl/containerdir0/1", kvData
+          .getMetadataPath());
+      assertEquals(1, kvData.getLayOutVersion());
+      assertEquals(2, kvData.getMetadata().size());
+
+    } catch (Exception ex) {
+      fail("testCheckBackWardCompatabilityOfContainerFile failed");
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
index 5a29e8a..55d6773 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
@@ -53,8 +53,7 @@ public class TestContainerSet {
     ContainerProtos.ContainerLifeCycleState state = ContainerProtos
         .ContainerLifeCycleState.CLOSED;
 
-    KeyValueContainerData kvData = new KeyValueContainerData(
-        ContainerProtos.ContainerType.KeyValueContainer, containerId);
+    KeyValueContainerData kvData = new KeyValueContainerData(containerId);
     kvData.setState(state);
     KeyValueContainer keyValueContainer = new KeyValueContainer(kvData, new
         OzoneConfiguration());
@@ -164,8 +163,7 @@ public class TestContainerSet {
   private ContainerSet createContainerSet() throws StorageContainerException {
     ContainerSet containerSet = new ContainerSet();
     for (int i=0; i<10; i++) {
-      KeyValueContainerData kvData = new KeyValueContainerData(
-          ContainerProtos.ContainerType.KeyValueContainer, i);
+      KeyValueContainerData kvData = new KeyValueContainerData(i);
       if (i%2 == 0) {
         kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
       } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
deleted file mode 100644
index 75c0139..0000000
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.hadoop.fs.FileSystemTestHelper;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueYaml;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * This class tests create/read .container files.
- */
-public class TestKeyValueYaml {
-
-  @Test
-  public void testCreateContainerFile() throws IOException {
-    String path = new FileSystemTestHelper().getTestRootDir();
-    String containerPath = "1.container";
-
-    File filePath = new File(new FileSystemTestHelper().getTestRootDir());
-    filePath.mkdirs();
-
-    KeyValueContainerData keyValueContainerData = new KeyValueContainerData(
-        ContainerProtos.ContainerType.KeyValueContainer, Long.MAX_VALUE);
-    keyValueContainerData.setContainerDBType("RocksDB");
-    keyValueContainerData.setMetadataPath(path);
-    keyValueContainerData.setChunksPath(path);
-
-    File containerFile = new File(filePath, containerPath);
-
-    // Create .container file with ContainerData
-    KeyValueYaml.createContainerFile(containerFile, keyValueContainerData);
-
-    //Check .container file exists or not.
-    assertTrue(containerFile.exists());
-
-    // Read from .container file, and verify data.
-    KeyValueContainerData kvData = KeyValueYaml.readContainerFile(
-        containerFile);
-    assertEquals(Long.MAX_VALUE, kvData.getContainerId());
-    assertEquals(ContainerProtos.ContainerType.KeyValueContainer, kvData
-        .getContainerType());
-    assertEquals("RocksDB", kvData.getContainerDBType());
-    assertEquals(path, kvData.getMetadataPath());
-    assertEquals(path, kvData.getChunksPath());
-    assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN, kvData
-        .getState());
-    assertEquals(1, kvData.getLayOutVersion());
-    assertEquals(0, kvData.getMetadata().size());
-
-    // Update ContainerData.
-    kvData.addMetadata("VOLUME", "hdfs");
-    kvData.addMetadata("OWNER", "ozone");
-    kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
-
-
-    // Update .container file with new ContainerData.
-    containerFile = new File(filePath, containerPath);
-    KeyValueYaml.createContainerFile(containerFile, kvData);
-
-    // Reading newly updated data from .container file
-    kvData =  KeyValueYaml.readContainerFile(containerFile);
-
-    // verify data.
-    assertEquals(Long.MAX_VALUE, kvData.getContainerId());
-    assertEquals(ContainerProtos.ContainerType.KeyValueContainer, kvData
-        .getContainerType());
-    assertEquals("RocksDB", kvData.getContainerDBType());
-    assertEquals(path, kvData.getMetadataPath());
-    assertEquals(path, kvData.getChunksPath());
-    assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, kvData
-        .getState());
-    assertEquals(1, kvData.getLayOutVersion());
-    assertEquals(2, kvData.getMetadata().size());
-    assertEquals("hdfs", kvData.getMetadata().get("VOLUME"));
-    assertEquals("ozone", kvData.getMetadata().get("OWNER"));
-
-    FileUtil.fullyDelete(filePath);
-
-
-  }
-
-  @Test
-  public void testIncorrectContainerFile() throws IOException{
-    try {
-      String path = "incorrect.container";
-      //Get file from resources folder
-      ClassLoader classLoader = getClass().getClassLoader();
-      File file = new File(classLoader.getResource(path).getFile());
-      KeyValueContainerData kvData = KeyValueYaml.readContainerFile(file);
-      fail("testIncorrectContainerFile failed");
-    } catch (IllegalStateException ex) {
-      GenericTestUtils.assertExceptionContains("Unexpected " +
-          "ContainerLifeCycleState", ex);
-    }
-  }
-
-
-  @Test
-  public void testCheckBackWardCompatabilityOfContainerFile() throws
-      IOException {
-    // This test is for if we upgrade, and then .container files added by new
-    // server will have new fields added to .container file, after a while we
-    // decided to rollback. Then older ozone can read .container files
-    // created or not.
-
-    try {
-      String path = "additionalfields.container";
-      //Get file from resources folder
-      ClassLoader classLoader = getClass().getClassLoader();
-      File file = new File(classLoader.getResource(path).getFile());
-      KeyValueContainerData kvData = KeyValueYaml.readContainerFile(file);
-
-      //Checking the Container file data is consistent or not
-      assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, kvData
-          .getState());
-      assertEquals("RocksDB", kvData.getContainerDBType());
-      assertEquals(ContainerProtos.ContainerType.KeyValueContainer, kvData
-          .getContainerType());
-      assertEquals(9223372036854775807L, kvData.getContainerId());
-      assertEquals("/hdds/current/aed-fg4-hji-jkl/containerdir0/1", kvData
-          .getChunksPath());
-      assertEquals("/hdds/current/aed-fg4-hji-jkl/containerdir0/1", kvData
-          .getMetadataPath());
-      assertEquals(1, kvData.getLayOutVersion());
-      assertEquals(2, kvData.getMetadata().size());
-
-    } catch (Exception ex) {
-      fail("testCheckBackWardCompatabilityOfContainerFile failed");
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
index 50927d1..6660e9b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
@@ -52,7 +52,6 @@ public class TestHandler {
   private VolumeSet volumeSet;
   private Handler handler;
 
-  private final static String SCM_ID = UUID.randomUUID().toString();
   private final static String DATANODE_UUID = UUID.randomUUID().toString();
 
   @Before
@@ -61,12 +60,12 @@ public class TestHandler {
     this.containerSet = Mockito.mock(ContainerSet.class);
     this.volumeSet = Mockito.mock(VolumeSet.class);
 
-    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, 
SCM_ID);
+    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
   }
 
   @Test
   public void testGetKeyValueHandler() throws Exception {
-    Handler kvHandler = dispatcher.getHandlerForContainerType(
+    Handler kvHandler = dispatcher.getHandler(
         ContainerProtos.ContainerType.KeyValueContainer);
 
     Assert.assertTrue("getHandlerForContainerType returned incorrect handler",
@@ -83,8 +82,7 @@ public class TestHandler {
     Assert.assertEquals("New ContainerType detected. Not an invalid " +
         "containerType", invalidContainerType, null);
 
-    Handler handler = dispatcher.getHandlerForContainerType(
-        invalidContainerType);
+    Handler handler = dispatcher.getHandler(invalidContainerType);
     Assert.assertEquals("Get Handler for Invalid ContainerType should " +
         "return null.", handler, null);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
index 4576db6..272bdb9 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
@@ -81,8 +81,7 @@ public class TestChunkManagerImpl {
     Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
         .thenReturn(hddsVolume);
 
-    keyValueContainerData = new KeyValueContainerData(
-        ContainerProtos.ContainerType.KeyValueContainer, 1L);
+    keyValueContainerData = new KeyValueContainerData(1L);
 
     keyValueContainer = new KeyValueContainer(
         keyValueContainerData, config);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
index 722cece..fa7c66d 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
@@ -79,8 +79,7 @@ public class TestKeyManagerImpl {
     Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
         .thenReturn(hddsVolume);
 
-    keyValueContainerData = new KeyValueContainerData(
-        ContainerProtos.ContainerType.KeyValueContainer, 1L);
+    keyValueContainerData = new KeyValueContainerData(1L);
 
     keyValueContainer = new KeyValueContainer(
         keyValueContainerData, config);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 006b82c..de5f432 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume
     .RoundRobinVolumeChoosingPolicy;
@@ -85,8 +86,7 @@ public class TestKeyValueContainer {
     Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
         .thenReturn(hddsVolume);
 
-    keyValueContainerData = new KeyValueContainerData(
-        ContainerProtos.ContainerType.KeyValueContainer, 1L);
+    keyValueContainerData = new KeyValueContainerData(1L);
 
     keyValueContainer = new KeyValueContainer(
         keyValueContainerData, conf);
@@ -197,7 +197,8 @@ public class TestKeyValueContainer {
     File containerFile = KeyValueContainerLocationUtil.getContainerFile(
         containerMetaDataLoc, containerName);
 
-    keyValueContainerData = KeyValueYaml.readContainerFile(containerFile);
+    keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
+        .readContainerFile(containerFile);
     assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
         keyValueContainerData.getState());
   }
@@ -237,7 +238,8 @@ public class TestKeyValueContainer {
     File containerFile = KeyValueContainerLocationUtil.getContainerFile(
         containerMetaDataLoc, containerName);
 
-    keyValueContainerData = KeyValueYaml.readContainerFile(containerFile);
+    keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
+        .readContainerFile(containerFile);
     assertEquals(2, keyValueContainerData.getMetadata().size());
 
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index f4dd41c..dbddf47 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -74,9 +74,10 @@ public class TestKeyValueHandler {
         .build();
     this.volumeSet = new VolumeSet(datanodeDetails, conf);
 
-    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, 
SCM_ID);
-    this.handler = (KeyValueHandler) dispatcher.getHandlerForContainerType(
+    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
+    this.handler = (KeyValueHandler) dispatcher.getHandler(
         ContainerProtos.ContainerType.KeyValueContainer);
+    dispatcher.setScmId(UUID.randomUUID().toString());
   }
 
   @Test
@@ -87,8 +88,7 @@ public class TestKeyValueHandler {
     // Create mock HddsDispatcher and KeyValueHandler.
     this.handler = Mockito.mock(KeyValueHandler.class);
     this.dispatcher = Mockito.mock(HddsDispatcher.class);
-    Mockito.when(dispatcher.getHandlerForContainerType(any())).thenReturn
-        (handler);
+    Mockito.when(dispatcher.getHandler(any())).thenReturn(handler);
     Mockito.when(dispatcher.dispatch(any())).thenCallRealMethod();
     Mockito.when(dispatcher.getContainer(anyLong())).thenReturn(
         Mockito.mock(KeyValueContainer.class));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
new file mode 100644
index 0000000..cf4bb62
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.util.Random;
+import java.util.UUID;
+
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This class is used to test OzoneContainer.
+ */
+public class TestOzoneContainer {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+
+  private OzoneConfiguration conf;
+  private String scmId = UUID.randomUUID().toString();
+  private VolumeSet volumeSet;
+  private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
+  private KeyValueContainerData keyValueContainerData;
+  private KeyValueContainer keyValueContainer;
+  private final DatanodeDetails datanodeDetails = createDatanodeDetails();
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, folder.getRoot()
+        .getAbsolutePath() + "," + folder.newFolder().getAbsolutePath());
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, 
folder.newFolder().getAbsolutePath());
+    volumeSet = new VolumeSet(datanodeDetails, conf);
+    volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
+
+    for (int i=0; i<10; i++) {
+      keyValueContainerData = new KeyValueContainerData(i);
+      keyValueContainer = new KeyValueContainer(
+          keyValueContainerData, conf);
+      keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+    }
+  }
+
+  @Test
+  public void testBuildContainerMap() throws Exception {
+    OzoneContainer ozoneContainer = new
+        OzoneContainer(datanodeDetails, conf);
+    ContainerSet containerset = ozoneContainer.getContainerSet();
+    assertEquals(10, containerset.containerCount());
+  }
+
+
+  private DatanodeDetails createDatanodeDetails() {
+    Random random = new Random();
+    String ipAddress =
+        random.nextInt(256) + "." + random.nextInt(256) + "." + random
+            .nextInt(256) + "." + random.nextInt(256);
+
+    String uuid = UUID.randomUUID().toString();
+    String hostName = uuid;
+    DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.STANDALONE, 0);
+    DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.RATIS, 0);
+    DatanodeDetails.Port restPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.REST, 0);
+    DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
+    builder.setUuid(uuid)
+        .setHostName("localhost")
+        .setIpAddress(ipAddress)
+        .addPort(containerPort)
+        .addPort(ratisPort)
+        .addPort(restPort);
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index b339fb7..9ac9930 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -703,6 +704,9 @@ public class SCMNodeManager
   public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
     return VersionResponse.newBuilder()
         .setVersion(this.version.getVersion())
+        .addValue(OzoneConsts.SCM_ID, 
this.scmManager.getScmStorage().getScmId())
+        .addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorage()
+            .getClusterID())
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 34779da..9db9e80 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -20,6 +20,7 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.VersionInfo;
@@ -125,12 +126,14 @@ public class TestEndPoint {
    * how the state machine would make the call.
    */
   public void testGetVersionTask() throws Exception {
-    Configuration conf = SCMTestUtils.getConf();
+    OzoneConfiguration conf = SCMTestUtils.getConf();
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         serverAddress, 1000)) {
+      OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(),
+          conf);
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
-          conf);
+          conf, ozoneContainer);
       EndpointStateMachine.EndPointStates newState = versionTask.call();
 
       // if version call worked the endpoint should automatically move to the
@@ -149,14 +152,16 @@ public class TestEndPoint {
    * expect that versionTask should be able to handle it.
    */
   public void testGetVersionToInvalidEndpoint() throws Exception {
-    Configuration conf = SCMTestUtils.getConf();
+    OzoneConfiguration conf = SCMTestUtils.getConf();
     InetSocketAddress nonExistentServerAddress = SCMTestUtils
         .getReuseableAddress();
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         nonExistentServerAddress, 1000)) {
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
-      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+      OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(),
           conf);
+      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+          conf, ozoneContainer);
       EndpointStateMachine.EndPointStates newState = versionTask.call();
 
       // This version call did NOT work, so endpoint should remain in the same
@@ -175,13 +180,15 @@ public class TestEndPoint {
   public void testGetVersionAssertRpcTimeOut() throws Exception {
     final long rpcTimeout = 1000;
     final long tolerance = 100;
-    Configuration conf = SCMTestUtils.getConf();
+    OzoneConfiguration conf = SCMTestUtils.getConf();
 
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         serverAddress, (int) rpcTimeout)) {
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
-      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+      OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(),
           conf);
+      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+          conf, ozoneContainer);
 
       scmServerImpl.setRpcResponseDelay(1500);
       long start = Time.monotonicNow();
@@ -386,4 +393,5 @@ public class TestEndPoint {
     }
     return reportsBuilder.build();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index c937980..ad1e706 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -27,8 +27,10 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
@@ -163,8 +165,9 @@ public class TestStorageContainerManagerHelper {
     DatanodeDetails leadDN = container.getPipeline().getLeader();
     OzoneContainer containerServer =
         getContainerServerByDatanodeUuid(leadDN.getUuidString());
-    ContainerData containerData = containerServer.getContainerManager()
-        .readContainer(containerID);
+    KeyValueContainerData containerData = (KeyValueContainerData) 
containerServer
+        .getContainerSet()
+        .getContainer(containerID).getContainerData();
     return KeyUtils.getDB(containerData, conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index 9e8cb46..b832dd2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
@@ -183,7 +183,7 @@ public class TestCloseContainerByPipeline {
     for (DatanodeDetails datanodeDetails : datanodes) {
       GenericTestUtils.waitFor(
           () -> isContainerClosed(cluster, containerID, datanodeDetails), 500,
-          5 * 1000);
+          15 * 1000);
       //double check if it's really closed (waitFor also throws an exception)
       Assert.assertTrue(isContainerClosed(cluster, containerID, 
datanodeDetails));
     }
@@ -204,7 +204,7 @@ public class TestCloseContainerByPipeline {
         if (datanode.equals(datanodeService.getDatanodeDetails())) {
           containerData =
               datanodeService.getDatanodeStateMachine().getContainer()
-                  .getContainerManager().readContainer(containerID);
+                  
.getContainerSet().getContainer(containerID).getContainerData();
           if (!containerData.isOpen()) {
             // make sure the closeContainerHandler on the Datanode is invoked
             Assert.assertTrue(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index efb7344..114bd04 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
@@ -104,8 +104,8 @@ public class TestCloseContainerHandler {
     ContainerData containerData;
     try {
       containerData = cluster.getHddsDatanodes().get(0)
-          .getDatanodeStateMachine().getContainer().getContainerManager()
-          .readContainer(containerID);
+          .getDatanodeStateMachine().getContainer().getContainerSet()
+          .getContainer(containerID).getContainerData();
       return !containerData.isOpen();
     } catch (StorageContainerException e) {
       throw new AssertionError(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 3f02036..18b325b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -35,10 +35,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -66,7 +63,11 @@ public class TestOzoneContainer {
       conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getLeader()
               .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
       conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
-      container = new OzoneContainer(TestUtils.getDatanodeDetails(), conf);
+
+      container = new OzoneContainer(TestUtils.getDatanodeDetails(),
+          conf);
+      //Setting scmId, as we start manually ozone container.
+      container.getDispatcher().setScmId(UUID.randomUUID().toString());
       container.start();
 
       XceiverClient client = new XceiverClient(pipeline, conf);
@@ -392,7 +393,7 @@ public class TestOzoneContainer {
       response = client.sendCommand(request);
 
       Assert.assertNotNull(response);
-      Assert.assertEquals(ContainerProtos.Result.UNCLOSED_CONTAINER_IO,
+      Assert.assertEquals(ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER,
           response.getResult());
       Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index d4c572f..bd9259d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.server;
 
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -262,5 +263,14 @@ public class TestContainerServer {
     @Override
     public void shutdown() {
     }
+    @Override
+    public Handler getHandler(ContainerProtos.ContainerType containerType) {
+      return null;
+    }
+
+    @Override
+    public void setScmId(String scmId) {
+
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
index bafba32..adce0ef 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -119,8 +119,8 @@ public class TestContainerReportWithKeys {
 
     ContainerData cd = getContainerData(keyInfo.getContainerID());
 
-    LOG.info("DN Container Data:  keyCount: {} used: {} ",
-        cd.getKeyCount(), cd.getBytesUsed());
+/*    LOG.info("DN Container Data:  keyCount: {} used: {} ",
+        cd.getKeyCount(), cd.getBytesUsed());*/
 
     ContainerInfo cinfo = scm.getContainerInfo(keyInfo.getContainerID());
 
@@ -132,9 +132,9 @@ public class TestContainerReportWithKeys {
   private static ContainerData getContainerData(long containerID) {
     ContainerData containerData;
     try {
-      ContainerManager containerManager = cluster.getHddsDatanodes().get(0)
-          .getDatanodeStateMachine().getContainer().getContainerManager();
-      containerData = containerManager.readContainer(containerID);
+      ContainerSet containerManager = cluster.getHddsDatanodes().get(0)
+          .getDatanodeStateMachine().getContainer().getContainerSet();
+      containerData = 
containerManager.getContainer(containerID).getContainerData();
     } catch (StorageContainerException e) {
       throw new AssertionError(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
index b86c577..cda54cb 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
@@ -44,9 +44,10 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.ksm.KeySpaceManager;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
@@ -698,13 +699,16 @@ public class TestKeys {
         List<KsmKeyLocationInfo> locations =
             keyInfo.getLatestVersionLocations().getLocationList();
         for (KsmKeyLocationInfo location : locations) {
-          KeyData keyData = new KeyData(location.getBlockID());
-          KeyData blockInfo = cm.getContainerManager()
-              .getKeyManager().getKey(keyData);
-          ContainerData containerData = cm.getContainerManager()
-              .readContainer(keyData.getContainerID());
-          File dataDir = ContainerUtils
-              .getDataDirectory(containerData).toFile();
+          KeyValueHandler  keyValueHandler = (KeyValueHandler) cm
+              .getDispatcher().getHandler(ContainerProtos.ContainerType
+                  .KeyValueContainer);
+          KeyValueContainer container = (KeyValueContainer) 
cm.getContainerSet()
+              .getContainer(location.getBlockID().getContainerID());
+          KeyData blockInfo = keyValueHandler
+              .getKeyManager().getKey(container, location.getBlockID());
+          KeyValueContainerData containerData = (KeyValueContainerData) 
container
+              .getContainerData();
+          File dataDir = new File(containerData.getChunksPath());
           for (ContainerProtos.ChunkInfo chunkInfo : blockInfo.getChunks()) {
             File chunkFile = dataDir.toPath()
                 .resolve(chunkInfo.getChunkName()).toFile();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to