http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
index 6eb7ea6..e1e2909 100644
--- 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
+++ 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
@@ -25,16 +25,16 @@ import 
org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
 import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
 import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
 import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
-import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import 
org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java
 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java
index 8d1a865..0268ccc 100644
--- 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java
+++ 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 
-import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 
 /**
  * Test the resource generation of Dynamic Provisioner.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
index 8cb57d6..d7dabe3 100644
--- 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
+++ 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.cblock.util;
 
 import org.apache.hadoop.cblock.meta.ContainerDescriptor;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
index 59c8e01..9fa76a8 100644
--- 
a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
+++ 
b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
@@ -18,12 +18,12 @@
 package org.apache.hadoop.cblock.util;
 
 import org.apache.hadoop.cblock.meta.ContainerDescriptor;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerData;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ContainerData;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-import org.apache.hadoop.scm.client.ScmClient;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -88,7 +88,7 @@ public class MockStorageClient implements ScmClient {
     ContainerInfo container = new ContainerInfo.Builder()
         .setContainerName(containerDescriptor.getContainerID())
         .setPipeline(containerDescriptor.getPipeline())
-        .setState(HdslProtos.LifeCycleState.ALLOCATED)
+        .setState(HddsProtos.LifeCycleState.ALLOCATED)
         .build();
     containerList.add(container);
     return containerList;
@@ -134,8 +134,8 @@ public class MockStorageClient implements ScmClient {
   }
 
   @Override
-  public Pipeline createContainer(HdslProtos.ReplicationType type,
-      HdslProtos.ReplicationFactor replicationFactor, String containerId,
+  public Pipeline createContainer(HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor replicationFactor, String containerId,
       String owner) throws IOException {
     int contId = currentContainerId.getAndIncrement();
     ContainerLookUpService.addContainer(Long.toString(contId));
@@ -153,8 +153,8 @@ public class MockStorageClient implements ScmClient {
    * @throws IOException
    */
   @Override
-  public HdslProtos.NodePool queryNode(EnumSet<HdslProtos.NodeState>
-      nodeStatuses, HdslProtos.QueryScope queryScope, String poolName)
+  public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState>
+      nodeStatuses, HddsProtos.QueryScope queryScope, String poolName)
       throws IOException {
     return null;
   }
@@ -168,8 +168,8 @@ public class MockStorageClient implements ScmClient {
    * @throws IOException
    */
   @Override
-  public Pipeline createReplicationPipeline(HdslProtos.ReplicationType type,
-      HdslProtos.ReplicationFactor factor, HdslProtos.NodePool nodePool)
+  public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
       throws IOException {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java 
b/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java
index 224c908..c6c0e84 100644
--- 
a/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java
+++ 
b/hadoop-cblock/tools/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java
@@ -32,7 +32,7 @@ import 
org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java 
b/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java
index f8b05ed..a3f53aa 100644
--- a/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java
+++ b/hadoop-cblock/tools/src/test/org/apache/hadoop/cblock/TestCBlockCLI.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.cblock.cli.CBlockCli;
 import org.apache.hadoop.cblock.meta.VolumeDescriptor;
 import org.apache.hadoop.cblock.util.MockStorageClient;
 import org.apache.hadoop.conf.OzoneConfiguration;
-import org.apache.hadoop.scm.client.ScmClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh 
b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
index c3e7113..a7e15d7 100755
--- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
+++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
@@ -596,8 +596,8 @@ function hadoop_bootstrap
   YARN_LIB_JARS_DIR=${YARN_LIB_JARS_DIR:-"share/hadoop/yarn/lib"}
   MAPRED_DIR=${MAPRED_DIR:-"share/hadoop/mapreduce"}
   MAPRED_LIB_JARS_DIR=${MAPRED_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}
-  HDSL_DIR=${HDSL_DIR:-"share/hadoop/hdsl"}
-  HDSL_LIB_JARS_DIR=${HDSL_LIB_JARS_DIR:-"share/hadoop/hdsl/lib"}
+  HDDS_DIR=${HDDS_DIR:-"share/hadoop/hdds"}
+  HDDS_LIB_JARS_DIR=${HDDS_LIB_JARS_DIR:-"share/hadoop/hdds/lib"}
   OZONE_DIR=${OZONE_DIR:-"share/hadoop/ozone"}
   OZONE_LIB_JARS_DIR=${OZONE_LIB_JARS_DIR:-"share/hadoop/ozone/lib"}
   CBLOCK_DIR=${CBLOCK_DIR:-"share/hadoop/cblock"}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-dist/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-dist/pom.xml b/hadoop-dist/pom.xml
index 98a9edf..3003550 100644
--- a/hadoop-dist/pom.xml
+++ b/hadoop-dist/pom.xml
@@ -219,7 +219,7 @@
     </profile>
 
     <profile>
-      <id>hdsl</id>
+      <id>hdds</id>
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
@@ -231,11 +231,11 @@
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdsl-server-scm</artifactId>
+          <artifactId>hadoop-hdds-server-scm</artifactId>
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdsl-tools</artifactId>
+          <artifactId>hadoop-hdds-tools</artifactId>
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
@@ -243,7 +243,7 @@
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdsl-container-service</artifactId>
+          <artifactId>hadoop-hdds-container-service</artifactId>
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
@@ -251,7 +251,7 @@
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdsl-tools</artifactId>
+          <artifactId>hadoop-hdds-tools</artifactId>
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-dist/src/main/compose/cblock/docker-config
----------------------------------------------------------------------
diff --git a/hadoop-dist/src/main/compose/cblock/docker-config 
b/hadoop-dist/src/main/compose/cblock/docker-config
index da0c2ac..4690de0 100644
--- a/hadoop-dist/src/main/compose/cblock/docker-config
+++ b/hadoop-dist/src/main/compose/cblock/docker-config
@@ -27,7 +27,7 @@ OZONE-SITE.XML_ozone.scm.client.address=scm
 OZONE-SITE.XML_dfs.cblock.jscsi.cblock.server.address=cblock
 OZONE-SITE.XML_dfs.cblock.scm.ipaddress=scm
 OZONE-SITE.XML_dfs.cblock.service.leveldb.path=/tmp
-HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.web.ObjectStoreRestPlugin,org.apache.hadoop.ozone.HdslDatanodeService
+HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.web.ObjectStoreRestPlugin,org.apache.hadoop.ozone.HddsDatanodeService
 HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:9000
 HDFS-SITE.XML_dfs.namenode.name.dir=/data/namenode
 HDFS-SITE.XML_rpc.metrics.quantile.enable=true

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-dist/src/main/compose/ozone/docker-config
----------------------------------------------------------------------
diff --git a/hadoop-dist/src/main/compose/ozone/docker-config 
b/hadoop-dist/src/main/compose/ozone/docker-config
index d297b19..8e5efa9 100644
--- a/hadoop-dist/src/main/compose/ozone/docker-config
+++ b/hadoop-dist/src/main/compose/ozone/docker-config
@@ -27,7 +27,7 @@ HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:9000
 HDFS-SITE.XML_dfs.namenode.name.dir=/data/namenode
 HDFS-SITE.XML_rpc.metrics.quantile.enable=true
 HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
-HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.web.ObjectStoreRestPlugin,org.apache.hadoop.ozone.HdslDatanodeService
+HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.web.ObjectStoreRestPlugin,org.apache.hadoop.ozone.HddsDatanodeService
 LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout
 LOG4J.PROPERTIES_log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 LOG4J.PROPERTIES_log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
new file mode 100644
index 0000000..95ff09a
--- /dev/null
+++ b/hadoop-hdds/client/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-hdds</artifactId>
+    <version>3.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>hadoop-hdds-client</artifactId>
+  <version>3.2.0-SNAPSHOT</version>
+  <description>Apache Hadoop Distributed Data Store Client 
libraries</description>
+  <name>Apache HDDS Client</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <hadoop.component>hdds</hadoop.component>
+    <is.hadoop.component>true</is.hadoop.component>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdds-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+    </dependency>
+
+  </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
new file mode 100644
index 0000000..5c702c6
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+
+/**
+ * A Client for the storageContainer protocol.
+ */
+public class XceiverClient extends XceiverClientSpi {
+  static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
+  private final Pipeline pipeline;
+  private final Configuration config;
+  private Channel channel;
+  private Bootstrap b;
+  private EventLoopGroup group;
+  private final Semaphore semaphore;
+
+  /**
+   * Constructs a client that can communicate with the Container framework on
+   * data nodes.
+   *
+   * @param pipeline - Pipeline that defines the machines.
+   * @param config -- Ozone Config
+   */
+  public XceiverClient(Pipeline pipeline, Configuration config) {
+    super();
+    Preconditions.checkNotNull(pipeline);
+    Preconditions.checkNotNull(config);
+    this.pipeline = pipeline;
+    this.config = config;
+    this.semaphore =
+        new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
+  }
+
+  @Override
+  public void connect() throws Exception {
+    if (channel != null && channel.isActive()) {
+      throw new IOException("This client is already connected to a host.");
+    }
+
+    group = new NioEventLoopGroup();
+    b = new Bootstrap();
+    b.group(group)
+        .channel(NioSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .handler(new XceiverClientInitializer(this.pipeline, semaphore));
+    DatanodeDetails leader = this.pipeline.getLeader();
+
+    // read port from the data node, on failure use default configured
+    // port.
+    int port = leader.getContainerPort();
+    if (port == 0) {
+      port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+          OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+    }
+    LOG.debug("Connecting to server Port : " + port);
+    channel = b.connect(leader.getHostName(), port).sync().channel();
+  }
+
+  /**
+   * Returns if the exceiver client connects to a server.
+   *
+   * @return True if the connection is alive, false otherwise.
+   */
+  @VisibleForTesting
+  public boolean isConnected() {
+    return channel.isActive();
+  }
+
+  @Override
+  public void close() {
+    if (group != null) {
+      group.shutdownGracefully().awaitUninterruptibly();
+    }
+  }
+
+  @Override
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  @Override
+  public ContainerProtos.ContainerCommandResponseProto sendCommand(
+      ContainerProtos.ContainerCommandRequestProto request) throws IOException 
{
+    try {
+      if ((channel == null) || (!channel.isActive())) {
+        throw new IOException("This channel is not connected.");
+      }
+      XceiverClientHandler handler =
+          channel.pipeline().get(XceiverClientHandler.class);
+
+      return handler.sendCommand(request);
+    } catch (ExecutionException | InterruptedException e) {
+      /**
+       * In case the netty channel handler throws an exception,
+       * the exception thrown will be wrapped within {@link 
ExecutionException}.
+       * Unwarpping here so that original exception gets passed
+       * to to the client.
+       */
+      if (e instanceof ExecutionException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof IOException) {
+          throw (IOException) cause;
+        }
+      }
+      throw new IOException(
+          "Unexpected exception during execution:" + e.getMessage());
+    }
+  }
+
+  /**
+   * Sends a given command to server gets a waitable future back.
+   *
+   * @param request Request
+   * @return Response to the command
+   * @throws IOException
+   */
+  @Override
+  public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+      sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
+      throws IOException, ExecutionException, InterruptedException {
+    if ((channel == null) || (!channel.isActive())) {
+      throw new IOException("This channel is not connected.");
+    }
+    XceiverClientHandler handler =
+        channel.pipeline().get(XceiverClientHandler.class);
+    return handler.sendCommandAsync(request);
+  }
+
+  /**
+   * Create a pipeline.
+   *
+   * @param pipelineID - Name of the pipeline.
+   * @param datanodes - Datanodes
+   */
+  @Override
+  public void createPipeline(String pipelineID, List<DatanodeDetails> 
datanodes)
+      throws IOException {
+    // For stand alone pipeline, there is no notion called setup pipeline.
+    return;
+  }
+
+  /**
+   * Returns pipeline Type.
+   *
+   * @return - Stand Alone as the type.
+   */
+  @Override
+  public HddsProtos.ReplicationType getPipelineType() {
+    return HddsProtos.ReplicationType.STAND_ALONE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
new file mode 100644
index 0000000..e2b55ac
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Netty client handler.
+ */
+public class XceiverClientHandler extends
+    SimpleChannelInboundHandler<ContainerCommandResponseProto> {
+
+  static final Logger LOG = 
LoggerFactory.getLogger(XceiverClientHandler.class);
+  private final ConcurrentMap<String, ResponseFuture> responses =
+      new ConcurrentHashMap<>();
+
+  private final Pipeline pipeline;
+  private volatile Channel channel;
+  private XceiverClientMetrics metrics;
+  private final Semaphore semaphore;
+
+  /**
+   * Constructs a client that can communicate to a container server.
+   */
+  public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) {
+    super(false);
+    Preconditions.checkNotNull(pipeline);
+    this.pipeline = pipeline;
+    this.metrics = XceiverClientManager.getXceiverClientMetrics();
+    this.semaphore = semaphore;
+  }
+
+  /**
+   * <strong>Please keep in mind that this method will be renamed to {@code
+   * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
+   * <p>
+   * Is called for each message of type {@link ContainerProtos
+   * .ContainerCommandResponseProto}.
+   *
+   * @param ctx the {@link ChannelHandlerContext} which this {@link
+   * SimpleChannelInboundHandler} belongs to
+   * @param msg the message to handle
+   * @throws Exception is thrown if an error occurred
+   */
+  @Override
+  public void channelRead0(ChannelHandlerContext ctx,
+      ContainerProtos.ContainerCommandResponseProto msg)
+      throws Exception {
+    Preconditions.checkNotNull(msg);
+    metrics.decrPendingContainerOpsMetrics(msg.getCmdType());
+
+    String key = msg.getTraceID();
+    ResponseFuture response = responses.remove(key);
+    semaphore.release();
+
+    if (response != null) {
+      response.getFuture().complete(msg);
+
+      long requestTime = response.getRequestTime();
+      metrics.addContainerOpsLatency(msg.getCmdType(),
+          Time.monotonicNowNanos() - requestTime);
+    } else {
+      LOG.error("A reply received for message that was not queued. trace " +
+          "ID: {}", msg.getTraceID());
+    }
+  }
+
+  @Override
+  public void channelRegistered(ChannelHandlerContext ctx) {
+    LOG.debug("channelRegistered: Connected to ctx");
+    channel = ctx.channel();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    LOG.info("Exception in client " + cause.toString());
+    Iterator<String> keyIterator = responses.keySet().iterator();
+    while (keyIterator.hasNext()) {
+      ResponseFuture response = responses.remove(keyIterator.next());
+      response.getFuture().completeExceptionally(cause);
+      semaphore.release();
+    }
+    ctx.close();
+  }
+
+  /**
+   * Since netty is async, we send a work request and then wait until a 
response
+   * appears in the reply queue. This is simple sync interface for clients. we
+   * should consider building async interfaces for client if this turns out to
+   * be a performance bottleneck.
+   *
+   * @param request - request.
+   * @return -- response
+   */
+
+  public ContainerCommandResponseProto sendCommand(
+      ContainerProtos.ContainerCommandRequestProto request)
+      throws ExecutionException, InterruptedException {
+    Future<ContainerCommandResponseProto> future = sendCommandAsync(request);
+    return future.get();
+  }
+
+  /**
+   * SendCommandAsyc queues a command to the Netty Subsystem and returns a
+   * CompletableFuture. This Future is marked compeleted in the channelRead0
+   * when the call comes back.
+   * @param request - Request to execute
+   * @return CompletableFuture
+   */
+  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+      ContainerProtos.ContainerCommandRequestProto request)
+      throws InterruptedException {
+
+    // Throw an exception of request doesn't have traceId
+    if (StringUtils.isEmpty(request.getTraceID())) {
+      throw new IllegalArgumentException("Invalid trace ID");
+    }
+
+    // Setting the datanode ID in the commands, so that we can distinguish
+    // commands when the cluster simulator is running.
+    if(!request.hasDatanodeUuid()) {
+      throw new IllegalArgumentException("Invalid Datanode ID");
+    }
+
+    metrics.incrPendingContainerOpsMetrics(request.getCmdType());
+
+    CompletableFuture<ContainerCommandResponseProto> future
+        = new CompletableFuture<>();
+    ResponseFuture response = new ResponseFuture(future,
+        Time.monotonicNowNanos());
+    semaphore.acquire();
+    ResponseFuture previous = responses.putIfAbsent(
+        request.getTraceID(), response);
+    if (previous != null) {
+      LOG.error("Command with Trace already exists. Ignoring this command. " +
+              "{}. Previous Command: {}", request.getTraceID(),
+          previous.toString());
+      throw new IllegalStateException("Duplicate trace ID. Command with this " 
+
+          "trace ID is already executing. Please ensure that " +
+          "trace IDs are not reused. ID: " + request.getTraceID());
+    }
+
+    channel.writeAndFlush(request);
+    return response.getFuture();
+  }
+
+  /**
+   * Class wraps response future info.
+   */
+  static class ResponseFuture {
+    private final long requestTime;
+    private final CompletableFuture<ContainerCommandResponseProto> future;
+
+    ResponseFuture(CompletableFuture<ContainerCommandResponseProto> future,
+        long requestTime) {
+      this.future = future;
+      this.requestTime = requestTime;
+    }
+
+    public long getRequestTime() {
+      return requestTime;
+    }
+
+    public CompletableFuture<ContainerCommandResponseProto> getFuture() {
+      return future;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
new file mode 100644
index 0000000..e10a9f6
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+
+import java.util.concurrent.Semaphore;
+
+/**
+ * Setup the netty pipeline.
+ */
+public class XceiverClientInitializer extends
+    ChannelInitializer<SocketChannel> {
+  private final Pipeline pipeline;
+  private final Semaphore semaphore;
+
+  /**
+   * Constructs an Initializer for the client pipeline.
+   * @param pipeline  - Pipeline.
+   */
+  public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) {
+    this.pipeline = pipeline;
+    this.semaphore = semaphore;
+  }
+
+  /**
+   * This method will be called once when the Channel is registered. After
+   * the method returns this instance will be removed from the
+   * ChannelPipeline of the Channel.
+   *
+   * @param ch   Channel which was registered.
+   * @throws Exception is thrown if an error occurs. In that case the
+   *                   Channel will be closed.
+   */
+  @Override
+  protected void initChannel(SocketChannel ch) throws Exception {
+    ChannelPipeline p = ch.pipeline();
+
+    p.addLast(new ProtobufVarint32FrameDecoder());
+    p.addLast(new ProtobufDecoder(ContainerProtos
+        .ContainerCommandResponseProto.getDefaultInstance()));
+
+    p.addLast(new ProtobufVarint32LengthFieldPrepender());
+    p.addLast(new ProtobufEncoder());
+
+    p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
new file mode 100644
index 0000000..7585104
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .ReplicationType.RATIS;
+
+/**
+ * XceiverClientManager is responsible for the lifecycle of XceiverClient
+ * instances.  Callers use this class to acquire an XceiverClient instance
+ * connected to the desired container pipeline.  When done, the caller also 
uses
+ * this class to release the previously acquired XceiverClient instance.
+ *
+ *
+ * This class caches connection to container for reuse purpose, such that
+ * accessing same container frequently will be through the same connection
+ * without reestablishing connection. But the connection will be closed if
+ * not being used for a period of time.
+ */
+public class XceiverClientManager implements Closeable {
+
+  //TODO : change this to SCM configuration class
+  private final Configuration conf;
+  private final Cache<String, XceiverClientSpi> clientCache;
+  private final boolean useRatis;
+
+  private static XceiverClientMetrics metrics;
+  /**
+   * Creates a new XceiverClientManager.
+   *
+   * @param conf configuration
+   */
+  public XceiverClientManager(Configuration conf) {
+    Preconditions.checkNotNull(conf);
+    int maxSize = conf.getInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
+        SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT);
+    long staleThresholdMs = conf.getTimeDuration(
+        SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
+        SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
+    this.useRatis = conf.getBoolean(
+        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
+        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+    this.conf = conf;
+    this.clientCache = CacheBuilder.newBuilder()
+        .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
+        .maximumSize(maxSize)
+        .removalListener(
+            new RemovalListener<String, XceiverClientSpi>() {
+            @Override
+            public void onRemoval(
+                RemovalNotification<String, XceiverClientSpi>
+                  removalNotification) {
+              synchronized (clientCache) {
+                // Mark the entry as evicted
+                XceiverClientSpi info = removalNotification.getValue();
+                info.setEvicted();
+              }
+            }
+          }).build();
+  }
+
+  @VisibleForTesting
+  public Cache<String, XceiverClientSpi> getClientCache() {
+    return clientCache;
+  }
+
+  /**
+   * Acquires a XceiverClientSpi connected to a container capable of
+   * storing the specified key.
+   *
+   * If there is already a cached XceiverClientSpi, simply return
+   * the cached otherwise create a new one.
+   *
+   * @param pipeline the container pipeline for the client connection
+   * @return XceiverClientSpi connected to a container
+   * @throws IOException if a XceiverClientSpi cannot be acquired
+   */
+  public XceiverClientSpi acquireClient(Pipeline pipeline)
+      throws IOException {
+    Preconditions.checkNotNull(pipeline);
+    Preconditions.checkArgument(pipeline.getMachines() != null);
+    Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
+
+    synchronized (clientCache) {
+      XceiverClientSpi info = getClient(pipeline);
+      info.incrementReference();
+      return info;
+    }
+  }
+
+  /**
+   * Releases a XceiverClientSpi after use.
+   *
+   * @param client client to release
+   */
+  public void releaseClient(XceiverClientSpi client) {
+    Preconditions.checkNotNull(client);
+    synchronized (clientCache) {
+      client.decrementReference();
+    }
+  }
+
+  private XceiverClientSpi getClient(Pipeline pipeline)
+      throws IOException {
+    String containerName = pipeline.getContainerName();
+    try {
+      return clientCache.get(containerName,
+          new Callable<XceiverClientSpi>() {
+          @Override
+          public XceiverClientSpi call() throws Exception {
+            XceiverClientSpi client = pipeline.getType() == RATIS ?
+                    XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
+                    : new XceiverClient(pipeline, conf);
+            client.connect();
+            return client;
+          }
+        });
+    } catch (Exception e) {
+      throw new IOException(
+          "Exception getting XceiverClient: " + e.toString(), e);
+    }
+  }
+
+  /**
+   * Close and remove all the cached clients.
+   */
+  public void close() {
+    //closing is done through RemovalListener
+    clientCache.invalidateAll();
+    clientCache.cleanUp();
+
+    if (metrics != null) {
+      metrics.unRegister();
+    }
+  }
+
+  /**
+   * Tells us if Ratis is enabled for this cluster.
+   * @return True if Ratis is enabled.
+   */
+  public boolean isUseRatis() {
+    return useRatis;
+  }
+
+  /**
+   * Returns hard coded 3 as replication factor.
+   * @return 3
+   */
+  public  HddsProtos.ReplicationFactor getFactor() {
+    if(isUseRatis()) {
+      return HddsProtos.ReplicationFactor.THREE;
+    }
+    return HddsProtos.ReplicationFactor.ONE;
+  }
+
+  /**
+   * Returns the default replication type.
+   * @return Ratis or Standalone
+   */
+  public HddsProtos.ReplicationType getType() {
+    // TODO : Fix me and make Ratis default before release.
+    // TODO: Remove this as replication factor and type are pipeline properties
+    if(isUseRatis()) {
+      return HddsProtos.ReplicationType.RATIS;
+    }
+    return HddsProtos.ReplicationType.STAND_ALONE;
+  }
+
+  /**
+   * Get xceiver client metric.
+   */
+  public synchronized static XceiverClientMetrics getXceiverClientMetrics() {
+    if (metrics == null) {
+      metrics = XceiverClientMetrics.create();
+    }
+
+    return metrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
new file mode 100644
index 0000000..a61eba1
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+/**
+ * The client metrics for the Storage Container protocol.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "Storage Container Client Metrics", context = "dfs")
+public class XceiverClientMetrics {
+  public static final String SOURCE_NAME = XceiverClientMetrics.class
+      .getSimpleName();
+
+  private @Metric MutableCounterLong pendingOps;
+  private MutableCounterLong[] pendingOpsArray;
+  private MutableRate[] containerOpsLatency;
+  private MetricsRegistry registry;
+
+  public XceiverClientMetrics() {
+    int numEnumEntries = ContainerProtos.Type.values().length;
+    this.registry = new MetricsRegistry(SOURCE_NAME);
+
+    this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
+    this.containerOpsLatency = new MutableRate[numEnumEntries];
+    for (int i = 0; i < numEnumEntries; i++) {
+      pendingOpsArray[i] = registry.newCounter(
+          "numPending" + ContainerProtos.Type.valueOf(i + 1),
+          "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
+          (long) 0);
+
+      containerOpsLatency[i] = registry.newRate(
+          ContainerProtos.Type.valueOf(i + 1) + "Latency",
+          "latency of " + ContainerProtos.Type.valueOf(i + 1)
+          + " ops");
+    }
+  }
+
+  public static XceiverClientMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, "Storage Container Client Metrics",
+        new XceiverClientMetrics());
+  }
+
+  public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
+    pendingOps.incr();
+    pendingOpsArray[type.ordinal()].incr();
+  }
+
+  public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) {
+    pendingOps.incr(-1);
+    pendingOpsArray[type.ordinal()].incr(-1);
+  }
+
+  public void addContainerOpsLatency(ContainerProtos.Type type,
+      long latencyNanos) {
+    containerOpsLatency[type.ordinal()].add(latencyNanos);
+  }
+
+  public long getContainerOpsMetrics(ContainerProtos.Type type) {
+    return pendingOpsArray[type.ordinal()].value();
+  }
+
+  public void unRegister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
new file mode 100644
index 0000000..d010c69
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An abstract implementation of {@link XceiverClientSpi} using Ratis.
+ * The underlying RPC mechanism can be chosen via the constructor.
+ */
+public final class XceiverClientRatis extends XceiverClientSpi {
+  static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
+
+  public static XceiverClientRatis newXceiverClientRatis(
+      Pipeline pipeline, Configuration ozoneConf) {
+    final String rpcType = ozoneConf.get(
+        ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+        ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
+    return new XceiverClientRatis(pipeline,
+        SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests);
+  }
+
+  private final Pipeline pipeline;
+  private final RpcType rpcType;
+  private final AtomicReference<RaftClient> client = new AtomicReference<>();
+  private final int maxOutstandingRequests;
+
+  /**
+   * Constructs a client.
+   */
+  private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
+      int maxOutStandingChunks) {
+    super();
+    this.pipeline = pipeline;
+    this.rpcType = rpcType;
+    this.maxOutstandingRequests = maxOutStandingChunks;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void createPipeline(String clusterId, List<DatanodeDetails> datanodes)
+      throws IOException {
+    RaftGroup group = RatisHelper.newRaftGroup(datanodes);
+    LOG.debug("initializing pipeline:{} with nodes:{}", clusterId,
+        group.getPeers());
+    reinitialize(datanodes, group);
+  }
+
+  /**
+   * Returns Ratis as pipeline Type.
+   *
+   * @return - Ratis
+   */
+  @Override
+  public HddsProtos.ReplicationType getPipelineType() {
+    return HddsProtos.ReplicationType.RATIS;
+  }
+
+  private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group)
+      throws IOException {
+    if (datanodes.isEmpty()) {
+      return;
+    }
+
+    IOException exception = null;
+    for (DatanodeDetails d : datanodes) {
+      try {
+        reinitialize(d, group);
+      } catch (IOException ioe) {
+        if (exception == null) {
+          exception = new IOException(
+              "Failed to reinitialize some of the RaftPeer(s)", ioe);
+        } else {
+          exception.addSuppressed(ioe);
+        }
+      }
+    }
+    if (exception != null) {
+      throw exception;
+    }
+  }
+
+  /**
+   * Adds a new peers to the Ratis Ring.
+   *
+   * @param datanode - new datanode
+   * @param group    - Raft group
+   * @throws IOException - on Failure.
+   */
+  private void reinitialize(DatanodeDetails datanode, RaftGroup group)
+      throws IOException {
+    final RaftPeer p = RatisHelper.toRaftPeer(datanode);
+    try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
+      client.reinitialize(group, p.getId());
+    } catch (IOException ioe) {
+      LOG.error("Failed to reinitialize RaftPeer:{} datanode: {}  ",
+          p, datanode, ioe);
+      throw new IOException("Failed to reinitialize RaftPeer " + p
+          + "(datanode=" + datanode + ")", ioe);
+    }
+  }
+
+  @Override
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  @Override
+  public void connect() throws Exception {
+    LOG.debug("Connecting to pipeline:{} leader:{}",
+        getPipeline().getPipelineName(),
+        RatisHelper.toRaftPeerId(pipeline.getLeader()));
+    // TODO : XceiverClient ratis should pass the config value of
+    // maxOutstandingRequests so as to set the upper bound on max no of async
+    // requests to be handled by raft client
+    if (!client.compareAndSet(null,
+        RatisHelper.newRaftClient(rpcType, getPipeline()))) {
+      throw new IllegalStateException("Client is already connected.");
+    }
+  }
+
+  @Override
+  public void close() {
+    final RaftClient c = client.getAndSet(null);
+    if (c != null) {
+      try {
+        c.close();
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
+  private RaftClient getClient() {
+    return Objects.requireNonNull(client.get(), "client is null");
+  }
+
+  private boolean isReadOnly(ContainerCommandRequestProto proto) {
+    switch (proto.getCmdType()) {
+    case ReadContainer:
+    case ReadChunk:
+    case ListKey:
+    case GetKey:
+    case GetSmallFile:
+    case ListContainer:
+    case ListChunk:
+      return true;
+    case CloseContainer:
+    case WriteChunk:
+    case UpdateContainer:
+    case CompactChunk:
+    case CreateContainer:
+    case DeleteChunk:
+    case DeleteContainer:
+    case DeleteKey:
+    case PutKey:
+    case PutSmallFile:
+    default:
+      return false;
+    }
+  }
+
+  private RaftClientReply sendRequest(ContainerCommandRequestProto request)
+      throws IOException {
+    boolean isReadOnlyRequest = isReadOnly(request);
+    ByteString byteString =
+        ShadedProtoUtil.asShadedByteString(request.toByteArray());
+    LOG.debug("sendCommand {} {}", isReadOnlyRequest, request);
+    final RaftClientReply reply =  isReadOnlyRequest ?
+        getClient().sendReadOnly(() -> byteString) :
+        getClient().send(() -> byteString);
+    LOG.debug("reply {} {}", isReadOnlyRequest, reply);
+    return reply;
+  }
+
+  private CompletableFuture<RaftClientReply> sendRequestAsync(
+      ContainerCommandRequestProto request) throws IOException {
+    boolean isReadOnlyRequest = isReadOnly(request);
+    ByteString byteString =
+        ShadedProtoUtil.asShadedByteString(request.toByteArray());
+    LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request);
+    return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) 
:
+        getClient().sendAsync(() -> byteString);
+  }
+
+  @Override
+  public ContainerCommandResponseProto sendCommand(
+      ContainerCommandRequestProto request) throws IOException {
+    final RaftClientReply reply = sendRequest(request);
+    Preconditions.checkState(reply.isSuccess());
+    return ContainerCommandResponseProto.parseFrom(
+        ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
+  }
+
+  /**
+   * Sends a given command to server gets a waitable future back.
+   *
+   * @param request Request
+   * @return Response to the command
+   * @throws IOException
+   */
+  @Override
+  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+      ContainerCommandRequestProto request)
+      throws IOException, ExecutionException, InterruptedException {
+    return sendRequestAsync(request).whenComplete((reply, e) ->
+          LOG.debug("received reply {} for request: {} exception: {}", request,
+              reply, e))
+        .thenApply(reply -> {
+          try {
+            return ContainerCommandResponseProto.parseFrom(
+                ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
+          } catch (InvalidProtocolBufferException e) {
+            throw new CompletionException(e);
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
new file mode 100644
index 0000000..8f30a7f
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ContainerData;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ReadContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState
+    .ALLOCATED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState
+    .OPEN;
+
+/**
+ * This class provides the client-facing APIs of container operations.
+ */
+public class ContainerOperationClient implements ScmClient {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerOperationClient.class);
+  private static long containerSizeB = -1;
+  private final StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private final XceiverClientManager xceiverClientManager;
+
+  public ContainerOperationClient(
+      StorageContainerLocationProtocolClientSideTranslatorPB
+          storageContainerLocationClient,
+      XceiverClientManager xceiverClientManager) {
+    this.storageContainerLocationClient = storageContainerLocationClient;
+    this.xceiverClientManager = xceiverClientManager;
+  }
+
+  /**
+   * Return the capacity of containers. The current assumption is that all
+   * containers have the same capacity. Therefore one static is sufficient for
+   * any container.
+   * @return The capacity of one container in number of bytes.
+   */
+  public static long getContainerSizeB() {
+    return containerSizeB;
+  }
+
+  /**
+   * Set the capacity of container. Should be exactly once on system start.
+   * @param size Capacity of one container in number of bytes.
+   */
+  public static void setContainerSizeB(long size) {
+    containerSizeB = size;
+  }
+
+  /**
+   * @inheritDoc
+   */
+  @Override
+  public Pipeline createContainer(String containerId, String owner)
+      throws IOException {
+    XceiverClientSpi client = null;
+    try {
+      Pipeline pipeline =
+          storageContainerLocationClient.allocateContainer(
+              xceiverClientManager.getType(),
+              xceiverClientManager.getFactor(), containerId, owner);
+      client = xceiverClientManager.acquireClient(pipeline);
+
+      // Allocated State means that SCM has allocated this pipeline in its
+      // namespace. The client needs to create the pipeline on the machines
+      // which was choosen by the SCM.
+      Preconditions.checkState(pipeline.getLifeCycleState() == ALLOCATED ||
+          pipeline.getLifeCycleState() == OPEN, "Unexpected pipeline state");
+      if (pipeline.getLifeCycleState() == ALLOCATED) {
+        createPipeline(client, pipeline);
+      }
+      // TODO : Container Client State needs to be updated.
+      // TODO : Return ContainerInfo instead of Pipeline
+      createContainer(containerId, client, pipeline);
+      return pipeline;
+    } finally {
+      if (client != null) {
+        xceiverClientManager.releaseClient(client);
+      }
+    }
+  }
+
+  /**
+   * Create a container over pipeline specified by the SCM.
+   *
+   * @param containerId - Container ID
+   * @param client - Client to communicate with Datanodes
+   * @param pipeline - A pipeline that is already created.
+   * @throws IOException
+   */
+  public void createContainer(String containerId, XceiverClientSpi client,
+      Pipeline pipeline) throws IOException {
+    String traceID = UUID.randomUUID().toString();
+    storageContainerLocationClient.notifyObjectStageChange(
+        ObjectStageChangeRequestProto.Type.container,
+        containerId,
+        ObjectStageChangeRequestProto.Op.create,
+        ObjectStageChangeRequestProto.Stage.begin);
+    ContainerProtocolCalls.createContainer(client, traceID);
+    storageContainerLocationClient.notifyObjectStageChange(
+        ObjectStageChangeRequestProto.Type.container,
+        containerId,
+        ObjectStageChangeRequestProto.Op.create,
+        ObjectStageChangeRequestProto.Stage.complete);
+
+    // Let us log this info after we let SCM know that we have completed the
+    // creation state.
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created container " + containerId
+          + " leader:" + pipeline.getLeader()
+          + " machines:" + pipeline.getMachines());
+    }
+  }
+
+  /**
+   * Creates a pipeline over the machines choosen by the SCM.
+   *
+   * @param client - Client
+   * @param pipeline - pipeline to be createdon Datanodes.
+   * @throws IOException
+   */
+  private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
+      throws IOException {
+
+    Preconditions.checkNotNull(pipeline.getPipelineName(), "Pipeline " +
+        "name cannot be null when client create flag is set.");
+
+    // Pipeline creation is a three step process.
+    //
+    // 1. Notify SCM that this client is doing a create pipeline on
+    // datanodes.
+    //
+    // 2. Talk to Datanodes to create the pipeline.
+    //
+    // 3. update SCM that pipeline creation was successful.
+    storageContainerLocationClient.notifyObjectStageChange(
+        ObjectStageChangeRequestProto.Type.pipeline,
+        pipeline.getPipelineName(),
+        ObjectStageChangeRequestProto.Op.create,
+        ObjectStageChangeRequestProto.Stage.begin);
+
+    client.createPipeline(pipeline.getPipelineName(),
+        pipeline.getMachines());
+
+    storageContainerLocationClient.notifyObjectStageChange(
+        ObjectStageChangeRequestProto.Type.pipeline,
+        pipeline.getPipelineName(),
+        ObjectStageChangeRequestProto.Op.create,
+        ObjectStageChangeRequestProto.Stage.complete);
+
+    // TODO : Should we change the state on the client side ??
+    // That makes sense, but it is not needed for the client to work.
+    LOG.debug("Pipeline creation successful. Pipeline: {}",
+        pipeline.toString());
+  }
+
+  /**
+   * @inheritDoc
+   */
+  @Override
+  public Pipeline createContainer(HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor factor,
+      String containerId, String owner) throws IOException {
+    XceiverClientSpi client = null;
+    try {
+      // allocate container on SCM.
+      Pipeline pipeline =
+          storageContainerLocationClient.allocateContainer(type, factor,
+              containerId, owner);
+      client = xceiverClientManager.acquireClient(pipeline);
+
+      // Allocated State means that SCM has allocated this pipeline in its
+      // namespace. The client needs to create the pipeline on the machines
+      // which was choosen by the SCM.
+      if (pipeline.getLifeCycleState() == ALLOCATED) {
+        createPipeline(client, pipeline);
+      }
+
+      // TODO : Return ContainerInfo instead of Pipeline
+      // connect to pipeline leader and allocate container on leader datanode.
+      client = xceiverClientManager.acquireClient(pipeline);
+      createContainer(containerId, client, pipeline);
+      return pipeline;
+    } finally {
+      if (client != null) {
+        xceiverClientManager.releaseClient(client);
+      }
+    }
+  }
+
+  /**
+   * Returns a set of Nodes that meet a query criteria.
+   *
+   * @param nodeStatuses - A set of criteria that we want the node to have.
+   * @param queryScope - Query scope - Cluster or pool.
+   * @param poolName - if it is pool, a pool name is required.
+   * @return A set of nodes that meet the requested criteria.
+   * @throws IOException
+   */
+  @Override
+  public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState>
+      nodeStatuses, HddsProtos.QueryScope queryScope, String poolName)
+      throws IOException {
+    return storageContainerLocationClient.queryNode(nodeStatuses, queryScope,
+        poolName);
+  }
+
+  /**
+   * Creates a specified replication pipeline.
+   */
+  @Override
+  public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
+      throws IOException {
+    return storageContainerLocationClient.createReplicationPipeline(type,
+        factor, nodePool);
+  }
+
+  /**
+   * Delete the container, this will release any resource it uses.
+   * @param pipeline - Pipeline that represents the container.
+   * @param force - True to forcibly delete the container.
+   * @throws IOException
+   */
+  @Override
+  public void deleteContainer(Pipeline pipeline, boolean force)
+      throws IOException {
+    XceiverClientSpi client = null;
+    try {
+      client = xceiverClientManager.acquireClient(pipeline);
+      String traceID = UUID.randomUUID().toString();
+      ContainerProtocolCalls.deleteContainer(client, force, traceID);
+      storageContainerLocationClient
+          .deleteContainer(pipeline.getContainerName());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleted container {}, leader: {}, machines: {} ",
+            pipeline.getContainerName(),
+            pipeline.getLeader(),
+            pipeline.getMachines());
+      }
+    } finally {
+      if (client != null) {
+        xceiverClientManager.releaseClient(client);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<ContainerInfo> listContainer(String startName,
+      String prefixName, int count)
+      throws IOException {
+    return storageContainerLocationClient.listContainer(
+        startName, prefixName, count);
+  }
+
+  /**
+   * Get meta data from an existing container.
+   *
+   * @param pipeline - pipeline that represents the container.
+   * @return ContainerInfo - a message of protobuf which has basic info
+   * of a container.
+   * @throws IOException
+   */
+  @Override
+  public ContainerData readContainer(Pipeline pipeline) throws IOException {
+    XceiverClientSpi client = null;
+    try {
+      client = xceiverClientManager.acquireClient(pipeline);
+      String traceID = UUID.randomUUID().toString();
+      ReadContainerResponseProto response =
+          ContainerProtocolCalls.readContainer(client,
+              pipeline.getContainerName(), traceID);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Read container {}, leader: {}, machines: {} ",
+            pipeline.getContainerName(),
+            pipeline.getLeader(),
+            pipeline.getMachines());
+      }
+      return response.getContainerData();
+    } finally {
+      if (client != null) {
+        xceiverClientManager.releaseClient(client);
+      }
+    }
+  }
+
+  /**
+   * Given an id, return the pipeline associated with the container.
+   * @param containerId - String Container ID
+   * @return Pipeline of the existing container, corresponding to the given id.
+   * @throws IOException
+   */
+  @Override
+  public Pipeline getContainer(String containerId) throws
+      IOException {
+    return storageContainerLocationClient.getContainer(containerId);
+  }
+
+  /**
+   * Close a container.
+   *
+   * @param pipeline the container to be closed.
+   * @throws IOException
+   */
+  @Override
+  public void closeContainer(Pipeline pipeline) throws IOException {
+    XceiverClientSpi client = null;
+    try {
+      LOG.debug("Close container {}", pipeline);
+      /*
+      TODO: two orders here, revisit this later:
+      1. close on SCM first, then on data node
+      2. close on data node first, then on SCM
+
+      with 1: if client failed after closing on SCM, then there is a
+      container SCM thinks as closed, but is actually open. Then SCM will no
+      longer allocate block to it, which is fine. But SCM may later try to
+      replicate this "closed" container, which I'm not sure is safe.
+
+      with 2: if client failed after close on datanode, then there is a
+      container SCM thinks as open, but is actually closed. Then SCM will still
+      try to allocate block to it. Which will fail when actually doing the
+      write. No more data can be written, but at least the correctness and
+      consistency of existing data will maintain.
+
+      For now, take the #2 way.
+       */
+      // Actually close the container on Datanode
+      client = xceiverClientManager.acquireClient(pipeline);
+      String traceID = UUID.randomUUID().toString();
+
+      String containerId = pipeline.getContainerName();
+
+      storageContainerLocationClient.notifyObjectStageChange(
+          ObjectStageChangeRequestProto.Type.container,
+          containerId,
+          ObjectStageChangeRequestProto.Op.close,
+          ObjectStageChangeRequestProto.Stage.begin);
+
+      ContainerProtocolCalls.closeContainer(client, traceID);
+      // Notify SCM to close the container
+      storageContainerLocationClient.notifyObjectStageChange(
+          ObjectStageChangeRequestProto.Type.container,
+          containerId,
+          ObjectStageChangeRequestProto.Op.close,
+          ObjectStageChangeRequestProto.Stage.complete);
+    } finally {
+      if (client != null) {
+        xceiverClientManager.releaseClient(client);
+      }
+    }
+  }
+
+  /**
+   * Get the the current usage information.
+   * @param pipeline - Pipeline
+   * @return the size of the given container.
+   * @throws IOException
+   */
+  @Override
+  public long getContainerSize(Pipeline pipeline) throws IOException {
+    // TODO : Pipeline can be null, handle it correctly.
+    long size = getContainerSizeB();
+    if (size == -1) {
+      throw new IOException("Container size unknown!");
+    }
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
new file mode 100644
index 0000000..bc5f8d6
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility methods for Ozone and Container Clients.
+ *
+ * The methods to retrieve SCM service endpoints assume there is a single
+ * SCM service instance. This will change when we switch to replicated service
+ * instances for redundancy.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class HddsClientUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      HddsClientUtils.class);
+
+  private static final int NO_PORT = -1;
+
+  private HddsClientUtils() {
+  }
+
+  /**
+   * Date format that used in ozone. Here the format is thread safe to use.
+   */
+  private static final ThreadLocal<DateTimeFormatter> DATE_FORMAT =
+      ThreadLocal.withInitial(() -> {
+        DateTimeFormatter format =
+            DateTimeFormatter.ofPattern(OzoneConsts.OZONE_DATE_FORMAT);
+        return format.withZone(ZoneId.of(OzoneConsts.OZONE_TIME_ZONE));
+      });
+
+
+  /**
+   * Convert time in millisecond to a human readable format required in ozone.
+   * @return a human readable string for the input time
+   */
+  public static String formatDateTime(long millis) {
+    ZonedDateTime dateTime = ZonedDateTime.ofInstant(
+        Instant.ofEpochSecond(millis), DATE_FORMAT.get().getZone());
+    return DATE_FORMAT.get().format(dateTime);
+  }
+
+  /**
+   * Convert time in ozone date format to millisecond.
+   * @return time in milliseconds
+   */
+  public static long formatDateTime(String date) throws ParseException {
+    Preconditions.checkNotNull(date, "Date string should not be null.");
+    return ZonedDateTime.parse(date, DATE_FORMAT.get())
+        .toInstant().getEpochSecond();
+  }
+
+
+
+  /**
+   * verifies that bucket name / volume name is a valid DNS name.
+   *
+   * @param resName Bucket or volume Name to be validated
+   *
+   * @throws IllegalArgumentException
+   */
+  public static void verifyResourceName(String resName)
+      throws IllegalArgumentException {
+
+    if (resName == null) {
+      throw new IllegalArgumentException("Bucket or Volume name is null");
+    }
+
+    if ((resName.length() < OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH) ||
+        (resName.length() > OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH)) {
+      throw new IllegalArgumentException(
+          "Bucket or Volume length is illegal, " +
+              "valid length is 3-63 characters");
+    }
+
+    if ((resName.charAt(0) == '.') || (resName.charAt(0) == '-')) {
+      throw new IllegalArgumentException(
+          "Bucket or Volume name cannot start with a period or dash");
+    }
+
+    if ((resName.charAt(resName.length() - 1) == '.') ||
+        (resName.charAt(resName.length() - 1) == '-')) {
+      throw new IllegalArgumentException(
+          "Bucket or Volume name cannot end with a period or dash");
+    }
+
+    boolean isIPv4 = true;
+    char prev = (char) 0;
+
+    for (int index = 0; index < resName.length(); index++) {
+      char currChar = resName.charAt(index);
+
+      if (currChar != '.') {
+        isIPv4 = ((currChar >= '0') && (currChar <= '9')) && isIPv4;
+      }
+
+      if (currChar > 'A' && currChar < 'Z') {
+        throw new IllegalArgumentException(
+            "Bucket or Volume name does not support uppercase characters");
+      }
+
+      if ((currChar != '.') && (currChar != '-')) {
+        if ((currChar < '0') || (currChar > '9' && currChar < 'a') ||
+            (currChar > 'z')) {
+          throw new IllegalArgumentException("Bucket or Volume name has an " +
+              "unsupported character : " +
+              currChar);
+        }
+      }
+
+      if ((prev == '.') && (currChar == '.')) {
+        throw new IllegalArgumentException("Bucket or Volume name should not " 
+
+            "have two contiguous periods");
+      }
+
+      if ((prev == '-') && (currChar == '.')) {
+        throw new IllegalArgumentException(
+            "Bucket or Volume name should not have period after dash");
+      }
+
+      if ((prev == '.') && (currChar == '-')) {
+        throw new IllegalArgumentException(
+            "Bucket or Volume name should not have dash after period");
+      }
+      prev = currChar;
+    }
+
+    if (isIPv4) {
+      throw new IllegalArgumentException(
+          "Bucket or Volume name cannot be an IPv4 address or all numeric");
+    }
+  }
+
+  /**
+   * Returns the cache value to be used for list calls.
+   * @param conf Configuration object
+   * @return list cache size
+   */
+  public static int getListCacheSize(Configuration conf) {
+    return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE,
+        OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT);
+  }
+
+  /**
+   * @return a default instance of {@link CloseableHttpClient}.
+   */
+  public static CloseableHttpClient newHttpClient() {
+    return HddsClientUtils.newHttpClient(new Configuration());
+  }
+
+  /**
+   * Returns a {@link CloseableHttpClient} configured by given configuration.
+   * If conf is null, returns a default instance.
+   *
+   * @param conf configuration
+   * @return a {@link CloseableHttpClient} instance.
+   */
+  public static CloseableHttpClient newHttpClient(Configuration conf) {
+    long socketTimeout = OzoneConfigKeys
+        .OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT;
+    long connectionTimeout = OzoneConfigKeys
+        .OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT;
+    if (conf != null) {
+      socketTimeout = conf.getTimeDuration(
+          OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT,
+          OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      connectionTimeout = conf.getTimeDuration(
+          OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT,
+          OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+    }
+
+    CloseableHttpClient client = HttpClients.custom()
+        .setDefaultRequestConfig(
+            RequestConfig.custom()
+                .setSocketTimeout(Math.toIntExact(socketTimeout))
+                .setConnectTimeout(Math.toIntExact(connectionTimeout))
+                .build())
+        .build();
+    return client;
+  }
+
+  /**
+   * Returns the maximum no of outstanding async requests to be handled by
+   * Standalone and Ratis client.
+   */
+  public static int getMaxOutstandingRequests(Configuration config) {
+    return config
+        .getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS,
+            ScmConfigKeys
+                .SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
new file mode 100644
index 0000000..73ad78c
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+/**
+ * Client facing classes for the container operations.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
new file mode 100644
index 0000000..9390bc1
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+/**
+ * Classes for different type of container service client.
+ */
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to