http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
deleted file mode 100644
index 5ec1db8..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Handles key level commands.
- */
-public interface KeyManager {
-
-  /**
-   * Start key manager.
-   */
-  void start();
-
-  /**
-   * Stop key manager.
-   */
-  void stop() throws IOException;
-
-  /**
-   * After calling commit, the key will be made visible. There can be multiple
-   * open key writes in parallel (identified by client id). The most recently
-   * committed one will be the one visible.
-   *
-   * @param args the key to commit.
-   * @param clientID the client that is committing.
-   * @throws IOException
-   */
-  void commitKey(KsmKeyArgs args, int clientID) throws IOException;
-
-  /**
-   * A client calls this on an open key, to request to allocate a new block,
-   * and appended to the tail of current block list of the open client.
-   *
-   * @param args the key to append
-   * @param clientID the client requesting block.
-   * @return the reference to the new block.
-   * @throws IOException
-   */
-  KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
-      throws IOException;
-  /**
-   * Given the args of a key to put, write an open key entry to meta data.
-   *
-   * In case that the container creation or key write failed on
-   * DistributedStorageHandler, this key's metadata will still stay in KSM.
-   * TODO garbage collect the open keys that never get closed
-   *
-   * @param args the args of the key provided by client.
-   * @return a OpenKeySession instance client uses to talk to container.
-   * @throws Exception
-   */
-  OpenKeySession openKey(KsmKeyArgs args) throws IOException;
-
-  /**
-   * Look up an existing key. Return the info of the key to client side, which
-   * DistributedStorageHandler will use to access the data on datanode.
-   *
-   * @param args the args of the key provided by client.
-   * @return a KsmKeyInfo instance client uses to talk to container.
-   * @throws IOException
-   */
-  KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException;
-
-  /**
-   * Renames an existing key within a bucket.
-   *
-   * @param args the args of the key provided by client.
-   * @param toKeyName New name to be used for the key
-   * @throws IOException if specified key doesn't exist or
-   * some other I/O errors while renaming the key.
-   */
-  void renameKey(KsmKeyArgs args, String toKeyName) throws IOException;
-
-  /**
-   * Deletes an object by an object key. The key will be immediately removed
-   * from KSM namespace and become invisible to clients. The object data
-   * will be removed in async manner that might retain for some time.
-   *
-   * @param args the args of the key provided by client.
-   * @throws IOException if specified key doesn't exist or
-   * some other I/O errors while deleting an object.
-   */
-  void deleteKey(KsmKeyArgs args) throws IOException;
-
-  /**
-   * Returns a list of keys represented by {@link KsmKeyInfo}
-   * in the given bucket.
-   *
-   * @param volumeName
-   *   the name of the volume.
-   * @param bucketName
-   *   the name of the bucket.
-   * @param startKey
-   *   the start key name, only the keys whose name is
-   *   after this value will be included in the result.
-   *   This key is excluded from the result.
-   * @param keyPrefix
-   *   key name prefix, only the keys whose name has
-   *   this prefix will be included in the result.
-   * @param maxKeys
-   *   the maximum number of keys to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of keys.
-   * @throws IOException
-   */
-  List<KsmKeyInfo> listKeys(String volumeName,
-      String bucketName, String startKey, String keyPrefix, int maxKeys)
-      throws IOException;
-
-  /**
-   * Returns a list of pending deletion key info that ups to the given count.
-   * Each entry is a {@link BlockGroup}, which contains the info about the
-   * key name and all its associated block IDs. A pending deletion key is
-   * stored with #deleting# prefix in KSM DB.
-   *
-   * @param count max number of keys to return.
-   * @return a list of {@link BlockGroup} representing keys and blocks.
-   * @throws IOException
-   */
-  List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
-
-  /**
-   * Deletes a pending deletion key by its name. This is often called when
-   * key can be safely deleted from this layer. Once called, all footprints
-   * of the key will be purged from KSM DB.
-   *
-   * @param objectKeyName object key name with #deleting# prefix.
-   * @throws IOException if specified key doesn't exist or other I/O errors.
-   */
-  void deletePendingDeletionKey(String objectKeyName) throws IOException;
-
-  /**
-   * Returns a list of all still open key info. Which contains the info about
-   * the key name and all its associated block IDs. A pending open key has
-   * prefix #open# in KSM DB.
-   *
-   * @return a list of {@link BlockGroup} representing keys and blocks.
-   * @throws IOException
-   */
-  List<BlockGroup> getExpiredOpenKeys() throws IOException;
-
-  /**
-   * Deletes a expired open key by its name. Called when a hanging key has been
-   * lingering for too long. Once called, the open key entries gets removed
-   * from KSM mdata data.
-   *
-   * @param objectKeyName object key name with #open# prefix.
-   * @throws IOException if specified key doesn't exist or other I/O errors.
-   */
-  void deleteExpiredOpenKey(String objectKeyName) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
deleted file mode 100644
index 0d4cfda..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
+++ /dev/null
@@ -1,566 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.KeyInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BackgroundService;
-import org.apache.hadoop.utils.BatchOperation;
-import org.iq80.leveldb.DBException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB;
-import org.apache.hadoop.hdds.protocol
-    .proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol
-    .proto.HddsProtos.ReplicationFactor;
-
-
-/**
- * Implementation of keyManager.
- */
-public class KeyManagerImpl implements KeyManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(KeyManagerImpl.class);
-
-  /**
-   * A SCM block client, used to talk to SCM to allocate block during putKey.
-   */
-  private final ScmBlockLocationProtocol scmBlockClient;
-  private final KSMMetadataManager metadataManager;
-  private final long scmBlockSize;
-  private final boolean useRatis;
-  private final BackgroundService keyDeletingService;
-  private final BackgroundService openKeyCleanupService;
-
-  private final long preallocateMax;
-  private final Random random;
-  private final String ksmId;
-
-  public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
-      KSMMetadataManager metadataManager, OzoneConfiguration conf,
-      String ksmId) {
-    this.scmBlockClient = scmBlockClient;
-    this.metadataManager = metadataManager;
-    this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_IN_MB,
-        OZONE_SCM_BLOCK_SIZE_DEFAULT) * OzoneConsts.MB;
-    this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY,
-        DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
-    long  blockDeleteInterval = conf.getTimeDuration(
-        OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
-        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    long serviceTimeout = conf.getTimeDuration(
-        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
-        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    this.preallocateMax = conf.getLong(
-        OZONE_KEY_PREALLOCATION_MAXSIZE,
-        OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
-    keyDeletingService = new KeyDeletingService(
-        scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf);
-    int openkeyCheckInterval = conf.getInt(
-        OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS,
-        OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT);
-    openKeyCleanupService = new OpenKeyCleanupService(
-        scmBlockClient, this, openkeyCheckInterval, serviceTimeout);
-    random = new Random();
-    this.ksmId = ksmId;
-  }
-
-  @VisibleForTesting
-  public BackgroundService getOpenKeyCleanupService() {
-    return openKeyCleanupService;
-  }
-
-  @Override
-  public void start() {
-    keyDeletingService.start();
-    openKeyCleanupService.start();
-  }
-
-  @Override
-  public void stop() throws IOException {
-    keyDeletingService.shutdown();
-    openKeyCleanupService.shutdown();
-  }
-
-  private void validateBucket(String volumeName, String bucketName)
-      throws IOException {
-    byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
-    byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-
-    //Check if the volume exists
-    if(metadataManager.get(volumeKey) == null) {
-      LOG.error("volume not found: {}", volumeName);
-      throw new KSMException("Volume not found",
-          KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-    //Check if bucket already exists
-    if(metadataManager.get(bucketKey) == null) {
-      LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
-      throw new KSMException("Bucket not found",
-          KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
-    }
-  }
-
-  @Override
-  public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
-      throws IOException {
-    Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-
-    try {
-      validateBucket(volumeName, bucketName);
-      String objectKey = metadataManager.getKeyWithDBPrefix(
-          volumeName, bucketName, keyName);
-      byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, 
clientID);
-      byte[] keyData = metadataManager.get(openKey);
-      if (keyData == null) {
-        LOG.error("Allocate block for a key not in open status in meta store " 
+
-            objectKey + " with ID " + clientID);
-        throw new KSMException("Open Key not found",
-            KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
-      }
-      KsmKeyInfo keyInfo =
-          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
-      AllocatedBlock allocatedBlock =
-          scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
-              keyInfo.getFactor(), ksmId);
-      KsmKeyLocationInfo info = new KsmKeyLocationInfo.Builder()
-          .setBlockID(allocatedBlock.getBlockID())
-          .setShouldCreateContainer(allocatedBlock.getCreateContainer())
-          .setLength(scmBlockSize)
-          .setOffset(0)
-          .build();
-      // current version not committed, so new blocks coming now are added to
-      // the same version
-      keyInfo.appendNewBlocks(Collections.singletonList(info));
-      keyInfo.updateModifcationTime();
-      metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
-      return info;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
-    Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-    ReplicationFactor factor = args.getFactor();
-    ReplicationType type = args.getType();
-
-    // If user does not specify a replication strategy or
-    // replication factor, KSM will use defaults.
-    if(factor == null) {
-      factor = useRatis ? ReplicationFactor.THREE: ReplicationFactor.ONE;
-    }
-
-    if(type == null) {
-      type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
-    }
-
-    try {
-      validateBucket(volumeName, bucketName);
-      long requestedSize = Math.min(preallocateMax, args.getDataSize());
-      List<KsmKeyLocationInfo> locations = new ArrayList<>();
-      String objectKey = metadataManager.getKeyWithDBPrefix(
-          volumeName, bucketName, keyName);
-      // requested size is not required but more like a optimization:
-      // SCM looks at the requested, if it 0, no block will be allocated at
-      // the point, if client needs more blocks, client can always call
-      // allocateBlock. But if requested size is not 0, KSM will preallocate
-      // some blocks and piggyback to client, to save RPC calls.
-      while (requestedSize > 0) {
-        long allocateSize = Math.min(scmBlockSize, requestedSize);
-        AllocatedBlock allocatedBlock =
-            scmBlockClient.allocateBlock(allocateSize, type, factor, ksmId);
-        KsmKeyLocationInfo subKeyInfo = new KsmKeyLocationInfo.Builder()
-            .setBlockID(allocatedBlock.getBlockID())
-            .setShouldCreateContainer(allocatedBlock.getCreateContainer())
-            .setLength(allocateSize)
-            .setOffset(0)
-            .build();
-        locations.add(subKeyInfo);
-        requestedSize -= allocateSize;
-      }
-      // NOTE size of a key is not a hard limit on anything, it is a value that
-      // client should expect, in terms of current size of key. If client sets 
a
-      // value, then this value is used, otherwise, we allocate a single block
-      // which is the current size, if read by the client.
-      long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
-      byte[] keyKey = metadataManager.getDBKeyBytes(
-          volumeName, bucketName, keyName);
-      byte[] value = metadataManager.get(keyKey);
-      KsmKeyInfo keyInfo;
-      long openVersion;
-      if (value != null) {
-        // the key already exist, the new blocks will be added as new version
-        keyInfo = KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
-        // when locations.size = 0, the new version will have identical blocks
-        // as its previous version
-        openVersion = keyInfo.addNewVersion(locations);
-        keyInfo.setDataSize(size + keyInfo.getDataSize());
-      } else {
-        // the key does not exist, create a new object, the new blocks are the
-        // version 0
-        long currentTime = Time.now();
-        keyInfo = new KsmKeyInfo.Builder()
-            .setVolumeName(args.getVolumeName())
-            .setBucketName(args.getBucketName())
-            .setKeyName(args.getKeyName())
-            .setKsmKeyLocationInfos(Collections.singletonList(
-                new KsmKeyLocationInfoGroup(0, locations)))
-            .setCreationTime(currentTime)
-            .setModificationTime(currentTime)
-            .setDataSize(size)
-            .setReplicationType(type)
-            .setReplicationFactor(factor)
-            .build();
-        openVersion = 0;
-      }
-      // Generate a random ID which is not already in meta db.
-      int id = -1;
-      // in general this should finish in a couple times at most. putting some
-      // arbitrary large number here to avoid dead loop.
-      for (int j = 0; j < 10000; j++) {
-        id = random.nextInt();
-        byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, id);
-        if (metadataManager.get(openKey) == null) {
-          metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
-          break;
-        }
-      }
-      if (id == -1) {
-        throw new IOException("Failed to find a usable id for " + objectKey);
-      }
-      LOG.debug("Key {} allocated in volume {} bucket {}",
-          keyName, volumeName, bucketName);
-      return new OpenKeySession(id, keyInfo, openVersion);
-    } catch (KSMException e) {
-      throw e;
-    } catch (IOException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Key open failed for volume:{} bucket:{} key:{}",
-            volumeName, bucketName, keyName, ex);
-      }
-      throw new KSMException(ex.getMessage(),
-          KSMException.ResultCodes.FAILED_KEY_ALLOCATION);
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public void commitKey(KsmKeyArgs args, int clientID) throws IOException {
-    Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-    try {
-      validateBucket(volumeName, bucketName);
-      String objectKey = metadataManager.getKeyWithDBPrefix(
-          volumeName, bucketName, keyName);
-      byte[] objectKeyBytes = metadataManager.getDBKeyBytes(volumeName,
-          bucketName, keyName);
-      byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, 
clientID);
-      byte[] openKeyData = metadataManager.get(openKey);
-      if (openKeyData == null) {
-        throw new KSMException("Commit a key without corresponding entry " +
-            DFSUtil.bytes2String(openKey), ResultCodes.FAILED_KEY_NOT_FOUND);
-      }
-      KsmKeyInfo keyInfo =
-          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData));
-      keyInfo.setDataSize(args.getDataSize());
-      keyInfo.setModificationTime(Time.now());
-      BatchOperation batch = new BatchOperation();
-      batch.delete(openKey);
-      batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray());
-      metadataManager.writeBatch(batch);
-    } catch (KSMException e) {
-      throw e;
-    } catch (IOException ex) {
-      LOG.error("Key commit failed for volume:{} bucket:{} key:{}",
-          volumeName, bucketName, keyName, ex);
-      throw new KSMException(ex.getMessage(),
-          KSMException.ResultCodes.FAILED_KEY_ALLOCATION);
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
-    Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-    try {
-      byte[] keyKey = metadataManager.getDBKeyBytes(
-          volumeName, bucketName, keyName);
-      byte[] value = metadataManager.get(keyKey);
-      if (value == null) {
-        LOG.debug("volume:{} bucket:{} Key:{} not found",
-            volumeName, bucketName, keyName);
-        throw new KSMException("Key not found",
-            KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
-      }
-      return KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
-    } catch (DBException ex) {
-      LOG.error("Get key failed for volume:{} bucket:{} key:{}",
-          volumeName, bucketName, keyName, ex);
-      throw new KSMException(ex.getMessage(),
-          KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public void renameKey(KsmKeyArgs args, String toKeyName) throws IOException {
-    Preconditions.checkNotNull(args);
-    Preconditions.checkNotNull(toKeyName);
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String fromKeyName = args.getKeyName();
-    if (toKeyName.length() == 0 || fromKeyName.length() == 0) {
-      LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} 
toKey:{}.",
-          volumeName, bucketName, fromKeyName, toKeyName);
-      throw new KSMException("Key name is empty",
-          ResultCodes.FAILED_INVALID_KEY_NAME);
-    }
-
-    metadataManager.writeLock().lock();
-    try {
-      // fromKeyName should exist
-      byte[] fromKey = metadataManager.getDBKeyBytes(
-          volumeName, bucketName, fromKeyName);
-      byte[] fromKeyValue = metadataManager.get(fromKey);
-      if (fromKeyValue == null) {
-        // TODO: Add support for renaming open key
-        LOG.error(
-            "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. "
-                + "Key: {} not found.", volumeName, bucketName, fromKeyName,
-            toKeyName, fromKeyName);
-        throw new KSMException("Key not found",
-            KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
-      }
-
-      // toKeyName should not exist
-      byte[] toKey =
-          metadataManager.getDBKeyBytes(volumeName, bucketName, toKeyName);
-      byte[] toKeyValue = metadataManager.get(toKey);
-      if (toKeyValue != null) {
-        LOG.error(
-            "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. "
-                + "Key: {} already exists.", volumeName, bucketName,
-            fromKeyName, toKeyName, toKeyName);
-        throw new KSMException("Key not found",
-            KSMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS);
-      }
-
-      if (fromKeyName.equals(toKeyName)) {
-        return;
-      }
-
-      KsmKeyInfo newKeyInfo =
-          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(fromKeyValue));
-      newKeyInfo.setKeyName(toKeyName);
-      newKeyInfo.updateModifcationTime();
-      BatchOperation batch = new BatchOperation();
-      batch.delete(fromKey);
-      batch.put(toKey, newKeyInfo.getProtobuf().toByteArray());
-      metadataManager.writeBatch(batch);
-    } catch (DBException ex) {
-      LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} 
toKey:{}.",
-          volumeName, bucketName, fromKeyName, toKeyName, ex);
-      throw new KSMException(ex.getMessage(),
-          ResultCodes.FAILED_KEY_RENAME);
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public void deleteKey(KsmKeyArgs args) throws IOException {
-    Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
-    try {
-      byte[] objectKey = metadataManager.getDBKeyBytes(
-          volumeName, bucketName, keyName);
-      byte[] objectValue = metadataManager.get(objectKey);
-      if (objectValue == null) {
-        throw new KSMException("Key not found",
-            KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
-      }
-      byte[] deletingKey = metadataManager.getDeletedKeyName(objectKey);
-      BatchOperation batch = new BatchOperation();
-      batch.put(deletingKey, objectValue);
-      batch.delete(objectKey);
-      metadataManager.writeBatch(batch);
-    } catch (DBException ex) {
-      LOG.error(String.format("Delete key failed for volume:%s "
-          + "bucket:%s key:%s", volumeName, bucketName, keyName), ex);
-      throw new KSMException(ex.getMessage(), ex,
-          ResultCodes.FAILED_KEY_DELETION);
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
-      String startKey, String keyPrefix, int maxKeys) throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-
-    metadataManager.readLock().lock();
-    try {
-      return metadataManager.listKeys(volumeName, bucketName,
-          startKey, keyPrefix, maxKeys);
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-
-  @Override
-  public List<BlockGroup> getPendingDeletionKeys(final int count)
-      throws IOException {
-    metadataManager.readLock().lock();
-    try {
-      return metadataManager.getPendingDeletionKeys(count);
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-
-  @Override
-  public void deletePendingDeletionKey(String objectKeyName)
-      throws IOException{
-    Preconditions.checkNotNull(objectKeyName);
-    if (!objectKeyName.startsWith(OzoneConsts.DELETING_KEY_PREFIX)) {
-      throw new IllegalArgumentException("Invalid key name,"
-          + " the name should be the key name with deleting prefix");
-    }
-
-    // Simply removes the entry from KSM DB.
-    metadataManager.writeLock().lock();
-    try {
-      byte[] pendingDelKey = DFSUtil.string2Bytes(objectKeyName);
-      byte[] delKeyValue = metadataManager.get(pendingDelKey);
-      if (delKeyValue == null) {
-        throw new IOException("Failed to delete key " + objectKeyName
-            + " because it is not found in DB");
-      }
-      metadataManager.delete(pendingDelKey);
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public List<BlockGroup> getExpiredOpenKeys() throws IOException {
-    metadataManager.readLock().lock();
-    try {
-      return metadataManager.getExpiredOpenKeys();
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-
-  @Override
-  public void deleteExpiredOpenKey(String objectKeyName) throws IOException {
-    Preconditions.checkNotNull(objectKeyName);
-    if (!objectKeyName.startsWith(OzoneConsts.OPEN_KEY_PREFIX)) {
-      throw new IllegalArgumentException("Invalid key name,"
-          + " the name should be the key name with open key prefix");
-    }
-
-    // Simply removes the entry from KSM DB.
-    metadataManager.writeLock().lock();
-    try {
-      byte[] openKey = DFSUtil.string2Bytes(objectKeyName);
-      byte[] delKeyValue = metadataManager.get(openKey);
-      if (delKeyValue == null) {
-        throw new IOException("Failed to delete key " + objectKeyName
-            + " because it is not found in DB");
-      }
-      metadataManager.delete(openKey);
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
deleted file mode 100644
index 5fa313b..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ /dev/null
@@ -1,912 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
-import org.apache.hadoop.ozone.common.Storage.StorageState;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
-import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
-import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
-import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
-    .ServicePort;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.ozone.protocolPB
-    .KeySpaceManagerProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB
-    .ScmBlockLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.protocolPB
-    .StorageContainerLocationProtocolClientSideTranslatorPB;
-import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.StringUtils;
-
-import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
-import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
-import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
-import static org.apache.hadoop.ozone.KsmUtils.getKsmAddress;
-import static org.apache.hadoop.hdds.server.ServerUtils
-    .updateRPCListenAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.KeySpaceManagerService
-    .newReflectiveBlockingService;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
-    .NodeState.HEALTHY;
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
-/**
- * Ozone Keyspace manager is the metadata manager of ozone.
- */
-@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
-public final class KeySpaceManager extends ServiceRuntimeInfoImpl
-    implements KeySpaceManagerProtocol, KSMMXBean {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(KeySpaceManager.class);
-
-  private static final String USAGE =
-      "Usage: \n ozone ksm [genericOptions] " + "[ "
-          + StartupOption.CREATEOBJECTSTORE.getName() + " ]\n " + "ozone ksm [ 
"
-          + StartupOption.HELP.getName() + " ]\n";
-
-  /** Startup options. */
-  public enum StartupOption {
-    CREATEOBJECTSTORE("-createObjectStore"),
-    HELP("-help"),
-    REGULAR("-regular");
-
-    private final String name;
-
-    StartupOption(String arg) {
-      this.name = arg;
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    public static StartupOption parse(String value) {
-      for (StartupOption option : StartupOption.values()) {
-        if (option.name.equalsIgnoreCase(value)) {
-          return option;
-        }
-      }
-      return null;
-    }
-  }
-
-  private final OzoneConfiguration configuration;
-  private final RPC.Server ksmRpcServer;
-  private final InetSocketAddress ksmRpcAddress;
-  private final KSMMetadataManager metadataManager;
-  private final VolumeManager volumeManager;
-  private final BucketManager bucketManager;
-  private final KeyManager keyManager;
-  private final KSMMetrics metrics;
-  private final KeySpaceManagerHttpServer httpServer;
-  private final KSMStorage ksmStorage;
-  private final ScmBlockLocationProtocol scmBlockClient;
-  private final StorageContainerLocationProtocol scmContainerClient;
-  private ObjectName ksmInfoBeanName;
-
-  private KeySpaceManager(OzoneConfiguration conf) throws IOException {
-    Preconditions.checkNotNull(conf);
-    configuration = conf;
-    ksmStorage = new KSMStorage(conf);
-    scmBlockClient = getScmBlockClient(configuration);
-    scmContainerClient = getScmContainerClient(configuration);
-    if (ksmStorage.getState() != StorageState.INITIALIZED) {
-      throw new KSMException("KSM not initialized.",
-          ResultCodes.KSM_NOT_INITIALIZED);
-    }
-
-    // verifies that the SCM info in the KSM Version file is correct.
-    ScmInfo scmInfo = scmBlockClient.getScmInfo();
-    if (!(scmInfo.getClusterId().equals(ksmStorage.getClusterID()) && scmInfo
-        .getScmId().equals(ksmStorage.getScmId()))) {
-      throw new KSMException("SCM version info mismatch.",
-          ResultCodes.SCM_VERSION_MISMATCH_ERROR);
-    }
-    final int handlerCount = conf.getInt(OZONE_KSM_HANDLER_COUNT_KEY,
-        OZONE_KSM_HANDLER_COUNT_DEFAULT);
-
-    RPC.setProtocolEngine(configuration, KeySpaceManagerProtocolPB.class,
-        ProtobufRpcEngine.class);
-
-    BlockingService ksmService = newReflectiveBlockingService(
-        new KeySpaceManagerProtocolServerSideTranslatorPB(this));
-    final InetSocketAddress ksmNodeRpcAddr =
-        getKsmAddress(configuration);
-    ksmRpcServer = startRpcServer(configuration, ksmNodeRpcAddr,
-        KeySpaceManagerProtocolPB.class, ksmService,
-        handlerCount);
-    ksmRpcAddress = updateRPCListenAddress(configuration,
-        OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
-    metadataManager = new KSMMetadataManagerImpl(configuration);
-    volumeManager = new VolumeManagerImpl(metadataManager, configuration);
-    bucketManager = new BucketManagerImpl(metadataManager);
-    metrics = KSMMetrics.create();
-    keyManager =
-        new KeyManagerImpl(scmBlockClient, metadataManager, configuration,
-            ksmStorage.getKsmId());
-    httpServer = new KeySpaceManagerHttpServer(configuration, this);
-  }
-
-  /**
-   * Create a scm block client, used by putKey() and getKey().
-   *
-   * @return {@link ScmBlockLocationProtocol}
-   * @throws IOException
-   */
-  private static ScmBlockLocationProtocol getScmBlockClient(
-      OzoneConfiguration conf) throws IOException {
-    RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
-        ProtobufRpcEngine.class);
-    long scmVersion =
-        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
-    InetSocketAddress scmBlockAddress =
-        getScmAddressForBlockClients(conf);
-    ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
-        new ScmBlockLocationProtocolClientSideTranslatorPB(
-            RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
-                scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
-                NetUtils.getDefaultSocketFactory(conf),
-                Client.getRpcTimeout(conf)));
-    return scmBlockLocationClient;
-  }
-
-  /**
-   * Returns a scm container client.
-   *
-   * @return {@link StorageContainerLocationProtocol}
-   * @throws IOException
-   */
-  private static StorageContainerLocationProtocol getScmContainerClient(
-      OzoneConfiguration conf) throws IOException {
-    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
-        ProtobufRpcEngine.class);
-    long scmVersion =
-        RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
-    InetSocketAddress scmAddr = getScmAddressForClients(
-        conf);
-    StorageContainerLocationProtocolClientSideTranslatorPB scmContainerClient =
-        new StorageContainerLocationProtocolClientSideTranslatorPB(
-            RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
-                scmAddr, UserGroupInformation.getCurrentUser(), conf,
-                NetUtils.getDefaultSocketFactory(conf),
-                Client.getRpcTimeout(conf)));
-    return scmContainerClient;
-  }
-
-  @VisibleForTesting
-  public KeyManager getKeyManager() {
-    return keyManager;
-  }
-
-  @VisibleForTesting
-  public ScmInfo getScmInfo() throws IOException {
-    return scmBlockClient.getScmInfo();
-  }
-
-  @VisibleForTesting
-  public KSMStorage getKsmStorage() {
-    return ksmStorage;
-  }
-  /**
-   * Starts an RPC server, if configured.
-   *
-   * @param conf configuration
-   * @param addr configured address of RPC server
-   * @param protocol RPC protocol provided by RPC server
-   * @param instance RPC protocol implementation instance
-   * @param handlerCount RPC server handler count
-   *
-   * @return RPC server
-   * @throws IOException if there is an I/O error while creating RPC server
-   */
-  private static RPC.Server startRpcServer(OzoneConfiguration conf,
-      InetSocketAddress addr, Class<?> protocol, BlockingService instance,
-      int handlerCount) throws IOException {
-    RPC.Server rpcServer = new RPC.Builder(conf)
-        .setProtocol(protocol)
-        .setInstance(instance)
-        .setBindAddress(addr.getHostString())
-        .setPort(addr.getPort())
-        .setNumHandlers(handlerCount)
-        .setVerbose(false)
-        .setSecretManager(null)
-        .build();
-
-    DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
-    return rpcServer;
-  }
-
-  /**
-   * Get metadata manager.
-   * @return metadata manager.
-   */
-  public KSMMetadataManager getMetadataManager() {
-    return metadataManager;
-  }
-
-  public KSMMetrics getMetrics() {
-    return metrics;
-  }
-
-  /**
-   * Main entry point for starting KeySpaceManager.
-   *
-   * @param argv arguments
-   * @throws IOException if startup fails due to I/O error
-   */
-  public static void main(String[] argv) throws IOException {
-    if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
-      System.exit(0);
-    }
-    try {
-      OzoneConfiguration conf = new OzoneConfiguration();
-      GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
-      if (!hParser.isParseSuccessful()) {
-        System.err.println("USAGE: " + USAGE + " \n");
-        hParser.printGenericCommandUsage(System.err);
-        System.exit(1);
-      }
-      StringUtils.startupShutdownMessage(KeySpaceManager.class, argv, LOG);
-      KeySpaceManager ksm = createKSM(hParser.getRemainingArgs(), conf);
-      if (ksm != null) {
-        ksm.start();
-        ksm.join();
-      }
-    } catch (Throwable t) {
-      LOG.error("Failed to start the KeyspaceManager.", t);
-      terminate(1, t);
-    }
-  }
-
-  private static void printUsage(PrintStream out) {
-    out.println(USAGE + "\n");
-  }
-
-  /**
-   * Constructs KSM instance based on command line arguments.
-   * @param argv Command line arguments
-   * @param conf OzoneConfiguration
-   * @return KSM instance
-   * @throws IOException in case KSM instance creation fails.
-   */
-
-  public static KeySpaceManager createKSM(String[] argv,
-      OzoneConfiguration conf) throws IOException {
-    if (!isHddsEnabled(conf)) {
-      System.err.println("KSM cannot be started in secure mode or when " +
-          OZONE_ENABLED + " is set to false");
-      System.exit(1);
-    }
-    StartupOption startOpt = parseArguments(argv);
-    if (startOpt == null) {
-      printUsage(System.err);
-      terminate(1);
-      return null;
-    }
-    switch (startOpt) {
-    case CREATEOBJECTSTORE:
-      terminate(ksmInit(conf) ? 0 : 1);
-      return null;
-    case HELP:
-      printUsage(System.err);
-      terminate(0);
-      return null;
-    default:
-      return new KeySpaceManager(conf);
-    }
-  }
-
-  /**
-   * Initializes the KSM instance.
-   * @param conf OzoneConfiguration
-   * @return true if KSM initialization succeeds , false otherwise
-   * @throws IOException in case ozone metadata directory path is not 
accessible
-   */
-
-  private static boolean ksmInit(OzoneConfiguration conf) throws IOException {
-    KSMStorage ksmStorage = new KSMStorage(conf);
-    StorageState state = ksmStorage.getState();
-    if (state != StorageState.INITIALIZED) {
-      try {
-        ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf);
-        ScmInfo scmInfo = scmBlockClient.getScmInfo();
-        String clusterId = scmInfo.getClusterId();
-        String scmId = scmInfo.getScmId();
-        if (clusterId == null || clusterId.isEmpty()) {
-          throw new IOException("Invalid Cluster ID");
-        }
-        if (scmId == null || scmId.isEmpty()) {
-          throw new IOException("Invalid SCM ID");
-        }
-        ksmStorage.setClusterId(clusterId);
-        ksmStorage.setScmId(scmId);
-        ksmStorage.initialize();
-        System.out.println(
-            "KSM initialization succeeded.Current cluster id for sd="
-                + ksmStorage.getStorageDir() + ";cid=" + ksmStorage
-                .getClusterID());
-        return true;
-      } catch (IOException ioe) {
-        LOG.error("Could not initialize KSM version file", ioe);
-        return false;
-      }
-    } else {
-      System.out.println(
-          "KSM already initialized.Reusing existing cluster id for sd="
-              + ksmStorage.getStorageDir() + ";cid=" + ksmStorage
-              .getClusterID());
-      return true;
-    }
-  }
-
-  /**
-   * Parses the command line options for KSM initialization.
-   * @param args command line arguments
-   * @return StartupOption if options are valid, null otherwise
-   */
-  private static StartupOption parseArguments(String[] args) {
-    if (args == null || args.length == 0) {
-      return StartupOption.REGULAR;
-    } else if (args.length == 1) {
-      return StartupOption.parse(args[0]);
-    }
-    return null;
-  }
-
-  /**
-   * Builds a message for logging startup information about an RPC server.
-   *
-   * @param description RPC server description
-   * @param addr RPC server listening address
-   * @return server startup message
-   */
-  private static String buildRpcServerStartMessage(String description,
-      InetSocketAddress addr) {
-    return addr != null ? String.format("%s is listening at %s",
-        description, addr.toString()) :
-        String.format("%s not started", description);
-  }
-
-  /**
-   * Start service.
-   */
-  public void start() throws IOException {
-    LOG.info(buildRpcServerStartMessage("KeyspaceManager RPC server",
-        ksmRpcAddress));
-    DefaultMetricsSystem.initialize("KeySpaceManager");
-    metadataManager.start();
-    keyManager.start();
-    ksmRpcServer.start();
-    httpServer.start();
-    registerMXBean();
-    setStartTime();
-  }
-
-  /**
-   * Stop service.
-   */
-  public void stop() {
-    try {
-      metadataManager.stop();
-      ksmRpcServer.stop();
-      keyManager.stop();
-      httpServer.stop();
-      metrics.unRegister();
-      unregisterMXBean();
-    } catch (Exception e) {
-      LOG.error("Key Space Manager stop failed.", e);
-    }
-  }
-
-  /**
-   * Wait until service has completed shutdown.
-   */
-  public void join() {
-    try {
-      ksmRpcServer.join();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.info("Interrupted during KeyspaceManager join.", e);
-    }
-  }
-
-  /**
-   * Creates a volume.
-   *
-   * @param args - Arguments to create Volume.
-   * @throws IOException
-   */
-  @Override
-  public void createVolume(KsmVolumeArgs args) throws IOException {
-    try {
-      metrics.incNumVolumeCreates();
-      volumeManager.createVolume(args);
-    } catch (Exception ex) {
-      metrics.incNumVolumeCreateFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Changes the owner of a volume.
-   *
-   * @param volume - Name of the volume.
-   * @param owner - Name of the owner.
-   * @throws IOException
-   */
-  @Override
-  public void setOwner(String volume, String owner) throws IOException {
-    try {
-      metrics.incNumVolumeUpdates();
-      volumeManager.setOwner(volume, owner);
-    } catch (Exception ex) {
-      metrics.incNumVolumeUpdateFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Changes the Quota on a volume.
-   *
-   * @param volume - Name of the volume.
-   * @param quota - Quota in bytes.
-   * @throws IOException
-   */
-  @Override
-  public void setQuota(String volume, long quota) throws IOException {
-    try {
-      metrics.incNumVolumeUpdates();
-      volumeManager.setQuota(volume, quota);
-    } catch (Exception ex) {
-      metrics.incNumVolumeUpdateFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Checks if the specified user can access this volume.
-   *
-   * @param volume - volume
-   * @param userAcl - user acls which needs to be checked for access
-   * @return true if the user has required access for the volume,
-   *         false otherwise
-   * @throws IOException
-   */
-  @Override
-  public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
-      throws IOException {
-    try {
-      metrics.incNumVolumeCheckAccesses();
-      return volumeManager.checkVolumeAccess(volume, userAcl);
-    } catch (Exception ex) {
-      metrics.incNumVolumeCheckAccessFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Gets the volume information.
-   *
-   * @param volume - Volume name.
-   * @return VolumeArgs or exception is thrown.
-   * @throws IOException
-   */
-  @Override
-  public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
-    try {
-      metrics.incNumVolumeInfos();
-      return volumeManager.getVolumeInfo(volume);
-    } catch (Exception ex) {
-      metrics.incNumVolumeInfoFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Deletes an existing empty volume.
-   *
-   * @param volume - Name of the volume.
-   * @throws IOException
-   */
-  @Override
-  public void deleteVolume(String volume) throws IOException {
-    try {
-      metrics.incNumVolumeDeletes();
-      volumeManager.deleteVolume(volume);
-    } catch (Exception ex) {
-      metrics.incNumVolumeDeleteFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Lists volume owned by a specific user.
-   *
-   * @param userName - user name
-   * @param prefix - Filter prefix -- Return only entries that match this.
-   * @param prevKey - Previous key -- List starts from the next from the
-   * prevkey
-   * @param maxKeys - Max number of keys to return.
-   * @return List of Volumes.
-   * @throws IOException
-   */
-  @Override
-  public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
-      String prevKey, int maxKeys) throws IOException {
-    try {
-      metrics.incNumVolumeLists();
-      return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys);
-    } catch (Exception ex) {
-      metrics.incNumVolumeListFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Lists volume all volumes in the cluster.
-   *
-   * @param prefix - Filter prefix -- Return only entries that match this.
-   * @param prevKey - Previous key -- List starts from the next from the
-   * prevkey
-   * @param maxKeys - Max number of keys to return.
-   * @return List of Volumes.
-   * @throws IOException
-   */
-  @Override
-  public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey, int
-      maxKeys) throws IOException {
-    try {
-      metrics.incNumVolumeLists();
-      return volumeManager.listVolumes(null, prefix, prevKey, maxKeys);
-    } catch (Exception ex) {
-      metrics.incNumVolumeListFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Creates a bucket.
-   *
-   * @param bucketInfo - BucketInfo to create bucket.
-   * @throws IOException
-   */
-  @Override
-  public void createBucket(KsmBucketInfo bucketInfo) throws IOException {
-    try {
-      metrics.incNumBucketCreates();
-      bucketManager.createBucket(bucketInfo);
-    } catch (Exception ex) {
-      metrics.incNumBucketCreateFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<KsmBucketInfo> listBuckets(String volumeName,
-      String startKey, String prefix, int maxNumOfBuckets)
-      throws IOException {
-    try {
-      metrics.incNumBucketLists();
-      return bucketManager.listBuckets(volumeName,
-          startKey, prefix, maxNumOfBuckets);
-    } catch (IOException ex) {
-      metrics.incNumBucketListFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Gets the bucket information.
-   *
-   * @param volume - Volume name.
-   * @param bucket - Bucket name.
-   * @return KsmBucketInfo or exception is thrown.
-   * @throws IOException
-   */
-  @Override
-  public KsmBucketInfo getBucketInfo(String volume, String bucket)
-      throws IOException {
-    try {
-      metrics.incNumBucketInfos();
-      return bucketManager.getBucketInfo(volume, bucket);
-    } catch (Exception ex) {
-      metrics.incNumBucketInfoFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Allocate a key.
-   *
-   * @param args - attributes of the key.
-   * @return KsmKeyInfo - the info about the allocated key.
-   * @throws IOException
-   */
-  @Override
-  public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
-    try {
-      metrics.incNumKeyAllocates();
-      return keyManager.openKey(args);
-    } catch (Exception ex) {
-      metrics.incNumKeyAllocateFails();
-      throw ex;
-    }
-  }
-
-  @Override
-  public void commitKey(KsmKeyArgs args, int clientID)
-      throws IOException {
-    try {
-      metrics.incNumKeyCommits();
-      keyManager.commitKey(args, clientID);
-    } catch (Exception ex) {
-      metrics.incNumKeyCommitFails();
-      throw ex;
-    }
-  }
-
-  @Override
-  public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
-      throws IOException {
-    try {
-      metrics.incNumBlockAllocateCalls();
-      return keyManager.allocateBlock(args, clientID);
-    } catch (Exception ex) {
-      metrics.incNumBlockAllocateCallFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Lookup a key.
-   *
-   * @param args - attributes of the key.
-   * @return KsmKeyInfo - the info about the requested key.
-   * @throws IOException
-   */
-  @Override
-  public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
-    try {
-      metrics.incNumKeyLookups();
-      return keyManager.lookupKey(args);
-    } catch (Exception ex) {
-      metrics.incNumKeyLookupFails();
-      throw ex;
-    }
-  }
-
-  @Override
-  public void renameKey(KsmKeyArgs args, String toKeyName) throws IOException {
-    try {
-      metrics.incNumKeyRenames();
-      keyManager.renameKey(args, toKeyName);
-    } catch (IOException e) {
-      metrics.incNumKeyRenameFails();
-      throw e;
-    }
-  }
-
-  /**
-   * Deletes an existing key.
-   *
-   * @param args - attributes of the key.
-   * @throws IOException
-   */
-  @Override
-  public void deleteKey(KsmKeyArgs args) throws IOException {
-    try {
-      metrics.incNumKeyDeletes();
-      keyManager.deleteKey(args);
-    } catch (Exception ex) {
-      metrics.incNumKeyDeleteFails();
-      throw ex;
-    }
-  }
-
-  @Override
-  public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
-      String startKey, String keyPrefix, int maxKeys) throws IOException {
-    try {
-      metrics.incNumKeyLists();
-      return keyManager.listKeys(volumeName, bucketName,
-          startKey, keyPrefix, maxKeys);
-    } catch (IOException ex) {
-      metrics.incNumKeyListFails();
-      throw ex;
-    }
-  }
-
-  /**
-   * Sets bucket property from args.
-   * @param args - BucketArgs.
-   * @throws IOException
-   */
-  @Override
-  public void setBucketProperty(KsmBucketArgs args)
-      throws IOException {
-    try {
-      metrics.incNumBucketUpdates();
-      bucketManager.setBucketProperty(args);
-    } catch (Exception ex) {
-      metrics.incNumBucketUpdateFails();
-      throw ex;
-    }
-  }
-
-
-  /**
-   * Deletes an existing empty bucket from volume.
-   * @param volume - Name of the volume.
-   * @param bucket - Name of the bucket.
-   * @throws IOException
-   */
-  public void deleteBucket(String volume, String bucket) throws IOException {
-    try {
-      metrics.incNumBucketDeletes();
-      bucketManager.deleteBucket(volume, bucket);
-    } catch (Exception ex) {
-      metrics.incNumBucketDeleteFails();
-      throw ex;
-    }
-  }
-
-  private void registerMXBean() {
-    Map<String, String> jmxProperties = new HashMap<String, String>();
-    jmxProperties.put("component", "ServerRuntime");
-    this.ksmInfoBeanName =
-        MBeans.register("KeySpaceManager",
-            "KeySpaceManagerInfo",
-            jmxProperties,
-            this);
-  }
-
-  private void unregisterMXBean() {
-    if (this.ksmInfoBeanName != null) {
-      MBeans.unregister(this.ksmInfoBeanName);
-      this.ksmInfoBeanName = null;
-    }
-  }
-
-  @Override
-  public String getRpcPort() {
-    return "" + ksmRpcAddress.getPort();
-  }
-
-  @VisibleForTesting
-  public KeySpaceManagerHttpServer getHttpServer() {
-    return httpServer;
-  }
-
-  @Override
-  public List<ServiceInfo> getServiceList() throws IOException {
-    // When we implement multi-home this call has to be handled properly.
-    List<ServiceInfo> services = new ArrayList<>();
-    ServiceInfo.Builder ksmServiceInfoBuilder = ServiceInfo.newBuilder()
-        .setNodeType(HddsProtos.NodeType.KSM)
-        .setHostname(ksmRpcAddress.getHostName())
-        .addServicePort(ServicePort.newBuilder()
-                .setType(ServicePort.Type.RPC)
-                .setValue(ksmRpcAddress.getPort())
-            .build());
-    if (httpServer.getHttpAddress() != null) {
-      ksmServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
-          .setType(ServicePort.Type.HTTP)
-          .setValue(httpServer.getHttpAddress().getPort())
-          .build());
-    }
-    if (httpServer.getHttpsAddress() != null) {
-      ksmServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
-          .setType(ServicePort.Type.HTTPS)
-          .setValue(httpServer.getHttpsAddress().getPort())
-          .build());
-    }
-    services.add(ksmServiceInfoBuilder.build());
-
-    // For client we have to return SCM with container protocol port,
-    // not block protocol.
-    InetSocketAddress scmAddr = getScmAddressForClients(
-        configuration);
-    ServiceInfo.Builder scmServiceInfoBuilder = ServiceInfo.newBuilder()
-        .setNodeType(HddsProtos.NodeType.SCM)
-        .setHostname(scmAddr.getHostName())
-        .addServicePort(ServicePort.newBuilder()
-            .setType(ServicePort.Type.RPC)
-            .setValue(scmAddr.getPort()).build());
-    services.add(scmServiceInfoBuilder.build());
-
-    List<HddsProtos.Node> nodes = scmContainerClient.queryNode(HEALTHY,
-        HddsProtos.QueryScope.CLUSTER, "");
-
-    for (HddsProtos.Node node : nodes) {
-      HddsProtos.DatanodeDetailsProto datanode = node.getNodeID();
-
-      ServiceInfo.Builder dnServiceInfoBuilder = ServiceInfo.newBuilder()
-          .setNodeType(HddsProtos.NodeType.DATANODE)
-          .setHostname(datanode.getHostName());
-
-      dnServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
-          .setType(ServicePort.Type.HTTP)
-          .setValue(DatanodeDetails.getFromProtoBuf(datanode)
-              .getPort(DatanodeDetails.Port.Name.REST).getValue())
-          .build());
-
-      services.add(dnServiceInfoBuilder.build());
-    }
-
-    metrics.incNumGetServiceLists();
-    // For now there is no exception that can can happen in this call,
-    // so failure metrics is not handled. In future if there is any need to
-    // handle exception in this method, we need to incorporate
-    // metrics.incNumGetServiceListFails()
-    return services;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManagerHttpServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManagerHttpServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManagerHttpServer.java
deleted file mode 100644
index 478804b..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManagerHttpServer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.hdds.server.BaseHttpServer;
-
-import java.io.IOException;
-
-/**
- * HttpServer wrapper for the KeySpaceManager.
- */
-public class KeySpaceManagerHttpServer extends BaseHttpServer {
-
-  public KeySpaceManagerHttpServer(Configuration conf, KeySpaceManager ksm)
-      throws IOException {
-    super(conf, "ksm");
-    addServlet("serviceList", "/serviceList", ServiceListJSONServlet.class);
-    getWebAppContext().setAttribute(OzoneConsts.KSM_CONTEXT_ATTRIBUTE, ksm);
-  }
-
-  @Override protected String getHttpAddressKey() {
-    return KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY;
-  }
-
-  @Override protected String getHttpBindHostKey() {
-    return KSMConfigKeys.OZONE_KSM_HTTP_BIND_HOST_KEY;
-  }
-
-  @Override protected String getHttpsAddressKey() {
-    return KSMConfigKeys.OZONE_KSM_HTTPS_ADDRESS_KEY;
-  }
-
-  @Override protected String getHttpsBindHostKey() {
-    return KSMConfigKeys.OZONE_KSM_HTTPS_BIND_HOST_KEY;
-  }
-
-  @Override protected String getBindHostDefault() {
-    return KSMConfigKeys.OZONE_KSM_HTTP_BIND_HOST_DEFAULT;
-  }
-
-  @Override protected int getHttpBindPortDefault() {
-    return KSMConfigKeys.OZONE_KSM_HTTP_BIND_PORT_DEFAULT;
-  }
-
-  @Override protected int getHttpsBindPortDefault() {
-    return KSMConfigKeys.OZONE_KSM_HTTPS_BIND_PORT_DEFAULT;
-  }
-
-  @Override protected String getKeytabFile() {
-    return KSMConfigKeys.OZONE_KSM_KEYTAB_FILE;
-  }
-
-  @Override protected String getSpnegoPrincipal() {
-    return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL;
-  }
-
-  @Override protected String getEnabledKey() {
-    return KSMConfigKeys.OZONE_KSM_HTTP_ENABLED_KEY;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java
deleted file mode 100644
index 8e2540a..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.utils.BackgroundService;
-import org.apache.hadoop.utils.BackgroundTask;
-import org.apache.hadoop.utils.BackgroundTaskQueue;
-import org.apache.hadoop.utils.BackgroundTaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This is the background service to delete hanging open keys.
- * Scan the metadata of ksm periodically to get
- * the keys with prefix "#open#" and ask scm to
- * delete metadata accordingly, if scm returns
- * success for keys, then clean up those keys.
- */
-public class OpenKeyCleanupService extends BackgroundService {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(OpenKeyCleanupService.class);
-
-  private final static int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2;
-
-  private final KeyManager keyManager;
-  private final ScmBlockLocationProtocol scmClient;
-
-  public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient,
-      KeyManager keyManager, int serviceInterval,
-      long serviceTimeout) {
-    super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS,
-        OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
-    this.keyManager = keyManager;
-    this.scmClient = scmClient;
-  }
-
-  @Override
-  public BackgroundTaskQueue getTasks() {
-    BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new OpenKeyDeletingTask());
-    return queue;
-  }
-
-  private class OpenKeyDeletingTask
-      implements BackgroundTask<BackgroundTaskResult> {
-
-    @Override
-    public int getPriority() {
-      return 0;
-    }
-
-    @Override
-    public BackgroundTaskResult call() throws Exception {
-      try {
-        List<BlockGroup> keyBlocksList = keyManager.getExpiredOpenKeys();
-        if (keyBlocksList.size() > 0) {
-          int toDeleteSize = keyBlocksList.size();
-          LOG.debug("Found {} to-delete open keys in KSM", toDeleteSize);
-          List<DeleteBlockGroupResult> results =
-              scmClient.deleteKeyBlocks(keyBlocksList);
-          int deletedSize = 0;
-          for (DeleteBlockGroupResult result : results) {
-            if (result.isSuccess()) {
-              try {
-                keyManager.deleteExpiredOpenKey(result.getObjectKey());
-                LOG.debug("Key {} deleted from KSM DB", result.getObjectKey());
-                deletedSize += 1;
-              } catch (IOException e) {
-                LOG.warn("Failed to delete hanging-open key {}",
-                    result.getObjectKey(), e);
-              }
-            } else {
-              LOG.warn("Deleting open Key {} failed because some of the blocks"
-                      + " were failed to delete, failed blocks: {}",
-                  result.getObjectKey(),
-                  StringUtils.join(",", result.getFailedBlocks()));
-            }
-          }
-          LOG.info("Found {} expired open key entries, successfully " +
-              "cleaned up {} entries", toDeleteSize, deletedSize);
-          return results::size;
-        } else {
-          LOG.debug("No hanging open key fond in KSM");
-        }
-      } catch (IOException e) {
-        LOG.error("Unable to get hanging open keys, retry in"
-            + " next interval", e);
-      }
-      return BackgroundTaskResult.EmptyTaskResult.newResult();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/ServiceListJSONServlet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/ServiceListJSONServlet.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/ServiceListJSONServlet.java
deleted file mode 100644
index 34a80ce..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/ServiceListJSONServlet.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.ksm;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.PrintWriter;
-
-
-/**
- * Provides REST access to Ozone Service List.
- * <p>
- * This servlet generally will be placed under the /serviceList URL of
- * KeySpaceManager HttpServer.
- *
- * The return format is of JSON and in the form
- * <p>
- *  <code><pre>
- *  {
- *    "services" : [
- *      {
- *        "NodeType":"KSM",
- *        "Hostname" "$hostname",
- *        "ports" : {
- *          "$PortType" : "$port",
- *          ...
- *        }
- *      }
- *    ]
- *  }
- *  </pre></code>
- *  <p>
- *
- */
-public class ServiceListJSONServlet  extends HttpServlet  {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ServiceListJSONServlet.class);
-  private static final long serialVersionUID = 1L;
-
-  private KeySpaceManager ksm;
-
-  public void init() throws ServletException {
-    this.ksm = (KeySpaceManager) getServletContext()
-        .getAttribute(OzoneConsts.KSM_CONTEXT_ATTRIBUTE);
-  }
-
-  /**
-   * Process a GET request for the specified resource.
-   *
-   * @param request
-   *          The servlet request we are processing
-   * @param response
-   *          The servlet response we are creating
-   */
-  @Override
-  public void doGet(HttpServletRequest request, HttpServletResponse response) {
-    try {
-      ObjectMapper objectMapper = new ObjectMapper();
-      objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
-      response.setContentType("application/json; charset=utf8");
-      PrintWriter writer = response.getWriter();
-      try {
-        writer.write(objectMapper.writeValueAsString(ksm.getServiceList()));
-      } finally {
-        if (writer != null) {
-          writer.close();
-        }
-      }
-    } catch (IOException e) {
-      LOG.error(
-          "Caught an exception while processing ServiceList request", e);
-      response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
deleted file mode 100644
index 6ac78d6..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * KSM volume manager interface.
- */
-public interface VolumeManager {
-
-  /**
-   * Create a new volume.
-   * @param args - Volume args to create a volume
-   */
-  void createVolume(KsmVolumeArgs args) throws IOException;
-
-  /**
-   * Changes the owner of a volume.
-   *
-   * @param volume - Name of the volume.
-   * @param owner - Name of the owner.
-   * @throws IOException
-   */
-  void setOwner(String volume, String owner) throws IOException;
-
-  /**
-   * Changes the Quota on a volume.
-   *
-   * @param volume - Name of the volume.
-   * @param quota - Quota in bytes.
-   * @throws IOException
-   */
-  void setQuota(String volume, long quota) throws IOException;
-
-  /**
-   * Gets the volume information.
-   * @param volume - Volume name.
-   * @return VolumeArgs or exception is thrown.
-   * @throws IOException
-   */
-  KsmVolumeArgs getVolumeInfo(String volume) throws IOException;
-
-  /**
-   * Deletes an existing empty volume.
-   *
-   * @param volume - Name of the volume.
-   * @throws IOException
-   */
-  void deleteVolume(String volume) throws IOException;
-
-  /**
-   * Checks if the specified user with a role can access this volume.
-   *
-   * @param volume - volume
-   * @param userAcl - user acl which needs to be checked for access
-   * @return true if the user has access for the volume, false otherwise
-   * @throws IOException
-   */
-  boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
-      throws IOException;
-
-  /**
-   * Returns a list of volumes owned by a given user; if user is null,
-   * returns all volumes.
-   *
-   * @param userName
-   *   volume owner
-   * @param prefix
-   *   the volume prefix used to filter the listing result.
-   * @param startKey
-   *   the start volume name determines where to start listing from,
-   *   this key is excluded from the result.
-   * @param maxKeys
-   *   the maximum number of volumes to return.
-   * @return a list of {@link KsmVolumeArgs}
-   * @throws IOException
-   */
-  List<KsmVolumeArgs> listVolumes(String userName, String prefix,
-      String startKey, int maxKeys) throws IOException;
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to