This is an automated email from the ASF dual-hosted git repository.

msingh pushed a commit to branch HDDS-1449
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 3bc3621c4231bdeef60aef4521acec822d8db5d2
Author: Mukul Kumar Singh <msi...@apache.org>
AuthorDate: Thu May 16 18:03:45 2019 +0530

    HDDS-1449. JVM Exit in datanode while committing a key
---
 .../commandhandler/DeleteBlocksCommandHandler.java |  89 ++++-----
 .../container/common/utils/ContainerCache.java     |  85 +++++++--
 .../container/keyvalue/KeyValueBlockIterator.java  |  15 +-
 .../container/keyvalue/KeyValueContainer.java      |  12 +-
 .../container/keyvalue/KeyValueContainerCheck.java |  59 +++---
 .../container/keyvalue/helpers/BlockUtils.java     |   4 +-
 .../keyvalue/helpers/KeyValueContainerUtil.java    |  35 ++--
 .../container/keyvalue/impl/BlockManagerImpl.java  | 204 ++++++++++----------
 .../background/BlockDeletingService.java           | 115 +++++------
 .../ozone/container/ozoneimpl/ContainerReader.java |  47 ++---
 .../keyvalue/TestKeyValueBlockIterator.java        | 212 +++++++++++----------
 .../container/keyvalue/TestKeyValueContainer.java  |  44 +++--
 .../keyvalue/TestKeyValueContainerCheck.java       |  87 +++++----
 .../ozone/TestStorageContainerManagerHelper.java   |  13 +-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  70 +++----
 .../container/common/TestBlockDeletingService.java | 164 ++++++++--------
 .../common/impl/TestContainerPersistence.java      |   4 +-
 .../commandhandler/TestBlockDeletion.java          |  26 +--
 .../TestCloseContainerByPipeline.java              |  10 +-
 19 files changed, 703 insertions(+), 592 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index aa63fb4..3a955c6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -48,7 +48,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -198,52 +198,53 @@ public class DeleteBlocksCommandHandler implements 
CommandHandler {
     }
 
     int newDeletionBlocks = 0;
-    MetadataStore containerDB = BlockUtils.getDB(containerData, conf);
-    for (Long blk : delTX.getLocalIDList()) {
-      BatchOperation batch = new BatchOperation();
-      byte[] blkBytes = Longs.toByteArray(blk);
-      byte[] blkInfo = containerDB.get(blkBytes);
-      if (blkInfo != null) {
-        byte[] deletingKeyBytes =
-            DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk);
-        byte[] deletedKeyBytes =
-            DFSUtil.string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blk);
-        if (containerDB.get(deletingKeyBytes) != null
-            || containerDB.get(deletedKeyBytes) != null) {
-          LOG.debug(String.format(
-              "Ignoring delete for block %d in container %d."
-                  + " Entry already added.", blk, containerId));
-          continue;
-        }
-        // Found the block in container db,
-        // use an atomic update to change its state to deleting.
-        batch.put(deletingKeyBytes, blkInfo);
-        batch.delete(blkBytes);
-        try {
-          containerDB.writeBatch(batch);
-          newDeletionBlocks++;
-          LOG.debug("Transited Block {} to DELETING state in container {}",
-              blk, containerId);
-        } catch (IOException e) {
-          // if some blocks failed to delete, we fail this TX,
-          // without sending this ACK to SCM, SCM will resend the TX
-          // with a certain number of retries.
-          throw new IOException(
-              "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
+    try(ReferenceCountedDB containerDB = BlockUtils.getDB(containerData, 
conf)) {
+      for (Long blk : delTX.getLocalIDList()) {
+        BatchOperation batch = new BatchOperation();
+        byte[] blkBytes = Longs.toByteArray(blk);
+        byte[] blkInfo = containerDB.getStore().get(blkBytes);
+        if (blkInfo != null) {
+          byte[] deletingKeyBytes =
+              DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk);
+          byte[] deletedKeyBytes =
+              DFSUtil.string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blk);
+          if (containerDB.getStore().get(deletingKeyBytes) != null
+              || containerDB.getStore().get(deletedKeyBytes) != null) {
+            LOG.debug(String.format(
+                "Ignoring delete for block %d in container %d."
+                    + " Entry already added.", blk, containerId));
+            continue;
+          }
+          // Found the block in container db,
+          // use an atomic update to change its state to deleting.
+          batch.put(deletingKeyBytes, blkInfo);
+          batch.delete(blkBytes);
+          try {
+            containerDB.getStore().writeBatch(batch);
+            newDeletionBlocks++;
+            LOG.debug("Transited Block {} to DELETING state in container {}",
+                blk, containerId);
+          } catch (IOException e) {
+            // if some blocks failed to delete, we fail this TX,
+            // without sending this ACK to SCM, SCM will resend the TX
+            // with a certain number of retries.
+            throw new IOException(
+                "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
+          }
+        } else {
+          LOG.debug("Block {} not found or already under deletion in"
+              + " container {}, skip deleting it.", blk, containerId);
         }
-      } else {
-        LOG.debug("Block {} not found or already under deletion in"
-            + " container {}, skip deleting it.", blk, containerId);
       }
-    }
 
-    containerDB
-        .put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX),
-            Longs.toByteArray(delTX.getTxID()));
-    containerData
-        .updateDeleteTransactionId(delTX.getTxID());
-    // update pending deletion blocks count in in-memory container status
-    containerData.incrPendingDeletionBlocks(newDeletionBlocks);
+      containerDB.getStore()
+          .put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX),
+              Longs.toByteArray(delTX.getTxID()));
+      containerData
+          .updateDeleteTransactionId(delTX.getTxID());
+      // update pending deletion blocks count in in-memory container status
+      containerData.incrPendingDeletionBlocks(newDeletionBlocks);
+    }
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
index 25d1bdf..5c5d8bc 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
@@ -28,8 +28,11 @@ import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -92,8 +95,8 @@ public final class ContainerCache extends LRUMap {
       MapIterator iterator = cache.mapIterator();
       while (iterator.hasNext()) {
         iterator.next();
-        MetadataStore db = (MetadataStore) iterator.getValue();
-        closeDB((String)iterator.getKey(), db);
+        ReferenceCountedDB db = (ReferenceCountedDB) iterator.getValue();
+        db.setEvicted(true);
       }
       // reset the cache
       cache.clear();
@@ -107,11 +110,11 @@ public final class ContainerCache extends LRUMap {
    */
   @Override
   protected boolean removeLRU(LinkEntry entry) {
-    MetadataStore db = (MetadataStore) entry.getValue();
+    ReferenceCountedDB db = (ReferenceCountedDB) entry.getValue();
     String dbFile = (String)entry.getKey();
     lock.lock();
     try {
-      closeDB(dbFile, db);
+      db.setEvicted(false);
       return true;
     } catch (Exception e) {
       LOG.error("Eviction for db:{} failed", dbFile, e);
@@ -128,26 +131,30 @@ public final class ContainerCache extends LRUMap {
    * @param containerDBType - DB type of the container.
    * @param containerDBPath - DB path of the container.
    * @param conf - Hadoop Configuration.
-   * @return MetadataStore.
+   * @return ReferenceCountedDB.
    */
-  public MetadataStore getDB(long containerID, String containerDBType,
+  public ReferenceCountedDB getDB(long containerID, String containerDBType,
                              String containerDBPath, Configuration conf)
       throws IOException {
     Preconditions.checkState(containerID >= 0,
         "Container ID cannot be negative.");
     lock.lock();
     try {
-      MetadataStore db = (MetadataStore) this.get(containerDBPath);
+      ReferenceCountedDB db = (ReferenceCountedDB) this.get(containerDBPath);
 
       if (db == null) {
-        db = MetadataStoreBuilder.newBuilder()
+        MetadataStore metadataStore =
+            MetadataStoreBuilder.newBuilder()
             .setDbFile(new File(containerDBPath))
             .setCreateIfMissing(false)
             .setConf(conf)
             .setDBType(containerDBType)
             .build();
+        db = new ReferenceCountedDB(metadataStore, containerDBPath);
         this.put(containerDBPath, db);
       }
+      // increment the reference before returning the object
+      db.incrementReference();
       return db;
     } catch (Exception e) {
       LOG.error("Error opening DB. Container:{} ContainerPath:{}",
@@ -161,16 +168,68 @@ public final class ContainerCache extends LRUMap {
   /**
    * Remove a DB handler from cache.
    *
-   * @param containerPath - path of the container db file.
+   * @param containerDBPath - path of the container db file.
    */
-  public void removeDB(String containerPath) {
+  public void removeDB(String containerDBPath) {
     lock.lock();
     try {
-      MetadataStore db = (MetadataStore)this.get(containerPath);
-      closeDB(containerPath, db);
-      this.remove(containerPath);
+      ReferenceCountedDB db = (ReferenceCountedDB)this.get(containerDBPath);
+      if (db != null) {
+        // marking it as evicted will close the db as well.
+        db.setEvicted(true);
+      }
+      this.remove(containerDBPath);
     } finally {
       lock.unlock();
     }
   }
+
+
+  /**
+   * Class to implement reference counting over instances handed by Container
+   * Cache.
+   */
+  public class ReferenceCountedDB implements Closeable {
+    private final AtomicInteger referenceCount;
+    private final AtomicBoolean isEvicted;
+    private final MetadataStore store;
+    private final String containerDBPath;
+
+    public ReferenceCountedDB(MetadataStore store, String containerDBPath) {
+      this.referenceCount = new AtomicInteger(0);
+      this.isEvicted = new AtomicBoolean(false);
+      this.store = store;
+      this.containerDBPath = containerDBPath;
+    }
+
+    private void incrementReference() {
+      this.referenceCount.incrementAndGet();
+    }
+
+    private void decrementReference() {
+      this.referenceCount.decrementAndGet();
+      cleanup();
+    }
+
+    private void setEvicted(boolean force) {
+      Preconditions.checkState(!force || (referenceCount.get() == 0),
+          "Force:%b, referencount:%d", force, referenceCount.get());
+      isEvicted.set(true);
+      cleanup();
+    }
+
+    private void cleanup() {
+      if (referenceCount.get() == 0 && isEvicted.get() && store != null) {
+        closeDB(containerDBPath, store);
+      }
+    }
+
+    public MetadataStore getStore() {
+      return store;
+    }
+
+    public void close() {
+      decrementReference();
+    }
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java
index 535af29..15a7fdb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java
@@ -31,11 +31,12 @@ import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocat
 import org.apache.hadoop.utils.MetaStoreIterator;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.apache.hadoop.utils.MetadataStore.KeyValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.NoSuchElementException;
@@ -48,12 +49,14 @@ import java.util.NoSuchElementException;
  * {@link MetadataKeyFilters#getNormalKeyFilter()}
  */
 @InterfaceAudience.Public
-public class KeyValueBlockIterator implements BlockIterator<BlockData> {
+public class KeyValueBlockIterator implements BlockIterator<BlockData>,
+    Closeable {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       KeyValueBlockIterator.class);
 
   private MetaStoreIterator<KeyValue> blockIterator;
+  private final ReferenceCountedDB db;
   private static KeyPrefixFilter defaultBlockFilter = MetadataKeyFilters
       .getNormalKeyFilter();
   private KeyPrefixFilter blockFilter;
@@ -91,9 +94,9 @@ public class KeyValueBlockIterator implements 
BlockIterator<BlockData> {
         containerData;
     keyValueContainerData.setDbFile(KeyValueContainerLocationUtil
         .getContainerDBFile(metdataPath, containerId));
-    MetadataStore metadataStore = BlockUtils.getDB(keyValueContainerData, new
+    db = BlockUtils.getDB(keyValueContainerData, new
         OzoneConfiguration());
-    blockIterator = metadataStore.iterator();
+    blockIterator = db.getStore().iterator();
     blockFilter = filter;
   }
 
@@ -145,4 +148,8 @@ public class KeyValueBlockIterator implements 
BlockIterator<BlockData> {
     nextBlock = null;
     blockIterator.seekToLast();
   }
+
+  public void close() {
+   db.close();
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 26b0ce1..cf275bf 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -74,6 +74,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.UNSUPPORTED_REQUEST;
 
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -349,11 +350,12 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
 
   void compactDB() throws StorageContainerException {
     try {
-      MetadataStore db = BlockUtils.getDB(containerData, config);
-      db.compactDB();
-      LOG.info("Container {} is closed with bcsId {}.",
-          containerData.getContainerID(),
-          containerData.getBlockCommitSequenceId());
+      try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
+        db.getStore().compactDB();
+        LOG.info("Container {} is closed with bcsId {}.",
+            containerData.getContainerID(),
+            containerData.getBlockCommitSequenceId());
+      }
     } catch (StorageContainerException ex) {
       throw ex;
     } catch (IOException ex) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index bdfdf21..fbfb7b9 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -36,6 +36,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -236,41 +237,41 @@ public class KeyValueContainerCheck {
 
 
     onDiskContainerData.setDbFile(dbFile);
-    MetadataStore db = BlockUtils
-        .getDB(onDiskContainerData, checkConfig);
-
-    iterateBlockDB(db);
+    try(ReferenceCountedDB db = BlockUtils.getDB(onDiskContainerData, 
checkConfig)) {
+      iterateBlockDB(db);
+    }
   }
 
-  private void iterateBlockDB(MetadataStore db)
+  private void iterateBlockDB(ReferenceCountedDB db)
       throws IOException {
     Preconditions.checkState(db != null);
 
     // get "normal" keys from the Block DB
-    KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
-        new File(onDiskContainerData.getContainerPath()));
-
-    // ensure there is a chunk file for each key in the DB
-    while (kvIter.hasNext()) {
-      BlockData block = kvIter.nextBlock();
-
-      List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
-      for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
-        File chunkFile;
-        chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
-            ChunkInfo.getFromProtoBuf(chunk));
-
-        if (!chunkFile.exists()) {
-          // concurrent mutation in Block DB? lookup the block again.
-          byte[] bdata = db.get(
-              Longs.toByteArray(block.getBlockID().getLocalID()));
-          if (bdata == null) {
-            LOG.trace("concurrency with delete, ignoring deleted block");
-            break; // skip to next block from kvIter
-          } else {
-            String errorStr = "Missing chunk file "
-                + chunkFile.getAbsolutePath();
-            throw new IOException(errorStr);
+    try(KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
+        new File(onDiskContainerData.getContainerPath()))) {
+
+      // ensure there is a chunk file for each key in the DB
+      while (kvIter.hasNext()) {
+        BlockData block = kvIter.nextBlock();
+
+        List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
+        for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
+          File chunkFile;
+          chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
+              ChunkInfo.getFromProtoBuf(chunk));
+
+          if (!chunkFile.exists()) {
+            // concurrent mutation in Block DB? lookup the block again.
+            byte[] bdata = db.getStore().get(
+                Longs.toByteArray(block.getBlockID().getLocalID()));
+            if (bdata == null) {
+              LOG.trace("concurrency with delete, ignoring deleted block");
+              break; // skip to next block from kvIter
+            } else {
+              String errorStr = "Missing chunk file "
+                  + chunkFile.getAbsolutePath();
+              throw new IOException(errorStr);
+            }
           }
         }
       }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
index 996b592..fd3c768 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
@@ -37,7 +37,7 @@ import 
org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 
 import java.io.IOException;
 
@@ -66,7 +66,7 @@ public final class BlockUtils {
    * @return MetadataStore handle.
    * @throws StorageContainerException
    */
-  public static MetadataStore getDB(KeyValueContainerData containerData,
+  public static ReferenceCountedDB getDB(KeyValueContainerData containerData,
                                     Configuration conf) throws
       StorageContainerException {
     Preconditions.checkNotNull(containerData);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index 7a30955..5631336 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.utils.MetadataStoreBuilder;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -174,22 +175,24 @@ public final class KeyValueContainerUtil {
     }
     kvContainerData.setDbFile(dbFile);
 
-    MetadataStore metadata = BlockUtils.getDB(kvContainerData, config);
-    long bytesUsed = 0;
-    List<Map.Entry<byte[], byte[]>> liveKeys = metadata
-        .getRangeKVs(null, Integer.MAX_VALUE,
-            MetadataKeyFilters.getNormalKeyFilter());
-    bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
-      BlockData blockData;
-      try {
-        blockData = BlockUtils.getBlockData(e.getValue());
-        return blockData.getSize();
-      } catch (IOException ex) {
-        return 0L;
-      }
-    }).sum();
-    kvContainerData.setBytesUsed(bytesUsed);
-    kvContainerData.setKeyCount(liveKeys.size());
+    try(ReferenceCountedDB metadata = BlockUtils.getDB(kvContainerData, 
config)) {
+      long bytesUsed = 0;
+      List<Map.Entry<byte[], byte[]>> liveKeys = metadata.getStore()
+          .getRangeKVs(null, Integer.MAX_VALUE,
+              MetadataKeyFilters.getNormalKeyFilter());
+
+      bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
+        BlockData blockData;
+        try {
+          blockData = BlockUtils.getBlockData(e.getValue());
+          return blockData.getSize();
+        } catch (IOException ex) {
+          return 0L;
+        }
+      }).sum();
+      kvContainerData.setBytesUsed(bytesUsed);
+      kvContainerData.setKeyCount(liveKeys.size());
+    }
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 3033dd9..e0944c1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -35,7 +35,7 @@ import 
org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
 import org.apache.hadoop.utils.BatchOperation;
 import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,47 +84,48 @@ public class BlockManagerImpl implements BlockManager {
         "cannot be negative");
     // We are not locking the key manager since LevelDb serializes all actions
     // against a single DB. We rely on DB level locking to avoid conflicts.
-    MetadataStore db = BlockUtils.getDB((KeyValueContainerData) container
-        .getContainerData(), config);
+    try(ReferenceCountedDB db = BlockUtils.getDB((KeyValueContainerData) 
container
+        .getContainerData(), config)) {
 
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.
+      Preconditions.checkNotNull(db, "DB cannot be null here");
 
-    long bcsId = data.getBlockCommitSequenceId();
-    long containerBCSId = ((KeyValueContainerData) 
container.getContainerData())
-        .getBlockCommitSequenceId();
+      long bcsId = data.getBlockCommitSequenceId();
+      long containerBCSId = ((KeyValueContainerData) 
container.getContainerData())
+          .getBlockCommitSequenceId();
 
-    // default blockCommitSequenceId for any block is 0. It the putBlock
-    // request is not coming via Ratis(for test scenarios), it will be 0.
-    // In such cases, we should overwrite the block as well
-    if (bcsId != 0) {
-      if (bcsId <= containerBCSId) {
-        // Since the blockCommitSequenceId stored in the db is greater than
-        // equal to blockCommitSequenceId to be updated, it means the putBlock
-        // transaction is reapplied in the ContainerStateMachine on restart.
-        // It also implies that the given block must already exist in the db.
-        // just log and return
-        LOG.warn("blockCommitSequenceId " + containerBCSId
-            + " in the Container Db is greater than" + " the supplied value "
-            + bcsId + " .Ignoring it");
-        return data.getSize();
+      // default blockCommitSequenceId for any block is 0. It the putBlock
+      // request is not coming via Ratis(for test scenarios), it will be 0.
+      // In such cases, we should overwrite the block as well
+      if (bcsId != 0) {
+        if (bcsId <= containerBCSId) {
+          // Since the blockCommitSequenceId stored in the db is greater than
+          // equal to blockCommitSequenceId to be updated, it means the 
putBlock
+          // transaction is reapplied in the ContainerStateMachine on restart.
+          // It also implies that the given block must already exist in the db.
+          // just log and return
+          LOG.warn("blockCommitSequenceId " + containerBCSId
+              + " in the Container Db is greater than" + " the supplied value "
+              + bcsId + " .Ignoring it");
+          return data.getSize();
+        }
       }
+      // update the blockData as well as BlockCommitSequenceId here
+      BatchOperation batch = new BatchOperation();
+      batch.put(Longs.toByteArray(data.getLocalID()),
+          data.getProtoBufMessage().toByteArray());
+      batch.put(blockCommitSequenceIdKey,
+          Longs.toByteArray(bcsId));
+      db.getStore().writeBatch(batch);
+      container.updateBlockCommitSequenceId(bcsId);
+      // Increment keycount here
+      container.getContainerData().incrKeyCount();
+      LOG.debug(
+          "Block " + data.getBlockID() + " successfully committed with bcsId "
+              + bcsId + " chunk size " + data.getChunks().size());
+      return data.getSize();
     }
-    // update the blockData as well as BlockCommitSequenceId here
-    BatchOperation batch = new BatchOperation();
-    batch.put(Longs.toByteArray(data.getLocalID()),
-        data.getProtoBufMessage().toByteArray());
-    batch.put(blockCommitSequenceIdKey,
-        Longs.toByteArray(bcsId));
-    db.writeBatch(batch);
-    container.updateBlockCommitSequenceId(bcsId);
-    // Increment keycount here
-    container.getContainerData().incrKeyCount();
-    LOG.debug(
-        "Block " + data.getBlockID() + " successfully committed with bcsId "
-            + bcsId + " chunk size " + data.getChunks().size());
-    return data.getSize();
   }
 
   /**
@@ -146,32 +147,33 @@ public class BlockManagerImpl implements BlockManager {
 
     KeyValueContainerData containerData = (KeyValueContainerData) container
         .getContainerData();
-    MetadataStore db = BlockUtils.getDB(containerData, config);
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
+    try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.
+      Preconditions.checkNotNull(db, "DB cannot be null here");
 
-    long containerBCSId = containerData.getBlockCommitSequenceId();
-    if (containerBCSId < bcsId) {
-      throw new StorageContainerException(
-          "Unable to find the block with bcsID " + bcsId + " .Container "
-              + container.getContainerData().getContainerID() + " bcsId is "
-              + containerBCSId + ".", UNKNOWN_BCSID);
-    }
-    byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
-    if (kData == null) {
-      throw new StorageContainerException("Unable to find the block." + 
blockID,
-          NO_SUCH_BLOCK);
-    }
-    ContainerProtos.BlockData blockData =
-        ContainerProtos.BlockData.parseFrom(kData);
-    long id = blockData.getBlockID().getBlockCommitSequenceId();
-    if (id < bcsId) {
-      throw new StorageContainerException(
-          "bcsId " + bcsId + " mismatches with existing block Id "
-              + id + " for block " + blockID + ".", BCSID_MISMATCH);
+      long containerBCSId = containerData.getBlockCommitSequenceId();
+      if (containerBCSId < bcsId) {
+        throw new StorageContainerException(
+            "Unable to find the block with bcsID " + bcsId + " .Container "
+                + container.getContainerData().getContainerID() + " bcsId is "
+                + containerBCSId + ".", UNKNOWN_BCSID);
+      }
+      byte[] kData = 
db.getStore().get(Longs.toByteArray(blockID.getLocalID()));
+      if (kData == null) {
+        throw new StorageContainerException("Unable to find the block." + 
blockID,
+            NO_SUCH_BLOCK);
+      }
+      ContainerProtos.BlockData blockData =
+          ContainerProtos.BlockData.parseFrom(kData);
+      long id = blockData.getBlockID().getBlockCommitSequenceId();
+      if (id < bcsId) {
+        throw new StorageContainerException(
+            "bcsId " + bcsId + " mismatches with existing block Id "
+                + id + " for block " + blockID + ".", BCSID_MISMATCH);
+      }
+      return BlockData.getFromProtoBuf(blockData);
     }
-    return BlockData.getFromProtoBuf(blockData);
   }
 
   /**
@@ -187,18 +189,19 @@ public class BlockManagerImpl implements BlockManager {
       throws IOException {
     KeyValueContainerData containerData = (KeyValueContainerData) container
         .getContainerData();
-    MetadataStore db = BlockUtils.getDB(containerData, config);
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
-    byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
-    if (kData == null) {
-      throw new StorageContainerException("Unable to find the block.",
-          NO_SUCH_BLOCK);
+    try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.
+      Preconditions.checkNotNull(db, "DB cannot be null here");
+      byte[] kData = 
db.getStore().get(Longs.toByteArray(blockID.getLocalID()));
+      if (kData == null) {
+        throw new StorageContainerException("Unable to find the block.",
+            NO_SUCH_BLOCK);
+      }
+      ContainerProtos.BlockData blockData =
+          ContainerProtos.BlockData.parseFrom(kData);
+      return blockData.getSize();
     }
-    ContainerProtos.BlockData blockData =
-        ContainerProtos.BlockData.parseFrom(kData);
-    return blockData.getSize();
   }
 
   /**
@@ -218,24 +221,24 @@ public class BlockManagerImpl implements BlockManager {
 
     KeyValueContainerData cData = (KeyValueContainerData) container
         .getContainerData();
-    MetadataStore db = BlockUtils.getDB(cData, config);
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
-    // Note : There is a race condition here, since get and delete
-    // are not atomic. Leaving it here since the impact is refusing
-    // to delete a Block which might have just gotten inserted after
-    // the get check.
-    byte[] kKey = Longs.toByteArray(blockID.getLocalID());
-    byte[] kData = db.get(kKey);
-    if (kData == null) {
-      throw new StorageContainerException("Unable to find the block.",
-          NO_SUCH_BLOCK);
+    try(ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.
+      Preconditions.checkNotNull(db, "DB cannot be null here");
+      // Note : There is a race condition here, since get and delete
+      // are not atomic. Leaving it here since the impact is refusing
+      // to delete a Block which might have just gotten inserted after
+      // the get check.
+      byte[] kKey = Longs.toByteArray(blockID.getLocalID());
+      try {
+        db.getStore().delete(kKey);
+      } catch (IOException e) {
+        throw new StorageContainerException("Unable to find the block.",
+            NO_SUCH_BLOCK);
+      }
+      // Decrement blockcount here
+      container.getContainerData().decrKeyCount();
     }
-    db.delete(kKey);
-
-    // Decrement blockcount here
-    container.getContainerData().decrKeyCount();
   }
 
   /**
@@ -258,18 +261,19 @@ public class BlockManagerImpl implements BlockManager {
     List<BlockData> result = null;
     KeyValueContainerData cData = (KeyValueContainerData) container
         .getContainerData();
-    MetadataStore db = BlockUtils.getDB(cData, config);
-    result = new ArrayList<>();
-    byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
-    List<Map.Entry<byte[], byte[]>> range =
-        db.getSequentialRangeKVs(startKeyInBytes, count,
-            MetadataKeyFilters.getNormalKeyFilter());
-    for (Map.Entry<byte[], byte[]> entry : range) {
-      BlockData value = BlockUtils.getBlockData(entry.getValue());
-      BlockData data = new BlockData(value.getBlockID());
-      result.add(data);
+    try(ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
+      result = new ArrayList<>();
+      byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
+      List<Map.Entry<byte[], byte[]>> range =
+          db.getStore().getSequentialRangeKVs(startKeyInBytes, count,
+              MetadataKeyFilters.getNormalKeyFilter());
+      for (Map.Entry<byte[], byte[]> entry : range) {
+        BlockData value = BlockUtils.getBlockData(entry.getValue());
+        BlockData data = new BlockData(value.getBlockID());
+        result.add(data);
+      }
+      return result;
     }
-    return result;
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
index 61a303f..52bef7b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.utils.BackgroundTaskQueue;
 import org.apache.hadoop.utils.BackgroundTaskResult;
 import org.apache.hadoop.utils.BatchOperation;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -185,69 +185,70 @@ public class BlockDeletingService extends 
BackgroundService{
       ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
       long startTime = Time.monotonicNow();
       // Scan container's db and get list of under deletion blocks
-      MetadataStore meta = BlockUtils.getDB(
-          (KeyValueContainerData) containerData, conf);
-      // # of blocks to delete is throttled
-      KeyPrefixFilter filter =
-          new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX);
-      List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
-          meta.getSequentialRangeKVs(null, blockLimitPerTask, filter);
-      if (toDeleteBlocks.isEmpty()) {
-        LOG.debug("No under deletion block found in container : {}",
-            containerData.getContainerID());
-      }
+      try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) {
+        // # of blocks to delete is throttled
+        KeyPrefixFilter filter =
+            new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX);
+        List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
+            meta.getStore().getSequentialRangeKVs(null, blockLimitPerTask,
+                filter);
+        if (toDeleteBlocks.isEmpty()) {
+          LOG.debug("No under deletion block found in container : {}",
+              containerData.getContainerID());
+        }
 
-      List<String> succeedBlocks = new LinkedList<>();
-      LOG.debug("Container : {}, To-Delete blocks : {}",
-          containerData.getContainerID(), toDeleteBlocks.size());
-      File dataDir = new File(containerData.getChunksPath());
-      if (!dataDir.exists() || !dataDir.isDirectory()) {
-        LOG.error("Invalid container data dir {} : "
-            + "does not exist or not a directory", dataDir.getAbsolutePath());
-        return crr;
-      }
+        List<String> succeedBlocks = new LinkedList<>();
+        LOG.debug("Container : {}, To-Delete blocks : {}",
+            containerData.getContainerID(), toDeleteBlocks.size());
+        File dataDir = new File(containerData.getChunksPath());
+        if (!dataDir.exists() || !dataDir.isDirectory()) {
+          LOG.error("Invalid container data dir {} : "
+              + "does not exist or not a directory", 
dataDir.getAbsolutePath());
+          return crr;
+        }
 
-      toDeleteBlocks.forEach(entry -> {
-        String blockName = DFSUtil.bytes2String(entry.getKey());
-        LOG.debug("Deleting block {}", blockName);
-        try {
-          ContainerProtos.BlockData data =
-              ContainerProtos.BlockData.parseFrom(entry.getValue());
-          for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
-            File chunkFile = dataDir.toPath()
-                .resolve(chunkInfo.getChunkName()).toFile();
-            if (FileUtils.deleteQuietly(chunkFile)) {
-              LOG.debug("block {} chunk {} deleted", blockName,
-                  chunkFile.getAbsolutePath());
+        toDeleteBlocks.forEach(entry -> {
+          String blockName = DFSUtil.bytes2String(entry.getKey());
+          LOG.debug("Deleting block {}", blockName);
+          try {
+            ContainerProtos.BlockData data =
+                ContainerProtos.BlockData.parseFrom(entry.getValue());
+            for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
+              File chunkFile = dataDir.toPath()
+                  .resolve(chunkInfo.getChunkName()).toFile();
+              if (FileUtils.deleteQuietly(chunkFile)) {
+                LOG.debug("block {} chunk {} deleted", blockName,
+                    chunkFile.getAbsolutePath());
+              }
             }
+            succeedBlocks.add(blockName);
+          } catch (InvalidProtocolBufferException e) {
+            LOG.error("Failed to parse block info for block {}", blockName, e);
           }
-          succeedBlocks.add(blockName);
-        } catch (InvalidProtocolBufferException e) {
-          LOG.error("Failed to parse block info for block {}", blockName, e);
-        }
-      });
+        });
 
-      // Once files are deleted... replace deleting entries with deleted 
entries
-      BatchOperation batch = new BatchOperation();
-      succeedBlocks.forEach(entry -> {
-        String blockId =
-            entry.substring(OzoneConsts.DELETING_KEY_PREFIX.length());
-        String deletedEntry = OzoneConsts.DELETED_KEY_PREFIX + blockId;
-        batch.put(DFSUtil.string2Bytes(deletedEntry),
-            DFSUtil.string2Bytes(blockId));
-        batch.delete(DFSUtil.string2Bytes(entry));
-      });
-      meta.writeBatch(batch);
-      // update count of pending deletion blocks in in-memory container status
-      containerData.decrPendingDeletionBlocks(succeedBlocks.size());
+        // Once files are deleted... replace deleting entries with deleted 
entries
+        BatchOperation batch = new BatchOperation();
+        succeedBlocks.forEach(entry -> {
+          String blockId =
+              entry.substring(OzoneConsts.DELETING_KEY_PREFIX.length());
+          String deletedEntry = OzoneConsts.DELETED_KEY_PREFIX + blockId;
+          batch.put(DFSUtil.string2Bytes(deletedEntry),
+              DFSUtil.string2Bytes(blockId));
+          batch.delete(DFSUtil.string2Bytes(entry));
+        });
+        meta.getStore().writeBatch(batch);
+        // update count of pending deletion blocks in in-memory container 
status
+        containerData.decrPendingDeletionBlocks(succeedBlocks.size());
 
-      if (!succeedBlocks.isEmpty()) {
-        LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
-            containerData.getContainerID(), succeedBlocks.size(),
-            Time.monotonicNow() - startTime);
+        if (!succeedBlocks.isEmpty()) {
+          LOG.info("Container: {}, deleted blocks: {}, task elapsed time: 
{}ms",
+              containerData.getContainerID(), succeedBlocks.size(),
+              Time.monotonicNow() - startTime);
+        }
+        crr.addAll(succeedBlocks);
+        return crr;
       }
-      crr.addAll(succeedBlocks);
-      return crr;
     }
 
     @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 0192fd5..fdddc4e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -38,7 +38,7 @@ import 
org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -180,28 +180,31 @@ public class ContainerReader implements Runnable {
         KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
         KeyValueContainer kvContainer = new KeyValueContainer(
             kvContainerData, config);
-        MetadataStore containerDB = BlockUtils.getDB(kvContainerData, config);
-        MetadataKeyFilters.KeyPrefixFilter filter =
-            new MetadataKeyFilters.KeyPrefixFilter()
-                .addFilter(OzoneConsts.DELETING_KEY_PREFIX);
-        int numPendingDeletionBlocks =
-            containerDB.getSequentialRangeKVs(null, Integer.MAX_VALUE, filter)
-                .size();
-        kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
-        byte[] delTxnId = containerDB.get(
-            DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX));
-        if (delTxnId != null) {
-          kvContainerData
-              .updateDeleteTransactionId(Longs.fromByteArray(delTxnId));
-        }
-        // sets the BlockCommitSequenceId.
-        byte[] bcsId = containerDB.get(
-            DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX));
-        if (bcsId != null) {
-          kvContainerData
-              .updateBlockCommitSequenceId(Longs.fromByteArray(bcsId));
+        try(ReferenceCountedDB containerDB = BlockUtils.getDB(kvContainerData,
+            config)) {
+          MetadataKeyFilters.KeyPrefixFilter filter =
+              new MetadataKeyFilters.KeyPrefixFilter()
+                  .addFilter(OzoneConsts.DELETING_KEY_PREFIX);
+          int numPendingDeletionBlocks =
+              containerDB.getStore().getSequentialRangeKVs(null,
+                  Integer.MAX_VALUE, filter)
+                  .size();
+          kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
+          byte[] delTxnId = containerDB.getStore().get(
+              DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX));
+          if (delTxnId != null) {
+            kvContainerData
+                .updateDeleteTransactionId(Longs.fromByteArray(delTxnId));
+          }
+          // sets the BlockCommitSequenceId.
+          byte[] bcsId = containerDB.getStore().get(
+              
DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX));
+          if (bcsId != null) {
+            kvContainerData
+                .updateBlockCommitSequenceId(Longs.fromByteArray(bcsId));
+          }
+          containerSet.addContainer(kvContainer);
         }
-        containerSet.addContainer(kvContainer);
       } else {
         throw new StorageContainerException("Container File is corrupted. " +
             "ContainerType is KeyValueContainer but cast to " +
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
index 15d7b34..687e64e 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
@@ -34,7 +34,7 @@ import 
org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -109,30 +109,31 @@ public class TestKeyValueBlockIterator {
     createContainerWithBlocks(containerID, normalBlocks, deletedBlocks);
     String containerPath = new File(containerData.getMetadataPath())
         .getParent();
-    KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath));
-
-    int counter = 0;
-    while(keyValueBlockIterator.hasNext()) {
-      BlockData blockData = keyValueBlockIterator.nextBlock();
-      assertEquals(blockData.getLocalID(), counter++);
-    }
-
-    assertFalse(keyValueBlockIterator.hasNext());
-
-    keyValueBlockIterator.seekToFirst();
-    counter = 0;
-    while(keyValueBlockIterator.hasNext()) {
-      BlockData blockData = keyValueBlockIterator.nextBlock();
-      assertEquals(blockData.getLocalID(), counter++);
-    }
-    assertFalse(keyValueBlockIterator.hasNext());
-
-    try {
-      keyValueBlockIterator.nextBlock();
-    } catch (NoSuchElementException ex) {
-      GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
-          "for ContainerID " + containerID, ex);
+    try(KeyValueBlockIterator keyValueBlockIterator = new 
KeyValueBlockIterator(
+        containerID, new File(containerPath))) {
+
+      int counter = 0;
+      while (keyValueBlockIterator.hasNext()) {
+        BlockData blockData = keyValueBlockIterator.nextBlock();
+        assertEquals(blockData.getLocalID(), counter++);
+      }
+
+      assertFalse(keyValueBlockIterator.hasNext());
+
+      keyValueBlockIterator.seekToFirst();
+      counter = 0;
+      while (keyValueBlockIterator.hasNext()) {
+        BlockData blockData = keyValueBlockIterator.nextBlock();
+        assertEquals(blockData.getLocalID(), counter++);
+      }
+      assertFalse(keyValueBlockIterator.hasNext());
+
+      try {
+        keyValueBlockIterator.nextBlock();
+      } catch (NoSuchElementException ex) {
+        GenericTestUtils.assertExceptionContains("Block Iterator reached end " 
+
+            "for ContainerID " + containerID, ex);
+      }
     }
   }
 
@@ -142,17 +143,18 @@ public class TestKeyValueBlockIterator {
     createContainerWithBlocks(containerID, 2, 0);
     String containerPath = new File(containerData.getMetadataPath())
         .getParent();
-    KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath));
-    long blockID = 0L;
-    assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
-    assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
-
-    try {
-      keyValueBlockIterator.nextBlock();
-    } catch (NoSuchElementException ex) {
-      GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
-          "for ContainerID " + containerID, ex);
+    try(KeyValueBlockIterator keyValueBlockIterator = new 
KeyValueBlockIterator(
+        containerID, new File(containerPath))) {
+      long blockID = 0L;
+      assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
+      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
+
+      try {
+        keyValueBlockIterator.nextBlock();
+      } catch (NoSuchElementException ex) {
+        GenericTestUtils.assertExceptionContains("Block Iterator reached end " 
+
+            "for ContainerID " + containerID, ex);
+      }
     }
   }
 
@@ -162,42 +164,41 @@ public class TestKeyValueBlockIterator {
     createContainerWithBlocks(containerID, 2, 0);
     String containerPath = new File(containerData.getMetadataPath())
         .getParent();
-    KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath));
-    long blockID = 0L;
-
-    // Even calling multiple times hasNext() should not move entry forward.
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
-
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
-
-    keyValueBlockIterator.seekToLast();
-    assertTrue(keyValueBlockIterator.hasNext());
-    assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
-
-    keyValueBlockIterator.seekToFirst();
-    blockID = 0L;
-    assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
-    assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
-
-    try {
-      keyValueBlockIterator.nextBlock();
-    } catch (NoSuchElementException ex) {
-      GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
-          "for ContainerID " + containerID, ex);
+    try(KeyValueBlockIterator keyValueBlockIterator = new 
KeyValueBlockIterator(
+        containerID, new File(containerPath))) {
+      long blockID = 0L;
+
+      // Even calling multiple times hasNext() should not move entry forward.
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
+
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
+
+      keyValueBlockIterator.seekToLast();
+      assertTrue(keyValueBlockIterator.hasNext());
+      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
+
+      keyValueBlockIterator.seekToFirst();
+      blockID = 0L;
+      assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
+      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
+
+      try {
+        keyValueBlockIterator.nextBlock();
+      } catch (NoSuchElementException ex) {
+        GenericTestUtils.assertExceptionContains("Block Iterator reached end " 
+
+            "for ContainerID " + containerID, ex);
+      }
     }
-
-
   }
 
   @Test
@@ -208,14 +209,15 @@ public class TestKeyValueBlockIterator {
     createContainerWithBlocks(containerId, normalBlocks, deletedBlocks);
     String containerPath = new File(containerData.getMetadataPath())
         .getParent();
-    KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
+    try(KeyValueBlockIterator keyValueBlockIterator = new 
KeyValueBlockIterator(
         containerId, new File(containerPath), MetadataKeyFilters
-        .getDeletingKeyFilter());
+        .getDeletingKeyFilter())) {
 
-    int counter = 5;
-    while(keyValueBlockIterator.hasNext()) {
-      BlockData blockData = keyValueBlockIterator.nextBlock();
-      assertEquals(blockData.getLocalID(), counter++);
+      int counter = 5;
+      while (keyValueBlockIterator.hasNext()) {
+        BlockData blockData = keyValueBlockIterator.nextBlock();
+        assertEquals(blockData.getLocalID(), counter++);
+      }
     }
   }
 
@@ -226,11 +228,12 @@ public class TestKeyValueBlockIterator {
     createContainerWithBlocks(containerId, 0, 5);
     String containerPath = new File(containerData.getMetadataPath())
         .getParent();
-    KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerId, new File(containerPath));
-    //As all blocks are deleted blocks, blocks does not match with normal key
-    // filter.
-    assertFalse(keyValueBlockIterator.hasNext());
+    try(KeyValueBlockIterator keyValueBlockIterator = new 
KeyValueBlockIterator(
+        containerId, new File(containerPath))) {
+      //As all blocks are deleted blocks, blocks does not match with normal key
+      // filter.
+      assertFalse(keyValueBlockIterator.hasNext());
+    }
   }
 
   /**
@@ -251,27 +254,30 @@ public class TestKeyValueBlockIterator {
     container = new KeyValueContainer(containerData, conf);
     container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID
         .randomUUID().toString());
-    MetadataStore metadataStore = BlockUtils.getDB(containerData, conf);
-
-    List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
-    ChunkInfo info = new ChunkInfo("chunkfile", 0, 1024);
-    chunkList.add(info.getProtoBufMessage());
-
-    for (int i=0; i<normalBlocks; i++) {
-      BlockID blockID = new BlockID(containerId, i);
-      BlockData blockData = new BlockData(blockID);
-      blockData.setChunks(chunkList);
-      metadataStore.put(Longs.toByteArray(blockID.getLocalID()), blockData
-          .getProtoBufMessage().toByteArray());
-    }
-
-    for (int i=normalBlocks; i<deletedBlocks; i++) {
-      BlockID blockID = new BlockID(containerId, i);
-      BlockData blockData = new BlockData(blockID);
-      blockData.setChunks(chunkList);
-      metadataStore.put(DFSUtil.string2Bytes(OzoneConsts
-          .DELETING_KEY_PREFIX + blockID.getLocalID()), blockData
-          .getProtoBufMessage().toByteArray());
+    try(ReferenceCountedDB metadataStore = BlockUtils.getDB(containerData,
+        conf)) {
+
+      List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
+      ChunkInfo info = new ChunkInfo("chunkfile", 0, 1024);
+      chunkList.add(info.getProtoBufMessage());
+
+      for (int i = 0; i < normalBlocks; i++) {
+        BlockID blockID = new BlockID(containerId, i);
+        BlockData blockData = new BlockData(blockID);
+        blockData.setChunks(chunkList);
+        metadataStore.getStore().put(Longs.toByteArray(blockID.getLocalID()),
+            blockData
+            .getProtoBufMessage().toByteArray());
+      }
+
+      for (int i = normalBlocks; i < deletedBlocks; i++) {
+        BlockID blockID = new BlockID(containerId, i);
+        BlockData blockData = new BlockData(blockID);
+        blockData.setChunks(chunkList);
+        metadataStore.getStore().put(DFSUtil.string2Bytes(OzoneConsts
+            .DELETING_KEY_PREFIX + blockID.getLocalID()), blockData
+            .getProtoBufMessage().toByteArray());
+      }
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 8e2986c..c165741 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -36,7 +36,7 @@ import 
org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -132,23 +132,24 @@ public class TestKeyValueContainer {
   private void addBlocks(int count) throws Exception {
     long containerId = keyValueContainerData.getContainerID();
 
-    MetadataStore metadataStore = BlockUtils.getDB(keyValueContainer
-        .getContainerData(), conf);
-    for (int i=0; i < count; i++) {
-      // Creating BlockData
-      BlockID blockID = new BlockID(containerId, i);
-      BlockData blockData = new BlockData(blockID);
-      blockData.addMetadata("VOLUME", "ozone");
-      blockData.addMetadata("OWNER", "hdfs");
-      List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
-      ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
-          .getLocalID(), 0), 0, 1024);
-      chunkList.add(info.getProtoBufMessage());
-      blockData.setChunks(chunkList);
-      metadataStore.put(Longs.toByteArray(blockID.getLocalID()), blockData
-          .getProtoBufMessage().toByteArray());
+    try(ReferenceCountedDB metadataStore = BlockUtils.getDB(keyValueContainer
+        .getContainerData(), conf)) {
+      for (int i = 0; i < count; i++) {
+        // Creating BlockData
+        BlockID blockID = new BlockID(containerId, i);
+        BlockData blockData = new BlockData(blockID);
+        blockData.addMetadata("VOLUME", "ozone");
+        blockData.addMetadata("OWNER", "hdfs");
+        List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
+        ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
+            .getLocalID(), 0), 0, 1024);
+        chunkList.add(info.getProtoBufMessage());
+        blockData.setChunks(chunkList);
+        metadataStore.getStore().put(Longs.toByteArray(blockID.getLocalID()),
+            blockData
+            .getProtoBufMessage().toByteArray());
+      }
     }
-
   }
 
   @SuppressWarnings("RedundantCast")
@@ -191,9 +192,12 @@ public class TestKeyValueContainer {
 
     int numberOfKeysToWrite = 12;
     //write one few keys to check the key count after import
-    MetadataStore metadataStore = BlockUtils.getDB(keyValueContainerData, 
conf);
-    for (int i = 0; i < numberOfKeysToWrite; i++) {
-      metadataStore.put(("test" + i).getBytes(UTF_8), "test".getBytes(UTF_8));
+    try(ReferenceCountedDB metadataStore =
+        BlockUtils.getDB(keyValueContainerData, conf)) {
+      for (int i = 0; i < numberOfKeysToWrite; i++) {
+        metadataStore.getStore().put(("test" + i).getBytes(UTF_8),
+            "test".getBytes(UTF_8));
+      }
     }
     BlockUtils.removeDB(keyValueContainerData, conf);
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
index 0bc1bbc..f1b90f1 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -149,48 +150,50 @@ import static org.junit.Assert.assertTrue;
     container = new KeyValueContainer(containerData, conf);
     container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
         UUID.randomUUID().toString());
-    MetadataStore metadataStore = BlockUtils.getDB(containerData, conf);
-    chunkManager = new ChunkManagerImpl(true);
-
-    assertTrue(containerData.getChunksPath() != null);
-    File chunksPath = new File(containerData.getChunksPath());
-    assertTrue(chunksPath.exists());
-    // Initially chunks folder should be empty.
-    assertTrue(chunksPath.listFiles().length == 0);
-
-    List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
-    for (int i = 0; i < (totalBlks); i++) {
-      BlockID blockID = new BlockID(containerId, i);
-      BlockData blockData = new BlockData(blockID);
-
-      chunkList.clear();
-      for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
-        String chunkName = strBlock + i + strChunk + chunkCount;
-        long offset = chunkCount * chunkLen;
-        ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen);
-        chunkList.add(info.getProtoBufMessage());
-        chunkManager
-            .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
-                new DispatcherContext.Builder()
-                    .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
-                    .build());
-        chunkManager
-            .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
-                new DispatcherContext.Builder()
-                    .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
-                    .build());
-      }
-      blockData.setChunks(chunkList);
-
-      if (i >= normalBlocks) {
-        // deleted key
-        metadataStore.put(DFSUtil.string2Bytes(
-            OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()),
-            blockData.getProtoBufMessage().toByteArray());
-      } else {
-        // normal key
-        metadataStore.put(Longs.toByteArray(blockID.getLocalID()),
-            blockData.getProtoBufMessage().toByteArray());
+    try (ReferenceCountedDB metadataStore = BlockUtils.getDB(containerData,
+        conf)) {
+      chunkManager = new ChunkManagerImpl(true);
+
+      assertTrue(containerData.getChunksPath() != null);
+      File chunksPath = new File(containerData.getChunksPath());
+      assertTrue(chunksPath.exists());
+      // Initially chunks folder should be empty.
+      assertTrue(chunksPath.listFiles().length == 0);
+
+      List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
+      for (int i = 0; i < (totalBlks); i++) {
+        BlockID blockID = new BlockID(containerId, i);
+        BlockData blockData = new BlockData(blockID);
+
+        chunkList.clear();
+        for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
+          String chunkName = strBlock + i + strChunk + chunkCount;
+          long offset = chunkCount * chunkLen;
+          ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen);
+          chunkList.add(info.getProtoBufMessage());
+          chunkManager
+              .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
+                  new DispatcherContext.Builder()
+                      .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
+                      .build());
+          chunkManager
+              .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
+                  new DispatcherContext.Builder()
+                      .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
+                      .build());
+        }
+        blockData.setChunks(chunkList);
+
+        if (i >= normalBlocks) {
+          // deleted key
+          metadataStore.getStore().put(DFSUtil.string2Bytes(
+              OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()),
+              blockData.getProtoBufMessage().toByteArray());
+        } else {
+          // normal key
+          metadataStore.getStore().put(Longs.toByteArray(blockID.getLocalID()),
+              blockData.getProtoBufMessage().toByteArray());
+        }
       }
     }
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index 5b55119..4cfcd45 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -119,16 +120,17 @@ public class TestStorageContainerManagerHelper {
   public List<String> getPendingDeletionBlocks(Long containerID)
       throws IOException {
     List<String> pendingDeletionBlocks = Lists.newArrayList();
-    MetadataStore meta = getContainerMetadata(containerID);
+    ReferenceCountedDB meta = getContainerMetadata(containerID);
     KeyPrefixFilter filter =
         new KeyPrefixFilter().addFilter(OzoneConsts.DELETING_KEY_PREFIX);
-    List<Map.Entry<byte[], byte[]>> kvs = meta
+    List<Map.Entry<byte[], byte[]>> kvs = meta.getStore()
         .getRangeKVs(null, Integer.MAX_VALUE, filter);
     kvs.forEach(entry -> {
       String key = DFSUtil.bytes2String(entry.getKey());
       pendingDeletionBlocks
           .add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
     });
+    meta.close();
     return pendingDeletionBlocks;
   }
 
@@ -143,17 +145,18 @@ public class TestStorageContainerManagerHelper {
 
   public List<Long> getAllBlocks(Long containeID) throws IOException {
     List<Long> allBlocks = Lists.newArrayList();
-    MetadataStore meta = getContainerMetadata(containeID);
+    ReferenceCountedDB meta = getContainerMetadata(containeID);
     List<Map.Entry<byte[], byte[]>> kvs =
-        meta.getRangeKVs(null, Integer.MAX_VALUE,
+        meta.getStore().getRangeKVs(null, Integer.MAX_VALUE,
             MetadataKeyFilters.getNormalKeyFilter());
     kvs.forEach(entry -> {
       allBlocks.add(Longs.fromByteArray(entry.getKey()));
     });
+    meta.close();
     return allBlocks;
   }
 
-  private MetadataStore getContainerMetadata(Long containerID)
+  private ReferenceCountedDB getContainerMetadata(Long containerID)
       throws IOException {
     ContainerWithPipeline containerWithPipeline = cluster
         .getStorageContainerManager().getClientProtocolServer()
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index bd496d0..7eb274c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -953,18 +953,19 @@ public abstract class TestOzoneRpcClientAbstract {
             .getContainerData());
     String containerPath = new File(containerData.getMetadataPath())
         .getParent();
-    KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath));
-    while (keyValueBlockIterator.hasNext()) {
-      BlockData blockData = keyValueBlockIterator.nextBlock();
-      if (blockData.getBlockID().getLocalID() == localID) {
-        long length = 0;
-        List<ContainerProtos.ChunkInfo> chunks = blockData.getChunks();
-        for (ContainerProtos.ChunkInfo chunk : chunks) {
-          length += chunk.getLen();
+    try(KeyValueBlockIterator keyValueBlockIterator = new 
KeyValueBlockIterator(
+        containerID, new File(containerPath))) {
+      while (keyValueBlockIterator.hasNext()) {
+        BlockData blockData = keyValueBlockIterator.nextBlock();
+        if (blockData.getBlockID().getLocalID() == localID) {
+          long length = 0;
+          List<ContainerProtos.ChunkInfo> chunks = blockData.getChunks();
+          for (ContainerProtos.ChunkInfo chunk : chunks) {
+            length += chunk.getLen();
+          }
+          Assert.assertEquals(length, keyValue.getBytes().length);
+          break;
         }
-        Assert.assertEquals(length, keyValue.getBytes().length);
-        break;
       }
     }
   }
@@ -1115,31 +1116,32 @@ public abstract class TestOzoneRpcClientAbstract {
         (KeyValueContainerData) container.getContainerData();
     String containerPath =
         new File(containerData.getMetadataPath()).getParent();
-    KeyValueBlockIterator keyValueBlockIterator =
-        new KeyValueBlockIterator(containerID, new File(containerPath));
-
-    // Find the block corresponding to the key we put. We use the localID of
-    // the BlockData to identify out key.
-    BlockData blockData = null;
-    while (keyValueBlockIterator.hasNext()) {
-      blockData = keyValueBlockIterator.nextBlock();
-      if (blockData.getBlockID().getLocalID() == localID) {
-        break;
+    try (KeyValueBlockIterator keyValueBlockIterator =
+        new KeyValueBlockIterator(containerID, new File(containerPath))) {
+
+      // Find the block corresponding to the key we put. We use the localID of
+      // the BlockData to identify out key.
+      BlockData blockData = null;
+      while (keyValueBlockIterator.hasNext()) {
+        blockData = keyValueBlockIterator.nextBlock();
+        if (blockData.getBlockID().getLocalID() == localID) {
+          break;
+        }
       }
+      Assert.assertNotNull("Block not found", blockData);
+
+      // Get the location of the chunk file
+      String chunkName = blockData.getChunks().get(0).getChunkName();
+      String containreBaseDir =
+          container.getContainerData().getVolume().getHddsRootDir().getPath();
+      File chunksLocationPath = KeyValueContainerLocationUtil
+          .getChunksLocationPath(containreBaseDir, scmId, containerID);
+      File chunkFile = new File(chunksLocationPath, chunkName);
+
+      // Corrupt the contents of the chunk file
+      String newData = new String("corrupted data");
+      FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes());
     }
-    Assert.assertNotNull("Block not found", blockData);
-
-    // Get the location of the chunk file
-    String chunkName = blockData.getChunks().get(0).getChunkName();
-    String containreBaseDir =
-        container.getContainerData().getVolume().getHddsRootDir().getPath();
-    File chunksLocationPath = KeyValueContainerLocationUtil
-        .getChunksLocationPath(containreBaseDir, scmId, containerID);
-    File chunkFile = new File(chunksLocationPath, chunkName);
-
-    // Corrupt the contents of the chunk file
-    String newData = new String("corrupted data");
-    FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes());
   }
 
   @Test
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 27fe4ff..f7d525d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.apache.hadoop.utils.BackgroundService;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
@@ -119,35 +120,36 @@ public class TestBlockDeletingService {
       containerSet.addContainer(container);
       data = (KeyValueContainerData) containerSet.getContainer(
           containerID).getContainerData();
-      MetadataStore metadata = BlockUtils.getDB(data, conf);
-      for (int j = 0; j<numOfBlocksPerContainer; j++) {
-        BlockID blockID =
-            ContainerTestHelper.getTestBlockID(containerID);
-        String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
-            blockID.getLocalID();
-        BlockData kd = new BlockData(blockID);
-        List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
-        for (int k = 0; k<numOfChunksPerBlock; k++) {
-          // offset doesn't matter here
-          String chunkName = blockID.getLocalID() + "_chunk_" + k;
-          File chunk = new File(data.getChunksPath(), chunkName);
-          FileUtils.writeStringToFile(chunk, "a chunk",
-              Charset.defaultCharset());
-          LOG.info("Creating file {}", chunk.getAbsolutePath());
-          // make sure file exists
-          Assert.assertTrue(chunk.isFile() && chunk.exists());
-          ContainerProtos.ChunkInfo info =
-              ContainerProtos.ChunkInfo.newBuilder()
-                  .setChunkName(chunk.getAbsolutePath())
-                  .setLen(0)
-                  .setOffset(0)
-                  .setChecksumData(Checksum.getNoChecksumDataProto())
-                  .build();
-          chunks.add(info);
+      try(ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+        for (int j = 0; j < numOfBlocksPerContainer; j++) {
+          BlockID blockID =
+              ContainerTestHelper.getTestBlockID(containerID);
+          String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
+              blockID.getLocalID();
+          BlockData kd = new BlockData(blockID);
+          List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+          for (int k = 0; k < numOfChunksPerBlock; k++) {
+            // offset doesn't matter here
+            String chunkName = blockID.getLocalID() + "_chunk_" + k;
+            File chunk = new File(data.getChunksPath(), chunkName);
+            FileUtils.writeStringToFile(chunk, "a chunk",
+                Charset.defaultCharset());
+            LOG.info("Creating file {}", chunk.getAbsolutePath());
+            // make sure file exists
+            Assert.assertTrue(chunk.isFile() && chunk.exists());
+            ContainerProtos.ChunkInfo info =
+                ContainerProtos.ChunkInfo.newBuilder()
+                    .setChunkName(chunk.getAbsolutePath())
+                    .setLen(0)
+                    .setOffset(0)
+                    .setChecksumData(Checksum.getNoChecksumDataProto())
+                    .build();
+            chunks.add(info);
+          }
+          kd.setChunks(chunks);
+          metadata.getStore().put(DFSUtil.string2Bytes(deleteStateName),
+              kd.getProtoBufMessage().toByteArray());
         }
-        kd.setChunks(chunks);
-        metadata.put(DFSUtil.string2Bytes(deleteStateName),
-            kd.getProtoBufMessage().toByteArray());
       }
     }
   }
@@ -166,17 +168,19 @@ public class TestBlockDeletingService {
    * Get under deletion blocks count from DB,
    * note this info is parsed from container.db.
    */
-  private int getUnderDeletionBlocksCount(MetadataStore meta)
+  private int getUnderDeletionBlocksCount(ReferenceCountedDB meta)
       throws IOException {
     List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
-        meta.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter()
+        meta.getStore().getRangeKVs(null, 100,
+            new MetadataKeyFilters.KeyPrefixFilter()
             .addFilter(OzoneConsts.DELETING_KEY_PREFIX));
     return underDeletionBlocks.size();
   }
 
-  private int getDeletedBlocksCount(MetadataStore db) throws IOException {
+  private int getDeletedBlocksCount(ReferenceCountedDB db) throws IOException {
     List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
-        db.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter()
+        db.getStore().getRangeKVs(null, 100,
+            new MetadataKeyFilters.KeyPrefixFilter()
             .addFilter(OzoneConsts.DELETED_KEY_PREFIX));
     return underDeletionBlocks.size();
   }
@@ -202,37 +206,38 @@ public class TestBlockDeletingService {
     containerSet.listContainer(0L, 1, containerData);
     Assert.assertEquals(1, containerData.size());
 
-    MetadataStore meta = BlockUtils.getDB(
-        (KeyValueContainerData) containerData.get(0), conf);
-    Map<Long, Container> containerMap = containerSet.getContainerMapCopy();
-    // NOTE: this test assumes that all the container is KetValueContainer and
-    // have DeleteTransactionId in KetValueContainerData. If other
-    // types is going to be added, this test should be checked.
-    long transactionId = ((KeyValueContainerData)containerMap
-        .get(containerData.get(0).getContainerID()).getContainerData())
-        .getDeleteTransactionId();
-
-
-    // Number of deleted blocks in container should be equal to 0 before
-    // block delete
-    Assert.assertEquals(0, transactionId);
-
-    // Ensure there are 3 blocks under deletion and 0 deleted blocks
-    Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
-    Assert.assertEquals(0, getDeletedBlocksCount(meta));
-
-    // An interval will delete 1 * 2 blocks
-    deleteAndWait(svc, 1);
-    Assert.assertEquals(1, getUnderDeletionBlocksCount(meta));
-    Assert.assertEquals(2, getDeletedBlocksCount(meta));
-
-    deleteAndWait(svc, 2);
-    Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
-    Assert.assertEquals(3, getDeletedBlocksCount(meta));
-
-    deleteAndWait(svc, 3);
-    Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
-    Assert.assertEquals(3, getDeletedBlocksCount(meta));
+    try(ReferenceCountedDB meta = BlockUtils.getDB(
+        (KeyValueContainerData) containerData.get(0), conf)) {
+      Map<Long, Container> containerMap = containerSet.getContainerMapCopy();
+      // NOTE: this test assumes that all the container is KetValueContainer 
and
+      // have DeleteTransactionId in KetValueContainerData. If other
+      // types is going to be added, this test should be checked.
+      long transactionId = ((KeyValueContainerData) containerMap
+          .get(containerData.get(0).getContainerID()).getContainerData())
+          .getDeleteTransactionId();
+
+
+      // Number of deleted blocks in container should be equal to 0 before
+      // block delete
+      Assert.assertEquals(0, transactionId);
+
+      // Ensure there are 3 blocks under deletion and 0 deleted blocks
+      Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
+      Assert.assertEquals(0, getDeletedBlocksCount(meta));
+
+      // An interval will delete 1 * 2 blocks
+      deleteAndWait(svc, 1);
+      Assert.assertEquals(1, getUnderDeletionBlocksCount(meta));
+      Assert.assertEquals(2, getDeletedBlocksCount(meta));
+
+      deleteAndWait(svc, 2);
+      Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
+      Assert.assertEquals(3, getDeletedBlocksCount(meta));
+
+      deleteAndWait(svc, 3);
+      Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
+      Assert.assertEquals(3, getDeletedBlocksCount(meta));
+    }
 
     svc.shutdown();
   }
@@ -311,25 +316,26 @@ public class TestBlockDeletingService {
     // get container meta data
     List<ContainerData> containerData = Lists.newArrayList();
     containerSet.listContainer(0L, 1, containerData);
-    MetadataStore meta = BlockUtils.getDB(
-        (KeyValueContainerData) containerData.get(0), conf);
+    try(ReferenceCountedDB meta = BlockUtils.getDB(
+        (KeyValueContainerData) containerData.get(0), conf)) {
 
-    LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
-    GenericTestUtils.waitFor(() -> {
-      try {
-        if (getUnderDeletionBlocksCount(meta) == 0) {
-          return true;
+      LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          if (getUnderDeletionBlocksCount(meta) == 0) {
+            return true;
+          }
+        } catch (IOException ignored) {
         }
-      } catch (IOException ignored) {
-      }
-      return false;
-    }, 1000, 100000);
-    newLog.stopCapturing();
+        return false;
+      }, 1000, 100000);
+      newLog.stopCapturing();
 
-    // The block deleting successfully and shouldn't catch timed
-    // out warning log.
-    Assert.assertTrue(!newLog.getOutput().contains(
-        "Background task executes timed out, retrying in next interval"));
+      // The block deleting successfully and shouldn't catch timed
+      // out warning log.
+      Assert.assertTrue(!newLog.getOutput().contains(
+          "Background task executes timed out, retrying in next interval"));
+    }
     svc.shutdown();
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 838dd9e..1c36e8e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -47,7 +47,7 @@ import 
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -191,7 +191,7 @@ public class TestContainerPersistence {
     Path meta = kvData.getDbFile().toPath().getParent();
     Assert.assertTrue(meta != null && Files.exists(meta));
 
-    MetadataStore store = null;
+    ReferenceCountedDB store = null;
     try {
       store = BlockUtils.getDB(kvData, conf);
       Assert.assertNotNull(store);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 14db90d..8e41372 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -50,7 +50,7 @@ import org.apache.hadoop.ozone.ozShell.TestOzoneShell;
 import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -300,9 +300,11 @@ public class TestBlockDeletion {
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer().getContainerSet();
     OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
-      MetadataStore db = BlockUtils.getDB((KeyValueContainerData) 
dnContainerSet
-          .getContainer(blockID.getContainerID()).getContainerData(), conf);
-      Assert.assertNotNull(db.get(Longs.toByteArray(blockID.getLocalID())));
+      try(ReferenceCountedDB db =
+          BlockUtils.getDB((KeyValueContainerData) dnContainerSet
+          .getContainer(blockID.getContainerID()).getContainerData(), conf)) {
+        
Assert.assertNotNull(db.getStore().get(Longs.toByteArray(blockID.getLocalID())));
+      }
     }, omKeyLocationInfoGroups);
   }
 
@@ -312,13 +314,15 @@ public class TestBlockDeletion {
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer().getContainerSet();
     OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
-      MetadataStore db = BlockUtils.getDB((KeyValueContainerData) 
dnContainerSet
-          .getContainer(blockID.getContainerID()).getContainerData(), conf);
-      Assert.assertNull(db.get(Longs.toByteArray(blockID.getLocalID())));
-      Assert.assertNull(db.get(DFSUtil.string2Bytes(
-          OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID())));
-      Assert.assertNotNull(DFSUtil
-          .string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + 
blockID.getLocalID()));
+      try(ReferenceCountedDB db =
+          BlockUtils.getDB((KeyValueContainerData) dnContainerSet
+          .getContainer(blockID.getContainerID()).getContainerData(), conf)) {
+        
Assert.assertNull(db.getStore().get(Longs.toByteArray(blockID.getLocalID())));
+        Assert.assertNull(db.getStore().get(DFSUtil.string2Bytes(
+            OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID())));
+        Assert.assertNotNull(DFSUtil
+            .string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + 
blockID.getLocalID()));
+      }
       containerIdsWithDeletedBlocks.add(blockID.getContainerID());
     }, omKeyLocationInfoGroups);
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index 4a86f44..be8c84e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.utils.MetadataStore;
+import 
org.apache.hadoop.ozone.container.common.utils.ContainerCache.ReferenceCountedDB;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -226,7 +226,7 @@ public class TestCloseContainerByPipeline {
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(3, datanodes.size());
 
-    List<MetadataStore> metadataStores = new ArrayList<>(datanodes.size());
+    List<ReferenceCountedDB> metadataStores = new 
ArrayList<>(datanodes.size());
     for (DatanodeDetails details : datanodes) {
       Assert.assertFalse(isContainerClosed(cluster, containerID, details));
       //send the order to close the container
@@ -237,8 +237,10 @@ public class TestCloseContainerByPipeline {
       Container dnContainer = cluster.getHddsDatanodes().get(index)
           .getDatanodeStateMachine().getContainer().getContainerSet()
           .getContainer(containerID);
-      metadataStores.add(BlockUtils.getDB((KeyValueContainerData) dnContainer
-          .getContainerData(), conf));
+      try(ReferenceCountedDB store = BlockUtils.getDB((KeyValueContainerData) 
dnContainer
+          .getContainerData(), conf)) {
+        metadataStores.add(store);
+      }
     }
 
     // There should be as many rocks db as the number of datanodes in pipeline.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to