http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
 
b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index fedc0f0..ec33990 100644
--- 
a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ 
b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -22,14 +22,13 @@ import com.google.common.base.Strings;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
-import org.apache.hadoop.ozone.ksm.protocolPB
-    .KeySpaceManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -37,9 +36,9 @@ import org.apache.hadoop.ozone.OzoneConsts.Versioning;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
 import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
-import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.web.request.OzoneQuota;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -77,8 +76,8 @@ public final class DistributedStorageHandler implements 
StorageHandler {
 
   private final StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
-  private final KeySpaceManagerProtocolClientSideTranslatorPB
-      keySpaceManagerClient;
+  private final OzoneManagerProtocolClientSideTranslatorPB
+      ozoneManagerClient;
   private final XceiverClientManager xceiverClientManager;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
@@ -92,14 +91,14 @@ public final class DistributedStorageHandler implements 
StorageHandler {
    *
    * @param conf configuration
    * @param storageContainerLocation StorageContainerLocationProtocol proxy
-   * @param keySpaceManagerClient KeySpaceManager proxy
+   * @param ozoneManagerClient OzoneManager proxy
    */
   public DistributedStorageHandler(OzoneConfiguration conf,
       StorageContainerLocationProtocolClientSideTranslatorPB
           storageContainerLocation,
-      KeySpaceManagerProtocolClientSideTranslatorPB
-          keySpaceManagerClient) {
-    this.keySpaceManagerClient = keySpaceManagerClient;
+      OzoneManagerProtocolClientSideTranslatorPB
+                                       ozoneManagerClient) {
+    this.ozoneManagerClient = ozoneManagerClient;
     this.storageContainerLocationClient = storageContainerLocation;
     this.xceiverClientManager = new XceiverClientManager(conf);
     this.useRatis = conf.getBoolean(
@@ -116,10 +115,10 @@ public final class DistributedStorageHandler implements 
StorageHandler {
 
     chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
         ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT);
-    userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
-        KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
-    groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS,
-        KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
+    userRights = conf.getEnum(OMConfigKeys.OZONE_OM_USER_RIGHTS,
+        OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
+    groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS,
+        OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
     if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
       LOG.warn("The chunk size ({}) is not allowed to be more than"
               + " the maximum size ({}),"
@@ -136,26 +135,26 @@ public final class DistributedStorageHandler implements 
StorageHandler {
     OzoneAcl userAcl =
         new OzoneAcl(OzoneAcl.OzoneACLType.USER,
             args.getUserName(), userRights);
-    KsmVolumeArgs.Builder builder = KsmVolumeArgs.newBuilder();
+    OmVolumeArgs.Builder builder = OmVolumeArgs.newBuilder();
     builder.setAdminName(args.getAdminName())
         .setOwnerName(args.getUserName())
         .setVolume(args.getVolumeName())
         .setQuotaInBytes(quota)
-        .addOzoneAcls(KSMPBHelper.convertOzoneAcl(userAcl));
+        .addOzoneAcls(OMPBHelper.convertOzoneAcl(userAcl));
     if (args.getGroups() != null) {
       for (String group : args.getGroups()) {
         OzoneAcl groupAcl =
             new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights);
-        builder.addOzoneAcls(KSMPBHelper.convertOzoneAcl(groupAcl));
+        builder.addOzoneAcls(OMPBHelper.convertOzoneAcl(groupAcl));
       }
     }
-    keySpaceManagerClient.createVolume(builder.build());
+    ozoneManagerClient.createVolume(builder.build());
   }
 
   @Override
   public void setVolumeOwner(VolumeArgs args) throws
       IOException, OzoneException {
-    keySpaceManagerClient.setOwner(args.getVolumeName(), args.getUserName());
+    ozoneManagerClient.setOwner(args.getVolumeName(), args.getUserName());
   }
 
   @Override
@@ -163,14 +162,14 @@ public final class DistributedStorageHandler implements 
StorageHandler {
       throws IOException, OzoneException {
     long quota = remove ? OzoneConsts.MAX_QUOTA_IN_BYTES :
         args.getQuota().sizeInBytes();
-    keySpaceManagerClient.setQuota(args.getVolumeName(), quota);
+    ozoneManagerClient.setQuota(args.getVolumeName(), quota);
   }
 
   @Override
   public boolean checkVolumeAccess(String volume, OzoneAcl acl)
       throws IOException, OzoneException {
-    return keySpaceManagerClient
-        .checkVolumeAccess(volume, KSMPBHelper.convertOzoneAcl(acl));
+    return ozoneManagerClient
+        .checkVolumeAccess(volume, OMPBHelper.convertOzoneAcl(acl));
   }
 
   @Override
@@ -185,9 +184,9 @@ public final class DistributedStorageHandler implements 
StorageHandler {
               OzoneConsts.MAX_LISTVOLUMES_SIZE, maxNumOfKeys));
     }
 
-    List<KsmVolumeArgs> listResult;
+    List<OmVolumeArgs> listResult;
     if (args.isRootScan()) {
-      listResult = keySpaceManagerClient.listAllVolumes(args.getPrefix(),
+      listResult = ozoneManagerClient.listAllVolumes(args.getPrefix(),
           args.getPrevKey(), args.getMaxKeys());
     } else {
       UserArgs userArgs = args.getArgs();
@@ -195,16 +194,16 @@ public final class DistributedStorageHandler implements 
StorageHandler {
         throw new IllegalArgumentException("Illegal argument,"
             + " missing user argument.");
       }
-      listResult = keySpaceManagerClient.listVolumeByUser(
+      listResult = ozoneManagerClient.listVolumeByUser(
           args.getArgs().getUserName(), args.getPrefix(), args.getPrevKey(),
           args.getMaxKeys());
     }
 
     // TODO Add missing fields createdBy, bucketCount and bytesUsed
     ListVolumes result = new ListVolumes();
-    for (KsmVolumeArgs volumeArgs : listResult) {
+    for (OmVolumeArgs volumeArgs : listResult) {
       VolumeInfo info = new VolumeInfo();
-      KeySpaceManagerProtocolProtos.VolumeInfo
+      OzoneManagerProtocolProtos.VolumeInfo
           infoProto = volumeArgs.getProtobuf();
       info.setOwner(new VolumeOwner(infoProto.getOwnerName()));
       info.setQuota(OzoneQuota.getOzoneQuota(infoProto.getQuotaInBytes()));
@@ -220,14 +219,14 @@ public final class DistributedStorageHandler implements 
StorageHandler {
   @Override
   public void deleteVolume(VolumeArgs args)
       throws IOException, OzoneException {
-    keySpaceManagerClient.deleteVolume(args.getVolumeName());
+    ozoneManagerClient.deleteVolume(args.getVolumeName());
   }
 
   @Override
   public VolumeInfo getVolumeInfo(VolumeArgs args)
       throws IOException, OzoneException {
-    KsmVolumeArgs volumeArgs =
-        keySpaceManagerClient.getVolumeInfo(args.getVolumeName());
+    OmVolumeArgs volumeArgs =
+        ozoneManagerClient.getVolumeInfo(args.getVolumeName());
     //TODO: add support for createdOn and other fields in getVolumeInfo
     VolumeInfo volInfo =
         new VolumeInfo(volumeArgs.getVolume(), null,
@@ -242,7 +241,7 @@ public final class DistributedStorageHandler implements 
StorageHandler {
   @Override
   public void createBucket(final BucketArgs args)
       throws IOException, OzoneException {
-    KsmBucketInfo.Builder builder = KsmBucketInfo.newBuilder();
+    OmBucketInfo.Builder builder = OmBucketInfo.newBuilder();
     builder.setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName());
     if(args.getAddAcls() != null) {
@@ -255,7 +254,7 @@ public final class DistributedStorageHandler implements 
StorageHandler {
       builder.setIsVersionEnabled(getBucketVersioningProtobuf(
           args.getVersioning()));
     }
-    keySpaceManagerClient.createBucket(builder.build());
+    ozoneManagerClient.createBucket(builder.build());
   }
 
   /**
@@ -285,7 +284,7 @@ public final class DistributedStorageHandler implements 
StorageHandler {
     List<OzoneAcl> removeAcls = args.getRemoveAcls();
     List<OzoneAcl> addAcls = args.getAddAcls();
     if(removeAcls != null || addAcls != null) {
-      KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
+      OmBucketArgs.Builder builder = OmBucketArgs.newBuilder();
       builder.setVolumeName(args.getVolumeName())
           .setBucketName(args.getBucketName());
       if(removeAcls != null && !removeAcls.isEmpty()) {
@@ -294,35 +293,35 @@ public final class DistributedStorageHandler implements 
StorageHandler {
       if(addAcls != null && !addAcls.isEmpty()) {
         builder.setAddAcls(args.getAddAcls());
       }
-      keySpaceManagerClient.setBucketProperty(builder.build());
+      ozoneManagerClient.setBucketProperty(builder.build());
     }
   }
 
   @Override
   public void setBucketVersioning(BucketArgs args)
       throws IOException, OzoneException {
-    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
+    OmBucketArgs.Builder builder = OmBucketArgs.newBuilder();
     builder.setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setIsVersionEnabled(getBucketVersioningProtobuf(
             args.getVersioning()));
-    keySpaceManagerClient.setBucketProperty(builder.build());
+    ozoneManagerClient.setBucketProperty(builder.build());
   }
 
   @Override
   public void setBucketStorageClass(BucketArgs args)
       throws IOException, OzoneException {
-    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
+    OmBucketArgs.Builder builder = OmBucketArgs.newBuilder();
     builder.setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setStorageType(args.getStorageType());
-    keySpaceManagerClient.setBucketProperty(builder.build());
+    ozoneManagerClient.setBucketProperty(builder.build());
   }
 
   @Override
   public void deleteBucket(BucketArgs args)
       throws IOException, OzoneException {
-    keySpaceManagerClient.deleteBucket(args.getVolumeName(),
+    ozoneManagerClient.deleteBucket(args.getVolumeName(),
         args.getBucketName());
   }
 
@@ -354,12 +353,12 @@ public final class DistributedStorageHandler implements 
StorageHandler {
                 OzoneConsts.MAX_LISTBUCKETS_SIZE, maxNumOfKeys));
       }
 
-      List<KsmBucketInfo> buckets =
-          keySpaceManagerClient.listBuckets(va.getVolumeName(),
+      List<OmBucketInfo> buckets =
+          ozoneManagerClient.listBuckets(va.getVolumeName(),
               args.getPrevKey(), args.getPrefix(), args.getMaxKeys());
 
       // Convert the result for the web layer.
-      for (KsmBucketInfo bucketInfo : buckets) {
+      for (OmBucketInfo bucketInfo : buckets) {
         BucketInfo bk = new BucketInfo();
         bk.setVolumeName(bucketInfo.getVolumeName());
         bk.setBucketName(bucketInfo.getBucketName());
@@ -382,26 +381,26 @@ public final class DistributedStorageHandler implements 
StorageHandler {
       throws IOException {
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
-    KsmBucketInfo ksmBucketInfo = keySpaceManagerClient.getBucketInfo(
+    OmBucketInfo omBucketInfo = ozoneManagerClient.getBucketInfo(
         volumeName, bucketName);
-    BucketInfo bucketInfo = new BucketInfo(ksmBucketInfo.getVolumeName(),
-        ksmBucketInfo.getBucketName());
-    if(ksmBucketInfo.getIsVersionEnabled()) {
+    BucketInfo bucketInfo = new BucketInfo(omBucketInfo.getVolumeName(),
+        omBucketInfo.getBucketName());
+    if(omBucketInfo.getIsVersionEnabled()) {
       bucketInfo.setVersioning(Versioning.ENABLED);
     } else {
       bucketInfo.setVersioning(Versioning.DISABLED);
     }
-    bucketInfo.setStorageType(ksmBucketInfo.getStorageType());
-    bucketInfo.setAcls(ksmBucketInfo.getAcls());
+    bucketInfo.setStorageType(omBucketInfo.getStorageType());
+    bucketInfo.setAcls(omBucketInfo.getAcls());
     bucketInfo.setCreatedOn(
-        HddsClientUtils.formatDateTime(ksmBucketInfo.getCreationTime()));
+        HddsClientUtils.formatDateTime(omBucketInfo.getCreationTime()));
     return bucketInfo;
   }
 
   @Override
   public OutputStream newKeyWriter(KeyArgs args) throws IOException,
       OzoneException {
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
@@ -409,14 +408,14 @@ public final class DistributedStorageHandler implements 
StorageHandler {
         .setType(xceiverClientManager.getType())
         .setFactor(xceiverClientManager.getFactor())
         .build();
-    // contact KSM to allocate a block for key.
-    OpenKeySession openKey = keySpaceManagerClient.openKey(keyArgs);
+    // contact OM to allocate a block for key.
+    OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
     ChunkGroupOutputStream groupOutputStream =
         new ChunkGroupOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
             .setScmClient(storageContainerLocationClient)
-            .setKsmClient(keySpaceManagerClient)
+            .setOmClient(ozoneManagerClient)
             .setChunkSize(chunkSize)
             .setRequestID(args.getRequestID())
             .setType(xceiverClientManager.getType())
@@ -437,56 +436,56 @@ public final class DistributedStorageHandler implements 
StorageHandler {
   @Override
   public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
       OzoneException {
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
         .setDataSize(args.getSize())
         .build();
-    KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
-    return ChunkGroupInputStream.getFromKsmKeyInfo(
+    OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
+    return ChunkGroupInputStream.getFromOmKeyInfo(
         keyInfo, xceiverClientManager, storageContainerLocationClient,
         args.getRequestID());
   }
 
   @Override
   public void deleteKey(KeyArgs args) throws IOException, OzoneException {
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
         .build();
-    keySpaceManagerClient.deleteKey(keyArgs);
+    ozoneManagerClient.deleteKey(keyArgs);
   }
 
   @Override
   public void renameKey(KeyArgs args, String toKeyName)
       throws IOException, OzoneException {
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
         .build();
-    keySpaceManagerClient.renameKey(keyArgs, toKeyName);
+    ozoneManagerClient.renameKey(keyArgs, toKeyName);
   }
 
   @Override
   public KeyInfo getKeyInfo(KeyArgs args) throws IOException, OzoneException {
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
         .build();
 
-    KsmKeyInfo ksmKeyInfo = keySpaceManagerClient.lookupKey(keyArgs);
+    OmKeyInfo omKeyInfo = ozoneManagerClient.lookupKey(keyArgs);
     KeyInfo keyInfo = new KeyInfo();
     keyInfo.setVersion(0);
-    keyInfo.setKeyName(ksmKeyInfo.getKeyName());
-    keyInfo.setSize(ksmKeyInfo.getDataSize());
+    keyInfo.setKeyName(omKeyInfo.getKeyName());
+    keyInfo.setSize(omKeyInfo.getDataSize());
     keyInfo.setCreatedOn(
-        HddsClientUtils.formatDateTime(ksmKeyInfo.getCreationTime()));
+        HddsClientUtils.formatDateTime(omKeyInfo.getCreationTime()));
     keyInfo.setModifiedOn(
-        HddsClientUtils.formatDateTime(ksmKeyInfo.getModificationTime()));
+        HddsClientUtils.formatDateTime(omKeyInfo.getModificationTime()));
     return keyInfo;
   }
 
@@ -515,13 +514,13 @@ public final class DistributedStorageHandler implements 
StorageHandler {
                 OzoneConsts.MAX_LISTKEYS_SIZE, maxNumOfKeys));
       }
 
-      List<KsmKeyInfo> keys=
-          keySpaceManagerClient.listKeys(bucketArgs.getVolumeName(),
+      List<OmKeyInfo> keys=
+          ozoneManagerClient.listKeys(bucketArgs.getVolumeName(),
               bucketArgs.getBucketName(),
               args.getPrevKey(), args.getPrefix(), args.getMaxKeys());
 
       // Convert the result for the web layer.
-      for (KsmKeyInfo info : keys) {
+      for (OmKeyInfo info : keys) {
         KeyInfo tempInfo = new KeyInfo();
         tempInfo.setVersion(0);
         tempInfo.setKeyName(info.getKeyName());
@@ -547,7 +546,7 @@ public final class DistributedStorageHandler implements 
StorageHandler {
   @Override
   public void close() {
     IOUtils.cleanupWithLogger(LOG, xceiverClientManager);
-    IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient);
+    IOUtils.cleanupWithLogger(LOG, ozoneManagerClient);
     IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
deleted file mode 100644
index 6c75691..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * BucketManager handles all the bucket level operations.
- */
-public interface BucketManager {
-  /**
-   * Creates a bucket.
-   * @param bucketInfo - KsmBucketInfo for creating bucket.
-   */
-  void createBucket(KsmBucketInfo bucketInfo) throws IOException;
-  /**
-   * Returns Bucket Information.
-   * @param volumeName - Name of the Volume.
-   * @param bucketName - Name of the Bucket.
-   */
-  KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
-      throws IOException;
-
-  /**
-   * Sets bucket property from args.
-   * @param args - BucketArgs.
-   * @throws IOException
-   */
-  void setBucketProperty(KsmBucketArgs args) throws IOException;
-
-  /**
-   * Deletes an existing empty bucket from volume.
-   * @param volumeName - Name of the volume.
-   * @param bucketName - Name of the bucket.
-   * @throws IOException
-   */
-  void deleteBucket(String volumeName, String bucketName) throws IOException;
-
-  /**
-   * Returns a list of buckets represented by {@link KsmBucketInfo}
-   * in the given volume.
-   *
-   * @param volumeName
-   *   Required parameter volume name determines buckets in which volume
-   *   to return.
-   * @param startBucket
-   *   Optional start bucket name parameter indicating where to start
-   *   the bucket listing from, this key is excluded from the result.
-   * @param bucketPrefix
-   *   Optional start key parameter, restricting the response to buckets
-   *   that begin with the specified name.
-   * @param maxNumOfBuckets
-   *   The maximum number of buckets to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of buckets.
-   * @throws IOException
-   */
-  List<KsmBucketInfo> listBuckets(String volumeName,
-      String startBucket, String bucketPrefix, int maxNumOfBuckets)
-      throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
deleted file mode 100644
index 957a6d9..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.BucketInfo;
-import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.util.Time;
-import org.iq80.leveldb.DBException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * KSM bucket manager.
- */
-public class BucketManagerImpl implements BucketManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BucketManagerImpl.class);
-
-  /**
-   * KSMMetadataManager is used for accessing KSM MetadataDB and ReadWriteLock.
-   */
-  private final KSMMetadataManager metadataManager;
-
-  /**
-   * Constructs BucketManager.
-   * @param metadataManager
-   */
-  public BucketManagerImpl(KSMMetadataManager metadataManager){
-    this.metadataManager = metadataManager;
-  }
-
-  /**
-   * MetadataDB is maintained in MetadataManager and shared between
-   * BucketManager and VolumeManager. (and also by KeyManager)
-   *
-   * BucketManager uses MetadataDB to store bucket level information.
-   *
-   * Keys used in BucketManager for storing data into MetadataDB
-   * for BucketInfo:
-   * {volume/bucket} -> bucketInfo
-   *
-   * Work flow of create bucket:
-   *
-   * -> Check if the Volume exists in metadataDB, if not throw
-   * VolumeNotFoundException.
-   * -> Else check if the Bucket exists in metadataDB, if so throw
-   * BucketExistException
-   * -> Else update MetadataDB with VolumeInfo.
-   */
-
-  /**
-   * Creates a bucket.
-   * @param bucketInfo - KsmBucketInfo.
-   */
-  @Override
-  public void createBucket(KsmBucketInfo bucketInfo) throws IOException {
-    Preconditions.checkNotNull(bucketInfo);
-    metadataManager.writeLock().lock();
-    String volumeName = bucketInfo.getVolumeName();
-    String bucketName = bucketInfo.getBucketName();
-    try {
-      byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
-      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-
-      //Check if the volume exists
-      if (metadataManager.get(volumeKey) == null) {
-        LOG.debug("volume: {} not found ", volumeName);
-        throw new KSMException("Volume doesn't exist",
-            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-      //Check if bucket already exists
-      if (metadataManager.get(bucketKey) != null) {
-        LOG.debug("bucket: {} already exists ", bucketName);
-        throw new KSMException("Bucket already exist",
-            KSMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS);
-      }
-
-      KsmBucketInfo ksmBucketInfo = KsmBucketInfo.newBuilder()
-          .setVolumeName(bucketInfo.getVolumeName())
-          .setBucketName(bucketInfo.getBucketName())
-          .setAcls(bucketInfo.getAcls())
-          .setStorageType(bucketInfo.getStorageType())
-          .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
-          .setCreationTime(Time.now())
-          .build();
-      metadataManager.put(bucketKey, 
ksmBucketInfo.getProtobuf().toByteArray());
-
-      LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
-    } catch (IOException | DBException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Bucket creation failed for bucket:{} in volume:{}",
-            bucketName, volumeName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Returns Bucket Information.
-   *
-   * @param volumeName - Name of the Volume.
-   * @param bucketName - Name of the Bucket.
-   */
-  @Override
-  public KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    metadataManager.readLock().lock();
-    try {
-      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-      byte[] value = metadataManager.get(bucketKey);
-      if (value == null) {
-        LOG.debug("bucket: {} not found in volume: {}.", bucketName,
-            volumeName);
-        throw new KSMException("Bucket not found",
-            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
-      }
-      return KsmBucketInfo.getFromProtobuf(BucketInfo.parseFrom(value));
-    } catch (IOException | DBException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Exception while getting bucket info for bucket: {}",
-            bucketName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-
-  /**
-   * Sets bucket property from args.
-   * @param args - BucketArgs.
-   * @throws IOException
-   */
-  @Override
-  public void setBucketProperty(KsmBucketArgs args) throws IOException {
-    Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    try {
-      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-      //Check if volume exists
-      if(metadataManager.get(metadataManager.getVolumeKey(volumeName)) ==
-          null) {
-        LOG.debug("volume: {} not found ", volumeName);
-        throw new KSMException("Volume doesn't exist",
-            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-      byte[] value = metadataManager.get(bucketKey);
-      //Check if bucket exist
-      if(value == null) {
-        LOG.debug("bucket: {} not found ", bucketName);
-        throw new KSMException("Bucket doesn't exist",
-            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
-      }
-      KsmBucketInfo oldBucketInfo = KsmBucketInfo.getFromProtobuf(
-          BucketInfo.parseFrom(value));
-      KsmBucketInfo.Builder bucketInfoBuilder = KsmBucketInfo.newBuilder();
-      bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName())
-          .setBucketName(oldBucketInfo.getBucketName());
-
-      //Check ACLs to update
-      if(args.getAddAcls() != null || args.getRemoveAcls() != null) {
-        bucketInfoBuilder.setAcls(getUpdatedAclList(oldBucketInfo.getAcls(),
-            args.getRemoveAcls(), args.getAddAcls()));
-        LOG.debug("Updating ACLs for bucket: {} in volume: {}",
-            bucketName, volumeName);
-      } else {
-        bucketInfoBuilder.setAcls(oldBucketInfo.getAcls());
-      }
-
-      //Check StorageType to update
-      StorageType storageType = args.getStorageType();
-      if (storageType != null) {
-        bucketInfoBuilder.setStorageType(storageType);
-        LOG.debug("Updating bucket storage type for bucket: {} in volume: {}",
-            bucketName, volumeName);
-      } else {
-        bucketInfoBuilder.setStorageType(oldBucketInfo.getStorageType());
-      }
-
-      //Check Versioning to update
-      Boolean versioning = args.getIsVersionEnabled();
-      if (versioning != null) {
-        bucketInfoBuilder.setIsVersionEnabled(versioning);
-        LOG.debug("Updating bucket versioning for bucket: {} in volume: {}",
-            bucketName, volumeName);
-      } else {
-        bucketInfoBuilder
-            .setIsVersionEnabled(oldBucketInfo.getIsVersionEnabled());
-      }
-      bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime());
-
-      metadataManager.put(bucketKey,
-          bucketInfoBuilder.build().getProtobuf().toByteArray());
-    } catch (IOException | DBException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Setting bucket property failed for bucket:{} in volume:{}",
-            bucketName, volumeName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Updates the existing ACL list with remove and add ACLs that are passed.
-   * Remove is done before Add.
-   *
-   * @param existingAcls - old ACL list.
-   * @param removeAcls - ACLs to be removed.
-   * @param addAcls - ACLs to be added.
-   * @return updated ACL list.
-   */
-  private List<OzoneAcl> getUpdatedAclList(List<OzoneAcl> existingAcls,
-      List<OzoneAcl> removeAcls, List<OzoneAcl> addAcls) {
-    if(removeAcls != null && !removeAcls.isEmpty()) {
-      existingAcls.removeAll(removeAcls);
-    }
-    if(addAcls != null && !addAcls.isEmpty()) {
-      addAcls.stream().filter(acl -> !existingAcls.contains(acl)).forEach(
-          existingAcls::add);
-    }
-    return existingAcls;
-  }
-
-  /**
-   * Deletes an existing empty bucket from volume.
-   * @param volumeName - Name of the volume.
-   * @param bucketName - Name of the bucket.
-   * @throws IOException
-   */
-  public void deleteBucket(String volumeName, String bucketName)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    metadataManager.writeLock().lock();
-    try {
-      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-      //Check if volume exists
-      if (metadataManager.get(metadataManager.getVolumeKey(volumeName))
-          == null) {
-        LOG.debug("volume: {} not found ", volumeName);
-        throw new KSMException("Volume doesn't exist",
-            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-      //Check if bucket exist
-      if (metadataManager.get(bucketKey) == null) {
-        LOG.debug("bucket: {} not found ", bucketName);
-        throw new KSMException("Bucket doesn't exist",
-            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
-      }
-      //Check if bucket is empty
-      if (!metadataManager.isBucketEmpty(volumeName, bucketName)) {
-        LOG.debug("bucket: {} is not empty ", bucketName);
-        throw new KSMException("Bucket is not empty",
-            KSMException.ResultCodes.FAILED_BUCKET_NOT_EMPTY);
-      }
-      metadataManager.delete(bucketKey);
-    } catch (IOException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Delete bucket failed for bucket:{} in volume:{}", 
bucketName,
-            volumeName, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<KsmBucketInfo> listBuckets(String volumeName,
-      String startBucket, String bucketPrefix, int maxNumOfBuckets)
-      throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    metadataManager.readLock().lock();
-    try {
-      return metadataManager.listBuckets(
-          volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java
deleted file mode 100644
index bf22332..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.server.ServiceRuntimeInfo;
-
-/**
- * This is the JMX management interface for ksm information.
- */
-@InterfaceAudience.Private
-public interface KSMMXBean extends ServiceRuntimeInfo {
-
-  String getRpcPort();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
deleted file mode 100644
index f5a2d5b..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-
-/**
- * KSM metadata manager interface.
- */
-public interface KSMMetadataManager {
-  /**
-   * Start metadata manager.
-   */
-  void start();
-
-  /**
-   * Stop metadata manager.
-   */
-  void stop() throws IOException;
-
-  /**
-   * Get metadata store.
-   * @return metadata store.
-   */
-  @VisibleForTesting
-  MetadataStore getStore();
-
-  /**
-   * Returns the read lock used on Metadata DB.
-   * @return readLock
-   */
-  Lock readLock();
-
-  /**
-   * Returns the write lock used on Metadata DB.
-   * @return writeLock
-   */
-  Lock writeLock();
-
-  /**
-   * Returns the value associated with this key.
-   * @param key - key
-   * @return value
-   */
-  byte[] get(byte[] key) throws IOException;
-
-  /**
-   * Puts a Key into Metadata DB.
-   * @param key   - key
-   * @param value - value
-   */
-  void put(byte[] key, byte[] value) throws IOException;
-
-  /**
-   * Deletes a Key from Metadata DB.
-   * @param key   - key
-   */
-  void delete(byte[] key) throws IOException;
-
-  /**
-   * Atomic write a batch of operations.
-   * @param batch
-   * @throws IOException
-   */
-  void writeBatch(BatchOperation batch) throws IOException;
-
-  /**
-   * Given a volume return the corresponding DB key.
-   * @param volume - Volume name
-   */
-  byte[] getVolumeKey(String volume);
-
-  /**
-   * Given a user return the corresponding DB key.
-   * @param user - User name
-   */
-  byte[] getUserKey(String user);
-
-  /**
-   * Given a volume and bucket, return the corresponding DB key.
-   * @param volume - User name
-   * @param bucket - Bucket name
-   */
-  byte[] getBucketKey(String volume, String bucket);
-
-  /**
-   * Given a volume, bucket and a key, return the corresponding DB key.
-   * @param volume - volume name
-   * @param bucket - bucket name
-   * @param key - key name
-   * @return bytes of DB key.
-   */
-  byte[] getDBKeyBytes(String volume, String bucket, String key);
-
-  /**
-   * Returns the DB key name of a deleted key in KSM metadata store.
-   * The name for a deleted key has prefix #deleting# followed by
-   * the actual key name.
-   * @param keyName - key name
-   * @return bytes of DB key.
-   */
-  byte[] getDeletedKeyName(byte[] keyName);
-
-  /**
-   * Returns the DB key name of a open key in KSM metadata store.
-   * Should be #open# prefix followed by actual key name.
-   * @param keyName - key name
-   * @param id - the id for this open
-   * @return bytes of DB key.
-   */
-  byte[] getOpenKeyNameBytes(String keyName, int id);
-
-  /**
-   * Returns the full name of a key given volume name, bucket name and key 
name.
-   * Generally done by padding certain delimiters.
-   *
-   * @param volumeName - volume name
-   * @param bucketName - bucket name
-   * @param keyName - key name
-   * @return the full key name.
-   */
-  String getKeyWithDBPrefix(String volumeName, String bucketName,
-      String keyName);
-
-  /**
-   * Given a volume, check if it is empty,
-   * i.e there are no buckets inside it.
-   * @param volume - Volume name
-   */
-  boolean isVolumeEmpty(String volume) throws IOException;
-
-  /**
-   * Given a volume/bucket, check if it is empty,
-   * i.e there are no keys inside it.
-   * @param volume - Volume name
-   * @param  bucket - Bucket name
-   * @return true if the bucket is empty
-   */
-  boolean isBucketEmpty(String volume, String bucket) throws IOException;
-
-  /**
-   * Returns a list of buckets represented by {@link KsmBucketInfo}
-   * in the given volume.
-   *
-   * @param volumeName
-   *   the name of the volume. This argument is required,
-   *   this method returns buckets in this given volume.
-   * @param startBucket
-   *   the start bucket name. Only the buckets whose name is
-   *   after this value will be included in the result.
-   *   This key is excluded from the result.
-   * @param bucketPrefix
-   *   bucket name prefix. Only the buckets whose name has
-   *   this prefix will be included in the result.
-   * @param maxNumOfBuckets
-   *   the maximum number of buckets to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of buckets.
-   * @throws IOException
-   */
-  List<KsmBucketInfo> listBuckets(String volumeName, String startBucket,
-      String bucketPrefix, int maxNumOfBuckets) throws IOException;
-
-  /**
-   * Returns a list of keys represented by {@link KsmKeyInfo}
-   * in the given bucket.
-   *
-   * @param volumeName
-   *   the name of the volume.
-   * @param bucketName
-   *   the name of the bucket.
-   * @param startKey
-   *   the start key name, only the keys whose name is
-   *   after this value will be included in the result.
-   *   This key is excluded from the result.
-   * @param keyPrefix
-   *   key name prefix, only the keys whose name has
-   *   this prefix will be included in the result.
-   * @param maxKeys
-   *   the maximum number of keys to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of keys.
-   * @throws IOException
-   */
-  List<KsmKeyInfo> listKeys(String volumeName,
-      String bucketName, String startKey, String keyPrefix, int maxKeys)
-      throws IOException;
-
-  /**
-   * Returns a list of volumes owned by a given user; if user is null,
-   * returns all volumes.
-   *
-   * @param userName
-   *   volume owner
-   * @param prefix
-   *   the volume prefix used to filter the listing result.
-   * @param startKey
-   *   the start volume name determines where to start listing from,
-   *   this key is excluded from the result.
-   * @param maxKeys
-   *   the maximum number of volumes to return.
-   * @return a list of {@link KsmVolumeArgs}
-   * @throws IOException
-   */
-  List<KsmVolumeArgs> listVolumes(String userName, String prefix,
-      String startKey, int maxKeys) throws IOException;
-
-  /**
-   * Returns a list of pending deletion key info that ups to the given count.
-   * Each entry is a {@link BlockGroup}, which contains the info about the
-   * key name and all its associated block IDs. A pending deletion key is
-   * stored with #deleting# prefix in KSM DB.
-   *
-   * @param count max number of keys to return.
-   * @return a list of {@link BlockGroup} represent keys and blocks.
-   * @throws IOException
-   */
-  List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
-
-  /**
-   * Returns a list of all still open key info. Which contains the info about
-   * the key name and all its associated block IDs. A pending open key has
-   * prefix #open# in KSM DB.
-   *
-   * @return a list of {@link BlockGroup} representing keys and blocks.
-   * @throws IOException
-   */
-  List<BlockGroup> getExpiredOpenKeys() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
deleted file mode 100644
index 6664a32..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
-import 
org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
-
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
-import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR;
-import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
-
-/**
- * KSM metadata manager interface.
- */
-public class KSMMetadataManagerImpl implements KSMMetadataManager {
-
-  private final MetadataStore store;
-  private final ReadWriteLock lock;
-  private final long openKeyExpireThresholdMS;
-
-  public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
-    File metaDir = getOzoneMetaDirPath(conf);
-    final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
-        OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
-    File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
-    this.store = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setDbFile(ksmDBFile)
-        .setCacheSize(cacheSize * OzoneConsts.MB)
-        .build();
-    this.lock = new ReentrantReadWriteLock();
-    this.openKeyExpireThresholdMS = 1000 * conf.getInt(
-        OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
-        OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
-  }
-
-  /**
-   * Start metadata manager.
-   */
-  @Override
-  public void start() {
-
-  }
-
-  /**
-   * Stop metadata manager.
-   */
-  @Override
-  public void stop() throws IOException {
-    if (store != null) {
-      store.close();
-    }
-  }
-
-  /**
-   * Get metadata store.
-   * @return store - metadata store.
-   */
-  @VisibleForTesting
-  @Override
-  public MetadataStore getStore() {
-    return store;
-  }
-
-  /**
-   * Given a volume return the corresponding DB key.
-   * @param volume - Volume name
-   */
-  public byte[] getVolumeKey(String volume) {
-    String dbVolumeName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
-    return DFSUtil.string2Bytes(dbVolumeName);
-  }
-
-  /**
-   * Given a user return the corresponding DB key.
-   * @param user - User name
-   */
-  public byte[] getUserKey(String user) {
-    String dbUserName = OzoneConsts.KSM_USER_PREFIX + user;
-    return DFSUtil.string2Bytes(dbUserName);
-  }
-
-  /**
-   * Given a volume and bucket, return the corresponding DB key.
-   * @param volume - User name
-   * @param bucket - Bucket name
-   */
-  public byte[] getBucketKey(String volume, String bucket) {
-    String bucketKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
-        + OzoneConsts.KSM_BUCKET_PREFIX + bucket;
-    return DFSUtil.string2Bytes(bucketKeyString);
-  }
-
-  /**
-   * @param volume
-   * @param bucket
-   * @return
-   */
-  private String getBucketWithDBPrefix(String volume, String bucket) {
-    StringBuffer sb = new StringBuffer();
-    sb.append(OzoneConsts.KSM_VOLUME_PREFIX)
-        .append(volume)
-        .append(OzoneConsts.KSM_BUCKET_PREFIX);
-    if (!Strings.isNullOrEmpty(bucket)) {
-      sb.append(bucket);
-    }
-    return sb.toString();
-  }
-
-  @Override
-  public String getKeyWithDBPrefix(String volume, String bucket, String key) {
-    String keyVB = OzoneConsts.KSM_KEY_PREFIX + volume
-        + OzoneConsts.KSM_KEY_PREFIX + bucket
-        + OzoneConsts.KSM_KEY_PREFIX;
-    return Strings.isNullOrEmpty(key) ? keyVB : keyVB + key;
-  }
-
-  @Override
-  public byte[] getDBKeyBytes(String volume, String bucket, String key) {
-    return DFSUtil.string2Bytes(getKeyWithDBPrefix(volume, bucket, key));
-  }
-
-  @Override
-  public byte[] getDeletedKeyName(byte[] keyName) {
-    return DFSUtil.string2Bytes(
-        DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
-  }
-
-  @Override
-  public byte[] getOpenKeyNameBytes(String keyName, int id) {
-    return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id +
-        OPEN_KEY_ID_DELIMINATOR + keyName);
-  }
-
-  /**
-   * Returns the read lock used on Metadata DB.
-   * @return readLock
-   */
-  @Override
-  public Lock readLock() {
-    return lock.readLock();
-  }
-
-  /**
-   * Returns the write lock used on Metadata DB.
-   * @return writeLock
-   */
-  @Override
-  public Lock writeLock() {
-    return lock.writeLock();
-  }
-
-  /**
-   * Returns the value associated with this key.
-   * @param key - key
-   * @return value
-   */
-  @Override
-  public byte[] get(byte[] key) throws IOException {
-    return store.get(key);
-  }
-
-  /**
-   * Puts a Key into Metadata DB.
-   * @param key   - key
-   * @param value - value
-   */
-  @Override
-  public void put(byte[] key, byte[] value) throws IOException {
-    store.put(key, value);
-  }
-
-  /**
-   * Deletes a Key from Metadata DB.
-   * @param key   - key
-   */
-  public void delete(byte[] key) throws IOException {
-    store.delete(key);
-  }
-
-  @Override
-  public void writeBatch(BatchOperation batch) throws IOException {
-    this.store.writeBatch(batch);
-  }
-
-  /**
-   * Given a volume, check if it is empty, i.e there are no buckets inside it.
-   * @param volume - Volume name
-   * @return true if the volume is empty
-   */
-  public boolean isVolumeEmpty(String volume) throws IOException {
-    String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
-        + OzoneConsts.KSM_BUCKET_PREFIX;
-    byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
-    ImmutablePair<byte[], byte[]> volumeRoot =
-        store.peekAround(0, dbVolumeRootKey);
-    if (volumeRoot != null) {
-      return !DFSUtil.bytes2String(volumeRoot.getKey())
-          .startsWith(dbVolumeRootName);
-    }
-    return true;
-  }
-
-  /**
-   * Given a volume/bucket, check if it is empty,
-   * i.e there are no keys inside it.
-   * @param volume - Volume name
-   * @param bucket - Bucket name
-   * @return true if the bucket is empty
-   */
-  public boolean isBucketEmpty(String volume, String bucket)
-      throws IOException {
-    String keyRootName = getKeyWithDBPrefix(volume, bucket, null);
-    byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
-    ImmutablePair<byte[], byte[]> firstKey = store.peekAround(0, keyRoot);
-    if (firstKey != null) {
-      return !DFSUtil.bytes2String(firstKey.getKey())
-          .startsWith(keyRootName);
-    }
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<KsmBucketInfo> listBuckets(final String volumeName,
-      final String startBucket, final String bucketPrefix,
-      final int maxNumOfBuckets) throws IOException {
-    List<KsmBucketInfo> result = new ArrayList<>();
-    if (Strings.isNullOrEmpty(volumeName)) {
-      throw new KSMException("Volume name is required.",
-          ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-
-    byte[] volumeNameBytes = getVolumeKey(volumeName);
-    if (store.get(volumeNameBytes) == null) {
-      throw new KSMException("Volume " + volumeName + " not found.",
-          ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-
-
-    // A bucket starts with /#volume/#bucket_prefix
-    MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
-      if (currentKey != null) {
-        String bucketNamePrefix =
-                getBucketWithDBPrefix(volumeName, bucketPrefix);
-        String bucket = DFSUtil.bytes2String(currentKey);
-        return bucket.startsWith(bucketNamePrefix);
-      }
-      return false;
-    };
-
-    List<Map.Entry<byte[], byte[]>> rangeResult;
-    if (!Strings.isNullOrEmpty(startBucket)) {
-      // Since we are excluding start key from the result,
-      // the maxNumOfBuckets is incremented.
-      rangeResult = store.getSequentialRangeKVs(
-          getBucketKey(volumeName, startBucket),
-          maxNumOfBuckets + 1, filter);
-      if (!rangeResult.isEmpty()) {
-        //Remove start key from result.
-        rangeResult.remove(0);
-      }
-    } else {
-      rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter);
-    }
-
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmBucketInfo info = KsmBucketInfo.getFromProtobuf(
-          BucketInfo.parseFrom(entry.getValue()));
-      result.add(info);
-    }
-    return result;
-  }
-
-  @Override
-  public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
-      String startKey, String keyPrefix, int maxKeys) throws IOException {
-    List<KsmKeyInfo> result = new ArrayList<>();
-    if (Strings.isNullOrEmpty(volumeName)) {
-      throw new KSMException("Volume name is required.",
-          ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-
-    if (Strings.isNullOrEmpty(bucketName)) {
-      throw new KSMException("Bucket name is required.",
-          ResultCodes.FAILED_BUCKET_NOT_FOUND);
-    }
-
-    byte[] bucketNameBytes = getBucketKey(volumeName, bucketName);
-    if (store.get(bucketNameBytes) == null) {
-      throw new KSMException("Bucket " + bucketName + " not found.",
-          ResultCodes.FAILED_BUCKET_NOT_FOUND);
-    }
-
-    MetadataKeyFilter filter = new KeyPrefixFilter()
-        .addFilter(getKeyWithDBPrefix(volumeName, bucketName, keyPrefix));
-
-    List<Map.Entry<byte[], byte[]>> rangeResult;
-    if (!Strings.isNullOrEmpty(startKey)) {
-      //Since we are excluding start key from the result,
-      // the maxNumOfBuckets is incremented.
-      rangeResult = store.getSequentialRangeKVs(
-          getDBKeyBytes(volumeName, bucketName, startKey),
-          maxKeys + 1, filter);
-      if (!rangeResult.isEmpty()) {
-        //Remove start key from result.
-        rangeResult.remove(0);
-      }
-    } else {
-      rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter);
-    }
-
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmKeyInfo info = KsmKeyInfo.getFromProtobuf(
-          KeyInfo.parseFrom(entry.getValue()));
-      result.add(info);
-    }
-    return result;
-  }
-
-  @Override
-  public List<KsmVolumeArgs> listVolumes(String userName,
-      String prefix, String startKey, int maxKeys) throws IOException {
-    List<KsmVolumeArgs> result = Lists.newArrayList();
-    VolumeList volumes;
-    if (Strings.isNullOrEmpty(userName)) {
-      volumes = getAllVolumes();
-    } else {
-      volumes = getVolumesByUser(userName);
-    }
-
-    if (volumes == null || volumes.getVolumeNamesCount() == 0) {
-      return result;
-    }
-
-    boolean startKeyFound = Strings.isNullOrEmpty(startKey);
-    for (String volumeName : volumes.getVolumeNamesList()) {
-      if (!Strings.isNullOrEmpty(prefix)) {
-        if (!volumeName.startsWith(prefix)) {
-          continue;
-        }
-      }
-
-      if (!startKeyFound && volumeName.equals(startKey)) {
-        startKeyFound = true;
-        continue;
-      }
-      if (startKeyFound && result.size() < maxKeys) {
-        byte[] volumeInfo = store.get(this.getVolumeKey(volumeName));
-        if (volumeInfo == null) {
-          // Could not get volume info by given volume name,
-          // since the volume name is loaded from db,
-          // this probably means ksm db is corrupted or some entries are
-          // accidentally removed.
-          throw new KSMException("Volume info not found for " + volumeName,
-              ResultCodes.FAILED_VOLUME_NOT_FOUND);
-        }
-        VolumeInfo info = VolumeInfo.parseFrom(volumeInfo);
-        KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(info);
-        result.add(volumeArgs);
-      }
-    }
-
-    return result;
-  }
-
-  private VolumeList getVolumesByUser(String userName)
-      throws KSMException {
-    return getVolumesByUser(getUserKey(userName));
-  }
-
-  private VolumeList getVolumesByUser(byte[] userNameKey)
-      throws KSMException {
-    VolumeList volumes = null;
-    try {
-      byte[] volumesInBytes = store.get(userNameKey);
-      if (volumesInBytes == null) {
-        // No volume found for this user, return an empty list
-        return VolumeList.newBuilder().build();
-      }
-      volumes = VolumeList.parseFrom(volumesInBytes);
-    } catch (IOException e) {
-      throw new KSMException("Unable to get volumes info by the given user, "
-          + "metadata might be corrupted", e,
-          ResultCodes.FAILED_METADATA_ERROR);
-    }
-    return volumes;
-  }
-
-  private VolumeList getAllVolumes() throws IOException {
-    // Scan all users in database
-    KeyPrefixFilter filter =
-        new KeyPrefixFilter().addFilter(OzoneConsts.KSM_USER_PREFIX);
-    // We are not expecting a huge number of users per cluster,
-    // it should be fine to scan all users in db and return us a
-    // list of volume names in string per user.
-    List<Map.Entry<byte[], byte[]>> rangeKVs = store
-        .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter);
-
-    VolumeList.Builder builder = VolumeList.newBuilder();
-    for (Map.Entry<byte[], byte[]> entry : rangeKVs) {
-      VolumeList volumes = this.getVolumesByUser(entry.getKey());
-      builder.addAllVolumeNames(volumes.getVolumeNamesList());
-    }
-
-    return builder.build();
-  }
-
-  @Override
-  public List<BlockGroup> getPendingDeletionKeys(final int count)
-      throws IOException {
-    List<BlockGroup> keyBlocksList = Lists.newArrayList();
-    List<Map.Entry<byte[], byte[]>> rangeResult =
-        store.getRangeKVs(null, count,
-            MetadataKeyFilters.getDeletingKeyFilter());
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmKeyInfo info =
-          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
-      // Get block keys as a list.
-      KsmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
-      if (latest == null) {
-        return Collections.emptyList();
-      }
-      List<BlockID> item = latest.getLocationList().stream()
-          .map(b->new BlockID(b.getContainerID(), b.getLocalID()))
-          .collect(Collectors.toList());
-      BlockGroup keyBlocks = BlockGroup.newBuilder()
-          .setKeyName(DFSUtil.bytes2String(entry.getKey()))
-          .addAllBlockIDs(item)
-          .build();
-      keyBlocksList.add(keyBlocks);
-    }
-    return keyBlocksList;
-  }
-
-  @Override
-  public List<BlockGroup> getExpiredOpenKeys() throws IOException {
-    List<BlockGroup> keyBlocksList = Lists.newArrayList();
-    long now = Time.now();
-    final MetadataKeyFilter openKeyFilter =
-        new KeyPrefixFilter().addFilter(OPEN_KEY_PREFIX);
-    List<Map.Entry<byte[], byte[]>> rangeResult =
-        store.getSequentialRangeKVs(null, Integer.MAX_VALUE,
-            openKeyFilter);
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmKeyInfo info =
-          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
-      long lastModify = info.getModificationTime();
-      if (now - lastModify < this.openKeyExpireThresholdMS) {
-        // consider as may still be active, not hanging.
-        continue;
-      }
-      // Get block keys as a list.
-      List<BlockID> item = info.getLatestVersionLocations()
-          .getBlocksLatestVersionOnly().stream()
-          .map(b->new BlockID(b.getContainerID(), b.getLocalID()))
-          .collect(Collectors.toList());
-      BlockGroup keyBlocks = BlockGroup.newBuilder()
-          .setKeyName(DFSUtil.bytes2String(entry.getKey()))
-          .addAllBlockIDs(item)
-          .build();
-      keyBlocksList.add(keyBlocks);
-    }
-    return keyBlocksList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
deleted file mode 100644
index 8ee67c3..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
+++ /dev/null
@@ -1,459 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-
-/**
- * This class is for maintaining KeySpaceManager statistics.
- */
-@InterfaceAudience.Private
-@Metrics(about="Key Space Manager Metrics", context="dfs")
-public class KSMMetrics {
-  private static final String SOURCE_NAME =
-      KSMMetrics.class.getSimpleName();
-
-  // KSM request type op metrics
-  private @Metric MutableCounterLong numVolumeOps;
-  private @Metric MutableCounterLong numBucketOps;
-  private @Metric MutableCounterLong numKeyOps;
-
-  // KSM op metrics
-  private @Metric MutableCounterLong numVolumeCreates;
-  private @Metric MutableCounterLong numVolumeUpdates;
-  private @Metric MutableCounterLong numVolumeInfos;
-  private @Metric MutableCounterLong numVolumeCheckAccesses;
-  private @Metric MutableCounterLong numBucketCreates;
-  private @Metric MutableCounterLong numVolumeDeletes;
-  private @Metric MutableCounterLong numBucketInfos;
-  private @Metric MutableCounterLong numBucketUpdates;
-  private @Metric MutableCounterLong numBucketDeletes;
-  private @Metric MutableCounterLong numKeyAllocate;
-  private @Metric MutableCounterLong numKeyLookup;
-  private @Metric MutableCounterLong numKeyRenames;
-  private @Metric MutableCounterLong numKeyDeletes;
-  private @Metric MutableCounterLong numBucketLists;
-  private @Metric MutableCounterLong numKeyLists;
-  private @Metric MutableCounterLong numVolumeLists;
-  private @Metric MutableCounterLong numKeyCommits;
-  private @Metric MutableCounterLong numAllocateBlockCalls;
-  private @Metric MutableCounterLong numGetServiceLists;
-
-  // Failure Metrics
-  private @Metric MutableCounterLong numVolumeCreateFails;
-  private @Metric MutableCounterLong numVolumeUpdateFails;
-  private @Metric MutableCounterLong numVolumeInfoFails;
-  private @Metric MutableCounterLong numVolumeDeleteFails;
-  private @Metric MutableCounterLong numBucketCreateFails;
-  private @Metric MutableCounterLong numVolumeCheckAccessFails;
-  private @Metric MutableCounterLong numBucketInfoFails;
-  private @Metric MutableCounterLong numBucketUpdateFails;
-  private @Metric MutableCounterLong numBucketDeleteFails;
-  private @Metric MutableCounterLong numKeyAllocateFails;
-  private @Metric MutableCounterLong numKeyLookupFails;
-  private @Metric MutableCounterLong numKeyRenameFails;
-  private @Metric MutableCounterLong numKeyDeleteFails;
-  private @Metric MutableCounterLong numBucketListFails;
-  private @Metric MutableCounterLong numKeyListFails;
-  private @Metric MutableCounterLong numVolumeListFails;
-  private @Metric MutableCounterLong numKeyCommitFails;
-  private @Metric MutableCounterLong numBlockAllocateCallFails;
-  private @Metric MutableCounterLong numGetServiceListFails;
-
-  public KSMMetrics() {
-  }
-
-  public static KSMMetrics create() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    return ms.register(SOURCE_NAME,
-        "Key Space Manager Metrics",
-        new KSMMetrics());
-  }
-
-  public void incNumVolumeCreates() {
-    numVolumeOps.incr();
-    numVolumeCreates.incr();
-  }
-
-  public void incNumVolumeUpdates() {
-    numVolumeOps.incr();
-    numVolumeUpdates.incr();
-  }
-
-  public void incNumVolumeInfos() {
-    numVolumeOps.incr();
-    numVolumeInfos.incr();
-  }
-
-  public void incNumVolumeDeletes() {
-    numVolumeOps.incr();
-    numVolumeDeletes.incr();
-  }
-
-  public void incNumVolumeCheckAccesses() {
-    numVolumeOps.incr();
-    numVolumeCheckAccesses.incr();
-  }
-
-  public void incNumBucketCreates() {
-    numBucketOps.incr();
-    numBucketCreates.incr();
-  }
-
-  public void incNumBucketInfos() {
-    numBucketOps.incr();
-    numBucketInfos.incr();
-  }
-
-  public void incNumBucketUpdates() {
-    numBucketOps.incr();
-    numBucketUpdates.incr();
-  }
-
-  public void incNumBucketDeletes() {
-    numBucketOps.incr();
-    numBucketDeletes.incr();
-  }
-
-  public void incNumBucketLists() {
-    numBucketOps.incr();
-    numBucketLists.incr();
-  }
-
-  public void incNumKeyLists() {
-    numKeyOps.incr();
-    numKeyLists.incr();
-  }
-
-  public void incNumVolumeLists() {
-    numVolumeOps.incr();
-    numVolumeLists.incr();
-  }
-
-  public void incNumGetServiceLists() {
-    numGetServiceLists.incr();
-  }
-
-  public void incNumVolumeCreateFails() {
-    numVolumeCreateFails.incr();
-  }
-
-  public void incNumVolumeUpdateFails() {
-    numVolumeUpdateFails.incr();
-  }
-
-  public void incNumVolumeInfoFails() {
-    numVolumeInfoFails.incr();
-  }
-
-  public void incNumVolumeDeleteFails() {
-    numVolumeDeleteFails.incr();
-  }
-
-  public void incNumVolumeCheckAccessFails() {
-    numVolumeCheckAccessFails.incr();
-  }
-
-  public void incNumBucketCreateFails() {
-    numBucketCreateFails.incr();
-  }
-
-  public void incNumBucketInfoFails() {
-    numBucketInfoFails.incr();
-  }
-
-  public void incNumBucketUpdateFails() {
-    numBucketUpdateFails.incr();
-  }
-
-  public void incNumBucketDeleteFails() {
-    numBucketDeleteFails.incr();
-  }
-
-  public void incNumKeyAllocates() {
-    numKeyOps.incr();
-    numKeyAllocate.incr();
-  }
-
-  public void incNumKeyAllocateFails() {
-    numKeyAllocateFails.incr();
-  }
-
-  public void incNumKeyLookups() {
-    numKeyOps.incr();
-    numKeyLookup.incr();
-  }
-
-  public void incNumKeyLookupFails() {
-    numKeyLookupFails.incr();
-  }
-
-  public void incNumKeyRenames() {
-    numKeyOps.incr();
-    numKeyRenames.incr();
-  }
-
-  public void incNumKeyRenameFails() {
-    numKeyOps.incr();
-    numKeyRenameFails.incr();
-  }
-
-  public void incNumKeyDeleteFails() {
-    numKeyDeleteFails.incr();
-  }
-
-  public void incNumKeyDeletes() {
-    numKeyOps.incr();
-    numKeyDeletes.incr();
-  }
-
-  public void incNumKeyCommits() {
-    numKeyOps.incr();
-    numKeyCommits.incr();
-  }
-
-  public void incNumKeyCommitFails() {
-    numKeyCommitFails.incr();
-  }
-
-  public void incNumBlockAllocateCalls() {
-    numAllocateBlockCalls.incr();
-  }
-
-  public void incNumBlockAllocateCallFails() {
-    numBlockAllocateCallFails.incr();
-  }
-
-  public void incNumBucketListFails() {
-    numBucketListFails.incr();
-  }
-
-  public void incNumKeyListFails() {
-    numKeyListFails.incr();
-  }
-
-  public void incNumVolumeListFails() {
-    numVolumeListFails.incr();
-  }
-
-  public void incNumGetServiceListFails() {
-    numGetServiceListFails.incr();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeCreates() {
-    return numVolumeCreates.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeUpdates() {
-    return numVolumeUpdates.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeInfos() {
-    return numVolumeInfos.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeDeletes() {
-    return numVolumeDeletes.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeCheckAccesses() {
-    return numVolumeCheckAccesses.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketCreates() {
-    return numBucketCreates.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketInfos() {
-    return numBucketInfos.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketUpdates() {
-    return numBucketUpdates.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketDeletes() {
-    return numBucketDeletes.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketLists() {
-    return numBucketLists.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeLists() {
-    return numVolumeLists.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyLists() {
-    return numKeyLists.value();
-  }
-
-  @VisibleForTesting
-  public long getNumGetServiceLists() {
-    return numGetServiceLists.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeCreateFails() {
-    return numVolumeCreateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeUpdateFails() {
-    return numVolumeUpdateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeInfoFails() {
-    return numVolumeInfoFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeDeleteFails() {
-    return numVolumeDeleteFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeCheckAccessFails() {
-    return numVolumeCheckAccessFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketCreateFails() {
-    return numBucketCreateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketInfoFails() {
-    return numBucketInfoFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketUpdateFails() {
-    return numBucketUpdateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketDeleteFails() {
-    return numBucketDeleteFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyAllocates() {
-    return numKeyAllocate.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyAllocateFails() {
-    return numKeyAllocateFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyLookups() {
-    return numKeyLookup.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyLookupFails() {
-    return numKeyLookupFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyRenames() {
-    return numKeyRenames.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyRenameFails() {
-    return numKeyRenameFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyDeletes() {
-    return numKeyDeletes.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyDeletesFails() {
-    return numKeyDeleteFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBucketListFails() {
-    return numBucketListFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyListFails() {
-    return numKeyListFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumVolumeListFails() {
-    return numVolumeListFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyCommits() {
-    return numKeyCommits.value();
-  }
-
-  @VisibleForTesting
-  public long getNumKeyCommitFails() {
-    return numKeyCommitFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBlockAllocates() {
-    return numAllocateBlockCalls.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBlockAllocateFails() {
-    return numBlockAllocateCallFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumGetServiceListFails() {
-    return numGetServiceListFails.value();
-  }
-
-  public void unRegister() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    ms.unregisterSource(SOURCE_NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java
deleted file mode 100644
index 015bed6..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.common.Storage;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
-
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
-
-/**
- * KSMStorage is responsible for management of the StorageDirectories used by
- * the KSM.
- */
-public class KSMStorage extends Storage {
-
-  public static final String STORAGE_DIR = "ksm";
-  public static final String KSM_ID = "ksmUuid";
-
-  /**
-   * Construct KSMStorage.
-   * @throws IOException if any directories are inaccessible.
-   */
-  public KSMStorage(OzoneConfiguration conf) throws IOException {
-    super(NodeType.KSM, getOzoneMetaDirPath(conf), STORAGE_DIR);
-  }
-
-  public void setScmId(String scmId) throws IOException {
-    if (getState() == StorageState.INITIALIZED) {
-      throw new IOException("KSM is already initialized.");
-    } else {
-      getStorageInfo().setProperty(SCM_ID, scmId);
-    }
-  }
-
-  public void setKsmId(String ksmId) throws IOException {
-    if (getState() == StorageState.INITIALIZED) {
-      throw new IOException("KSM is already initialized.");
-    } else {
-      getStorageInfo().setProperty(KSM_ID, ksmId);
-    }
-  }
-
-  /**
-   * Retrieves the SCM ID from the version file.
-   * @return SCM_ID
-   */
-  public String getScmId() {
-    return getStorageInfo().getProperty(SCM_ID);
-  }
-
-  /**
-   * Retrieves the KSM ID from the version file.
-   * @return KSM_ID
-   */
-  public String getKsmId() {
-    return getStorageInfo().getProperty(KSM_ID);
-  }
-
-  @Override
-  protected Properties getNodeProperties() {
-    String ksmId = getKsmId();
-    if (ksmId == null) {
-      ksmId = UUID.randomUUID().toString();
-    }
-    Properties ksmProperties = new Properties();
-    ksmProperties.setProperty(KSM_ID, ksmId);
-    return ksmProperties;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
deleted file mode 100644
index e51ab28..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BackgroundService;
-import org.apache.hadoop.utils.BackgroundTask;
-import org.apache.hadoop.utils.BackgroundTaskQueue;
-import org.apache.hadoop.utils.BackgroundTaskResult;
-import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
-import static 
org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
-
-/**
- * This is the background service to delete keys.
- * Scan the metadata of ksm periodically to get
- * the keys with prefix "#deleting" and ask scm to
- * delete metadata accordingly, if scm returns
- * success for keys, then clean up those keys.
- */
-public class KeyDeletingService extends BackgroundService {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(KeyDeletingService.class);
-
-  // The thread pool size for key deleting service.
-  private final static int KEY_DELETING_CORE_POOL_SIZE = 2;
-
-  private final ScmBlockLocationProtocol scmClient;
-  private final KeyManager manager;
-  private final int keyLimitPerTask;
-
-  public KeyDeletingService(ScmBlockLocationProtocol scmClient,
-      KeyManager manager, long serviceInterval,
-      long serviceTimeout, Configuration conf) {
-    super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,
-        KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
-    this.scmClient = scmClient;
-    this.manager = manager;
-    this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
-        OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
-  }
-
-  @Override
-  public BackgroundTaskQueue getTasks() {
-    BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new KeyDeletingTask());
-    return queue;
-  }
-
-  /**
-   * A key deleting task scans KSM DB and looking for a certain number
-   * of pending-deletion keys, sends these keys along with their associated
-   * blocks to SCM for deletion. Once SCM confirms keys are deleted (once
-   * SCM persisted the blocks info in its deletedBlockLog), it removes
-   * these keys from the DB.
-   */
-  private class KeyDeletingTask implements
-      BackgroundTask<BackgroundTaskResult> {
-
-    @Override
-    public int getPriority() {
-      return 0;
-    }
-
-    @Override
-    public BackgroundTaskResult call() throws Exception {
-      try {
-        long startTime = Time.monotonicNow();
-        List<BlockGroup> keyBlocksList = manager
-            .getPendingDeletionKeys(keyLimitPerTask);
-        if (keyBlocksList.size() > 0) {
-          LOG.info("Found {} to-delete keys in KSM", keyBlocksList.size());
-          List<DeleteBlockGroupResult> results =
-              scmClient.deleteKeyBlocks(keyBlocksList);
-          for (DeleteBlockGroupResult result : results) {
-            if (result.isSuccess()) {
-              try {
-                // Purge key from KSM DB.
-                manager.deletePendingDeletionKey(result.getObjectKey());
-                LOG.debug("Key {} deleted from KSM DB", result.getObjectKey());
-              } catch (IOException e) {
-                // if a pending deletion key is failed to delete,
-                // print a warning here and retain it in this state,
-                // so that it can be attempt to delete next time.
-                LOG.warn("Failed to delete pending-deletion key {}",
-                    result.getObjectKey(), e);
-              }
-            } else {
-              // Key deletion failed, retry in next interval.
-              LOG.warn("Key {} deletion failed because some of the blocks"
-                  + " were failed to delete, failed blocks: {}",
-                  result.getObjectKey(),
-                  StringUtils.join(",", result.getFailedBlocks()));
-            }
-          }
-
-          if (!results.isEmpty()) {
-            LOG.info("Number of key deleted from KSM DB: {},"
-                + " task elapsed time: {}ms",
-                results.size(), Time.monotonicNow() - startTime);
-          }
-
-          return results::size;
-        } else {
-          LOG.debug("No pending deletion key found in KSM");
-        }
-      } catch (IOException e) {
-        LOG.error("Unable to get pending deletion keys, retry in"
-            + " next interval", e);
-      }
-      return EmptyTaskResult.newResult();
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to