http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocol/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocol/package-info.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocol/package-info.java
deleted file mode 100644
index 274f859..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocol/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.scm.protocol;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
deleted file mode 100644
index 0de759f..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.scm.protocolPB;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtocolTranslator;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-import 
org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.GetScmBlockLocationsResponseProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.ScmLocatedBlockProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
-import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.scm.ScmInfo;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * This class is the client-side translator to translate the requests made on
- * the {@link ScmBlockLocationProtocol} interface to the RPC server
- * implementing {@link ScmBlockLocationProtocolPB}.
- */
-@InterfaceAudience.Private
-public final class ScmBlockLocationProtocolClientSideTranslatorPB
-    implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable {
-
-  /**
-   * RpcController is not used and hence is set to null.
-   */
-  private static final RpcController NULL_RPC_CONTROLLER = null;
-
-  private final ScmBlockLocationProtocolPB rpcProxy;
-
-  /**
-   * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
-   *
-   * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
-   */
-  public ScmBlockLocationProtocolClientSideTranslatorPB(
-      ScmBlockLocationProtocolPB rpcProxy) {
-    this.rpcProxy = rpcProxy;
-  }
-
-  /**
-   * Find the set of nodes to read/write a block, as
-   * identified by the block key.  This method supports batch lookup by
-   * passing multiple keys.
-   *
-   * @param keys batch of block keys to find
-   * @return allocated blocks for each block key
-   * @throws IOException if there is any failure
-   */
-  @Override
-  public Set<AllocatedBlock> getBlockLocations(Set<String> keys)
-      throws IOException {
-    GetScmBlockLocationsRequestProto.Builder req =
-        GetScmBlockLocationsRequestProto.newBuilder();
-    for (String key : keys) {
-      req.addKeys(key);
-    }
-    final GetScmBlockLocationsResponseProto resp;
-    try {
-      resp = rpcProxy.getScmBlockLocations(NULL_RPC_CONTROLLER,
-          req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    Set<AllocatedBlock> locatedBlocks =
-        Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedBlocksCount());
-    for (ScmLocatedBlockProto locatedBlock : resp.getLocatedBlocksList()) {
-      locatedBlocks.add(new AllocatedBlock.Builder()
-          .setKey(locatedBlock.getKey())
-          .setPipeline(Pipeline.getFromProtoBuf(locatedBlock.getPipeline()))
-          .build());
-    }
-    return locatedBlocks;
-  }
-
-  /**
-   * Asks SCM where a block should be allocated. SCM responds with the
-   * set of datanodes that should be used creating this block.
-   * @param size - size of the block.
-   * @return allocated block accessing info (key, pipeline).
-   * @throws IOException
-   */
-  @Override
-  public AllocatedBlock allocateBlock(long size,
-      HdslProtos.ReplicationType type, HdslProtos.ReplicationFactor factor,
-      String owner) throws IOException {
-    Preconditions.checkArgument(size > 0, "block size must be greater than 0");
-
-    AllocateScmBlockRequestProto request =
-        AllocateScmBlockRequestProto.newBuilder().setSize(size).setType(type)
-            .setFactor(factor).setOwner(owner).build();
-    final AllocateScmBlockResponseProto response;
-    try {
-      response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (response.getErrorCode() !=
-        AllocateScmBlockResponseProto.Error.success) {
-      throw new IOException(response.hasErrorMessage() ?
-          response.getErrorMessage() : "Allocate block failed.");
-    }
-    AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
-        .setKey(response.getKey())
-        .setPipeline(Pipeline.getFromProtoBuf(response.getPipeline()))
-        .setShouldCreateContainer(response.getCreateContainer());
-    return builder.build();
-  }
-
-  /**
-   * Delete the set of keys specified.
-   *
-   * @param keyBlocksInfoList batch of block keys to delete.
-   * @return list of block deletion results.
-   * @throws IOException if there is any failure.
-   *
-   */
-  @Override
-  public List<DeleteBlockGroupResult> deleteKeyBlocks(
-      List<BlockGroup> keyBlocksInfoList) throws IOException {
-    List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
-        .map(BlockGroup::getProto).collect(Collectors.toList());
-    DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
-        .newBuilder().addAllKeyBlocks(keyBlocksProto).build();
-
-    final DeleteScmKeyBlocksResponseProto resp;
-    try {
-      resp = rpcProxy.deleteScmKeyBlocks(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    List<DeleteBlockGroupResult> results =
-        new ArrayList<>(resp.getResultsCount());
-    results.addAll(resp.getResultsList().stream().map(
-        result -> new DeleteBlockGroupResult(result.getObjectKey(),
-            DeleteBlockGroupResult
-                .convertBlockResultProto(result.getBlockResultsList())))
-        .collect(Collectors.toList()));
-    return results;
-  }
-
-  /**
-   * Gets the cluster Id and Scm Id from SCM.
-   * @return ScmInfo
-   * @throws IOException
-   */
-  @Override
-  public ScmInfo getScmInfo() throws IOException {
-    HdslProtos.GetScmInfoRequestProto request =
-        HdslProtos.GetScmInfoRequestProto.getDefaultInstance();
-    HdslProtos.GetScmInfoRespsonseProto resp;
-    try {
-      resp = rpcProxy.getScmInfo(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    ScmInfo.Builder builder = new ScmInfo.Builder()
-        .setClusterId(resp.getClusterId())
-        .setScmId(resp.getScmId());
-    return builder.build();
-  }
-
-  @Override
-  public Object getUnderlyingProxyObject() {
-    return rpcProxy;
-  }
-
-  @Override
-  public void close() {
-    RPC.stopProxy(rpcProxy);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java
deleted file mode 100644
index 019aeeb..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolPB.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.scm.protocolPB;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos
-    .ScmBlockLocationProtocolService;
-
-/**
- * Protocol used from an HDFS node to StorageContainerManager.  This extends 
the
- * Protocol Buffers service interface to add Hadoop-specific annotations.
- */
-@ProtocolInfo(protocolName =
-    "org.apache.hadoop.ozone.protocol.ScmBlockLocationProtocol",
-    protocolVersion = 1)
-@InterfaceAudience.Private
-public interface ScmBlockLocationProtocolPB
-    extends ScmBlockLocationProtocolService.BlockingInterface {
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
deleted file mode 100644
index 348b266..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.scm.protocolPB;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtocolTranslator;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-import org.apache.hadoop.scm.ScmInfo;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-
-/**
- * This class is the client-side translator to translate the requests made on
- * the {@link StorageContainerLocationProtocol} interface to the RPC server
- * implementing {@link StorageContainerLocationProtocolPB}.
- */
-@InterfaceAudience.Private
-public final class StorageContainerLocationProtocolClientSideTranslatorPB
-    implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable 
{
-
-  /**
-   * RpcController is not used and hence is set to null.
-   */
-  private static final RpcController NULL_RPC_CONTROLLER = null;
-
-  private final StorageContainerLocationProtocolPB rpcProxy;
-
-  /**
-   * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
-   *
-   * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
-   */
-  public StorageContainerLocationProtocolClientSideTranslatorPB(
-      StorageContainerLocationProtocolPB rpcProxy) {
-    this.rpcProxy = rpcProxy;
-  }
-
-  /**
-   * Asks SCM where a container should be allocated. SCM responds with the set
-   * of datanodes that should be used creating this container. Ozone/SCM only
-   * supports replication factor of either 1 or 3.
-   * @param type - Replication Type
-   * @param factor - Replication Count
-   * @param containerName - Name
-   * @return
-   * @throws IOException
-   */
-  @Override
-  public Pipeline allocateContainer(HdslProtos.ReplicationType type,
-      HdslProtos.ReplicationFactor factor, String
-      containerName, String owner) throws IOException {
-
-    Preconditions.checkNotNull(containerName, "Container Name cannot be Null");
-    Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" 
+
-        " be empty");
-    ContainerRequestProto request = ContainerRequestProto.newBuilder()
-        .setContainerName(containerName)
-        .setReplicationFactor(factor)
-        .setReplicationType(type)
-        .setOwner(owner)
-        .build();
-
-    final ContainerResponseProto response;
-    try {
-      response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    if (response.getErrorCode() != ContainerResponseProto.Error.success) {
-      throw new IOException(response.hasErrorMessage() ?
-          response.getErrorMessage() : "Allocate container failed.");
-    }
-    return Pipeline.getFromProtoBuf(response.getPipeline());
-  }
-
-  public Pipeline getContainer(String containerName) throws IOException {
-    Preconditions.checkNotNull(containerName,
-        "Container Name cannot be Null");
-    Preconditions.checkState(!containerName.isEmpty(),
-        "Container name cannot be empty");
-    GetContainerRequestProto request = GetContainerRequestProto
-        .newBuilder()
-        .setContainerName(containerName)
-        .build();
-    try {
-      GetContainerResponseProto response =
-          rpcProxy.getContainer(NULL_RPC_CONTROLLER, request);
-      return Pipeline.getFromProtoBuf(response.getPipeline());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<ContainerInfo> listContainer(String startName, String prefixName,
-      int count) throws IOException {
-    SCMListContainerRequestProto.Builder builder = SCMListContainerRequestProto
-        .newBuilder();
-    if (prefixName != null) {
-      builder.setPrefixName(prefixName);
-    }
-    if (startName != null) {
-      builder.setStartName(startName);
-    }
-    builder.setCount(count);
-    SCMListContainerRequestProto request = builder.build();
-
-    try {
-      SCMListContainerResponseProto response =
-          rpcProxy.listContainer(NULL_RPC_CONTROLLER, request);
-      List<ContainerInfo> containerList = new ArrayList<>();
-      for (HdslProtos.SCMContainerInfo containerInfoProto : response
-          .getContainersList()) {
-        containerList.add(ContainerInfo.fromProtobuf(containerInfoProto));
-      }
-      return containerList;
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-  }
-
-  /**
-   * Ask SCM to delete a container by name. SCM will remove
-   * the container mapping in its database.
-   *
-   * @param containerName
-   * @throws IOException
-   */
-  @Override
-  public void deleteContainer(String containerName)
-      throws IOException {
-    Preconditions.checkState(!Strings.isNullOrEmpty(containerName),
-        "Container name cannot be null or empty");
-    SCMDeleteContainerRequestProto request = SCMDeleteContainerRequestProto
-        .newBuilder()
-        .setContainerName(containerName)
-        .build();
-    try {
-      rpcProxy.deleteContainer(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-  }
-
-  /**
-   * Queries a list of Node Statuses.
-   *
-   * @param nodeStatuses
-   * @return List of Datanodes.
-   */
-  @Override
-  public HdslProtos.NodePool queryNode(EnumSet<HdslProtos.NodeState>
-      nodeStatuses, HdslProtos.QueryScope queryScope, String poolName)
-      throws IOException {
-    // TODO : We support only cluster wide query right now. So ignoring 
checking
-    // queryScope and poolName
-    Preconditions.checkNotNull(nodeStatuses);
-    Preconditions.checkState(nodeStatuses.size() > 0);
-    NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder()
-        .addAllQuery(nodeStatuses)
-        .setScope(queryScope).setPoolName(poolName).build();
-    try {
-      NodeQueryResponseProto response =
-          rpcProxy.queryNode(NULL_RPC_CONTROLLER, request);
-      return response.getDatanodes();
-    } catch (ServiceException e) {
-      throw  ProtobufHelper.getRemoteException(e);
-    }
-
-  }
-
-  /**
-   * Notify from client that creates object on datanodes.
-   * @param type object type
-   * @param name object name
-   * @param op operation type (e.g., create, close, delete)
-   * @param stage object creation stage : begin/complete
-   */
-  @Override
-  public void notifyObjectStageChange(
-      ObjectStageChangeRequestProto.Type type, String name,
-      ObjectStageChangeRequestProto.Op op,
-      ObjectStageChangeRequestProto.Stage stage) throws IOException {
-    Preconditions.checkState(!Strings.isNullOrEmpty(name),
-        "Object name cannot be null or empty");
-    ObjectStageChangeRequestProto request =
-        ObjectStageChangeRequestProto.newBuilder()
-            .setType(type)
-            .setName(name)
-            .setOp(op)
-            .setStage(stage)
-            .build();
-    try {
-      rpcProxy.notifyObjectStageChange(NULL_RPC_CONTROLLER, request);
-    } catch(ServiceException e){
-      throw ProtobufHelper.getRemoteException(e);
-    }
-  }
-
-  /**
-   * Creates a replication pipeline of a specified type.
-   *
-   * @param replicationType - replication type
-   * @param factor - factor 1 or 3
-   * @param nodePool - optional machine list to build a pipeline.
-   * @throws IOException
-   */
-  @Override
-  public Pipeline createReplicationPipeline(HdslProtos.ReplicationType
-      replicationType, HdslProtos.ReplicationFactor factor, HdslProtos
-      .NodePool nodePool) throws IOException {
-    PipelineRequestProto request = PipelineRequestProto.newBuilder()
-        .setNodePool(nodePool)
-        .setReplicationFactor(factor)
-        .setReplicationType(replicationType)
-        .build();
-    try {
-      PipelineResponseProto response =
-          rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request);
-      if (response.getErrorCode() ==
-          PipelineResponseProto.Error.success) {
-        Preconditions.checkState(response.hasPipeline(), "With success, " +
-            "must come a pipeline");
-        return Pipeline.getFromProtoBuf(response.getPipeline());
-      } else {
-        String errorMessage = String.format("create replication pipeline " +
-                "failed. code : %s Message: %s", response.getErrorCode(),
-            response.hasErrorMessage() ? response.getErrorMessage() : "");
-        throw new IOException(errorMessage);
-      }
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-  }
-
-  @Override
-  public ScmInfo getScmInfo() throws IOException {
-    HdslProtos.GetScmInfoRequestProto request =
-        HdslProtos.GetScmInfoRequestProto.getDefaultInstance();
-    try {
-      HdslProtos.GetScmInfoRespsonseProto resp = rpcProxy.getScmInfo(
-          NULL_RPC_CONTROLLER, request);
-      ScmInfo.Builder builder = new ScmInfo.Builder()
-          .setClusterId(resp.getClusterId())
-          .setScmId(resp.getScmId());
-      return builder.build();
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-
-  }
-
-  @Override
-  public Object getUnderlyingProxyObject() {
-    return rpcProxy;
-  }
-
-  @Override
-  public void close() {
-    RPC.stopProxy(rpcProxy);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java
deleted file mode 100644
index b8c2958..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolPB.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.scm.protocolPB;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ipc.ProtocolInfo;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService;
-
-/**
- * Protocol used from an HDFS node to StorageContainerManager.  This extends 
the
- * Protocol Buffers service interface to add Hadoop-specific annotations.
- */
-@ProtocolInfo(protocolName =
-    "org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol",
-    protocolVersion = 1)
-@InterfaceAudience.Private
-public interface StorageContainerLocationProtocolPB
-    extends StorageContainerLocationProtocolService.BlockingInterface {
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java
deleted file mode 100644
index f9a2c09..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/protocolPB/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.scm.protocolPB;
-
-/**
- * This package contains classes for the client of the storage container
- * protocol.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
deleted file mode 100644
index 174f1c1..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.scm.storage;
-
-import com.google.protobuf.ByteString;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .GetKeyRequestProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .GetKeyResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .GetSmallFileRequestProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .GetSmallFileResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.KeyData;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .PutKeyRequestProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .PutSmallFileRequestProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .ReadChunkRequestProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .ReadChunkResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.Type;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .WriteChunkRequestProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .ReadContainerResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .ReadContainerRequestProto;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.KeyValue;
-import 
org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
-
-import java.io.IOException;
-import org.apache.hadoop.scm.XceiverClientSpi;
-
-/**
- * Implementation of all container protocol calls performed by Container
- * clients.
- */
-public final class ContainerProtocolCalls  {
-
-  /**
-   * There is no need to instantiate this class.
-   */
-  private ContainerProtocolCalls() {
-  }
-
-  /**
-   * Calls the container protocol to get a container key.
-   *
-   * @param xceiverClient client to perform call
-   * @param containerKeyData key data to identify container
-   * @param traceID container protocol call args
-   * @return container protocol get key response
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient,
-      KeyData containerKeyData, String traceID) throws IOException {
-    GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
-        .newBuilder()
-        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
-        .setKeyData(containerKeyData);
-    String id = xceiverClient.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.GetKey)
-        .setTraceID(traceID)
-        .setDatanodeUuid(id)
-        .setGetKey(readKeyRequest)
-        .build();
-    ContainerCommandResponseProto response = 
xceiverClient.sendCommand(request);
-    validateContainerResponse(response);
-    return response.getGetKey();
-  }
-
-  /**
-   * Calls the container protocol to put a container key.
-   *
-   * @param xceiverClient client to perform call
-   * @param containerKeyData key data to identify container
-   * @param traceID container protocol call args
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  public static void putKey(XceiverClientSpi xceiverClient,
-      KeyData containerKeyData, String traceID) throws IOException {
-    PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
-        .newBuilder()
-        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
-        .setKeyData(containerKeyData);
-    String id = xceiverClient.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.PutKey)
-        .setTraceID(traceID)
-        .setDatanodeUuid(id)
-        .setPutKey(createKeyRequest)
-        .build();
-    ContainerCommandResponseProto response = 
xceiverClient.sendCommand(request);
-    validateContainerResponse(response);
-  }
-
-  /**
-   * Calls the container protocol to read a chunk.
-   *
-   * @param xceiverClient client to perform call
-   * @param chunk information about chunk to read
-   * @param key the key name
-   * @param traceID container protocol call args
-   * @return container protocol read chunk response
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  public static ReadChunkResponseProto readChunk(XceiverClientSpi 
xceiverClient,
-      ChunkInfo chunk, String key, String traceID)
-      throws IOException {
-    ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
-        .newBuilder()
-        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
-        .setKeyName(key)
-        .setChunkData(chunk);
-    String id = xceiverClient.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.ReadChunk)
-        .setTraceID(traceID)
-        .setDatanodeUuid(id)
-        .setReadChunk(readChunkRequest)
-        .build();
-    ContainerCommandResponseProto response = 
xceiverClient.sendCommand(request);
-    validateContainerResponse(response);
-    return response.getReadChunk();
-  }
-
-  /**
-   * Calls the container protocol to write a chunk.
-   *
-   * @param xceiverClient client to perform call
-   * @param chunk information about chunk to write
-   * @param key the key name
-   * @param data the data of the chunk to write
-   * @param traceID container protocol call args
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo 
chunk,
-      String key, ByteString data, String traceID)
-      throws IOException {
-    WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
-        .newBuilder()
-        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
-        .setKeyName(key)
-        .setChunkData(chunk)
-        .setData(data);
-    String id = xceiverClient.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.WriteChunk)
-        .setTraceID(traceID)
-        .setDatanodeUuid(id)
-        .setWriteChunk(writeChunkRequest)
-        .build();
-    ContainerCommandResponseProto response = 
xceiverClient.sendCommand(request);
-    validateContainerResponse(response);
-  }
-
-  /**
-   * Allows writing a small file using single RPC. This takes the container
-   * name, key name and data to write sends all that data to the container 
using
-   * a single RPC. This API is designed to be used for files which are smaller
-   * than 1 MB.
-   *
-   * @param client - client that communicates with the container.
-   * @param containerName - Name of the container
-   * @param key - Name of the Key
-   * @param data - Data to be written into the container.
-   * @param traceID - Trace ID for logging purpose.
-   * @throws IOException
-   */
-  public static void writeSmallFile(XceiverClientSpi client,
-      String containerName, String key, byte[] data, String traceID)
-      throws IOException {
-
-    KeyData containerKeyData =
-        KeyData.newBuilder().setContainerName(containerName).setName(key)
-            .build();
-    PutKeyRequestProto.Builder createKeyRequest =
-        PutKeyRequestProto.newBuilder()
-            .setPipeline(client.getPipeline().getProtobufMessage())
-            .setKeyData(containerKeyData);
-
-    KeyValue keyValue =
-        KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true")
-            .build();
-    ChunkInfo chunk =
-        ChunkInfo.newBuilder().setChunkName(key + "_chunk").setOffset(0)
-            .setLen(data.length).addMetadata(keyValue).build();
-
-    PutSmallFileRequestProto putSmallFileRequest =
-        PutSmallFileRequestProto.newBuilder().setChunkInfo(chunk)
-            .setKey(createKeyRequest).setData(ByteString.copyFrom(data))
-            .build();
-
-    String id = client.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto request =
-        ContainerCommandRequestProto.newBuilder()
-            .setCmdType(Type.PutSmallFile)
-            .setTraceID(traceID)
-            .setDatanodeUuid(id)
-            .setPutSmallFile(putSmallFileRequest)
-            .build();
-    ContainerCommandResponseProto response = client.sendCommand(request);
-    validateContainerResponse(response);
-  }
-
-  /**
-   * createContainer call that creates a container on the datanode.
-   * @param client  - client
-   * @param traceID - traceID
-   * @throws IOException
-   */
-  public static void createContainer(XceiverClientSpi client, String traceID)
-      throws IOException {
-    ContainerProtos.CreateContainerRequestProto.Builder createRequest =
-        ContainerProtos.CreateContainerRequestProto
-            .newBuilder();
-    ContainerProtos.ContainerData.Builder containerData = ContainerProtos
-        .ContainerData.newBuilder();
-    containerData.setName(client.getPipeline().getContainerName());
-    createRequest.setPipeline(client.getPipeline().getProtobufMessage());
-    createRequest.setContainerData(containerData.build());
-
-    String id = client.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto.Builder request =
-        ContainerCommandRequestProto.newBuilder();
-    request.setCmdType(ContainerProtos.Type.CreateContainer);
-    request.setCreateContainer(createRequest);
-    request.setDatanodeUuid(id);
-    request.setTraceID(traceID);
-    ContainerCommandResponseProto response = client.sendCommand(
-        request.build());
-    validateContainerResponse(response);
-  }
-
-  /**
-   * Deletes a container from a pipeline.
-   *
-   * @param client
-   * @param force whether or not to forcibly delete the container.
-   * @param traceID
-   * @throws IOException
-   */
-  public static void deleteContainer(XceiverClientSpi client,
-      boolean force, String traceID) throws IOException {
-    ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest =
-        ContainerProtos.DeleteContainerRequestProto.newBuilder();
-    deleteRequest.setName(client.getPipeline().getContainerName());
-    deleteRequest.setPipeline(client.getPipeline().getProtobufMessage());
-    deleteRequest.setForceDelete(force);
-    String id = client.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto.Builder request =
-        ContainerCommandRequestProto.newBuilder();
-    request.setCmdType(ContainerProtos.Type.DeleteContainer);
-    request.setDeleteContainer(deleteRequest);
-    request.setTraceID(traceID);
-    request.setDatanodeUuid(id);
-    ContainerCommandResponseProto response =
-        client.sendCommand(request.build());
-    validateContainerResponse(response);
-  }
-
-  /**
-   * Close a container.
-   *
-   * @param client
-   * @param traceID
-   * @throws IOException
-   */
-  public static void closeContainer(XceiverClientSpi client, String traceID)
-      throws IOException {
-    ContainerProtos.CloseContainerRequestProto.Builder closeRequest =
-        ContainerProtos.CloseContainerRequestProto.newBuilder();
-    closeRequest.setPipeline(client.getPipeline().getProtobufMessage());
-
-    String id = client.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto.Builder request =
-        ContainerCommandRequestProto.newBuilder();
-    request.setCmdType(Type.CloseContainer);
-    request.setCloseContainer(closeRequest);
-    request.setTraceID(traceID);
-    request.setDatanodeUuid(id);
-    ContainerCommandResponseProto response =
-        client.sendCommand(request.build());
-    validateContainerResponse(response);
-  }
-
-  /**
-   * readContainer call that gets meta data from an existing container.
-   *
-   * @param client - client
-   * @param traceID - trace ID
-   * @throws IOException
-   */
-  public static ReadContainerResponseProto readContainer(
-      XceiverClientSpi client, String containerName,
-      String traceID) throws IOException {
-    ReadContainerRequestProto.Builder readRequest =
-        ReadContainerRequestProto.newBuilder();
-    readRequest.setName(containerName);
-    readRequest.setPipeline(client.getPipeline().getProtobufMessage());
-    String id = client.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto.Builder request =
-        ContainerCommandRequestProto.newBuilder();
-    request.setCmdType(Type.ReadContainer);
-    request.setReadContainer(readRequest);
-    request.setDatanodeUuid(id);
-    request.setTraceID(traceID);
-    ContainerCommandResponseProto response =
-        client.sendCommand(request.build());
-    validateContainerResponse(response);
-    return response.getReadContainer();
-  }
-
-  /**
-   * Reads the data given the container name and key.
-   *
-   * @param client
-   * @param containerName - name of the container
-   * @param key - key
-   * @param traceID - trace ID
-   * @return GetSmallFileResponseProto
-   * @throws IOException
-   */
-  public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi 
client,
-      String containerName, String key, String traceID) throws IOException {
-    KeyData containerKeyData = KeyData
-        .newBuilder()
-        .setContainerName(containerName)
-        .setName(key).build();
-
-    GetKeyRequestProto.Builder getKey = GetKeyRequestProto
-        .newBuilder()
-        .setPipeline(client.getPipeline().getProtobufMessage())
-        .setKeyData(containerKeyData);
-    ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
-        GetSmallFileRequestProto
-            .newBuilder().setKey(getKey)
-            .build();
-    String id = client.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.GetSmallFile)
-        .setTraceID(traceID)
-        .setDatanodeUuid(id)
-        .setGetSmallFile(getSmallFileRequest)
-        .build();
-    ContainerCommandResponseProto response = client.sendCommand(request);
-    validateContainerResponse(response);
-    return response.getGetSmallFile();
-  }
-
-  /**
-   * Validates a response from a container protocol call.  Any non-successful
-   * return code is mapped to a corresponding exception and thrown.
-   *
-   * @param response container protocol call response
-   * @throws IOException if the container protocol call failed
-   */
-  private static void validateContainerResponse(
-      ContainerCommandResponseProto response
-  ) throws StorageContainerException {
-    if (response.getResult() == ContainerProtos.Result.SUCCESS) {
-      return;
-    }
-    throw new StorageContainerException(
-        response.getMessage(), response.getResult());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/package-info.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/package-info.java
deleted file mode 100644
index aa89af0..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/scm/storage/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.scm.storage;
-
-/**
- * This package contains StorageContainerManager classes.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java
deleted file mode 100644
index 2ff4e55..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.utils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ExecutionException;
-
-/**
- * An abstract class for a background service in ozone.
- * A background service schedules multiple child tasks in parallel
- * in a certain period. In each interval, it waits until all the tasks
- * finish execution and then schedule next interval.
- */
-public abstract class BackgroundService {
-
-  @VisibleForTesting
-  public static final Logger LOG =
-      LoggerFactory.getLogger(BackgroundService.class);
-
-  // Executor to launch child tasks
-  private final ScheduledExecutorService exec;
-  private final ThreadGroup threadGroup;
-  private final ThreadFactory threadFactory;
-  private final String serviceName;
-  private final long interval;
-  private final long serviceTimeout;
-  private final TimeUnit unit;
-  private final PeriodicalTask service;
-
-  public BackgroundService(String serviceName, long interval,
-      TimeUnit unit, int threadPoolSize, long serviceTimeout) {
-    this.interval = interval;
-    this.unit = unit;
-    this.serviceName = serviceName;
-    this.serviceTimeout = serviceTimeout;
-    threadGroup = new ThreadGroup(serviceName);
-    ThreadFactory tf = r -> new Thread(threadGroup, r);
-    threadFactory = new ThreadFactoryBuilder()
-        .setThreadFactory(tf)
-        .setDaemon(true)
-        .setNameFormat(serviceName + "#%d")
-        .build();
-    exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
-    service = new PeriodicalTask();
-  }
-
-  protected ExecutorService getExecutorService() {
-    return this.exec;
-  }
-
-  @VisibleForTesting
-  public int getThreadCount() {
-    return threadGroup.activeCount();
-  }
-
-  @VisibleForTesting
-  public void triggerBackgroundTaskForTesting() {
-    service.run();
-  }
-
-  // start service
-  public void start() {
-    exec.scheduleWithFixedDelay(service, 0, interval, unit);
-  }
-
-  public abstract BackgroundTaskQueue getTasks();
-
-  /**
-   * Run one or more background tasks concurrently.
-   * Wait until all tasks to return the result.
-   */
-  public class PeriodicalTask implements Runnable {
-    @Override
-    public synchronized void run() {
-      LOG.debug("Running background service : {}", serviceName);
-      BackgroundTaskQueue tasks = getTasks();
-      if (tasks.isEmpty()) {
-        // No task found, or some problems to init tasks
-        // return and retry in next interval.
-        return;
-      }
-
-      LOG.debug("Number of background tasks to execute : {}", tasks.size());
-      CompletionService<BackgroundTaskResult> taskCompletionService =
-          new ExecutorCompletionService<>(exec);
-
-      List<Future<BackgroundTaskResult>> results = Lists.newArrayList();
-      while (tasks.size() > 0) {
-        BackgroundTask task = tasks.poll();
-        Future<BackgroundTaskResult> result =
-            taskCompletionService.submit(task);
-        results.add(result);
-      }
-
-      results.parallelStream().forEach(taskResultFuture -> {
-        try {
-          // Collect task results
-          BackgroundTaskResult result = serviceTimeout > 0
-              ? taskResultFuture.get(serviceTimeout, TimeUnit.MILLISECONDS)
-              : taskResultFuture.get();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("task execution result size {}", result.getSize());
-          }
-        } catch (InterruptedException | ExecutionException e) {
-          LOG.warn(
-              "Background task fails to execute, "
-                  + "retrying in next interval", e);
-        } catch (TimeoutException e) {
-          LOG.warn("Background task executes timed out, "
-              + "retrying in next interval", e);
-        }
-      });
-    }
-  }
-
-  // shutdown and make sure all threads are properly released.
-  public void shutdown() {
-    LOG.info("Shutting down service {}", this.serviceName);
-    exec.shutdown();
-    try {
-      if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
-        exec.shutdownNow();
-      }
-    } catch (InterruptedException e) {
-      exec.shutdownNow();
-    }
-    if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
-      threadGroup.destroy();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
deleted file mode 100644
index 47e8ebc..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.utils;
-
-import java.util.concurrent.Callable;
-
-/**
- * A task thread to run by {@link BackgroundService}.
- */
-public interface BackgroundTask<T> extends Callable<T> {
-
-  int getPriority();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
deleted file mode 100644
index b56ef0c..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.utils;
-
-import java.util.PriorityQueue;
-
-/**
- * A priority queue that stores a number of {@link BackgroundTask}.
- */
-public class BackgroundTaskQueue {
-
-  private final PriorityQueue<BackgroundTask> tasks;
-
-  public BackgroundTaskQueue() {
-    tasks = new PriorityQueue<>((task1, task2)
-        -> task1.getPriority() - task2.getPriority());
-  }
-
-  /**
-   * @return the head task in this queue.
-   */
-  public synchronized BackgroundTask poll() {
-    return tasks.poll();
-  }
-
-  /**
-   * Add a {@link BackgroundTask} to the queue,
-   * the task will be sorted by its priority.
-   *
-   * @param task
-   */
-  public synchronized void add(BackgroundTask task) {
-    tasks.add(task);
-  }
-
-  /**
-   * @return true if the queue contains no task, false otherwise.
-   */
-  public synchronized boolean isEmpty() {
-    return tasks.isEmpty();
-  }
-
-  /**
-   * @return the size of the queue.
-   */
-  public synchronized int size() {
-    return tasks.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
deleted file mode 100644
index 198300f..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.utils;
-
-/**
- * Result of a {@link BackgroundTask}.
- */
-public interface BackgroundTaskResult {
-
-  /**
-   * Returns the size of entries included in this result.
-   */
-  int getSize();
-
-  /**
-   * An empty task result implementation.
-   */
-  class EmptyTaskResult implements BackgroundTaskResult {
-
-    public static EmptyTaskResult newResult() {
-      return new EmptyTaskResult();
-    }
-
-    @Override
-    public int getSize() {
-      return 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java
deleted file mode 100644
index 47699eb..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.utils;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-/**
- * An utility class to store a batch of DB write operations.
- */
-public class BatchOperation {
-
-  /**
-   * Enum for write operations.
-   */
-  public enum Operation {
-    DELETE, PUT
-  }
-
-  private List<SingleOperation> operations =
-      Lists.newArrayList();
-
-  /**
-   * Add a PUT operation into the batch.
-   */
-  public void put(byte[] key, byte[] value) {
-    operations.add(new SingleOperation(Operation.PUT, key, value));
-  }
-
-  /**
-   * Add a DELETE operation into the batch.
-   */
-  public void delete(byte[] key) {
-    operations.add(new SingleOperation(Operation.DELETE, key, null));
-
-  }
-
-  public List<SingleOperation> getOperations() {
-    return operations;
-  }
-
-  /**
-   * A SingleOperation represents a PUT or DELETE operation
-   * and the data the operation needs to manipulates.
-   */
-  public static class SingleOperation {
-
-    private Operation opt;
-    private byte[] key;
-    private byte[] value;
-
-    public SingleOperation(Operation opt, byte[] key, byte[] value) {
-      this.opt = opt;
-      if (key == null) {
-        throw new IllegalArgumentException("key cannot be null");
-      }
-      this.key = key.clone();
-      this.value = value == null ? null : value.clone();
-    }
-
-    public Operation getOpt() {
-      return opt;
-    }
-
-    public byte[] getKey() {
-      return key.clone();
-    }
-
-    public byte[] getValue() {
-      return value == null ? null : value.clone();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
deleted file mode 100644
index c407398..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.utils;
-
-import java.io.IOException;
-
-/**
- * A consumer for metadata store key-value entries.
- * Used by {@link MetadataStore} class.
- */
-@FunctionalInterface
-public interface EntryConsumer {
-
-  /**
-   * Consumes a key and value and produces a boolean result.
-   * @param key key
-   * @param value value
-   * @return a boolean value produced by the consumer
-   * @throws IOException
-   */
-  boolean consume(byte[] key, byte[] value) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
deleted file mode 100644
index 72ac8d1..0000000
--- a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.utils;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.iq80.leveldb.WriteBatch;
-import org.iq80.leveldb.DB;
-import org.iq80.leveldb.Options;
-import org.iq80.leveldb.WriteOptions;
-import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Snapshot;
-import org.iq80.leveldb.ReadOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * LevelDB interface.
- */
-public class LevelDBStore implements MetadataStore {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(LevelDBStore.class);
-
-  private DB db;
-  private final File dbFile;
-  private final Options dbOptions;
-  private final WriteOptions writeOptions;
-
-  public LevelDBStore(File dbPath, boolean createIfMissing)
-      throws IOException {
-    dbOptions = new Options();
-    dbOptions.createIfMissing(createIfMissing);
-    this.dbFile = dbPath;
-    this.writeOptions = new WriteOptions().sync(true);
-    openDB(dbPath, dbOptions);
-  }
-
-  /**
-   * Opens a DB file.
-   *
-   * @param dbPath          - DB File path
-   * @throws IOException
-   */
-  public LevelDBStore(File dbPath, Options options)
-      throws IOException {
-    dbOptions = options;
-    this.dbFile = dbPath;
-    this.writeOptions = new WriteOptions().sync(true);
-    openDB(dbPath, dbOptions);
-  }
-
-  private void openDB(File dbPath, Options options) throws IOException {
-    dbPath.getParentFile().mkdirs();
-    db = JniDBFactory.factory.open(dbPath, options);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("LevelDB successfully opened");
-      LOG.debug("[Option] cacheSize = " + options.cacheSize());
-      LOG.debug("[Option] createIfMissing = " + options.createIfMissing());
-      LOG.debug("[Option] blockSize = " + options.blockSize());
-      LOG.debug("[Option] compressionType= " + options.compressionType());
-      LOG.debug("[Option] maxOpenFiles= " + options.maxOpenFiles());
-      LOG.debug("[Option] writeBufferSize= "+ options.writeBufferSize());
-    }
-  }
-
-  /**
-   * Puts a Key into file.
-   *
-   * @param key   - key
-   * @param value - value
-   */
-  @Override
-  public void put(byte[] key, byte[] value) {
-    db.put(key, value, writeOptions);
-  }
-
-  /**
-   * Get Key.
-   *
-   * @param key key
-   * @return value
-   */
-  @Override
-  public byte[] get(byte[] key) {
-    return db.get(key);
-  }
-
-  /**
-   * Delete Key.
-   *
-   * @param key - Key
-   */
-  @Override
-  public void delete(byte[] key) {
-    db.delete(key);
-  }
-
-  /**
-   * Closes the DB.
-   *
-   * @throws IOException
-   */
-  @Override
-  public void close() throws IOException {
-    if (db != null){
-      db.close();
-    }
-  }
-
-  /**
-   * Returns true if the DB is empty.
-   *
-   * @return boolean
-   * @throws IOException
-   */
-  @Override
-  public boolean isEmpty() throws IOException {
-    try (DBIterator iter = db.iterator()) {
-      iter.seekToFirst();
-      boolean hasNext = !iter.hasNext();
-      return hasNext;
-    }
-  }
-
-  /**
-   * Returns the actual levelDB object.
-   * @return DB handle.
-   */
-  public DB getDB() {
-    return db;
-  }
-
-  /**
-   * Returns an iterator on all the key-value pairs in the DB.
-   * @return an iterator on DB entries.
-   */
-  public DBIterator getIterator() {
-    return db.iterator();
-  }
-
-
-  @Override
-  public void destroy() throws IOException {
-    close();
-    JniDBFactory.factory.destroy(dbFile, dbOptions);
-  }
-
-  @Override
-  public ImmutablePair<byte[], byte[]> peekAround(int offset,
-      byte[] from) throws IOException, IllegalArgumentException {
-    try (DBIterator it = db.iterator()) {
-      if (from == null) {
-        it.seekToFirst();
-      } else {
-        it.seek(from);
-      }
-      if (!it.hasNext()) {
-        return null;
-      }
-      switch (offset) {
-      case 0:
-        Entry<byte[], byte[]> current = it.next();
-        return new ImmutablePair<>(current.getKey(), current.getValue());
-      case 1:
-        if (it.next() != null && it.hasNext()) {
-          Entry<byte[], byte[]> next = it.peekNext();
-          return new ImmutablePair<>(next.getKey(), next.getValue());
-        }
-        break;
-      case -1:
-        if (it.hasPrev()) {
-          Entry<byte[], byte[]> prev = it.peekPrev();
-          return new ImmutablePair<>(prev.getKey(), prev.getValue());
-        }
-        break;
-      default:
-        throw new IllegalArgumentException(
-            "Position can only be -1, 0 " + "or 1, but found " + offset);
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public void iterate(byte[] from, EntryConsumer consumer)
-      throws IOException {
-    try (DBIterator iter = db.iterator()) {
-      if (from != null) {
-        iter.seek(from);
-      } else {
-        iter.seekToFirst();
-      }
-      while (iter.hasNext()) {
-        Entry<byte[], byte[]> current = iter.next();
-        if (!consumer.consume(current.getKey(),
-            current.getValue())) {
-          break;
-        }
-      }
-    }
-  }
-
-  /**
-   * Compacts the DB by removing deleted keys etc.
-   * @throws IOException if there is an error.
-   */
-  @Override
-  public void compactDB() throws IOException {
-    if(db != null) {
-      // From LevelDB docs : begin == null and end == null means the whole DB.
-      db.compactRange(null, null);
-    }
-  }
-
-  @Override
-  public void writeBatch(BatchOperation operation) throws IOException {
-    List<BatchOperation.SingleOperation> operations =
-        operation.getOperations();
-    if (!operations.isEmpty()) {
-      try (WriteBatch writeBatch = db.createWriteBatch()) {
-        for (BatchOperation.SingleOperation opt : operations) {
-          switch (opt.getOpt()) {
-          case DELETE:
-            writeBatch.delete(opt.getKey());
-            break;
-          case PUT:
-            writeBatch.put(opt.getKey(), opt.getValue());
-            break;
-          default:
-            throw new IllegalArgumentException("Invalid operation "
-                + opt.getOpt());
-          }
-        }
-        db.write(writeBatch);
-      }
-    }
-  }
-
-  @Override
-  public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
-      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
-      throws IOException, IllegalArgumentException {
-    return getRangeKVs(startKey, count, false, filters);
-  }
-
-  @Override
-  public List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
-      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
-      throws IOException, IllegalArgumentException {
-    return getRangeKVs(startKey, count, true, filters);
-  }
-
-  /**
-   * Returns a certain range of key value pairs as a list based on a
-   * startKey or count. Further a {@link MetadataKeyFilter} can be added to
-   * filter keys if necessary. To prevent race conditions while listing
-   * entries, this implementation takes a snapshot and lists the entries from
-   * the snapshot. This may, on the other hand, cause the range result slight
-   * different with actual data if data is updating concurrently.
-   * <p>
-   * If the startKey is specified and found in levelDB, this key and the keys
-   * after this key will be included in the result. If the startKey is null
-   * all entries will be included as long as other conditions are satisfied.
-   * If the given startKey doesn't exist, an empty list will be returned.
-   * <p>
-   * The count argument is to limit number of total entries to return,
-   * the value for count must be an integer greater than 0.
-   * <p>
-   * This method allows to specify one or more {@link MetadataKeyFilter}
-   * to filter keys by certain condition. Once given, only the entries
-   * whose key passes all the filters will be included in the result.
-   *
-   * @param startKey a start key.
-   * @param count max number of entries to return.
-   * @param filters customized one or more {@link MetadataKeyFilter}.
-   * @return a list of entries found in the database or an empty list if the
-   * startKey is invalid.
-   * @throws IOException if there are I/O errors.
-   * @throws IllegalArgumentException if count is less than 0.
-   */
-  private List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
-      int count, boolean sequential, MetadataKeyFilter... filters)
-      throws IOException {
-    List<Entry<byte[], byte[]>> result = new ArrayList<>();
-    long start = System.currentTimeMillis();
-    if (count < 0) {
-      throw new IllegalArgumentException(
-          "Invalid count given " + count + ", count must be greater than 0");
-    }
-    Snapshot snapShot = null;
-    DBIterator dbIter = null;
-    try {
-      snapShot = db.getSnapshot();
-      ReadOptions readOptions = new ReadOptions().snapshot(snapShot);
-      dbIter = db.iterator(readOptions);
-      if (startKey == null) {
-        dbIter.seekToFirst();
-      } else {
-        if (db.get(startKey) == null) {
-          // Key not found, return empty list
-          return result;
-        }
-        dbIter.seek(startKey);
-      }
-      while (dbIter.hasNext() && result.size() < count) {
-        byte[] preKey = dbIter.hasPrev() ? dbIter.peekPrev().getKey() : null;
-        byte[] nextKey = dbIter.hasNext() ? dbIter.peekNext().getKey() : null;
-        Entry<byte[], byte[]> current = dbIter.next();
-
-        if (filters == null) {
-          result.add(current);
-        } else {
-          if (Arrays.asList(filters).stream().allMatch(
-              entry -> entry.filterKey(preKey, current.getKey(), nextKey))) {
-            result.add(current);
-          } else {
-            if (result.size() > 0 && sequential) {
-              // if the caller asks for a sequential range of results,
-              // and we met a dis-match, abort iteration from here.
-              // if result is empty, we continue to look for the first match.
-              break;
-            }
-          }
-        }
-      }
-    } finally {
-      if (snapShot != null) {
-        snapShot.close();
-      }
-      if (dbIter != null) {
-        dbIter.close();
-      }
-      if (LOG.isDebugEnabled()) {
-        if (filters != null) {
-          for (MetadataKeyFilters.MetadataKeyFilter filter : filters) {
-            int scanned = filter.getKeysScannedNum();
-            int hinted = filter.getKeysHintedNum();
-            if (scanned > 0 || hinted > 0) {
-              LOG.debug(
-                  "getRangeKVs ({}) numOfKeysScanned={}, numOfKeysHinted={}",
-                  filter.getClass().getSimpleName(), 
filter.getKeysScannedNum(),
-                  filter.getKeysHintedNum());
-            }
-          }
-        }
-        long end = System.currentTimeMillis();
-        long timeConsumed = end - start;
-        LOG.debug("Time consumed for getRangeKVs() is {}ms,"
-            + " result length is {}.", timeConsumed, result.size());
-      }
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
deleted file mode 100644
index 3ff0a94..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.utils;
-
-import com.google.common.base.Strings;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.OzoneConsts;
-
-/**
- * An utility class to filter levelDB keys.
- */
-public final class MetadataKeyFilters {
-
-  private static KeyPrefixFilter deletingKeyFilter =
-      new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX);
-
-  private static KeyPrefixFilter normalKeyFilter =
-      new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX,
-          true);
-
-  private MetadataKeyFilters() {
-  }
-
-  public static KeyPrefixFilter getDeletingKeyFilter() {
-    return deletingKeyFilter;
-  }
-
-  public static KeyPrefixFilter getNormalKeyFilter() {
-    return normalKeyFilter;
-  }
-  /**
-   * Interface for levelDB key filters.
-   */
-  public interface MetadataKeyFilter {
-    /**
-     * Filter levelDB key with a certain condition.
-     *
-     * @param preKey     previous key.
-     * @param currentKey current key.
-     * @param nextKey    next key.
-     * @return true if a certain condition satisfied, return false otherwise.
-     */
-    boolean filterKey(byte[] preKey, byte[] currentKey, byte[] nextKey);
-
-    default int getKeysScannedNum() {
-      return 0;
-    }
-
-    default int getKeysHintedNum() {
-      return 0;
-    }
-  }
-
-  /**
-   * Utility class to filter key by a string prefix. This filter
-   * assumes keys can be parsed to a string.
-   */
-  public static class KeyPrefixFilter implements MetadataKeyFilter {
-
-    private String keyPrefix = null;
-    private int keysScanned = 0;
-    private int keysHinted = 0;
-    private Boolean negative;
-
-    public KeyPrefixFilter(String keyPrefix) {
-      this(keyPrefix, false);
-    }
-
-    public KeyPrefixFilter(String keyPrefix, boolean negative) {
-      this.keyPrefix = keyPrefix;
-      this.negative = negative;
-    }
-
-    @Override
-    public boolean filterKey(byte[] preKey, byte[] currentKey,
-        byte[] nextKey) {
-      keysScanned++;
-      boolean accept = false;
-      if (Strings.isNullOrEmpty(keyPrefix)) {
-        accept = true;
-      } else {
-        if (currentKey != null &&
-            DFSUtil.bytes2String(currentKey).startsWith(keyPrefix)) {
-          keysHinted++;
-          accept = true;
-        } else {
-          accept = false;
-        }
-      }
-      return (negative) ? !accept : accept;
-    }
-
-    @Override
-    public int getKeysScannedNum() {
-      return keysScanned;
-    }
-
-    @Override
-    public int getKeysHintedNum() {
-      return keysHinted;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java 
b/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java
deleted file mode 100644
index b90b08f..0000000
--- 
a/hadoop-hdsl/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.utils;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Interface for key-value store that stores ozone metadata.
- * Ozone metadata is stored as key value pairs, both key and value
- * are arbitrary byte arrays.
- */
-@InterfaceStability.Evolving
-public interface MetadataStore extends Closeable{
-
-  /**
-   * Puts a key-value pair into the store.
-   *
-   * @param key metadata key
-   * @param value metadata value
-   */
-  void put(byte[] key, byte[] value) throws IOException;
-
-  /**
-   * @return true if the metadata store is empty.
-   *
-   * @throws IOException
-   */
-  boolean isEmpty() throws IOException;
-
-  /**
-   * Returns the value mapped to the given key in byte array.
-   *
-   * @param key metadata key
-   * @return value in byte array
-   * @throws IOException
-   */
-  byte[] get(byte[] key) throws IOException;
-
-  /**
-   * Deletes a key from the metadata store.
-   *
-   * @param key metadata key
-   * @throws IOException
-   */
-  void delete(byte[] key) throws IOException;
-
-  /**
-   * Returns a certain range of key value pairs as a list based on a
-   * startKey or count. Further a {@link MetadataKeyFilter} can be added to
-   * filter keys if necessary. To prevent race conditions while listing
-   * entries, this implementation takes a snapshot and lists the entries from
-   * the snapshot. This may, on the other hand, cause the range result slight
-   * different with actual data if data is updating concurrently.
-   * <p>
-   * If the startKey is specified and found in levelDB, this key and the keys
-   * after this key will be included in the result. If the startKey is null
-   * all entries will be included as long as other conditions are satisfied.
-   * If the given startKey doesn't exist and empty list will be returned.
-   * <p>
-   * The count argument is to limit number of total entries to return,
-   * the value for count must be an integer greater than 0.
-   * <p>
-   * This method allows to specify one or more {@link MetadataKeyFilter}
-   * to filter keys by certain condition. Once given, only the entries
-   * whose key passes all the filters will be included in the result.
-   *
-   * @param startKey a start key.
-   * @param count max number of entries to return.
-   * @param filters customized one or more {@link MetadataKeyFilter}.
-   * @return a list of entries found in the database or an empty list if the
-   * startKey is invalid.
-   * @throws IOException if there are I/O errors.
-   * @throws IllegalArgumentException if count is less than 0.
-   */
-  List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
-      int count, MetadataKeyFilter... filters)
-      throws IOException, IllegalArgumentException;
-
-  /**
-   * This method is very similar to {@link #getRangeKVs}, the only
-   * different is this method is supposed to return a sequential range
-   * of elements based on the filters. While iterating the elements,
-   * if it met any entry that cannot pass the filter, the iterator will stop
-   * from this point without looking for next match. If no filter is given,
-   * this method behaves just like {@link #getRangeKVs}.
-   *
-   * @param startKey a start key.
-   * @param count max number of entries to return.
-   * @param filters customized one or more {@link MetadataKeyFilter}.
-   * @return a list of entries found in the database.
-   * @throws IOException
-   * @throws IllegalArgumentException
-   */
-  List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
-      int count, MetadataKeyFilter... filters)
-      throws IOException, IllegalArgumentException;
-
-  /**
-   * A batch of PUT, DELETE operations handled as a single atomic write.
-   *
-   * @throws IOException write fails
-   */
-  void writeBatch(BatchOperation operation) throws IOException;
-
-  /**
-   * Compact the entire database.
-   * @throws IOException
-   */
-  void compactDB() throws IOException;
-
-  /**
-   * Destroy the content of the specified database,
-   * a destroyed database will not be able to load again.
-   * Be very careful with this method.
-   *
-   * @throws IOException if I/O error happens
-   */
-  void destroy() throws IOException;
-
-  /**
-   * Seek the database to a certain key, returns the key-value
-   * pairs around this key based on the given offset. Note, this method
-   * can only support offset -1 (left), 0 (current) and 1 (right),
-   * any other offset given will cause a {@link IllegalArgumentException}.
-   *
-   * @param offset offset to the key
-   * @param from from which key
-   * @return a key-value pair
-   * @throws IOException
-   */
-  ImmutablePair<byte[], byte[]> peekAround(int offset, byte[] from)
-      throws IOException, IllegalArgumentException;
-
-  /**
-   * Iterates entries in the database from a certain key.
-   * Applies the given {@link EntryConsumer} to the key and value of
-   * each entry, the function produces a boolean result which is used
-   * as the criteria to exit from iteration.
-   *
-   * @param from the start key
-   * @param consumer
-   *   a {@link EntryConsumer} applied to each key and value. If the consumer
-   *   returns true, continues the iteration to next entry; otherwise exits
-   *   the iteration.
-   * @throws IOException
-   */
-  void iterate(byte[] from, EntryConsumer consumer)
-      throws IOException;
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to