http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff036e49/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 21d2411..151fddf 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -19,77 +19,178 @@ package org.apache.hadoop.ozone.om;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
-
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.apache.hadoop.utils.db.DBStore;
+import org.apache.hadoop.utils.db.DBStoreBuilder;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.utils.db.TableIterator;
+import org.eclipse.jetty.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
-import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR;
-import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 
 /**
  * Ozone metadata manager interface.
  */
 public class OmMetadataManagerImpl implements OMMetadataManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OmMetadataManagerImpl.class);
+
+  /**
+   * OM RocksDB Structure .
+   * <p>
+   * OM DB stores metadata as KV pairs in different column families.
+   * <p>
+   * OM DB Schema:
+   * |-------------------------------------------------------------------|
+   * |  Column Family     |        VALUE                                 |
+   * |-------------------------------------------------------------------|
+   * | userTable          |     user->VolumeList                         |
+   * |-------------------------------------------------------------------|
+   * | volumeTable        |     /volume->VolumeInfo                      |
+   * |-------------------------------------------------------------------|
+   * | bucketTable        |     /volume/bucket-> BucketInfo              |
+   * |-------------------------------------------------------------------|
+   * | keyTable           | /volumeName/bucketName/keyName->KeyInfo      |
+   * |-------------------------------------------------------------------|
+   * | deletedTable       | /volumeName/bucketName/keyName->KeyInfo      |
+   * |-------------------------------------------------------------------|
+   * | openKey            | /volumeName/bucketName/keyName/id->KeyInfo   |
+   * |-------------------------------------------------------------------|
+   */
+
+  private static final String USER_TABLE = "userTable";
+  private static final String VOLUME_TABLE = "volumeTable";
+  private static final String BUCKET_TABLE = "bucketTable";
+  private static final String KEY_TABLE = "keyTable";
+  private static final String DELETED_TABLE = "deletedTable";
+  private static final String OPEN_KEY_TABLE = "openKeyTable";
 
-  private final MetadataStore store;
+  private final DBStore store;
+
+  // TODO: Make this lock move into Table instead of *ONE* lock for the whole
+  // DB.
   private final ReadWriteLock lock;
   private final long openKeyExpireThresholdMS;
 
+  private final Table userTable;
+  private final Table volumeTable;
+  private final Table bucketTable;
+  private final Table keyTable;
+  private final Table deletedTable;
+  private final Table openKeyTable;
+
   public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
     File metaDir = getOzoneMetaDirPath(conf);
-    final int cacheSize = conf.getInt(OZONE_OM_DB_CACHE_SIZE_MB,
-        OZONE_OM_DB_CACHE_SIZE_DEFAULT);
-    File omDBFile = new File(metaDir.getPath(), OM_DB_NAME);
-    this.store = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setDbFile(omDBFile)
-        .setCacheSize(cacheSize * OzoneConsts.MB)
-        .build();
     this.lock = new ReentrantReadWriteLock();
     this.openKeyExpireThresholdMS = 1000 * conf.getInt(
         OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
         OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
+
+    this.store = DBStoreBuilder.newBuilder(conf)
+        .setName(OM_DB_NAME)
+        .setPath(Paths.get(metaDir.getPath()))
+        .addTable(USER_TABLE)
+        .addTable(VOLUME_TABLE)
+        .addTable(BUCKET_TABLE)
+        .addTable(KEY_TABLE)
+        .addTable(DELETED_TABLE)
+        .addTable(OPEN_KEY_TABLE)
+        .build();
+
+    userTable = this.store.getTable(USER_TABLE);
+    checkTableStatus(userTable, USER_TABLE);
+
+    volumeTable = this.store.getTable(VOLUME_TABLE);
+    checkTableStatus(volumeTable, VOLUME_TABLE);
+
+    bucketTable = this.store.getTable(BUCKET_TABLE);
+    checkTableStatus(bucketTable, BUCKET_TABLE);
+
+    keyTable = this.store.getTable(KEY_TABLE);
+    checkTableStatus(keyTable, KEY_TABLE);
+
+    deletedTable = this.store.getTable(DELETED_TABLE);
+    checkTableStatus(deletedTable, DELETED_TABLE);
+
+    openKeyTable = this.store.getTable(OPEN_KEY_TABLE);
+    checkTableStatus(openKeyTable, OPEN_KEY_TABLE);
+
+  }
+
+  @Override
+  public Table getUserTable() {
+    return userTable;
+  }
+
+  @Override
+  public Table getVolumeTable() {
+    return volumeTable;
+  }
+
+  @Override
+  public Table getBucketTable() {
+    return bucketTable;
+  }
+
+  @Override
+  public Table getKeyTable() {
+    return keyTable;
+  }
+
+  @Override
+  public Table getDeletedTable() {
+    return deletedTable;
+  }
+
+  @Override
+  public Table getOpenKeyTable() {
+    return openKeyTable;
+  }
+
+  private void checkTableStatus(Table table, String name) throws IOException {
+    String logMessage = "Unable to get a reference to %s table. Cannot " +
+        "continue.";
+    String errMsg = "Inconsistent DB state, Table - %s. Please check the logs" 
+
+        "for more info.";
+    if (table == null) {
+      LOG.error(String.format(logMessage, name));
+      throw new IOException(String.format(errMsg, name));
+    }
   }
 
   /**
@@ -104,7 +205,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
    * Stop metadata manager.
    */
   @Override
-  public void stop() throws IOException {
+  public void stop() throws Exception {
     if (store != null) {
       store.close();
     }
@@ -112,86 +213,75 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
 
   /**
    * Get metadata store.
+   *
    * @return store - metadata store.
    */
   @VisibleForTesting
   @Override
-  public MetadataStore getStore() {
+  public DBStore getStore() {
     return store;
   }
 
   /**
    * Given a volume return the corresponding DB key.
+   *
    * @param volume - Volume name
    */
+  @Override
   public byte[] getVolumeKey(String volume) {
-    String dbVolumeName = OzoneConsts.OM_VOLUME_PREFIX + volume;
-    return DFSUtil.string2Bytes(dbVolumeName);
+    return DFSUtil.string2Bytes(OzoneConsts.OM_KEY_PREFIX + volume);
   }
 
   /**
    * Given a user return the corresponding DB key.
+   *
    * @param user - User name
    */
+  @Override
   public byte[] getUserKey(String user) {
-    String dbUserName = OzoneConsts.OM_USER_PREFIX + user;
-    return DFSUtil.string2Bytes(dbUserName);
+    return DFSUtil.string2Bytes(user);
   }
 
   /**
    * Given a volume and bucket, return the corresponding DB key.
+   *
    * @param volume - User name
    * @param bucket - Bucket name
    */
+  @Override
   public byte[] getBucketKey(String volume, String bucket) {
-    String bucketKeyString = OzoneConsts.OM_VOLUME_PREFIX + volume
-        + OzoneConsts.OM_BUCKET_PREFIX + bucket;
-    return DFSUtil.string2Bytes(bucketKeyString);
-  }
+    StringBuilder builder =
+        new StringBuilder().append(OM_KEY_PREFIX).append(volume);
 
-  /**
-   * @param volume
-   * @param bucket
-   * @return
-   */
-  private String getBucketWithDBPrefix(String volume, String bucket) {
-    StringBuffer sb = new StringBuffer();
-    sb.append(OzoneConsts.OM_VOLUME_PREFIX)
-        .append(volume)
-        .append(OzoneConsts.OM_BUCKET_PREFIX);
-    if (!Strings.isNullOrEmpty(bucket)) {
-      sb.append(bucket);
+    if (StringUtils.isNotBlank(bucket)) {
+      builder.append(OM_KEY_PREFIX).append(bucket);
     }
-    return sb.toString();
-  }
-
-  @Override
-  public String getKeyWithDBPrefix(String volume, String bucket, String key) {
-    String keyVB = OzoneConsts.OM_KEY_PREFIX + volume
-        + OzoneConsts.OM_KEY_PREFIX + bucket
-        + OzoneConsts.OM_KEY_PREFIX;
-    return Strings.isNullOrEmpty(key) ? keyVB : keyVB + key;
-  }
-
-  @Override
-  public byte[] getDBKeyBytes(String volume, String bucket, String key) {
-    return DFSUtil.string2Bytes(getKeyWithDBPrefix(volume, bucket, key));
+    return DFSUtil.string2Bytes(builder.toString());
   }
 
   @Override
-  public byte[] getDeletedKeyName(byte[] keyName) {
-    return DFSUtil.string2Bytes(
-        DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
+  public byte[] getOzoneKeyBytes(String volume, String bucket, String key) {
+    StringBuilder builder = new StringBuilder()
+        .append(OM_KEY_PREFIX).append(volume);
+    // TODO : Throw if the Bucket is null?
+    builder.append(OM_KEY_PREFIX).append(bucket);
+    if (StringUtil.isNotBlank(key)) {
+      builder.append(OM_KEY_PREFIX).append(key);
+    }
+    return DFSUtil.string2Bytes(builder.toString());
   }
 
   @Override
-  public byte[] getOpenKeyNameBytes(String keyName, int id) {
-    return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id +
-        OPEN_KEY_ID_DELIMINATOR + keyName);
+  public byte[] getOpenKeyBytes(String volume, String bucket,
+                                    String key, long id) {
+    String openKey = OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket +
+        OM_KEY_PREFIX + key + OM_KEY_PREFIX + id;
+    return DFSUtil.string2Bytes(openKey);
   }
 
   /**
    * Returns the read lock used on Metadata DB.
+   *
    * @return readLock
    */
   @Override
@@ -201,6 +291,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
 
   /**
    * Returns the write lock used on Metadata DB.
+   *
    * @return writeLock
    */
   @Override
@@ -209,71 +300,79 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
   }
 
   /**
-   * Returns the value associated with this key.
-   * @param key - key
-   * @return value
+   * Returns true if the firstArray startsWith the bytes of secondArray.
+   *
+   * @param firstArray - Byte array
+   * @param secondArray - Byte array
+   * @return true if the first array bytes match the bytes in the second array.
    */
-  @Override
-  public byte[] get(byte[] key) throws IOException {
-    return store.get(key);
-  }
+  private boolean startsWith(byte[] firstArray, byte[] secondArray) {
 
-  /**
-   * Puts a Key into Metadata DB.
-   * @param key   - key
-   * @param value - value
-   */
-  @Override
-  public void put(byte[] key, byte[] value) throws IOException {
-    store.put(key, value);
-  }
+    if (firstArray == null) {
+      // if both are null, then the arrays match, else if first is null and
+      // second is not, then this function returns false.
+      return secondArray == null;
+    }
 
-  /**
-   * Deletes a Key from Metadata DB.
-   * @param key   - key
-   */
-  public void delete(byte[] key) throws IOException {
-    store.delete(key);
-  }
 
-  @Override
-  public void writeBatch(BatchOperation batch) throws IOException {
-    this.store.writeBatch(batch);
+    if (secondArray != null) {
+      // If the second array is longer then first array cannot be starting with
+      // the bytes of second array.
+      if (secondArray.length > firstArray.length) {
+        return false;
+      }
+
+      for (int ndx = 0; ndx < secondArray.length; ndx++) {
+        if (firstArray[ndx] != secondArray[ndx]) {
+          return false;
+        }
+      }
+      return true; //match, return true.
+    }
+    return false; // if first is not null and second is null, we define that
+    // array does not start with same chars.
   }
 
   /**
    * Given a volume, check if it is empty, i.e there are no buckets inside it.
+   * We iterate in the bucket table and see if there is any key that starts 
with
+   * the volume prefix. We actually look for /volume/, since if we don't have
+   * the trailing slash it is possible that we might match some other volume.
+   * <p>
+   * For example, vol1 and vol122 might match, to avoid that we look for /vol1/
+   *
    * @param volume - Volume name
    * @return true if the volume is empty
    */
+  @Override
   public boolean isVolumeEmpty(String volume) throws IOException {
-    String dbVolumeRootName = OzoneConsts.OM_VOLUME_PREFIX + volume
-        + OzoneConsts.OM_BUCKET_PREFIX;
-    byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
-    ImmutablePair<byte[], byte[]> volumeRoot =
-        store.peekAround(0, dbVolumeRootKey);
-    if (volumeRoot != null) {
-      return !DFSUtil.bytes2String(volumeRoot.getKey())
-          .startsWith(dbVolumeRootName);
+    byte[] volumePrefix = getVolumeKey(volume + OM_KEY_PREFIX);
+    try (TableIterator<Table.KeyValue> bucketIter = bucketTable.iterator()) {
+      Table.KeyValue kv = bucketIter.seek(volumePrefix);
+      if (kv != null && startsWith(kv.getKey(), volumePrefix)) {
+        return false; // we found at least one bucket with this volume prefix.
+      }
     }
     return true;
   }
 
   /**
-   * Given a volume/bucket, check if it is empty,
-   * i.e there are no keys inside it.
+   * Given a volume/bucket, check if it is empty, i.e there are no keys inside
+   * it. Prefix is /volume/bucket/, and we lookup the keyTable.
+   *
    * @param volume - Volume name
    * @param bucket - Bucket name
    * @return true if the bucket is empty
    */
+  @Override
   public boolean isBucketEmpty(String volume, String bucket)
       throws IOException {
-    String keyRootName = getKeyWithDBPrefix(volume, bucket, null);
-    byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
-    ImmutablePair<byte[], byte[]> firstKey = store.peekAround(0, keyRoot);
-    if (firstKey != null) {
-      return !DFSUtil.bytes2String(firstKey.getKey())
-          .startsWith(keyRootName);
+    byte[] keyPrefix = getBucketKey(volume, bucket + OM_KEY_PREFIX);
+    try (TableIterator<Table.KeyValue> keyIter = keyTable.iterator()) {
+      Table.KeyValue kv = keyIter.seek(keyPrefix);
+      if (kv != null && startsWith(kv.getKey(), keyPrefix)) {
+        return false; // we found at least one key with this vol/bucket prefix.
+      }
     }
     return true;
   }
@@ -283,8 +382,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
    */
   @Override
   public List<OmBucketInfo> listBuckets(final String volumeName,
-                                        final String startBucket, final String 
bucketPrefix,
-                                        final int maxNumOfBuckets) throws 
IOException {
+      final String startBucket, final String bucketPrefix,
+      final int maxNumOfBuckets) throws IOException {
     List<OmBucketInfo> result = new ArrayList<>();
     if (Strings.isNullOrEmpty(volumeName)) {
       throw new OMException("Volume name is required.",
@@ -292,49 +391,61 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
     }
 
     byte[] volumeNameBytes = getVolumeKey(volumeName);
-    if (store.get(volumeNameBytes) == null) {
+    if (volumeTable.get(volumeNameBytes) == null) {
       throw new OMException("Volume " + volumeName + " not found.",
           ResultCodes.FAILED_VOLUME_NOT_FOUND);
     }
 
 
-    // A bucket starts with /#volume/#bucket_prefix
-    MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
-      if (currentKey != null) {
-        String bucketNamePrefix =
-                getBucketWithDBPrefix(volumeName, bucketPrefix);
-        String bucket = DFSUtil.bytes2String(currentKey);
-        return bucket.startsWith(bucketNamePrefix);
-      }
-      return false;
-    };
-
-    List<Map.Entry<byte[], byte[]>> rangeResult;
-    if (!Strings.isNullOrEmpty(startBucket)) {
-      // Since we are excluding start key from the result,
-      // the maxNumOfBuckets is incremented.
-      rangeResult = store.getSequentialRangeKVs(
-          getBucketKey(volumeName, startBucket),
-          maxNumOfBuckets + 1, filter);
-      if (!rangeResult.isEmpty()) {
-        //Remove start key from result.
-        rangeResult.remove(0);
-      }
+    byte[] startKey;
+    boolean skipStartKey = false;
+    if (StringUtil.isNotBlank(startBucket)) {
+      // if the user has specified a start key, we need to seek to that key
+      // and avoid that key in the response set.
+      startKey = getBucketKey(volumeName, startBucket);
+      skipStartKey = true;
     } else {
-      rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter);
+      // If the user has specified a prefix key, we need to get to the first
+      // of the keys with the prefix match. We can leverage RocksDB to do that.
+      // However, if the user has specified only a prefix, we cannot skip
+      // the first prefix key we see, the boolean skipStartKey allows us to
+      // skip the startkey or not depending on what patterns are specified.
+      startKey = getBucketKey(volumeName, bucketPrefix);
     }
 
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      OmBucketInfo info = OmBucketInfo.getFromProtobuf(
-          BucketInfo.parseFrom(entry.getValue()));
-      result.add(info);
+    byte[] seekPrefix;
+    if (StringUtil.isNotBlank(bucketPrefix)) {
+      seekPrefix = getBucketKey(volumeName, bucketPrefix);
+    } else {
+      seekPrefix = getVolumeKey(volumeName + OM_KEY_PREFIX);
+    }
+    int currentCount = 0;
+    try (TableIterator<Table.KeyValue> bucketIter = bucketTable.iterator()) {
+      Table.KeyValue kv = bucketIter.seek(startKey);
+      while (currentCount < maxNumOfBuckets && bucketIter.hasNext()) {
+        kv = bucketIter.next();
+        // Skip the Start Bucket if needed.
+        if (kv != null && skipStartKey &&
+            Arrays.equals(kv.getKey(), startKey)) {
+          continue;
+        }
+        if (kv != null && startsWith(kv.getKey(), seekPrefix)) {
+          result.add(OmBucketInfo.getFromProtobuf(
+              BucketInfo.parseFrom(kv.getValue())));
+          currentCount++;
+        } else {
+          // The SeekPrefix does not match any more, we can break out of the
+          // loop.
+          break;
+        }
+      }
     }
     return result;
   }
 
   @Override
   public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
-                                  String startKey, String keyPrefix, int 
maxKeys) throws IOException {
+      String startKey, String keyPrefix, int maxKeys) throws IOException {
     List<OmKeyInfo> result = new ArrayList<>();
     if (Strings.isNullOrEmpty(volumeName)) {
       throw new OMException("Volume name is required.",
@@ -347,47 +458,61 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
     }
 
     byte[] bucketNameBytes = getBucketKey(volumeName, bucketName);
-    if (store.get(bucketNameBytes) == null) {
+    if (getBucketTable().get(bucketNameBytes) == null) {
       throw new OMException("Bucket " + bucketName + " not found.",
           ResultCodes.FAILED_BUCKET_NOT_FOUND);
     }
 
-    MetadataKeyFilter filter = new KeyPrefixFilter()
-        .addFilter(getKeyWithDBPrefix(volumeName, bucketName, keyPrefix));
-
-    List<Map.Entry<byte[], byte[]>> rangeResult;
-    if (!Strings.isNullOrEmpty(startKey)) {
-      //Since we are excluding start key from the result,
-      // the maxNumOfBuckets is incremented.
-      rangeResult = store.getSequentialRangeKVs(
-          getDBKeyBytes(volumeName, bucketName, startKey),
-          maxKeys + 1, filter);
-      if (!rangeResult.isEmpty()) {
-        //Remove start key from result.
-        rangeResult.remove(0);
-      }
+    byte[] seekKey;
+    boolean skipStartKey = false;
+    if (StringUtil.isNotBlank(startKey)) {
+      // Seek to the specified key.
+      seekKey = getOzoneKeyBytes(volumeName, bucketName, startKey);
+      skipStartKey = true;
     } else {
-      rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter);
+      // This allows us to seek directly to the first key with the right 
prefix.
+      seekKey = getOzoneKeyBytes(volumeName, bucketName, keyPrefix);
     }
 
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      OmKeyInfo info = OmKeyInfo.getFromProtobuf(
-          KeyInfo.parseFrom(entry.getValue()));
-      result.add(info);
+    byte[] seekPrefix;
+    if (StringUtil.isNotBlank(keyPrefix)) {
+      seekPrefix = getOzoneKeyBytes(volumeName, bucketName, keyPrefix);
+    } else {
+      seekPrefix = getBucketKey(volumeName, bucketName + OM_KEY_PREFIX);
+    }
+    int currentCount = 0;
+    try (TableIterator<Table.KeyValue> keyIter = getKeyTable().iterator()) {
+      Table.KeyValue kv = keyIter.seek(seekKey);
+      while (currentCount < maxKeys && keyIter.hasNext()) {
+        kv = keyIter.next();
+        // Skip the Start key if needed.
+        if (kv != null && skipStartKey && Arrays.equals(kv.getKey(), seekKey)) 
{
+          continue;
+        }
+        if (kv != null && startsWith(kv.getKey(), seekPrefix)) {
+          result.add(OmKeyInfo.getFromProtobuf(
+              KeyInfo.parseFrom(kv.getValue())));
+          currentCount++;
+        } else {
+          // The SeekPrefix does not match any more, we can break out of the
+          // loop.
+          break;
+        }
+      }
     }
     return result;
   }
 
   @Override
   public List<OmVolumeArgs> listVolumes(String userName,
-                                        String prefix, String startKey, int 
maxKeys) throws IOException {
+      String prefix, String startKey, int maxKeys) throws IOException {
     List<OmVolumeArgs> result = Lists.newArrayList();
     VolumeList volumes;
-    if (Strings.isNullOrEmpty(userName)) {
-      volumes = getAllVolumes();
-    } else {
-      volumes = getVolumesByUser(userName);
+    if (StringUtil.isBlank(userName)) {
+      throw new OMException("User name is required to list Volumes.",
+          ResultCodes.FAILED_USER_NOT_FOUND);
     }
+    volumes = getVolumesByUser(userName);
 
     if (volumes == null || volumes.getVolumeNamesCount() == 0) {
       return result;
@@ -406,7 +531,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
         continue;
       }
       if (startKeyFound && result.size() < maxKeys) {
-        byte[] volumeInfo = store.get(this.getVolumeKey(volumeName));
+        byte[] volumeInfo = 
getVolumeTable().get(this.getVolumeKey(volumeName));
         if (volumeInfo == null) {
           // Could not get volume info by given volume name,
           // since the volume name is loaded from db,
@@ -433,7 +558,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
       throws OMException {
     VolumeList volumes = null;
     try {
-      byte[] volumesInBytes = store.get(userNameKey);
+      byte[] volumesInBytes = getUserTable().get(userNameKey);
       if (volumesInBytes == null) {
         // No volume found for this user, return an empty list
         return VolumeList.newBuilder().build();
@@ -447,32 +572,12 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
     return volumes;
   }
 
-  private VolumeList getAllVolumes() throws IOException {
-    // Scan all users in database
-    KeyPrefixFilter filter =
-        new KeyPrefixFilter().addFilter(OzoneConsts.OM_USER_PREFIX);
-    // We are not expecting a huge number of users per cluster,
-    // it should be fine to scan all users in db and return us a
-    // list of volume names in string per user.
-    List<Map.Entry<byte[], byte[]>> rangeKVs = store
-        .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter);
-
-    VolumeList.Builder builder = VolumeList.newBuilder();
-    for (Map.Entry<byte[], byte[]> entry : rangeKVs) {
-      VolumeList volumes = this.getVolumesByUser(entry.getKey());
-      builder.addAllVolumeNames(volumes.getVolumeNamesList());
-    }
-
-    return builder.build();
-  }
-
   @Override
   public List<BlockGroup> getPendingDeletionKeys(final int count)
       throws IOException {
     List<BlockGroup> keyBlocksList = Lists.newArrayList();
-    List<Map.Entry<byte[], byte[]>> rangeResult =
-        store.getRangeKVs(null, count,
-            MetadataKeyFilters.getDeletingKeyFilter());
+    // TODO: Fix this later, Not part of this patch.
+    List<Map.Entry<byte[], byte[]>> rangeResult = Collections.emptyList();
     for (Map.Entry<byte[], byte[]> entry : rangeResult) {
       OmKeyInfo info =
           OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
@@ -482,7 +587,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
         return Collections.emptyList();
       }
       List<BlockID> item = latest.getLocationList().stream()
-          .map(b->new BlockID(b.getContainerID(), b.getLocalID()))
+          .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
           .collect(Collectors.toList());
       BlockGroup keyBlocks = BlockGroup.newBuilder()
           .setKeyName(DFSUtil.bytes2String(entry.getKey()))
@@ -497,11 +602,9 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
   public List<BlockGroup> getExpiredOpenKeys() throws IOException {
     List<BlockGroup> keyBlocksList = Lists.newArrayList();
     long now = Time.now();
-    final MetadataKeyFilter openKeyFilter =
-        new KeyPrefixFilter().addFilter(OPEN_KEY_PREFIX);
-    List<Map.Entry<byte[], byte[]>> rangeResult =
-        store.getSequentialRangeKVs(null, Integer.MAX_VALUE,
-            openKeyFilter);
+    // TODO: Fix the getExpiredOpenKeys, Not part of this patch.
+    List<Map.Entry<byte[], byte[]>> rangeResult = Collections.emptyList();
+
     for (Map.Entry<byte[], byte[]> entry : rangeResult) {
       OmKeyInfo info =
           OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
@@ -513,7 +616,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
       // Get block keys as a list.
       List<BlockID> item = info.getLatestVersionLocations()
           .getBlocksLatestVersionOnly().stream()
-          .map(b->new BlockID(b.getContainerID(), b.getLocalID()))
+          .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
           .collect(Collectors.toList());
       BlockGroup keyBlocks = BlockGroup.newBuilder()
           .setKeyName(DFSUtil.bytes2String(entry.getKey()))

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff036e49/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 71fa921..c06508d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -21,14 +21,27 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import 
org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -39,36 +52,12 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
-import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .ServicePort;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
 import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB
-    .ScmBlockLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.protocolPB
-    .StorageContainerLocationProtocolClientSideTranslatorPB;
-import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.StringUtils;
-
-import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
-import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
-import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
-import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
-import static org.apache.hadoop.hdds.server.ServerUtils
-    .updateRPCListenAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,18 +70,17 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
+import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.OzoneManagerService
-    .newReflectiveBlockingService;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
-    .NodeState.HEALTHY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 /**
@@ -108,33 +96,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       "Usage: \n ozone om [genericOptions] " + "[ "
           + StartupOption.CREATEOBJECTSTORE.getName() + " ]\n " + "ozone om [ "
           + StartupOption.HELP.getName() + " ]\n";
-
-  /** Startup options. */
-  public enum StartupOption {
-    CREATEOBJECTSTORE("-createObjectStore"),
-    HELP("-help"),
-    REGULAR("-regular");
-
-    private final String name;
-
-    StartupOption(String arg) {
-      this.name = arg;
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    public static StartupOption parse(String value) {
-      for (StartupOption option : StartupOption.values()) {
-        if (option.name.equalsIgnoreCase(value)) {
-          return option;
-        }
-      }
-      return null;
-    }
-  }
-
   private final OzoneConfiguration configuration;
   private final RPC.Server omRpcServer;
   private final InetSocketAddress omRpcAddress;
@@ -238,20 +199,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return scmContainerClient;
   }
 
-  @VisibleForTesting
-  public KeyManager getKeyManager() {
-    return keyManager;
-  }
-
-  @VisibleForTesting
-  public ScmInfo getScmInfo() throws IOException {
-    return scmBlockClient.getScmInfo();
-  }
-
-  @VisibleForTesting
-  public OMStorage getOmStorage() {
-    return omStorage;
-  }
   /**
    * Starts an RPC server, if configured.
    *
@@ -260,7 +207,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    * @param protocol RPC protocol provided by RPC server
    * @param instance RPC protocol implementation instance
    * @param handlerCount RPC server handler count
-   *
    * @return RPC server
    * @throws IOException if there is an I/O error while creating RPC server
    */
@@ -282,18 +228,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   /**
-   * Get metadata manager.
-   * @return metadata manager.
-   */
-  public OMMetadataManager getMetadataManager() {
-    return metadataManager;
-  }
-
-  public OMMetrics getMetrics() {
-    return metrics;
-  }
-
-  /**
    * Main entry point for starting OzoneManager.
    *
    * @param argv arguments
@@ -329,6 +263,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   /**
    * Constructs OM instance based on command line arguments.
+   *
    * @param argv Command line arguments
    * @param conf OzoneConfiguration
    * @return OM instance
@@ -336,7 +271,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
 
   public static OzoneManager createOm(String[] argv,
-                                      OzoneConfiguration conf) throws 
IOException {
+      OzoneConfiguration conf) throws IOException {
     if (!isHddsEnabled(conf)) {
       System.err.println("OM cannot be started in secure mode or when " +
           OZONE_ENABLED + " is set to false");
@@ -363,9 +298,11 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   /**
    * Initializes the OM instance.
+   *
    * @param conf OzoneConfiguration
    * @return true if OM initialization succeeds, false otherwise
-   * @throws IOException in case ozone metadata directory path is not 
accessible
+   * @throws IOException in case ozone metadata directory path is not
+   *                     accessible
    */
 
   private static boolean omInit(OzoneConfiguration conf) throws IOException {
@@ -406,14 +343,17 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   /**
    * Parses the command line options for OM initialization.
+   *
    * @param args command line arguments
    * @return StartupOption if options are valid, null otherwise
    */
   private static StartupOption parseArguments(String[] args) {
     if (args == null || args.length == 0) {
       return StartupOption.REGULAR;
-    } else if (args.length == 1) {
-      return StartupOption.parse(args[0]);
+    } else {
+      if (args.length == 1) {
+        return StartupOption.parse(args[0]);
+      }
     }
     return null;
   }
@@ -432,6 +372,34 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         String.format("%s not started", description);
   }
 
+  @VisibleForTesting
+  public KeyManager getKeyManager() {
+    return keyManager;
+  }
+
+  @VisibleForTesting
+  public ScmInfo getScmInfo() throws IOException {
+    return scmBlockClient.getScmInfo();
+  }
+
+  @VisibleForTesting
+  public OMStorage getOmStorage() {
+    return omStorage;
+  }
+
+  /**
+   * Get metadata manager.
+   *
+   * @return metadata manager.
+   */
+  public OMMetadataManager getMetadataManager() {
+    return metadataManager;
+  }
+
+  public OMMetrics getMetrics() {
+    return metrics;
+  }
+
   /**
    * Start service.
    */
@@ -533,8 +501,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    *
    * @param volume - volume
    * @param userAcl - user acls which needs to be checked for access
-   * @return true if the user has required access for the volume,
-   *         false otherwise
+   * @return true if the user has required access for the volume, false
+   * otherwise
    * @throws IOException
    */
   @Override
@@ -597,7 +565,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
   @Override
   public List<OmVolumeArgs> listVolumeByUser(String userName, String prefix,
-                                             String prevKey, int maxKeys) 
throws IOException {
+      String prevKey, int maxKeys) throws IOException {
     try {
       metrics.incNumVolumeLists();
       return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys);
@@ -651,7 +619,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
   @Override
   public List<OmBucketInfo> listBuckets(String volumeName,
-                                        String startKey, String prefix, int 
maxNumOfBuckets)
+      String startKey, String prefix, int maxNumOfBuckets)
       throws IOException {
     try {
       metrics.incNumBucketLists();
@@ -702,7 +670,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   @Override
-  public void commitKey(OmKeyArgs args, int clientID)
+  public void commitKey(OmKeyArgs args, long clientID)
       throws IOException {
     try {
       metrics.incNumKeyCommits();
@@ -714,7 +682,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   @Override
-  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID)
+  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
       throws IOException {
     try {
       metrics.incNumBlockAllocateCalls();
@@ -773,7 +741,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   @Override
   public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
-                                  String startKey, String keyPrefix, int 
maxKeys) throws IOException {
+      String startKey, String keyPrefix, int maxKeys) throws IOException {
     try {
       metrics.incNumKeyLists();
       return keyManager.listKeys(volumeName, bucketName,
@@ -786,6 +754,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   /**
    * Sets bucket property from args.
+   *
    * @param args - BucketArgs.
    * @throws IOException
    */
@@ -801,9 +770,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
   }
 
-
   /**
    * Deletes an existing empty bucket from volume.
+   *
    * @param volume - Name of the volume.
    * @param bucket - Name of the bucket.
    * @throws IOException
@@ -853,8 +822,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         .setNodeType(HddsProtos.NodeType.OM)
         .setHostname(omRpcAddress.getHostName())
         .addServicePort(ServicePort.newBuilder()
-                .setType(ServicePort.Type.RPC)
-                .setValue(omRpcAddress.getPort())
+            .setType(ServicePort.Type.RPC)
+            .setValue(omRpcAddress.getPort())
             .build());
     if (httpServer.getHttpAddress() != null) {
       omServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
@@ -908,4 +877,32 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     // metrics.incNumGetServiceListFails()
     return services;
   }
+
+  /**
+   * Startup options.
+   */
+  public enum StartupOption {
+    CREATEOBJECTSTORE("-createObjectStore"),
+    HELP("-help"),
+    REGULAR("-regular");
+
+    private final String name;
+
+    StartupOption(String arg) {
+      this.name = arg;
+    }
+
+    public static StartupOption parse(String value) {
+      for (StartupOption option : StartupOption.values()) {
+        if (option.name.equalsIgnoreCase(value)) {
+          return option;
+        }
+      }
+      return null;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff036e49/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index e50145d..419b0aa 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.RocksDBStore;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,10 +69,10 @@ public class VolumeManagerImpl implements VolumeManager {
 
   // Helpers to add and delete volume from user list
   private void addVolumeToOwnerList(String volume, String owner,
-      BatchOperation batchOperation) throws IOException {
+      WriteBatch batchOperation) throws RocksDBException, IOException {
     // Get the volume list
     byte[] dbUserKey = metadataManager.getUserKey(owner);
-    byte[] volumeList  = metadataManager.get(dbUserKey);
+    byte[] volumeList  = metadataManager.getUserTable().get(dbUserKey);
     List<String> prevVolList = new LinkedList<>();
     if (volumeList != null) {
       VolumeList vlist = VolumeList.parseFrom(volumeList);
@@ -87,15 +89,15 @@ public class VolumeManagerImpl implements VolumeManager {
     prevVolList.add(volume);
     VolumeList newVolList = VolumeList.newBuilder()
         .addAllVolumeNames(prevVolList).build();
-    batchOperation.put(dbUserKey, newVolList.toByteArray());
+    batchOperation.put(metadataManager.getUserTable().getHandle(),
+        dbUserKey, newVolList.toByteArray());
   }
 
   private void delVolumeFromOwnerList(String volume, String owner,
-                                      BatchOperation batchOperation)
-      throws IOException {
+      WriteBatch batch) throws RocksDBException, IOException {
     // Get the volume list
     byte[] dbUserKey = metadataManager.getUserKey(owner);
-    byte[] volumeList  = metadataManager.get(dbUserKey);
+    byte[] volumeList  = metadataManager.getUserTable().get(dbUserKey);
     List<String> prevVolList = new LinkedList<>();
     if (volumeList != null) {
       VolumeList vlist = VolumeList.parseFrom(volumeList);
@@ -108,11 +110,12 @@ public class VolumeManagerImpl implements VolumeManager {
     // Remove the volume from the list
     prevVolList.remove(volume);
     if (prevVolList.size() == 0) {
-      batchOperation.delete(dbUserKey);
+      batch.delete(dbUserKey);
     } else {
       VolumeList newVolList = VolumeList.newBuilder()
           .addAllVolumeNames(prevVolList).build();
-      batchOperation.put(dbUserKey, newVolList.toByteArray());
+      batch.put(metadataManager.getUserTable().getHandle(),
+          dbUserKey, newVolList.toByteArray());
     }
   }
 
@@ -126,7 +129,7 @@ public class VolumeManagerImpl implements VolumeManager {
     metadataManager.writeLock().lock();
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
-      byte[] volumeInfo = metadataManager.get(dbVolumeKey);
+      byte[] volumeInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
 
       // Check of the volume already exists
       if (volumeInfo != null) {
@@ -134,37 +137,45 @@ public class VolumeManagerImpl implements VolumeManager {
         throw new OMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
       }
 
-      BatchOperation batch = new BatchOperation();
-      // Write the vol info
-      List<HddsProtos.KeyValue> metadataList = new LinkedList<>();
-      for (Map.Entry<String, String> entry : args.getKeyValueMap().entrySet()) 
{
-        metadataList.add(HddsProtos.KeyValue.newBuilder()
-            .setKey(entry.getKey()).setValue(entry.getValue()).build());
+      try(WriteBatch batch = new WriteBatch()) {
+        // Write the vol info
+        List<HddsProtos.KeyValue> metadataList = new LinkedList<>();
+        for (Map.Entry<String, String> entry :
+            args.getKeyValueMap().entrySet()) {
+          metadataList.add(HddsProtos.KeyValue.newBuilder()
+              .setKey(entry.getKey()).setValue(entry.getValue()).build());
+        }
+        List<OzoneAclInfo> aclList = args.getAclMap().ozoneAclGetProtobuf();
+
+        VolumeInfo newVolumeInfo = VolumeInfo.newBuilder()
+            .setAdminName(args.getAdminName())
+            .setOwnerName(args.getOwnerName())
+            .setVolume(args.getVolume())
+            .setQuotaInBytes(args.getQuotaInBytes())
+            .addAllMetadata(metadataList)
+            .addAllVolumeAcls(aclList)
+            .setCreationTime(Time.now())
+            .build();
+        batch.put(metadataManager.getVolumeTable().getHandle(),
+            dbVolumeKey, newVolumeInfo.toByteArray());
+
+        // Add volume to user list
+        addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
+        metadataManager.getStore().write(batch);
       }
-      List<OzoneAclInfo> aclList = args.getAclMap().ozoneAclGetProtobuf();
-
-      VolumeInfo newVolumeInfo = VolumeInfo.newBuilder()
-          .setAdminName(args.getAdminName())
-          .setOwnerName(args.getOwnerName())
-          .setVolume(args.getVolume())
-          .setQuotaInBytes(args.getQuotaInBytes())
-          .addAllMetadata(metadataList)
-          .addAllVolumeAcls(aclList)
-          .setCreationTime(Time.now())
-          .build();
-      batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
-
-      // Add volume to user list
-      addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
-      metadataManager.writeBatch(batch);
       LOG.debug("created volume:{} user:{}", args.getVolume(),
           args.getOwnerName());
-    } catch (IOException ex) {
+    } catch (RocksDBException | IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Volume creation failed for user:{} volume:{}",
             args.getOwnerName(), args.getVolume(), ex);
       }
-      throw ex;
+      if(ex instanceof RocksDBException) {
+        throw RocksDBStore.toIOException("Volume creation failed.",
+            (RocksDBException) ex);
+      } else {
+        throw (IOException) ex;
+      }
     } finally {
       metadataManager.writeLock().unlock();
     }
@@ -184,7 +195,7 @@ public class VolumeManagerImpl implements VolumeManager {
     metadataManager.writeLock().lock();
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
+      byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
       if (volInfo == null) {
         LOG.debug("Changing volume ownership failed for user:{} volume:{}",
             owner, volume);
@@ -195,28 +206,34 @@ public class VolumeManagerImpl implements VolumeManager {
       OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo);
       Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
 
-      BatchOperation batch = new BatchOperation();
-      delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
-      addVolumeToOwnerList(volume, owner, batch);
-
-      OmVolumeArgs newVolumeArgs =
-          OmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
-              .setAdminName(volumeArgs.getAdminName())
-              .setOwnerName(owner)
-              .setQuotaInBytes(volumeArgs.getQuotaInBytes())
-              .setCreationTime(volumeArgs.getCreationTime())
-              .build();
-
-      VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
-      batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
-
-      metadataManager.writeBatch(batch);
-    } catch (IOException ex) {
+      try(WriteBatch batch = new WriteBatch()) {
+        delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
+        addVolumeToOwnerList(volume, owner, batch);
+
+        OmVolumeArgs newVolumeArgs =
+            OmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
+                .setAdminName(volumeArgs.getAdminName())
+                .setOwnerName(owner)
+                .setQuotaInBytes(volumeArgs.getQuotaInBytes())
+                .setCreationTime(volumeArgs.getCreationTime())
+                .build();
+
+        VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
+        batch.put(metadataManager.getVolumeTable().getHandle(),
+            dbVolumeKey, newVolumeInfo.toByteArray());
+        metadataManager.getStore().write(batch);
+      }
+    } catch (RocksDBException | IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Changing volume ownership failed for user:{} volume:{}",
             owner, volume, ex);
       }
-      throw ex;
+      if(ex instanceof RocksDBException) {
+        throw RocksDBStore.toIOException("Volume creation failed.",
+            (RocksDBException) ex);
+      } else {
+        throw (IOException) ex;
+      }
     } finally {
       metadataManager.writeLock().unlock();
     }
@@ -234,7 +251,7 @@ public class VolumeManagerImpl implements VolumeManager {
     metadataManager.writeLock().lock();
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
+      byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
       if (volInfo == null) {
         LOG.debug("volume:{} does not exist", volume);
         throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
@@ -253,7 +270,8 @@ public class VolumeManagerImpl implements VolumeManager {
               .setCreationTime(volumeArgs.getCreationTime()).build();
 
       VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
-      metadataManager.put(dbVolumeKey, newVolumeInfo.toByteArray());
+      metadataManager.getVolumeTable().put(dbVolumeKey,
+          newVolumeInfo.toByteArray());
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Changing volume quota failed for volume:{} quota:{}", 
volume,
@@ -276,7 +294,7 @@ public class VolumeManagerImpl implements VolumeManager {
     metadataManager.readLock().lock();
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
+      byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
       if (volInfo == null) {
         LOG.debug("volume:{} does not exist", volume);
         throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
@@ -307,9 +325,9 @@ public class VolumeManagerImpl implements VolumeManager {
     Preconditions.checkNotNull(volume);
     metadataManager.writeLock().lock();
     try {
-      BatchOperation batch = new BatchOperation();
+
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
+      byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
       if (volInfo == null) {
         LOG.debug("volume:{} does not exist", volume);
         throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
@@ -324,14 +342,22 @@ public class VolumeManagerImpl implements VolumeManager {
       Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
       // delete the volume from the owner list
       // as well as delete the volume entry
-      delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch);
-      batch.delete(dbVolumeKey);
-      metadataManager.writeBatch(batch);
-    } catch (IOException ex) {
+      try(WriteBatch batch = new WriteBatch()) {
+        delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch);
+        batch.delete(metadataManager.getVolumeTable().getHandle(),
+            dbVolumeKey);
+        metadataManager.getStore().write(batch);
+      }
+    } catch (RocksDBException| IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Delete volume failed for volume:{}", volume, ex);
       }
-      throw ex;
+      if(ex instanceof RocksDBException) {
+        throw RocksDBStore.toIOException("Volume creation failed.",
+            (RocksDBException) ex);
+      } else {
+        throw (IOException) ex;
+      }
     } finally {
       metadataManager.writeLock().unlock();
     }
@@ -352,7 +378,7 @@ public class VolumeManagerImpl implements VolumeManager {
     metadataManager.readLock().lock();
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
+      byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
       if (volInfo == null) {
         LOG.debug("volume:{} does not exist", volume);
         throw  new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
@@ -378,7 +404,7 @@ public class VolumeManagerImpl implements VolumeManager {
    */
   @Override
   public List<OmVolumeArgs> listVolumes(String userName,
-                                        String prefix, String startKey, int 
maxKeys) throws IOException {
+      String prefix, String startKey, int maxKeys) throws IOException {
     metadataManager.readLock().lock();
     try {
       return metadataManager.listVolumes(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff036e49/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 45ec2d0..06d782b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -526,8 +526,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
           .setFactor(factor)
           .setDataSize(keyArgs.getDataSize())
           .build();
-      int id = request.getClientID();
-      impl.commitKey(omKeyArgs, id);
+      impl.commitKey(omKeyArgs, request.getClientID());
       resp.setStatus(Status.OK);
     } catch (IOException e) {
       resp.setStatus(exceptionToResponseStatus(e));
@@ -547,8 +546,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
           .setBucketName(keyArgs.getBucketName())
           .setKeyName(keyArgs.getKeyName())
           .build();
-      int id = request.getClientID();
-      OmKeyLocationInfo newLocation = impl.allocateBlock(omKeyArgs, id);
+      OmKeyLocationInfo newLocation = impl.allocateBlock(omKeyArgs,
+          request.getClientID());
       resp.setKeyLocation(newLocation.getProtobuf());
       resp.setStatus(Status.OK);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff036e49/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
index 1ecac7f..9684a1f 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java
@@ -17,33 +17,26 @@
 package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
 
+import java.io.File;
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.LinkedList;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.mockito.Mockito.any;
+import java.util.List;
 
 /**
  * Tests BucketManagerImpl, mocks OMMetadataManager for testing.
@@ -53,86 +46,35 @@ public class TestBucketManagerImpl {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  private OMMetadataManager getMetadataManagerMock(String... volumesToCreate)
-      throws IOException {
-    OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class);
-    Map<String, byte[]> metadataDB = new HashMap<>();
-    ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    Mockito.when(metadataManager.writeLock()).thenReturn(lock.writeLock());
-    Mockito.when(metadataManager.readLock()).thenReturn(lock.readLock());
-    Mockito.when(metadataManager.getVolumeKey(any(String.class))).thenAnswer(
-        (InvocationOnMock invocation) ->
-            DFSUtil.string2Bytes(
-                OzoneConsts.OM_VOLUME_PREFIX + invocation.getArguments()[0]));
-    Mockito.when(metadataManager
-        .getBucketKey(any(String.class), any(String.class))).thenAnswer(
-            (InvocationOnMock invocation) ->
-                DFSUtil.string2Bytes(
-                    OzoneConsts.OM_VOLUME_PREFIX
-                        + invocation.getArguments()[0]
-                        + OzoneConsts.OM_BUCKET_PREFIX
-                        + invocation.getArguments()[1]));
-
-    Mockito.doAnswer(
-        new Answer<Boolean>() {
-          @Override
-          public Boolean answer(InvocationOnMock invocation)
-              throws Throwable {
-            String keyRootName =  OzoneConsts.OM_KEY_PREFIX
-                + invocation.getArguments()[0]
-                + OzoneConsts.OM_KEY_PREFIX
-                + invocation.getArguments()[1]
-                + OzoneConsts.OM_KEY_PREFIX;
-            Iterator<String> keyIterator = metadataDB.keySet().iterator();
-            while(keyIterator.hasNext()) {
-              if(keyIterator.next().startsWith(keyRootName)) {
-                return false;
-              }
-            }
-            return true;
-          }
-        }).when(metadataManager).isBucketEmpty(any(String.class),
-        any(String.class));
-
-    Mockito.doAnswer(
-        new Answer<Void>() {
-          @Override
-          public Void answer(InvocationOnMock invocation) throws Throwable {
-            metadataDB.put(DFSUtil.bytes2String(
-                (byte[])invocation.getArguments()[0]),
-                (byte[])invocation.getArguments()[1]);
-            return null;
-          }
-        }).when(metadataManager).put(any(byte[].class), any(byte[].class));
-
-    Mockito.when(metadataManager.get(any(byte[].class))).thenAnswer(
-        (InvocationOnMock invocation) ->
-            metadataDB.get(DFSUtil.bytes2String(
-                (byte[])invocation.getArguments()[0]))
-    );
-    Mockito.doAnswer(
-        new Answer<Void>() {
-          @Override
-          public Void answer(InvocationOnMock invocation) throws Throwable {
-            metadataDB.remove(DFSUtil.bytes2String(
-                (byte[])invocation.getArguments()[0]));
-            return null;
-          }
-        }).when(metadataManager).delete(any(byte[].class));
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
 
-    for(String volumeName : volumesToCreate) {
-      byte[] dummyVolumeInfo = DFSUtil.string2Bytes(volumeName);
-      metadataDB.put(OzoneConsts.OM_VOLUME_PREFIX + volumeName,
-                     dummyVolumeInfo);
+  private OzoneConfiguration createNewTestPath() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    File newFolder = folder.newFolder();
+    if (!newFolder.exists()) {
+      Assert.assertTrue(newFolder.mkdirs());
     }
-    return metadataManager;
+    ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
+    return conf;
+  }
+
+  private OmMetadataManagerImpl createSampleVol() throws IOException {
+    OzoneConfiguration conf = createNewTestPath();
+    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
+    byte[] volumeKey = metaMgr.getVolumeKey("sampleVol");
+    // This is a simple hack for testing, we just test if the volume via a
+    // null check, do not parse the value part. So just write some dummy value.
+    metaMgr.getVolumeTable().put(volumeKey, volumeKey);
+    return metaMgr;
   }
 
   @Test
-  public void testCreateBucketWithoutVolume() throws IOException {
+  public void testCreateBucketWithoutVolume() throws Exception {
     thrown.expectMessage("Volume doesn't exist");
-    OMMetadataManager metaMgr = getMetadataManagerMock();
+    OzoneConfiguration conf = createNewTestPath();
+    OmMetadataManagerImpl metaMgr =
+        new OmMetadataManagerImpl(conf);
     try {
       BucketManager bucketManager = new BucketManagerImpl(metaMgr);
       OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
@@ -140,29 +82,35 @@ public class TestBucketManagerImpl {
           .setBucketName("bucketOne")
           .build();
       bucketManager.createBucket(bucketInfo);
-    } catch(OMException omEx) {
+    } catch (OMException omEx) {
       Assert.assertEquals(ResultCodes.FAILED_VOLUME_NOT_FOUND,
           omEx.getResult());
       throw omEx;
+    } finally {
+      metaMgr.getStore().close();
     }
   }
 
   @Test
-  public void testCreateBucket() throws IOException {
-    OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+  public void testCreateBucket() throws Exception {
+    OmMetadataManagerImpl metaMgr = createSampleVol();
+
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
         .setBucketName("bucketOne")
         .build();
     bucketManager.createBucket(bucketInfo);
-    Assert.assertNotNull(bucketManager.getBucketInfo("sampleVol", 
"bucketOne"));
+    Assert.assertNotNull(bucketManager.getBucketInfo("sampleVol",
+        "bucketOne"));
+    metaMgr.getStore().close();
   }
 
   @Test
-  public void testCreateAlreadyExistingBucket() throws IOException {
+  public void testCreateAlreadyExistingBucket() throws Exception {
     thrown.expectMessage("Bucket already exist");
-    OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    OmMetadataManagerImpl metaMgr = createSampleVol();
+
     try {
       BucketManager bucketManager = new BucketManagerImpl(metaMgr);
       OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
@@ -171,30 +119,37 @@ public class TestBucketManagerImpl {
           .build();
       bucketManager.createBucket(bucketInfo);
       bucketManager.createBucket(bucketInfo);
-    } catch(OMException omEx) {
+    } catch (OMException omEx) {
       Assert.assertEquals(ResultCodes.FAILED_BUCKET_ALREADY_EXISTS,
           omEx.getResult());
       throw omEx;
+    } finally {
+      metaMgr.getStore().close();
     }
   }
 
   @Test
-  public void testGetBucketInfoForInvalidBucket() throws IOException {
+  public void testGetBucketInfoForInvalidBucket() throws Exception {
     thrown.expectMessage("Bucket not found");
+    OmMetadataManagerImpl metaMgr = createSampleVol();
     try {
-      OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+
+
       BucketManager bucketManager = new BucketManagerImpl(metaMgr);
       bucketManager.getBucketInfo("sampleVol", "bucketOne");
-    } catch(OMException omEx) {
+    } catch (OMException omEx) {
       Assert.assertEquals(ResultCodes.FAILED_BUCKET_NOT_FOUND,
           omEx.getResult());
       throw omEx;
+    } finally {
+      metaMgr.getStore().close();
     }
   }
 
   @Test
-  public void testGetBucketInfo() throws IOException {
-    OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+  public void testGetBucketInfo() throws Exception {
+    OmMetadataManagerImpl metaMgr = createSampleVol();
+
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
@@ -210,11 +165,13 @@ public class TestBucketManagerImpl {
     Assert.assertEquals(StorageType.DISK,
         result.getStorageType());
     Assert.assertEquals(false, result.getIsVersionEnabled());
+    metaMgr.getStore().close();
   }
 
   @Test
-  public void testSetBucketPropertyAddACL() throws IOException {
-    OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+  public void testSetBucketPropertyAddACL() throws Exception {
+    OmMetadataManagerImpl metaMgr = createSampleVol();
+
     List<OzoneAcl> acls = new LinkedList<>();
     OzoneAcl ozoneAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER,
         "root", OzoneAcl.OzoneACLRights.READ);
@@ -247,11 +204,13 @@ public class TestBucketManagerImpl {
         "sampleVol", "bucketOne");
     Assert.assertEquals(2, updatedResult.getAcls().size());
     Assert.assertTrue(updatedResult.getAcls().contains(newAcl));
+    metaMgr.getStore().close();
   }
 
   @Test
-  public void testSetBucketPropertyRemoveACL() throws IOException {
-    OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+  public void testSetBucketPropertyRemoveACL() throws Exception {
+    OmMetadataManagerImpl metaMgr = createSampleVol();
+
     List<OzoneAcl> acls = new LinkedList<>();
     OzoneAcl aclOne = new OzoneAcl(OzoneAcl.OzoneACLType.USER,
         "root", OzoneAcl.OzoneACLRights.READ);
@@ -283,11 +242,13 @@ public class TestBucketManagerImpl {
         "sampleVol", "bucketOne");
     Assert.assertEquals(1, updatedResult.getAcls().size());
     Assert.assertFalse(updatedResult.getAcls().contains(aclTwo));
+    metaMgr.getStore().close();
   }
 
   @Test
-  public void testSetBucketPropertyChangeStorageType() throws IOException {
-    OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+  public void testSetBucketPropertyChangeStorageType() throws Exception {
+    OmMetadataManagerImpl metaMgr = createSampleVol();
+
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
@@ -309,11 +270,13 @@ public class TestBucketManagerImpl {
         "sampleVol", "bucketOne");
     Assert.assertEquals(StorageType.SSD,
         updatedResult.getStorageType());
+    metaMgr.getStore().close();
   }
 
   @Test
-  public void testSetBucketPropertyChangeVersioning() throws IOException {
-    OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+  public void testSetBucketPropertyChangeVersioning() throws Exception {
+    OmMetadataManagerImpl metaMgr = createSampleVol();
+
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
@@ -333,21 +296,22 @@ public class TestBucketManagerImpl {
     OmBucketInfo updatedResult = bucketManager.getBucketInfo(
         "sampleVol", "bucketOne");
     Assert.assertTrue(updatedResult.getIsVersionEnabled());
+    metaMgr.getStore().close();
   }
 
   @Test
-  public void testDeleteBucket() throws IOException {
+  public void testDeleteBucket() throws Exception {
     thrown.expectMessage("Bucket not found");
-    OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    OmMetadataManagerImpl metaMgr = createSampleVol();
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
-    for(int i = 0; i < 5; i++) {
+    for (int i = 0; i < 5; i++) {
       OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
           .setVolumeName("sampleVol")
           .setBucketName("bucket_" + i)
           .build();
       bucketManager.createBucket(bucketInfo);
     }
-    for(int i = 0; i < 5; i++) {
+    for (int i = 0; i < 5; i++) {
       Assert.assertEquals("bucket_" + i,
           bucketManager.getBucketInfo(
               "sampleVol", "bucket_" + i).getBucketName());
@@ -356,22 +320,23 @@ public class TestBucketManagerImpl {
       bucketManager.deleteBucket("sampleVol", "bucket_1");
       Assert.assertNotNull(bucketManager.getBucketInfo(
           "sampleVol", "bucket_2"));
-    } catch(IOException ex) {
+    } catch (IOException ex) {
       Assert.fail(ex.getMessage());
     }
     try {
       bucketManager.getBucketInfo("sampleVol", "bucket_1");
-    } catch(OMException omEx) {
+    } catch (OMException omEx) {
       Assert.assertEquals(ResultCodes.FAILED_BUCKET_NOT_FOUND,
           omEx.getResult());
       throw omEx;
     }
+    metaMgr.getStore().close();
   }
 
   @Test
-  public void testDeleteNonEmptyBucket() throws IOException {
+  public void testDeleteNonEmptyBucket() throws Exception {
     thrown.expectMessage("Bucket is not empty");
-    OMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    OmMetadataManagerImpl metaMgr = createSampleVol();
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
@@ -379,16 +344,19 @@ public class TestBucketManagerImpl {
         .build();
     bucketManager.createBucket(bucketInfo);
     //Create keys in bucket
-    metaMgr.put(DFSUtil.string2Bytes("/sampleVol/bucketOne/key_one"),
+    metaMgr.getKeyTable().put(DFSUtil.string2Bytes("/sampleVol/bucketOne" +
+            "/key_one"),
         DFSUtil.string2Bytes("value_one"));
-    metaMgr.put(DFSUtil.string2Bytes("/sampleVol/bucketOne/key_two"),
+    metaMgr.getKeyTable().put(DFSUtil.string2Bytes("/sampleVol/bucketOne" +
+            "/key_two"),
         DFSUtil.string2Bytes("value_two"));
     try {
       bucketManager.deleteBucket("sampleVol", "bucketOne");
-    } catch(OMException omEx) {
+    } catch (OMException omEx) {
       Assert.assertEquals(ResultCodes.FAILED_BUCKET_NOT_EMPTY,
           omEx.getResult());
       throw omEx;
     }
+    metaMgr.getStore().close();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff036e49/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index 51018a1..080840a 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -57,9 +57,8 @@ import java.sql.Statement;
 
 import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_SUFFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_BUCKET_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_VOLUME_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB;
 
 /**
@@ -412,12 +411,15 @@ public class SQLCLI  extends Configured implements Tool {
     }
   }
 
+  // TODO: This has to be fixed.
+  // we don't have prefix anymore. now each key is written into different
+  // table. The logic has to be changed.
   private KeyType getKeyType(String key) {
     if (key.startsWith(OM_USER_PREFIX)) {
       return KeyType.USER;
-    } else if (key.startsWith(OM_VOLUME_PREFIX)) {
-      return key.replaceFirst(OM_VOLUME_PREFIX, "")
-          .contains(OM_BUCKET_PREFIX) ? KeyType.BUCKET : KeyType.VOLUME;
+    } else if (key.startsWith(OM_KEY_PREFIX)) {
+      return key.replaceFirst(OM_KEY_PREFIX, "")
+          .contains(OM_KEY_PREFIX) ? KeyType.BUCKET : KeyType.VOLUME;
     }else {
       return KeyType.KEY;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to