This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 0466ade  HDDS-3172. Use DBStore instead of  MetadataStore in SCM
0466ade is described below

commit 0466ade7d71bb65466fc45f7b481bd2ff34bbbe8
Author: Elek Márton <e...@apache.org>
AuthorDate: Wed Apr 22 12:21:22 2020 +0200

    HDDS-3172. Use DBStore instead of  MetadataStore in SCM
    
    Closes #700
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   5 -
 .../hdds/utils/db/BatchOperationHandler.java       |  44 ++++++++
 .../hdds/utils/db/DBColumnFamilyDefinition.java    |  81 +++++++++++++++
 .../apache/hadoop/hdds/utils/db/DBDefinition.java  |  46 +++++++++
 .../org/apache/hadoop/hdds/utils/db/DBStore.java   |  18 +---
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       |  54 +++++++++-
 .../hdds/scm/block/SCMBlockDeletingService.java    |  37 ++++---
 .../hdds/scm/container/ContainerManager.java       |   6 +-
 .../hdds/scm/container/SCMContainerManager.java    | 113 +++++++++------------
 .../hadoop/hdds/scm/metadata/ContainerIDCodec.java |  48 +++++++++
 .../hdds/scm/metadata/ContainerInfoCodec.java      |  47 +++++++++
 .../hadoop/hdds/scm/metadata/PipelineCodec.java    |  56 ++++++++++
 .../hadoop/hdds/scm/metadata/PipelineIDCodec.java  |  45 ++++++++
 .../hadoop/hdds/scm/metadata/SCMDBDefinition.java  |  98 ++++++++++++++++++
 .../hadoop/hdds/scm/metadata/SCMMetadataStore.java |  27 ++++-
 .../hdds/scm/metadata/SCMMetadataStoreRDBImpl.java | 113 +++++++++++----------
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  85 +++++-----------
 .../hdds/scm/server/StorageContainerManager.java   |  10 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |  32 ++++--
 .../container/TestCloseContainerEventHandler.java  |  15 ++-
 .../scm/container/TestSCMContainerManager.java     |  14 ++-
 .../hdds/scm/node/TestContainerPlacement.java      |  31 ++++--
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  54 +++++++---
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  36 ++++---
 .../TestOneReplicaPipelineSafeModeRule.java        |  18 +++-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  58 +++++++++--
 hadoop-ozone/dist/pom.xml                          |   2 +-
 .../ozone/recon/scm/ReconContainerManager.java     |  27 ++---
 .../hadoop/ozone/recon/scm/ReconDBDefinition.java  |  38 +++++++
 .../ozone/recon/scm/ReconPipelineManager.java      |  25 ++---
 .../scm/ReconStorageContainerManagerFacade.java    |  42 +++++---
 .../scm/AbstractReconContainerManagerTest.java     |  37 ++++---
 .../ozone/recon/scm/TestReconPipelineManager.java  |  37 ++++---
 .../ozone/genesis/BenchMarkOzoneManager.java       |   9 +-
 .../apache/hadoop/ozone/genesis/BenchMarkSCM.java  |   9 +-
 .../apache/hadoop/ozone/genesis/GenesisUtil.java   |  30 +++---
 36 files changed, 1054 insertions(+), 393 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index a058d76..a88acc2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -114,14 +114,9 @@ public final class OzoneConsts {
    */
   public static final String CONTAINER_DB_SUFFIX = "container.db";
   public static final String PIPELINE_DB_SUFFIX = "pipeline.db";
-  public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX;
-  public static final String SCM_PIPELINE_DB = "scm-" + PIPELINE_DB_SUFFIX;
   public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
-  public static final String DELETED_BLOCK_DB = "deletedBlock.db";
   public static final String OM_DB_NAME = "om.db";
   public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
-  public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
-  public static final String SCM_DB_NAME = "scm.db";
 
   public static final String STORAGE_DIR_CHUNKS = "chunks";
   public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java
new file mode 100644
index 0000000..eea483c
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.IOException;
+
+/**
+ * Create and commit batch operation for one DB.
+ */
+public interface BatchOperationHandler {
+
+  /**
+   * Initialize an atomic batch operation which can hold multiple PUT/DELETE
+   * operations and committed later in one step.
+   *
+   * @return BatchOperation holder which can be used to add or commit batch
+   * operations.
+   */
+  BatchOperation initBatchOperation();
+
+  /**
+   * Commit the batch operations.
+   *
+   * @param operation which contains all the required batch operation.
+   * @throws IOException on Failure.
+   */
+  void commitBatchOperation(BatchOperation operation) throws IOException;
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java
new file mode 100644
index 0000000..e1c4163
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.IOException;
+
+/**
+ * Class represents one single column table with the required codecs and types.
+ *
+ * @param <KEY>   the type of the key.
+ * @param <VALUE> they type of the value.
+ */
+public class DBColumnFamilyDefinition<KEY, VALUE> {
+
+  private final String tableName;
+
+  private final Class<KEY> keyType;
+
+  private final Codec<KEY> keyCodec;
+
+  private final Class<VALUE> valueType;
+
+  private final Codec<VALUE> valueCodec;
+
+  public DBColumnFamilyDefinition(
+      String tableName,
+      Class<KEY> keyType,
+      Codec<KEY> keyCodec,
+      Class<VALUE> valueType,
+      Codec<VALUE> valueCodec) {
+    this.tableName = tableName;
+    this.keyType = keyType;
+    this.keyCodec = keyCodec;
+    this.valueType = valueType;
+    this.valueCodec = valueCodec;
+  }
+
+  public Table<KEY, VALUE> getTable(DBStore db) throws IOException {
+    return db.getTable(tableName, keyType, valueType);
+  }
+
+  public String getName() {
+    return tableName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public Class<KEY> getKeyType() {
+    return keyType;
+  }
+
+  public Codec<KEY> getKeyCodec() {
+    return keyCodec;
+  }
+
+  public Class<VALUE> getValueType() {
+    return valueType;
+  }
+
+  public Codec<VALUE> getValueCodec() {
+    return valueCodec;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
new file mode 100644
index 0000000..3058261
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple interface to provide information to create a DBStore..
+ */
+public interface DBDefinition {
+
+  Logger LOG = LoggerFactory.getLogger(DBDefinition.class);
+
+  /**
+   * Logical name of the DB.
+   */
+  String getName();
+
+  /**
+   * Configuration key defines the location of the DB.
+   */
+  String getLocationConfigKey();
+
+  /**
+   * Create a new DB store instance based on the configuration.
+   */
+  DBColumnFamilyDefinition[] getColumnFamilies();
+
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index ed64b74..8567d03 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hdds.utils.db.cache.TableCacheImpl;
  *
  */
 @InterfaceStability.Evolving
-public interface DBStore extends AutoCloseable {
+public interface DBStore extends AutoCloseable, BatchOperationHandler {
 
   /**
    * Gets an existing TableStore.
@@ -141,22 +141,6 @@ public interface DBStore extends AutoCloseable {
    */
   long getEstimatedKeyCount() throws IOException;
 
-  /**
-   * Initialize an atomic batch operation which can hold multiple PUT/DELETE
-   * operations and committed later in one step.
-   *
-   * @return BatchOperation holder which can be used to add or commit batch
-   * operations.
-   */
-  BatchOperation initBatchOperation();
-
-  /**
-   * Commit the batch operations.
-   *
-   * @param operation which contains all the required batch operation.
-   * @throws IOException on Failure.
-   */
-  void commitBatchOperation(BatchOperation operation) throws IOException;
 
   /**
    * Get current snapshot of DB store as an artifact stored on
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index 88d509a..2e18530 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -29,12 +29,15 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 
 import com.google.common.base.Preconditions;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.hdds.server.ServerUtils.getDirectoryFromConfig;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
@@ -78,10 +81,11 @@ public final class DBStoreBuilder {
   private String rocksDbStat;
   private RocksDBConfiguration rocksDBConfiguration;
 
-  private DBStoreBuilder(OzoneConfiguration configuration) {
+  private DBStoreBuilder(ConfigurationSource configuration) {
     this(configuration, configuration.getObject(RocksDBConfiguration.class));
   }
-  private DBStoreBuilder(OzoneConfiguration configuration,
+
+  private DBStoreBuilder(ConfigurationSource configuration,
       RocksDBConfiguration rocksDBConfiguration) {
     tables = new HashSet<>();
     tableNames = new LinkedList<>();
@@ -93,8 +97,7 @@ public final class DBStoreBuilder {
     this.rocksDBConfiguration = rocksDBConfiguration;
   }
 
-
-  public static DBStoreBuilder newBuilder(OzoneConfiguration configuration) {
+  public static DBStoreBuilder newBuilder(ConfigurationSource configuration) {
     return new DBStoreBuilder(configuration);
   }
 
@@ -263,4 +266,45 @@ public final class DBStoreBuilder {
     return Paths.get(dbPath.toString(), dbname).toFile();
   }
 
+  private static DBStoreBuilder createDBStoreBuilder(
+      ConfigurationSource configuration, DBDefinition definition) {
+
+    File metadataDir = getDirectoryFromConfig(configuration,
+        definition.getLocationConfigKey(), definition.getName());
+
+    if (metadataDir == null) {
+
+      LOG.warn("{} is not configured. We recommend adding this setting. " +
+              "Falling back to {} instead.",
+          definition.getLocationConfigKey(),
+          HddsConfigKeys.OZONE_METADATA_DIRS);
+      metadataDir = getOzoneMetaDirPath(configuration);
+    }
+
+    return DBStoreBuilder.newBuilder(configuration)
+        .setName(definition.getName())
+        .setPath(Paths.get(metadataDir.getPath()));
+  }
+
+  /**
+   * Create DBStoreBuilder from a generic DBDefinition.
+   */
+  public static DBStore createDBStore(ConfigurationSource configuration,
+      DBDefinition definition)
+      throws IOException {
+    DBStoreBuilder builder = createDBStoreBuilder(configuration, definition);
+    for (DBColumnFamilyDefinition columnTableDefinition : definition
+        .getColumnFamilies()) {
+      builder.registerTable(columnTableDefinition);
+    }
+    return builder.build();
+  }
+
+  private <KEY, VALUE> void registerTable(
+      DBColumnFamilyDefinition<KEY, VALUE> definition) {
+    addTable(definition.getName())
+        .addCodec(definition.getKeyType(), definition.getKeyCodec())
+        .addCodec(definition.getValueType(), definition.getValueCodec());
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 5ca75d2..0980369 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -16,38 +16,35 @@
  */
 package org.apache.hadoop.hdds.scm.block;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
-
 /**
  * A background service running in SCM to delete blocks. This service scans
  * block deletion log in certain interval and caches block deletion commands
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 973026d..43c1ced 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -16,15 +16,15 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+
 // TODO: Write extensive java doc.
 // This is the main interface of ContainerManager.
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 38c3d11..9f47608 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -32,7 +31,6 @@ import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -42,18 +40,13 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.server.ServerUtils;
-import org.apache.hadoop.hdds.utils.BatchOperation;
-import org.apache.hadoop.hdds.utils.MetadataStore;
-import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
-import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,38 +60,40 @@ public class SCMContainerManager implements 
ContainerManager {
       SCMContainerManager.class);
 
   private final Lock lock;
-  private final MetadataStore containerStore;
+
   private final PipelineManager pipelineManager;
+
   private final ContainerStateManager containerStateManager;
+
   private final int numContainerPerOwnerInPipeline;
 
   private final SCMContainerManagerMetrics scmContainerManagerMetrics;
 
+  private Table<ContainerID, ContainerInfo> containerStore;
+
+  private BatchOperationHandler batchHandler;
+
   /**
    * Constructs a mapping class that creates mapping between container names
    * and pipelines.
-   *
+   * <p>
    * passed to LevelDB and this memory is allocated in Native code space.
    * CacheSize is specified
    * in MB.
-   * @param conf - {@link ConfigurationSource}
+   *
+   * @param conf            - {@link ConfigurationSource}
    * @param pipelineManager - {@link PipelineManager}
    * @throws IOException on Failure.
    */
-  public SCMContainerManager(final ConfigurationSource conf,
+  public SCMContainerManager(
+      final ConfigurationSource conf,
+      Table<ContainerID, ContainerInfo> containerStore,
+      BatchOperationHandler batchHandler,
       PipelineManager pipelineManager)
       throws IOException {
 
-    final File containerDBPath = getContainerDBPath(conf);
-    final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
-        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-
-    this.containerStore = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setDbFile(containerDBPath)
-        .setCacheSize(cacheSize * OzoneConsts.MB)
-        .build();
-
+    this.batchHandler = batchHandler;
+    this.containerStore = containerStore;
     this.lock = new ReentrantLock();
     this.pipelineManager = pipelineManager;
     this.containerStateManager = new ContainerStateManager(conf);
@@ -112,11 +107,12 @@ public class SCMContainerManager implements 
ContainerManager {
   }
 
   private void loadExistingContainers() throws IOException {
-    List<Map.Entry<byte[], byte[]>> range = containerStore
-        .getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
-    for (Map.Entry<byte[], byte[]> entry : range) {
-      ContainerInfo container = ContainerInfo.fromProtobuf(
-          ContainerInfoProto.PARSER.parseFrom(entry.getValue()));
+
+    TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
+        iterator = containerStore.iterator();
+
+    while (iterator.hasNext()) {
+      ContainerInfo container = iterator.next().getValue();
       Preconditions.checkNotNull(container);
       containerStateManager.loadContainer(container);
       try {
@@ -304,10 +300,8 @@ public class SCMContainerManager implements 
ContainerManager {
     lock.lock();
     try {
       containerStateManager.removeContainer(containerID);
-      final byte[] dbKey = Longs.toByteArray(containerID.getId());
-      final byte[] containerBytes = containerStore.get(dbKey);
-      if (containerBytes != null) {
-        containerStore.delete(dbKey);
+      if (containerStore.get(containerID) != null) {
+        containerStore.delete(containerID);
       } else {
         // Where did the container go? o_O
         LOG.warn("Unable to remove the container {} from container store," +
@@ -358,8 +352,7 @@ public class SCMContainerManager implements 
ContainerManager {
                   containerID);
         }
       }
-      final byte[] dbKey = Longs.toByteArray(containerID.getId());
-      containerStore.put(dbKey, container.getProtobuf().toByteArray());
+      containerStore.put(containerID, container);
       return newState;
     } catch (ContainerNotFoundException cnfe) {
       throw new SCMException(
@@ -372,38 +365,40 @@ public class SCMContainerManager implements 
ContainerManager {
     }
   }
 
-    /**
-     * Update deleteTransactionId according to deleteTransactionMap.
-     *
-     * @param deleteTransactionMap Maps the containerId to latest delete
-     *                             transaction id for the container.
-     * @throws IOException
-     */
+  /**
+   * Update deleteTransactionId according to deleteTransactionMap.
+   *
+   * @param deleteTransactionMap Maps the containerId to latest delete
+   *                             transaction id for the container.
+   * @throws IOException
+   */
   public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
       throws IOException {
+
     if (deleteTransactionMap == null) {
       return;
     }
-
+    org.apache.hadoop.hdds.utils.db.BatchOperation batchOperation =
+        batchHandler.initBatchOperation();
     lock.lock();
     try {
-      BatchOperation batch = new BatchOperation();
       for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
         long containerID = entry.getKey();
-        byte[] dbKey = Longs.toByteArray(containerID);
-        byte[] containerBytes = containerStore.get(dbKey);
-        if (containerBytes == null) {
+
+        ContainerID containerIdObject = new ContainerID(containerID);
+        ContainerInfo containerInfo =
+            containerStore.get(containerIdObject);
+        if (containerInfo == null) {
           throw new SCMException(
               "Failed to increment number of deleted blocks for container "
                   + containerID + ", reason : " + "container doesn't exist.",
               SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
         }
-        ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
-            HddsProtos.ContainerInfoProto.parseFrom(containerBytes));
         containerInfo.updateDeleteTransactionId(entry.getValue());
-        batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
+        containerStore
+            .putWithBatch(batchOperation, containerIdObject, containerInfo);
       }
-      containerStore.writeBatch(batch);
+      batchHandler.commitBatchOperation(batchOperation);
       containerStateManager
           .updateDeleteTransactionId(deleteTransactionMap);
     } finally {
@@ -477,10 +472,8 @@ public class SCMContainerManager implements 
ContainerManager {
   protected void addContainerToDB(ContainerInfo containerInfo)
       throws IOException {
     try {
-      final byte[] containerIDBytes = Longs.toByteArray(
-          containerInfo.getContainerID());
-      containerStore.put(containerIDBytes,
-          containerInfo.getProtobuf().toByteArray());
+      containerStore
+          .put(new ContainerID(containerInfo.getContainerID()), containerInfo);
       // Incrementing here, as allocateBlock to create a container calls
       // getMatchingContainer() and finally calls this API to add newly
       // created container to DB.
@@ -586,9 +579,6 @@ public class SCMContainerManager implements 
ContainerManager {
     if (containerStateManager != null) {
       containerStateManager.close();
     }
-    if (containerStore != null) {
-      containerStore.close();
-    }
 
     if (scmContainerManagerMetrics != null) {
       this.scmContainerManagerMetrics.unRegister();
@@ -612,11 +602,6 @@ public class SCMContainerManager implements 
ContainerManager {
     }
   }
 
-  protected File getContainerDBPath(ConfigurationSource conf) {
-    File metaDir = ServerUtils.getScmDbDir(conf);
-    return new File(metaDir, SCM_CONTAINER_DB);
-  }
-
   protected PipelineManager getPipelineManager() {
     return pipelineManager;
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerIDCodec.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerIDCodec.java
new file mode 100644
index 0000000..87c9e91
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerIDCodec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.LongCodec;
+
+/**
+ * Codec to serialize / deserialize ContainerID.
+ */
+public class ContainerIDCodec implements Codec<ContainerID> {
+
+  private Codec<Long> longCodec = new LongCodec();
+
+  @Override
+  public byte[] toPersistedFormat(ContainerID container) throws IOException {
+    return longCodec.toPersistedFormat(container.getId());
+  }
+
+  @Override
+  public ContainerID fromPersistedFormat(byte[] rawData) throws IOException {
+    return new ContainerID(longCodec.fromPersistedFormat(rawData));
+  }
+
+  @Override
+  public ContainerID copyObject(ContainerID object) {
+    return new ContainerID(object.getId());
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerInfoCodec.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerInfoCodec.java
new file mode 100644
index 0000000..6b26215
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerInfoCodec.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+/**
+ * Codec to serialize / deserialize ContainerInfo.
+ */
+public class ContainerInfoCodec implements Codec<ContainerInfo> {
+
+  @Override
+  public byte[] toPersistedFormat(ContainerInfo container) throws IOException {
+    return container.getProtobuf().toByteArray();
+  }
+
+  @Override
+  public ContainerInfo fromPersistedFormat(byte[] rawData) throws IOException {
+    return ContainerInfo.fromProtobuf(
+        ContainerInfoProto.PARSER.parseFrom(rawData));
+  }
+
+  @Override
+  public ContainerInfo copyObject(ContainerInfo object) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java
new file mode 100644
index 0000000..25a1e44
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.IOException;
+import java.time.Instant;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Codec to serialize / deserialize Pipeline.
+ */
+public class PipelineCodec implements Codec<Pipeline> {
+
+  @Override
+  public byte[] toPersistedFormat(Pipeline object) throws IOException {
+    return object.getProtobufMessage().toByteArray();
+  }
+
+  @Override
+  public Pipeline fromPersistedFormat(byte[] rawData) throws IOException {
+    HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline
+        .newBuilder(HddsProtos.Pipeline.PARSER.parseFrom(rawData));
+    Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
+        HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
+    // When SCM is restarted, set Creation time with current time.
+    pipeline.setCreationTimestamp(Instant.now());
+    Preconditions.checkNotNull(pipeline);
+    return pipeline;
+  }
+
+  @Override
+  public Pipeline copyObject(Pipeline object) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
new file mode 100644
index 0000000..d661e34
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+/**
+ * Codec to serialize / deserialize PipelineID.
+ */
+public class PipelineIDCodec implements Codec<PipelineID> {
+
+  @Override
+  public byte[] toPersistedFormat(PipelineID object) throws IOException {
+    return object.getProtobuf().toByteArray();
+  }
+
+  @Override
+  public PipelineID fromPersistedFormat(byte[] rawData) throws IOException {
+    return null;
+  }
+
+  @Override
+  public PipelineID copyObject(PipelineID object) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
new file mode 100644
index 0000000..fcddcdd
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.math.BigInteger;
+import java.security.cert.X509Certificate;
+
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.LongCodec;
+
+/**
+ * Class defines the structure and types of the scm.db.
+ */
+public class SCMDBDefinition implements DBDefinition {
+
+  public static final DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
+      DELETED_BLOCKS =
+      new DBColumnFamilyDefinition<>(
+          "deletedBlocks",
+          Long.class,
+          new LongCodec(),
+          DeletedBlocksTransaction.class,
+          new DeletedBlocksTransactionCodec());
+
+  public static final DBColumnFamilyDefinition<BigInteger, X509Certificate>
+      VALID_CERTS =
+      new DBColumnFamilyDefinition<>(
+          "validCerts",
+          BigInteger.class,
+          new BigIntegerCodec(),
+          X509Certificate.class,
+          new X509CertificateCodec());
+
+  public static final DBColumnFamilyDefinition<BigInteger, X509Certificate>
+      REVOKED_CERTS =
+      new DBColumnFamilyDefinition<>(
+          "revokedCerts",
+          BigInteger.class,
+          new BigIntegerCodec(),
+          X509Certificate.class,
+          new X509CertificateCodec());
+
+  public static final DBColumnFamilyDefinition<PipelineID, Pipeline>
+      PIPELINES =
+      new DBColumnFamilyDefinition<>(
+          "pipelines",
+          PipelineID.class,
+          new PipelineIDCodec(),
+          Pipeline.class,
+          new PipelineCodec());
+
+  public static final DBColumnFamilyDefinition<ContainerID, ContainerInfo>
+      CONTAINERS =
+      new DBColumnFamilyDefinition<ContainerID, ContainerInfo>(
+          "containers",
+          ContainerID.class,
+          new ContainerIDCodec(),
+          ContainerInfo.class,
+          new ContainerInfoCodec());
+
+  @Override
+  public String getName() {
+    return "scm.db";
+  }
+
+  @Override
+  public String getLocationConfigKey() {
+    return ScmConfigKeys.OZONE_SCM_DB_DIRS;
+  }
+
+  @Override
+  public DBColumnFamilyDefinition[] getColumnFamilies() {
+    return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS,
+        REVOKED_CERTS, PIPELINES, CONTAINERS};
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index 1150316..0452c05 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -17,18 +17,24 @@
  */
 package org.apache.hadoop.hdds.scm.metadata;
 
+import java.io.IOException;
 import java.math.BigInteger;
 import java.security.cert.X509Certificate;
+
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import java.io.IOException;
-import com.google.common.annotations.VisibleForTesting;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Generic interface for data stores for SCM.
  * This is similar to the OMMetadataStore class,
@@ -99,5 +105,18 @@ public interface SCMMetadataStore {
    */
   TableIterator getAllCerts(CertificateStore.CertType certType);
 
+  /**
+   * A Table that maintains all the pipeline information.
+   */
+  Table<PipelineID, Pipeline> getPipelineTable();
+
+  /**
+   * Helper to create and write batch transactions.
+   */
+  BatchOperationHandler getBatchHandler();
 
+  /**
+   * Table that maintains all the container information.
+   */
+  Table<ContainerID, ContainerInfo> getContainerTable();
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
index 72818a3..3823fd8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
@@ -17,53 +17,48 @@
  */
 package org.apache.hadoop.hdds.scm.metadata;
 
-import java.io.File;
+import java.io.IOException;
 import java.math.BigInteger;
-import java.nio.file.Paths;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import java.io.IOException;
-import org.apache.hadoop.hdds.security.x509.certificate.authority
-    .CertificateStore;
-import org.apache.hadoop.hdds.server.ServerUtils;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
+
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
+import static 
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.DELETED_BLOCKS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
+import static 
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_CERTS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_DB_NAME;
-
 /**
  * A RocksDB based implementation of SCM Metadata Store.
- * <p>
- * <p>
- * +---------------+------------------+-------------------------+
- * | Column Family |    Key           |          Value          |
- * +---------------+------------------+-------------------------+
- * | DeletedBlocks | TXID(Long)       | DeletedBlockTransaction |
- * +---------------+------------------+-------------------------+
- * | ValidCerts    | Serial (BigInt)  | X509Certificate         |
- * +---------------+------------------+-------------------------+
- * |RevokedCerts   | Serial (BigInt)  | X509Certificate         |
- * +---------------+------------------+-------------------------+
+ *
  */
 public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
 
-  private static final String DELETED_BLOCKS_TABLE = "deletedBlocks";
-  private Table deletedBlocksTable;
+  private Table<Long, DeletedBlocksTransaction> deletedBlocksTable;
 
-  private static final String VALID_CERTS_TABLE = "validCerts";
-  private Table validCertsTable;
+  private Table<BigInteger, X509Certificate> validCertsTable;
 
-  private static final String REVOKED_CERTS_TABLE = "revokedCerts";
-  private Table revokedCertsTable;
+  private Table<BigInteger, X509Certificate> revokedCertsTable;
 
+  private Table<ContainerID, ContainerInfo> containerTable;
 
+  private Table<PipelineID, Pipeline> pipelineTable;
 
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMMetadataStoreRDBImpl.class);
@@ -88,31 +83,26 @@ public class SCMMetadataStoreRDBImpl implements 
SCMMetadataStore {
   public void start(OzoneConfiguration config)
       throws IOException {
     if (this.store == null) {
-      File metaDir = ServerUtils.getScmDbDir(configuration);
-
-      this.store = DBStoreBuilder.newBuilder(configuration)
-          .setName(SCM_DB_NAME)
-          .setPath(Paths.get(metaDir.getPath()))
-          .addTable(DELETED_BLOCKS_TABLE)
-          .addTable(VALID_CERTS_TABLE)
-          .addTable(REVOKED_CERTS_TABLE)
-          .addCodec(DeletedBlocksTransaction.class,
-              new DeletedBlocksTransactionCodec())
-          .addCodec(BigInteger.class, new BigIntegerCodec())
-          .addCodec(X509Certificate.class, new X509CertificateCodec())
-          .build();
-
-      deletedBlocksTable = this.store.getTable(DELETED_BLOCKS_TABLE,
-          Long.class, DeletedBlocksTransaction.class);
-      checkTableStatus(deletedBlocksTable, DELETED_BLOCKS_TABLE);
-
-      validCertsTable = this.store.getTable(VALID_CERTS_TABLE,
-          BigInteger.class, X509Certificate.class);
-      checkTableStatus(validCertsTable, VALID_CERTS_TABLE);
-
-      revokedCertsTable = this.store.getTable(REVOKED_CERTS_TABLE,
-          BigInteger.class, X509Certificate.class);
-      checkTableStatus(revokedCertsTable, REVOKED_CERTS_TABLE);
+
+      this.store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
+
+      deletedBlocksTable =
+          DELETED_BLOCKS.getTable(this.store);
+
+      checkTableStatus(deletedBlocksTable,
+          DELETED_BLOCKS.getName());
+
+      validCertsTable = VALID_CERTS.getTable(store);
+
+      checkTableStatus(validCertsTable, VALID_CERTS.getName());
+
+      revokedCertsTable = REVOKED_CERTS.getTable(store);
+
+      checkTableStatus(revokedCertsTable, REVOKED_CERTS.getName());
+
+      pipelineTable = PIPELINES.getTable(store);
+
+      containerTable = CONTAINERS.getTable(store);
     }
   }
 
@@ -163,6 +153,21 @@ public class SCMMetadataStoreRDBImpl implements 
SCMMetadataStore {
   }
 
   @Override
+  public Table<PipelineID, Pipeline> getPipelineTable() {
+    return pipelineTable;
+  }
+
+  @Override
+  public BatchOperationHandler getBatchHandler() {
+    return this.store;
+  }
+
+  @Override
+  public Table<ContainerID, ContainerInfo> getContainerTable() {
+    return containerTable;
+  }
+
+  @Override
   public Long getCurrentTXID() {
     return this.txID.get();
   }
@@ -174,8 +179,8 @@ public class SCMMetadataStoreRDBImpl implements 
SCMMetadataStore {
    * @throws IOException
    */
   private Long getLargestRecordedTXID() throws IOException {
-    try (TableIterator<Long, DeletedBlocksTransaction> txIter =
-             deletedBlocksTable.iterator()) {
+    try (TableIterator<Long, ? extends KeyValue<Long, 
DeletedBlocksTransaction>>
+        txIter = deletedBlocksTable.iterator()) {
       txIter.seekToLast();
       Long txid = txIter.key();
       if (txid != null) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 200b358..38edbac 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import javax.management.ObjectName;
-import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
@@ -38,7 +37,6 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -46,19 +44,15 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
-import org.apache.hadoop.hdds.utils.MetadataStore;
-import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
 import org.apache.hadoop.hdds.utils.Scheduler;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +71,6 @@ public class SCMPipelineManager implements PipelineManager {
   private PipelineStateManager stateManager;
   private final BackgroundPipelineCreator backgroundPipelineCreator;
   private Scheduler scheduler;
-  private MetadataStore pipelineStore;
 
   private final EventPublisher eventPublisher;
   private final NodeManager nodeManager;
@@ -87,28 +80,35 @@ public class SCMPipelineManager implements PipelineManager {
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
 
+  private Table<PipelineID, Pipeline> pipelineStore;
+
   private final AtomicBoolean isInSafeMode;
   // Used to track if the safemode pre-checks have completed. This is designed
   // to prevent pipelines being created until sufficient nodes have registered.
   private final AtomicBoolean pipelineCreationAllowed;
 
-  public SCMPipelineManager(ConfigurationSource conf, NodeManager nodeManager,
+  public SCMPipelineManager(ConfigurationSource conf,
+      NodeManager nodeManager,
+      Table<PipelineID, Pipeline> pipelineStore,
       EventPublisher eventPublisher)
       throws IOException {
-    this(conf, nodeManager, eventPublisher, null, null);
+    this(conf, nodeManager, pipelineStore, eventPublisher, null, null);
     this.stateManager = new PipelineStateManager();
     this.pipelineFactory = new PipelineFactory(nodeManager,
         stateManager, conf, eventPublisher);
+    this.pipelineStore = pipelineStore;
     initializePipelineState();
   }
 
   protected SCMPipelineManager(ConfigurationSource conf,
       NodeManager nodeManager,
-                               EventPublisher eventPublisher,
-                               PipelineStateManager pipelineStateManager,
-                               PipelineFactory pipelineFactory)
+      Table<PipelineID, Pipeline> pipelineStore,
+      EventPublisher eventPublisher,
+      PipelineStateManager pipelineStateManager,
+      PipelineFactory pipelineFactory)
       throws IOException {
     this.lock = new ReentrantReadWriteLock();
+    this.pipelineStore = pipelineStore;
     this.conf = conf;
     this.pipelineFactory = pipelineFactory;
     this.stateManager = pipelineStateManager;
@@ -116,16 +116,6 @@ public class SCMPipelineManager implements PipelineManager 
{
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
         new BackgroundPipelineCreator(this, scheduler, conf);
-    int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
-        ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-    final File pipelineDBPath = getPipelineDBPath(conf);
-    this.pipelineStore =
-        MetadataStoreBuilder.newBuilder()
-            .setCreateIfMissing(true)
-            .setConf(conf)
-            .setDbFile(pipelineDBPath)
-            .setCacheSize(cacheSize * OzoneConsts.MB)
-            .build();
     this.eventPublisher = eventPublisher;
     this.nodeManager = nodeManager;
     this.metrics = SCMPipelineMetrics.create();
@@ -168,18 +158,10 @@ public class SCMPipelineManager implements 
PipelineManager {
       LOG.info("No pipeline exists in current db");
       return;
     }
-    List<Map.Entry<byte[], byte[]>> pipelines =
-        pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE,
-            (MetadataKeyFilters.MetadataKeyFilter[])null);
-
-    for (Map.Entry<byte[], byte[]> entry : pipelines) {
-      HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline
-          .newBuilder(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
-      Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
-          HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
-      // When SCM is restarted, set Creation time with current time.
-      pipeline.setCreationTimestamp(Instant.now());
-      Preconditions.checkNotNull(pipeline);
+    TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>>
+        iterator = pipelineStore.iterator();
+    while (iterator.hasNext()) {
+      Pipeline pipeline = iterator.next().getValue();
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
     }
@@ -231,8 +213,7 @@ public class SCMPipelineManager implements PipelineManager {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
-      pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
-          pipeline.getProtobufMessage().toByteArray());
+      pipelineStore.put(pipeline.getId(), pipeline);
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
       recordMetricsForPipeline(pipeline);
@@ -588,11 +569,10 @@ public class SCMPipelineManager implements 
PipelineManager {
    * @throws IOException
    */
   protected void removePipeline(PipelineID pipelineId) throws IOException {
-    byte[] key = pipelineId.getProtobuf().toByteArray();
     lock.writeLock().lock();
     try {
       if (pipelineStore != null) {
-        pipelineStore.delete(key);
+        pipelineStore.delete(pipelineId);
         Pipeline pipeline = stateManager.removePipeline(pipelineId);
         nodeManager.removePipeline(pipeline);
         metrics.incNumPipelineDestroyed();
@@ -617,16 +597,6 @@ public class SCMPipelineManager implements PipelineManager 
{
       scheduler = null;
     }
 
-    lock.writeLock().lock();
-    try {
-      if (pipelineStore != null) {
-        pipelineStore.close();
-        pipelineStore = null;
-      }
-    } finally {
-      lock.writeLock().unlock();
-    }
-
     if(pmInfoBean != null) {
       MBeans.unregister(this.pmInfoBean);
       pmInfoBean = null;
@@ -638,11 +608,6 @@ public class SCMPipelineManager implements PipelineManager 
{
     pipelineFactory.shutdown();
   }
 
-  protected File getPipelineDBPath(ConfigurationSource configuration) {
-    File metaDir = ServerUtils.getScmDbDir(configuration);
-    return new File(metaDir, SCM_PIPELINE_DB);
-  }
-
   protected ReadWriteLock getLock() {
     return lock;
   }
@@ -652,10 +617,6 @@ public class SCMPipelineManager implements PipelineManager 
{
     return pipelineFactory;
   }
 
-  protected MetadataStore getPipelineStore() {
-    return pipelineStore;
-  }
-
   protected NodeManager getNodeManager() {
     return nodeManager;
   }
@@ -665,6 +626,10 @@ public class SCMPipelineManager implements PipelineManager 
{
     return this.isInSafeMode.get();
   }
 
+  public Table<PipelineID, Pipeline> getPipelineStore() {
+    return pipelineStore;
+  }
+
   @Override
   public synchronized void handleSafeModeTransition(
       SCMSafeModeManager.SafeModeStatus status) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 1f2305f..8498a25 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -406,13 +406,19 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
       pipelineManager = configurator.getPipelineManager();
     } else {
       pipelineManager =
-          new SCMPipelineManager(conf, scmNodeManager, eventQueue);
+          new SCMPipelineManager(conf, scmNodeManager,
+              scmMetadataStore.getPipelineTable(),
+              eventQueue);
     }
 
     if (configurator.getContainerManager() != null) {
       containerManager = configurator.getContainerManager();
     } else {
-      containerManager = new SCMContainerManager(conf, pipelineManager);
+      containerManager =
+          new SCMContainerManager(conf,
+              scmMetadataStore.getContainerTable(),
+              scmMetadataStore.getBatchHandler(),
+              pipelineManager);
     }
 
     if (configurator.getScmBlockManager() != null) {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 6a19ff9..c9b5fde 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -29,8 +29,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@@ -40,9 +39,11 @@ import 
org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
@@ -55,6 +56,9 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.ozone.OzoneConsts.GB;
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -63,10 +67,6 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
-import static org.apache.hadoop.ozone.OzoneConsts.GB;
-import static org.apache.hadoop.ozone.OzoneConsts.MB;
-
-
 /**
  * Tests for SCM Block Manager.
  */
@@ -88,6 +88,7 @@ public class TestBlockManager {
 
   @Rule
   public TemporaryFolder folder= new TemporaryFolder();
+  private SCMMetadataStore scmMetadataStore;
 
   @Before
   public void setUp() throws Exception {
@@ -105,16 +106,25 @@ public class TestBlockManager {
     // Override the default Node Manager in SCM with this Mock Node Manager.
     nodeManager = new MockNodeManager(true, 10);
     eventQueue = new EventQueue();
+
+    scmMetadataStore = new SCMMetadataStoreRDBImpl(conf);
+    scmMetadataStore.start(conf);
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
+        new SCMPipelineManager(conf, nodeManager,
+            scmMetadataStore.getPipelineTable(),
+            eventQueue);
     pipelineManager.allowPipelineCreation();
+
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf, eventQueue);
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     SCMContainerManager containerManager =
-        new SCMContainerManager(conf, pipelineManager);
+        new SCMContainerManager(conf,
+            scmMetadataStore.getContainerTable(),
+            scmMetadataStore.getStore(),
+            pipelineManager);
     SCMSafeModeManager safeModeManager = new SCMSafeModeManager(conf,
         containerManager.getContainers(), pipelineManager, eventQueue) {
       @Override
@@ -127,6 +137,7 @@ public class TestBlockManager {
     configurator.setPipelineManager(pipelineManager);
     configurator.setContainerManager(containerManager);
     configurator.setScmSafeModeManager(safeModeManager);
+    configurator.setMetadataStore(scmMetadataStore);
     scm = TestUtils.getScm(conf, configurator);
 
     // Initialize these fields so that the tests can pass.
@@ -145,10 +156,11 @@ public class TestBlockManager {
   }
 
   @After
-  public void cleanup() {
+  public void cleanup() throws Exception {
     scm.stop();
     scm.join();
     eventQueue.close();
+    scmMetadataStore.stop();
   }
 
   @Test
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index f567500..09b41a5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -28,10 +28,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -58,6 +61,7 @@ public class TestCloseContainerEventHandler {
   private static long size;
   private static File testDir;
   private static EventQueue eventQueue;
+  private static DBStore dbStore;
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -71,15 +75,19 @@ public class TestCloseContainerEventHandler {
     configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 16);
     nodeManager = new MockNodeManager(true, 10);
     eventQueue = new EventQueue();
+    dbStore =
+        DBStoreBuilder.createDBStore(configuration, new SCMDBDefinition());
     pipelineManager =
-        new SCMPipelineManager(configuration, nodeManager, eventQueue);
+        new SCMPipelineManager(configuration, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(dbStore), eventQueue);
     pipelineManager.allowPipelineCreation();
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), configuration, eventQueue);
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
-    containerManager = new SCMContainerManager(configuration, pipelineManager);
+    containerManager = new SCMContainerManager(configuration,
+        SCMDBDefinition.CONTAINERS.getTable(dbStore), dbStore, 
pipelineManager);
     pipelineManager.triggerPipelineCreation();
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(pipelineManager, containerManager));
@@ -97,6 +105,9 @@ public class TestCloseContainerEventHandler {
     if (pipelineManager != null) {
       pipelineManager.close();
     }
+    if (dbStore != null) {
+      dbStore.close();
+    }
     FileUtil.fullyDelete(testDir);
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 75d2712..1821e92 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -44,9 +44,12 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -59,6 +62,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+
 /**
  * Tests for Container ContainerManager.
  */
@@ -76,6 +80,7 @@ public class TestSCMContainerManager {
 
   @Rule
   public ExpectedException thrown = ExpectedException.none();
+
   @BeforeClass
   public static void setUp() throws Exception {
     OzoneConfiguration conf = SCMTestUtils.getConf();
@@ -93,10 +98,15 @@ public class TestSCMContainerManager {
       throw new IOException("Unable to create test directory path");
     }
     nodeManager = new MockNodeManager(true, 10);
+    DBStore dbStore = DBStoreBuilder.createDBStore(conf, new 
SCMDBDefinition());
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue());
     pipelineManager.allowPipelineCreation();
-    containerManager = new SCMContainerManager(conf, pipelineManager);
+    containerManager = new SCMContainerManager(conf,
+        SCMDBDefinition.CONTAINERS.getTable(dbStore),
+        dbStore,
+        pipelineManager);
     xceiverClientManager = new XceiverClientManager(conf);
     replicationFactor = SCMTestUtils.getReplicationFactor(conf);
     replicationType = SCMTestUtils.getReplicationType(conf);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index de027ed..6ce66a2 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -36,19 +36,24 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.PathUtils;
 
 import org.apache.commons.io.IOUtils;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
+import org.junit.After;
 import static org.junit.Assert.assertEquals;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,9 +64,21 @@ import org.mockito.Mockito;
  * Test for different container placement policy.
  */
 public class TestContainerPlacement {
+
   @Rule
   public ExpectedException thrown = ExpectedException.none();
+  private DBStore dbStore;
+
+  @Before
+  public void createDbStore() throws IOException {
+    dbStore =
+        DBStoreBuilder.createDBStore(getConf(), new SCMDBDefinition());
+  }
 
+  @After
+  public void destroyDBStore() throws Exception {
+    dbStore.close();
+  }
   /**
    * Returns a new copy of Configuration.
    *
@@ -100,11 +117,13 @@ public class TestContainerPlacement {
   SCMContainerManager createContainerManager(ConfigurationSource config,
       NodeManager scmNodeManager) throws IOException {
     EventQueue eventQueue = new EventQueue();
-    final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
-        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+
     PipelineManager pipelineManager =
-        new SCMPipelineManager(config, scmNodeManager, eventQueue);
-    return new SCMContainerManager(config, pipelineManager);
+        new SCMPipelineManager(config, scmNodeManager,
+            PIPELINES.getTable(dbStore), eventQueue);
+    return new SCMContainerManager(config, CONTAINERS.getTable(dbStore),
+        dbStore,
+        pipelineManager);
 
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index fcb1c94..007f071 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -37,9 +37,12 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
 
@@ -55,6 +58,7 @@ import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Test;
 
+
 /**
  * Test cases to verify PipelineManager.
  */
@@ -62,6 +66,7 @@ public class TestSCMPipelineManager {
   private static MockNodeManager nodeManager;
   private static File testDir;
   private static OzoneConfiguration conf;
+  private DBStore store;
 
   @Before
   public void setUp() throws Exception {
@@ -76,17 +81,24 @@ public class TestSCMPipelineManager {
       throw new IOException("Unable to create test directory path");
     }
     nodeManager = new MockNodeManager(true, 20);
+
+    store = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
+    
   }
 
   @After
-  public void cleanup() {
+  public void cleanup() throws Exception {
+    store.close();
     FileUtil.fullyDelete(testDir);
   }
 
   @Test
   public void testPipelineReload() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf,
+            nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store),
+            new EventQueue());
     pipelineManager.allowPipelineCreation();
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
@@ -106,7 +118,8 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should be able to load the pipelines from the db
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
     pipelineManager.allowPipelineCreation();
     mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
@@ -137,7 +150,8 @@ public class TestSCMPipelineManager {
   @Test
   public void testRemovePipeline() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
     pipelineManager.allowPipelineCreation();
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
@@ -156,7 +170,8 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should not be able to load removed pipelines
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
     try {
       pipelineManager.getPipeline(pipeline.getId());
       fail("Pipeline should not have been retrieved");
@@ -172,7 +187,8 @@ public class TestSCMPipelineManager {
   public void testPipelineReport() throws IOException {
     EventQueue eventQueue = new EventQueue();
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
     pipelineManager.allowPipelineCreation();
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
@@ -238,7 +254,8 @@ public class TestSCMPipelineManager {
     MockNodeManager nodeManagerMock = new MockNodeManager(true,
         20);
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue());
+        new SCMPipelineManager(conf, nodeManagerMock,
+            SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
     pipelineManager.allowPipelineCreation();
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManagerMock,
@@ -297,7 +314,8 @@ public class TestSCMPipelineManager {
   @Test
   public void testActivateDeactivatePipeline() throws IOException {
     final SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
     pipelineManager.allowPipelineCreation();
     final PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
@@ -345,7 +363,8 @@ public class TestSCMPipelineManager {
   public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
     EventQueue eventQueue = new EventQueue();
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
     pipelineManager.allowPipelineCreation();
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
@@ -361,7 +380,8 @@ public class TestSCMPipelineManager {
     pipelineManager.close();
     // new pipeline manager loads the pipelines from the db in ALLOCATED state
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
     mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -406,7 +426,8 @@ public class TestSCMPipelineManager {
 
     EventQueue eventQueue = new EventQueue();
     final SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
     pipelineManager.allowPipelineCreation();
     final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
         nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
@@ -448,8 +469,9 @@ public class TestSCMPipelineManager {
         TimeUnit.MILLISECONDS);
 
     EventQueue eventQueue = new EventQueue();
-    final SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
+    SCMPipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
     final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
         nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
         false);
@@ -484,7 +506,6 @@ public class TestSCMPipelineManager {
     pipelineManager.close();
   }
 
-
   @Test
   public void testSafeModeUpdatedOnSafemodeExit()
       throws IOException, TimeoutException, InterruptedException {
@@ -494,8 +515,9 @@ public class TestSCMPipelineManager {
         TimeUnit.MILLISECONDS);
 
     EventQueue eventQueue = new EventQueue();
-    final SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
+    SCMPipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager,
+            SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
     final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
         nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
         false);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index bda6f84..700479d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.hdds.scm.safemode;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -26,21 +31,20 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
 /**
  * This class tests HealthyPipelineSafeMode rule.
  */
@@ -49,7 +53,7 @@ public class TestHealthyPipelineSafeModeRule {
   @Test
   public void testHealthyPipelineSafeModeRuleWithNoPipelines()
       throws Exception {
-
+    DBStore store = null;
     String storageDir = GenericTestUtils.getTempPath(
         TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
     try {
@@ -65,9 +69,9 @@ public class TestHealthyPipelineSafeModeRule {
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
-
+      store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue);
+          nodeManager, SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -82,16 +86,16 @@ public class TestHealthyPipelineSafeModeRule {
       // This should be immediately satisfied, as no pipelines are there yet.
       Assert.assertTrue(healthyPipelineSafeModeRule.validate());
     } finally {
+      store.close();
       FileUtil.fullyDelete(new File(storageDir));
     }
   }
 
   @Test
   public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
-
     String storageDir = GenericTestUtils.getTempPath(
         TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
-
+    DBStore store = null;
     try {
       EventQueue eventQueue = new EventQueue();
       List<ContainerInfo> containers =
@@ -109,8 +113,9 @@ public class TestHealthyPipelineSafeModeRule {
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
+      store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue);
+          nodeManager, SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
       pipelineManager.allowPipelineCreation();
 
       PipelineProvider mockRatisProvider =
@@ -153,6 +158,7 @@ public class TestHealthyPipelineSafeModeRule {
       GenericTestUtils.waitFor(() -> healthyPipelineSafeModeRule.validate(),
           1000, 5000);
     } finally {
+      store.close();
       FileUtil.fullyDelete(new File(storageDir));
     }
   }
@@ -164,6 +170,7 @@ public class TestHealthyPipelineSafeModeRule {
 
     String storageDir = GenericTestUtils.getTempPath(
         TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
+    DBStore store = null;
 
     try {
       EventQueue eventQueue = new EventQueue();
@@ -183,8 +190,10 @@ public class TestHealthyPipelineSafeModeRule {
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
+      store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue);
+          nodeManager, SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
+
       pipelineManager.allowPipelineCreation();
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -235,6 +244,7 @@ public class TestHealthyPipelineSafeModeRule {
           1000, 5000);
 
     } finally {
+      store.close();
       FileUtil.fullyDelete(new File(storageDir));
     }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index dd8d301..c1f09fa 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -17,28 +17,32 @@
 
 package org.apache.hadoop.hdds.scm.safemode;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * This class tests OneReplicaPipelineSafeModeRule.
  */
@@ -50,7 +54,6 @@ public class TestOneReplicaPipelineSafeModeRule {
   private SCMPipelineManager pipelineManager;
   private EventQueue eventQueue;
 
-
   private void setup(int nodes, int pipelineFactorThreeCount,
       int pipelineFactorOneCount) throws Exception {
     OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
@@ -66,8 +69,13 @@ public class TestOneReplicaPipelineSafeModeRule {
     MockNodeManager mockNodeManager = new MockNodeManager(true, nodes);
 
     eventQueue = new EventQueue();
+
+    DBStore dbStore =
+        DBStoreBuilder.createDBStore(ozoneConfiguration, new 
SCMDBDefinition());
+
     pipelineManager =
         new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
+            SCMDBDefinition.PIPELINES.getTable(dbStore),
             eventQueue);
     pipelineManager.allowPipelineCreation();
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 0620883..9d22304 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.safemode;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -41,8 +43,11 @@ import 
org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
 
+import org.junit.After;
 import org.junit.Assert;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -69,6 +74,8 @@ public class TestSCMSafeModeManager {
   @Rule
   public final TemporaryFolder tempDir = new TemporaryFolder();
 
+  private DBStore dbStore;
+
   @Before
   public void setUp() {
     queue = new EventQueue();
@@ -77,6 +84,20 @@ public class TestSCMSafeModeManager {
         false);
   }
 
+  @Before
+  public void initDbStore() throws IOException {
+    config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        tempDir.newFolder().getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
+  }
+
+  @After
+  public void destroyDbStore() throws Exception {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+  }
+
   @Test
   public void testSafeModeState() throws Exception {
     // Test 1: test for 0 containers
@@ -184,12 +205,32 @@ public class TestSCMSafeModeManager {
   }
 
   @Test
-  public void testSafeModeExitRuleWithPipelineAvailabilityCheck()
-      throws Exception{
+  public void testSafeModeExitRuleWithPipelineAvailabilityCheck1()
+      throws Exception {
     testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0.90, 1);
+  }
+
+  @Test
+  public void testSafeModeExitRuleWithPipelineAvailabilityCheck2()
+      throws Exception {
     testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0.10, 0.9);
+  }
+
+  @Test
+  public void testSafeModeExitRuleWithPipelineAvailabilityCheck3()
+      throws Exception {
     testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0, 0.9);
+  }
+
+  @Test
+  public void testSafeModeExitRuleWithPipelineAvailabilityCheck4()
+      throws Exception {
     testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0);
+  }
+
+  @Test
+  public void testSafeModeExitRuleWithPipelineAvailabilityCheck5()
+      throws Exception {
     testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0.5);
   }
 
@@ -201,7 +242,7 @@ public class TestSCMSafeModeManager {
           0.9);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue);
+          mockNodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@@ -219,7 +260,7 @@ public class TestSCMSafeModeManager {
           200);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue);
+          mockNodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@@ -236,7 +277,7 @@ public class TestSCMSafeModeManager {
       conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue);
+          mockNodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForSafeModePercent");
@@ -260,7 +301,7 @@ public class TestSCMSafeModeManager {
 
     MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
     SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
-        mockNodeManager, queue);
+        mockNodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
             pipelineManager.getStateManager(), config, true);
@@ -477,7 +518,7 @@ public class TestSCMSafeModeManager {
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, queue);
+          nodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -531,7 +572,8 @@ public class TestSCMSafeModeManager {
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
 
     SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-        nodeManager, queue);
+        nodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
+
 
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
diff --git a/hadoop-ozone/dist/pom.xml b/hadoop-ozone/dist/pom.xml
index 8be5c5b..e892053 100644
--- a/hadoop-ozone/dist/pom.xml
+++ b/hadoop-ozone/dist/pom.xml
@@ -23,7 +23,7 @@
   </parent>
   <artifactId>hadoop-ozone-dist</artifactId>
   <name>Apache Hadoop Ozone Distribution</name>
-  <packaging>pom</packaging>
+  <packaging>jar</packaging>
   <version>0.6.0-SNAPSHOT</version>
   <properties>
     <file.encoding>UTF-8</file.encoding>
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index 20afa09..aa524b0 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.recon.scm;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -30,11 +29,11 @@ import 
org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 
-import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_SCM_CONTAINER_DB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,33 +55,29 @@ public class ReconContainerManager extends 
SCMContainerManager {
    * CacheSize is specified
    * in MB.
    *
-   * @param conf            - {@link ConfigurationSource}
-   * @param pipelineManager - {@link PipelineManager}
    * @throws IOException on Failure.
    */
   public ReconContainerManager(
-      ConfigurationSource conf, PipelineManager pipelineManager,
+      ConfigurationSource conf,
+      Table<ContainerID, ContainerInfo> containerStore,
+      BatchOperationHandler batchHandler,
+      PipelineManager pipelineManager,
       StorageContainerServiceProvider scm,
       ContainerSchemaManager containerSchemaManager) throws IOException {
-    super(conf, pipelineManager);
+    super(conf, containerStore, batchHandler, pipelineManager);
     this.scmClient = scm;
     this.containerSchemaManager = containerSchemaManager;
   }
 
-  @Override
-  protected File getContainerDBPath(ConfigurationSource conf) {
-    File metaDir = ReconUtils.getReconScmDbDir(conf);
-    return new File(metaDir, RECON_SCM_CONTAINER_DB);
-  }
-
   /**
    * Check and add new container if not already present in Recon.
-   * @param containerID containerID to check.
+   *
+   * @param containerID     containerID to check.
    * @param datanodeDetails Datanode from where we got this container.
    * @throws IOException on Error.
    */
   public void checkAndAddNewContainer(ContainerID containerID,
-                                      DatanodeDetails datanodeDetails)
+      DatanodeDetails datanodeDetails)
       throws IOException {
     if (!exists(containerID)) {
       LOG.info("New container {} got from {}.", containerID,
@@ -143,7 +138,7 @@ public class ReconContainerManager extends 
SCMContainerManager {
    */
   @Override
   public void updateContainerReplica(ContainerID containerID,
-                                     ContainerReplica replica)
+      ContainerReplica replica)
       throws ContainerNotFoundException {
     super.updateContainerReplica(containerID, replica);
     // Update container_history table
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDBDefinition.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDBDefinition.java
new file mode 100644
index 0000000..bcfe060
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDBDefinition.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.recon.scm;
+
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
+
+/**
+ * SCM db file for ozone.
+ */
+public class ReconDBDefinition extends SCMDBDefinition {
+
+  @Override
+  public String getName() {
+    return "recon-scm.db";
+  }
+
+  @Override
+  public String getLocationConfigKey() {
+    return ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index 20f77c7..a8dd3c9 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.recon.scm;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -31,14 +30,15 @@ import 
org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.hdds.utils.db.Table;
 
 import com.google.common.annotations.VisibleForTesting;
 import static 
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
-import static 
org.apache.hadoop.ozone.recon.ReconConstants.RECON_SCM_PIPELINE_DB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+
 /**
  * Recon's overriding implementation of SCM's Pipeline Manager.
  */
@@ -48,20 +48,16 @@ public class ReconPipelineManager extends 
SCMPipelineManager {
       LoggerFactory.getLogger(ReconPipelineManager.class);
 
   public ReconPipelineManager(ConfigurationSource conf,
-                              NodeManager nodeManager,
-                              EventPublisher eventPublisher)
+      NodeManager nodeManager,
+      Table<PipelineID, Pipeline> pipelineStore,
+      EventPublisher eventPublisher)
       throws IOException {
-    super(conf, nodeManager, eventPublisher, new PipelineStateManager(),
+    super(conf, nodeManager, pipelineStore, eventPublisher,
+        new PipelineStateManager(),
         new ReconPipelineFactory());
     initializePipelineState();
   }
-
-  @Override
-  protected File getPipelineDBPath(ConfigurationSource conf) {
-    File metaDir = ReconUtils.getReconScmDbDir(conf);
-    return new File(metaDir, RECON_SCM_PIPELINE_DB);
-  }
-
+  
   @Override
   public void triggerPipelineCreation() {
     // Don't do anything in Recon.
@@ -147,8 +143,7 @@ public class ReconPipelineManager extends 
SCMPipelineManager {
   void addPipeline(Pipeline pipeline) throws IOException {
     getLock().writeLock().lock();
     try {
-      getPipelineStore().put(pipeline.getId().getProtobuf().toByteArray(),
-          pipeline.getProtobufMessage().toByteArray());
+      getPipelineStore().put(pipeline.getId(), pipeline);
       getStateManager().addPipeline(pipeline);
       getNodeManager().addPipeline(pipeline);
     } finally {
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 61de428..7800abb 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.ozone.recon.scm;
 
-import static 
org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
-import static 
org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashSet;
@@ -28,8 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-
-import com.google.inject.Inject;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@@ -49,13 +44,19 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.safemode.SafeModeManager;
-import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.recon.fsck.MissingContainerTask;
 import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+
+import com.google.inject.Inject;
+import static 
org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
+import static 
org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
 import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +74,7 @@ public class ReconStorageContainerManagerFacade
   private final ReconDatanodeProtocolServer datanodeProtocolServer;
   private final EventQueue eventQueue;
   private final SCMStorageConfig scmStorageConfig;
+  private final DBStore dbStore;
 
   private ReconNodeManager nodeManager;
   private ReconPipelineManager pipelineManager;
@@ -83,23 +85,34 @@ public class ReconStorageContainerManagerFacade
 
   @Inject
   public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
-            StorageContainerServiceProvider scmServiceProvider,
-            ReconTaskStatusDao reconTaskStatusDao,
-            ContainerSchemaManager containerSchemaManager)
+      StorageContainerServiceProvider scmServiceProvider,
+      ReconTaskStatusDao reconTaskStatusDao,
+      ContainerSchemaManager containerSchemaManager)
       throws IOException {
     this.eventQueue = new EventQueue();
     eventQueue.setSilent(true);
     this.ozoneConfiguration = getReconScmConfiguration(conf);
     this.scmStorageConfig = new ReconStorageConfig(conf);
     this.clusterMap = new NetworkTopologyImpl(conf);
+    dbStore = DBStoreBuilder
+        .createDBStore(ozoneConfiguration, new ReconDBDefinition());
+
     this.nodeManager =
         new ReconNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
     this.datanodeProtocolServer = new ReconDatanodeProtocolServer(
         conf, this, eventQueue);
     this.pipelineManager =
-        new ReconPipelineManager(conf, nodeManager, eventQueue);
-    this.containerManager = new ReconContainerManager(conf, pipelineManager,
-        scmServiceProvider, containerSchemaManager);
+
+        new ReconPipelineManager(conf,
+            nodeManager,
+            ReconDBDefinition.PIPELINES.getTable(dbStore),
+            eventQueue);
+    this.containerManager = new ReconContainerManager(conf,
+        ReconDBDefinition.CONTAINERS.getTable(dbStore),
+        dbStore,
+        pipelineManager,
+        scmServiceProvider,
+        containerSchemaManager);
     this.scmServiceProvider = scmServiceProvider;
 
     NodeReportHandler nodeReportHandler =
@@ -214,6 +227,11 @@ public class ReconStorageContainerManagerFacade
     IOUtils.cleanupWithLogger(LOG, nodeManager);
     IOUtils.cleanupWithLogger(LOG, containerManager);
     IOUtils.cleanupWithLogger(LOG, pipelineManager);
+    try {
+      dbStore.close();
+    } catch (Exception e) {
+      LOG.error("Can't close dbStore ", e);
+    }
   }
 
   public ReconDatanodeProtocolServer getDatanodeProtocolServer() {
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index 7f87806..04010e5 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@ -18,15 +18,6 @@
 
 package org.apache.hadoop.ozone.recon.scm;
 
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
-import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.io.IOException;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -40,12 +31,23 @@ import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Abstract class for Recon Container Manager related tests.
@@ -59,6 +61,7 @@ public class AbstractReconContainerManagerTest {
   private SCMStorageConfig scmStorageConfig;
   private ReconPipelineManager pipelineManager;
   private ReconContainerManager containerManager;
+  private DBStore store;
 
   @Before
   public void setUp() throws Exception {
@@ -66,20 +69,28 @@ public class AbstractReconContainerManagerTest {
     conf.set(OZONE_METADATA_DIRS,
         temporaryFolder.newFolder().getAbsolutePath());
     conf.set(OZONE_SCM_NAMES, "localhost");
+    store = DBStoreBuilder.createDBStore(conf, new ReconDBDefinition());
     scmStorageConfig = new ReconStorageConfig(conf);
     NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
     EventQueue eventQueue = new EventQueue();
     NodeManager nodeManager =
         new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
-    pipelineManager = new ReconPipelineManager(conf, nodeManager, eventQueue);
-    containerManager = new ReconContainerManager(conf, pipelineManager,
-        getScmServiceProvider(), mock(ContainerSchemaManager.class));
+    pipelineManager = new ReconPipelineManager(conf, nodeManager,
+        ReconDBDefinition.PIPELINES.getTable(store), eventQueue);
+    containerManager = new ReconContainerManager(
+        conf,
+        ReconDBDefinition.CONTAINERS.getTable(store),
+        store,
+        pipelineManager,
+        getScmServiceProvider(),
+        mock(ContainerSchemaManager.class));
   }
 
   @After
-  public void tearDown() throws IOException {
+  public void tearDown() throws Exception {
     containerManager.close();
     pipelineManager.close();
+    store.close();
   }
 
   protected OzoneConfiguration getConf() {
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
index 3d4d239..c891f33 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
@@ -18,15 +18,6 @@
 
 package org.apache.hadoop.ozone.recon.scm;
 
-import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
-import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -45,11 +36,23 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import 
org.apache.hadoop.ozone.recon.scm.ReconPipelineFactory.ReconPipelineProvider;
+
+import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
+import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import static org.mockito.Mockito.mock;
 
 /**
  * Class to test Recon Pipeline Manager.
@@ -61,6 +64,7 @@ public class TestReconPipelineManager {
 
   private OzoneConfiguration conf;
   private SCMStorageConfig scmStorageConfig;
+  private DBStore store;
 
   @Before
   public void setup() throws IOException {
@@ -69,6 +73,12 @@ public class TestReconPipelineManager {
         temporaryFolder.newFolder().getAbsolutePath());
     conf.set(OZONE_SCM_NAMES, "localhost");
     scmStorageConfig = new ReconStorageConfig(conf);
+    store = DBStoreBuilder.createDBStore(conf, new ReconDBDefinition());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    store.close();
   }
 
   @Test
@@ -103,7 +113,8 @@ public class TestReconPipelineManager {
         new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
 
     try (ReconPipelineManager reconPipelineManager =
-        new ReconPipelineManager(conf, nodeManager, eventQueue)) {
+        new ReconPipelineManager(conf, nodeManager,
+            ReconDBDefinition.PIPELINES.getTable(store), eventQueue)) {
       reconPipelineManager.addPipeline(validPipeline);
       reconPipelineManager.addPipeline(invalidPipeline);
 
@@ -138,7 +149,8 @@ public class TestReconPipelineManager {
         new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
 
     ReconPipelineManager reconPipelineManager =
-        new ReconPipelineManager(conf, nodeManager, eventQueue);
+        new ReconPipelineManager(conf, nodeManager,
+            ReconDBDefinition.PIPELINES.getTable(store), eventQueue);
     assertFalse(reconPipelineManager.containsPipeline(pipeline.getId()));
     reconPipelineManager.addPipeline(pipeline);
     assertTrue(reconPipelineManager.containsPipeline(pipeline.getId()));
@@ -150,7 +162,8 @@ public class TestReconPipelineManager {
     NodeManager nodeManagerMock = mock(NodeManager.class);
 
     ReconPipelineManager reconPipelineManager = new ReconPipelineManager(
-        conf, nodeManagerMock, new EventQueue());
+        conf, nodeManagerMock, ReconDBDefinition.PIPELINES.getTable(store),
+        new EventQueue());
     PipelineFactory pipelineFactory = 
reconPipelineManager.getPipelineFactory();
     assertTrue(pipelineFactory instanceof ReconPipelineFactory);
     ReconPipelineFactory reconPipelineFactory =
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
index 6e531f3..19b5e8e 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.ozone.genesis;
 
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,7 +25,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -47,7 +44,9 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.security.UserGroupInformation;
-import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Scope;
@@ -77,7 +76,7 @@ public class BenchMarkOzoneManager {
 
   @Setup(Level.Trial)
   public static void initialize()
-      throws IOException, AuthenticationException, InterruptedException {
+      throws Exception {
     try {
       lock.lock();
       if (scm == null) {
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkSCM.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkSCM.java
index 0839ea5..64e2f4d 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkSCM.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkSCM.java
@@ -18,13 +18,10 @@
 
 package org.apache.hadoop.ozone.genesis;
 
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -38,7 +35,9 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Param;
@@ -67,7 +66,7 @@ public class BenchMarkSCM {
 
   @Setup(Level.Trial)
   public static void initialize()
-      throws IOException, AuthenticationException, InterruptedException {
+      throws Exception {
     try {
       lock.lock();
       if (scm == null) {
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
index e98cabc..797c805 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.ozone.genesis;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -32,15 +31,17 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.utils.MetadataStore;
 import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
-import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMStorage;
@@ -48,9 +49,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 
 import org.apache.commons.lang3.RandomStringUtils;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
+
 
 /**
  * Utility class for benchmark test cases.
@@ -150,16 +149,11 @@ public final class GenesisUtil {
   }
 
   static void addPipelines(HddsProtos.ReplicationFactor factor,
-      int numPipelines, ConfigurationSource conf) throws IOException {
-    final File metaDir = ServerUtils.getScmDbDir(conf);
-    final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
-    int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
-        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-    MetadataStore pipelineStore =
-        MetadataStoreBuilder.newBuilder().setCreateIfMissing(true)
-            .setConf(conf).setDbFile(pipelineDBPath)
-            .setCacheSize(cacheSize * OzoneConsts.MB).build();
+      int numPipelines, ConfigurationSource conf) throws Exception {
+    DBStore dbStore = DBStoreBuilder.createDBStore(conf, new 
SCMDBDefinition());
 
+    Table<PipelineID, Pipeline> pipelineTable =
+        SCMDBDefinition.PIPELINES.getTable(dbStore);
     List<DatanodeDetails> nodes = new ArrayList<>();
     for (int i = 0; i < factor.getNumber(); i++) {
       nodes
@@ -174,11 +168,11 @@ public final class GenesisUtil {
               .setFactor(factor)
               .setNodes(nodes)
               .build();
-      pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
-          pipeline.getProtobufMessage().toByteArray());
+      pipelineTable.put(pipeline.getId(),
+          pipeline);
     }
 
-    pipelineStore.close();
+    dbStore.close();
   }
 
   static OzoneManager getOm(OzoneConfiguration conf)


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org

Reply via email to