Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 d50c8de46 -> 7f50a3674


HDFS-11822. Block Storage: Fix TestCBlockCLI, failing because of " Address 
already in use". Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f50a367
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f50a367
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f50a367

Branch: refs/heads/HDFS-7240
Commit: 7f50a367499f4e39cf5badbf246c98a091b77682
Parents: d50c8de
Author: Chen Liang <cli...@apache.org>
Authored: Mon Jun 5 10:33:00 2017 -0700
Committer: Chen Liang <cli...@apache.org>
Committed: Mon Jun 5 10:33:00 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/cblock/CBlockManager.java |  81 ++--
 .../cblock/client/CBlockVolumeClient.java       |  13 +-
 .../apache/hadoop/ozone/OzoneClientUtils.java   |  72 ++++
 .../hadoop/ozone/ksm/KeySpaceManager.java       |  23 +-
 .../ozone/scm/StorageContainerManager.java      |  27 +-
 .../apache/hadoop/cblock/TestBufferManager.java |   4 +-
 .../org/apache/hadoop/cblock/TestCBlockCLI.java |  20 +-
 .../hadoop/cblock/TestCBlockReadWrite.java      | 366 +++++++++++++++++++
 .../apache/hadoop/cblock/TestCBlockServer.java  |   6 +
 .../cblock/TestCBlockServerPersistence.java     |  11 +-
 .../hadoop/cblock/TestLocalBlockCache.java      | 249 +------------
 11 files changed, 532 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
index 30563c3..6d5d441 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java
@@ -24,13 +24,17 @@ import org.apache.hadoop.cblock.meta.VolumeInfo;
 import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
 import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
 import org.apache.hadoop.cblock.proto.MountVolumeResponse;
-import 
org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos;
 import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
 import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
-import 
org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.cblock.protocolPB
+    .CBlockClientServerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
-import 
org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.cblock.protocolPB
+    .CBlockServiceProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ozone.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.scm.XceiverClientManager;
@@ -42,7 +46,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneConfiguration;
-import 
org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.utils.LevelDBStore;
@@ -58,22 +63,34 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_IPADDRESS_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_IPADDRESS_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_PORT_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_PORT_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SCM_IPADDRESS_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SCM_IPADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SCM_PORT_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SCM_PORT_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
 
 /**
  * The main entry point of CBlock operations, ALL the CBlock operations
@@ -118,10 +135,8 @@ public class CBlockManager implements 
CBlockServiceProtocol,
     RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class,
         ProtobufRpcEngine.class);
     // start service for client command-to-cblock server service
-    InetSocketAddress serviceRpcAddr = NetUtils.createSocketAddr(
-        conf.getTrimmed(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY,
-            DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT), -1,
-        DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
+    InetSocketAddress serviceRpcAddr =
+        OzoneClientUtils.getCblockServiceRpcAddr(conf);
     BlockingService cblockProto =
         CBlockServiceProtocolProtos
             .CBlockServiceProtocolService
@@ -133,14 +148,15 @@ public class CBlockManager implements 
CBlockServiceProtocol,
         DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY,
         DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
         DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
-
+    InetSocketAddress cblockServiceRpcAddress =
+        OzoneClientUtils.updateListenAddress(conf,
+            DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, serviceRpcAddr, cblockService);
     LOG.info("CBlock manager listening for client commands on: {}",
-        serviceRpcAddr);
+        cblockServiceRpcAddress);
     // now start service for cblock client-to-cblock server communication
-    InetSocketAddress serverRpcAddr = NetUtils.createSocketAddr(
-        conf.get(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY,
-            DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT), -1,
-        DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
+
+    InetSocketAddress serverRpcAddr =
+        OzoneClientUtils.getCblockServerRpcAddr(conf);
     BlockingService serverProto =
         CBlockClientServerProtocolProtos
             .CBlockClientServerProtocolService
@@ -153,8 +169,11 @@ public class CBlockManager implements 
CBlockServiceProtocol,
         DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY,
         DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
         DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
+    InetSocketAddress cblockServerRpcAddress =
+        OzoneClientUtils.updateListenAddress(conf,
+            DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer);
     LOG.info("CBlock server listening for client commands on: {}",
-        serverRpcAddr);
+        cblockServerRpcAddress);
   }
 
   public void start() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
index c783c67..f70e8a4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
@@ -22,6 +22,7 @@ import 
org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -30,11 +31,6 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HOSTNAME_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_PORT_KEY;
-
 /**
  * Implementation of client used by CBlock command line tool.
  */
@@ -45,12 +41,7 @@ public class CBlockVolumeClient {
   public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
     this.conf = conf;
     long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
-    String serverAddress = conf.get(DFS_CBLOCK_SERVICERPC_HOSTNAME_KEY,
-        DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT);
-    int serverPort = conf.getInt(DFS_CBLOCK_SERVICERPC_PORT_KEY,
-        DFS_CBLOCK_SERVICERPC_PORT_DEFAULT);
-    InetSocketAddress address = new InetSocketAddress(
-        serverAddress, serverPort);
+    InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf);
     // currently the largest supported volume is about 8TB, which might take
     // > 20 seconds to finish creating containers. thus set timeout to 30 sec.
     cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
index db3c14f..f38dd7e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
@@ -23,6 +23,7 @@ import com.google.common.net.HostAndPort;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.http.client.config.RequestConfig;
@@ -39,6 +40,17 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSI_PORT_DEFAULT;
+
 import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
     .OZONE_KSM_BIND_HOST_DEFAULT;
@@ -300,6 +312,44 @@ public final class OzoneClientUtils {
   }
 
   /**
+   * Retrieve the socket address that is used by CBlock Service.
+   * @param conf
+   * @return Target InetSocketAddress for the CBlock Service endpoint.
+   */
+  public static InetSocketAddress getCblockServiceRpcAddr(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
+            port.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that is used by CBlock Server.
+   * @param conf
+   * @return Target InetSocketAddress for the CBlock Server endpoint.
+   */
+  public static InetSocketAddress getCblockServerRpcAddr(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
+            port.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT));
+  }
+
+  /**
    * Retrieve the hostname, trying the supplied config keys in order.
    * Each config value may be absent, or if present in the format
    * host:port (the :port part is optional).
@@ -561,6 +611,28 @@ public final class OzoneClientUtils {
   }
 
   /**
+   * After starting an RPC server, updates configuration with the actual
+   * listening address of that server. The listening address may be different
+   * from the configured address if, for example, the configured address uses
+   * port 0 to request use of an ephemeral port.
+   *
+   * @param conf configuration to update
+   * @param rpcAddressKey configuration key for RPC server address
+   * @param addr configured address
+   * @param rpcServer started RPC server.
+   */
+  public static InetSocketAddress updateListenAddress(
+      OzoneConfiguration conf, String rpcAddressKey,
+      InetSocketAddress addr, RPC.Server rpcServer) {
+    InetSocketAddress listenAddr = rpcServer.getListenerAddress();
+    InetSocketAddress updatedAddr = new InetSocketAddress(
+        addr.getHostString(), listenAddr.getPort());
+    conf.set(rpcAddressKey,
+        listenAddr.getHostString() + ":" + listenAddr.getPort());
+    return updatedAddr;
+  }
+
+  /**
    * Releases a http connection if the request is not null.
    * @param request
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 3cf1fb3..f4da2bb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -91,7 +91,7 @@ public class KeySpaceManager implements 
KeySpaceManagerProtocol {
     ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr,
         KeySpaceManagerProtocolPB.class, ksmService,
         handlerCount);
-    ksmRpcAddress = updateListenAddress(conf,
+    ksmRpcAddress = OzoneClientUtils.updateListenAddress(conf,
         OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
     metadataManager = new MetadataManagerImpl(conf);
     volumeManager = new VolumeManagerImpl(metadataManager, conf);
@@ -191,27 +191,6 @@ public class KeySpaceManager implements 
KeySpaceManagerProtocol {
   }
 
   /**
-   * After starting an RPC server, updates configuration with the actual
-   * listening address of that server. The listening address may be different
-   * from the configured address if, for example, the configured address uses
-   * port 0 to request use of an ephemeral port.
-   *
-   * @param conf configuration to update
-   * @param rpcAddressKey configuration key for RPC server address
-   * @param addr configured address
-   * @param rpcServer started RPC server.
-   */
-  private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
-      String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
-    InetSocketAddress listenAddr = rpcServer.getListenerAddress();
-    InetSocketAddress updatedAddr = new InetSocketAddress(
-        addr.getHostString(), listenAddr.getPort());
-    conf.set(rpcAddressKey,
-        listenAddr.getHostString() + ":" + listenAddr.getPort());
-    return updatedAddr;
-  }
-
-  /**
    * Start service.
    */
   public void start() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index f54a6a0..3f0f6d3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -209,7 +209,7 @@ public class StorageContainerManager
     datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
         StorageContainerDatanodeProtocolPB.class, dnProtoPbService,
         handlerCount);
-    datanodeRpcAddress = updateListenAddress(conf,
+    datanodeRpcAddress = OzoneClientUtils.updateListenAddress(conf,
         OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
 
     // SCM Container Service RPC
@@ -224,7 +224,7 @@ public class StorageContainerManager
     clientRpcServer = startRpcServer(conf, scmAddress,
         StorageContainerLocationProtocolPB.class, storageProtoPbService,
         handlerCount);
-    clientRpcAddress = updateListenAddress(conf,
+    clientRpcAddress = OzoneClientUtils.updateListenAddress(conf,
         OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
 
 
@@ -240,7 +240,7 @@ public class StorageContainerManager
     blockRpcServer = startRpcServer(conf, scmBlockAddress,
         ScmBlockLocationProtocolPB.class, blockProtoPbService,
         handlerCount);
-    blockRpcAddress = updateListenAddress(conf,
+    blockRpcAddress = OzoneClientUtils.updateListenAddress(conf,
         OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
 
     registerMXBean();
@@ -303,27 +303,6 @@ public class StorageContainerManager
   }
 
   /**
-   * After starting an RPC server, updates configuration with the actual
-   * listening address of that server. The listening address may be different
-   * from the configured address if, for example, the configured address uses
-   * port 0 to request use of an ephemeral port.
-   *
-   * @param conf configuration to update
-   * @param rpcAddressKey configuration key for RPC server address
-   * @param addr configured address
-   * @param rpcServer started RPC server.
-   */
-  private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
-      String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
-    InetSocketAddress listenAddr = rpcServer.getListenerAddress();
-    InetSocketAddress updatedAddr = new InetSocketAddress(
-        addr.getHostString(), listenAddr.getPort());
-    conf.set(rpcAddressKey,
-        listenAddr.getHostString() + ":" + listenAddr.getPort());
-    return updatedAddr;
-  }
-
-  /**
    * Main entry point for starting StorageContainerManager.
    *
    * @param argv arguments

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
index 90fb5b3..ec188dd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
@@ -55,7 +55,7 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
     DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
 
 /**
- * Tests for Tests for local cache.
+ * Tests for Local Cache Buffer Manager.
  */
 public class TestBufferManager {
   private final static long GB = 1024 * 1024 * 1024;
@@ -310,6 +310,8 @@ public class TestBufferManager {
     flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
     flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
     flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig
+        .setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 120);
 
     String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
     String userName = "user" + RandomStringUtils.randomNumeric(4);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java
index 46e1f64..ebf00af 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java
@@ -22,17 +22,24 @@ import org.apache.hadoop.cblock.meta.VolumeDescriptor;
 import org.apache.hadoop.cblock.util.MockStorageClient;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.scm.client.ScmClient;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.List;
 
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -53,7 +60,16 @@ public class TestCBlockCLI {
     outContent = new ByteArrayOutputStream();
     ScmClient storageClient = new MockStorageClient();
     conf = new OzoneConfiguration();
-    conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, "/tmp/testCblockCli.dat");
+    String path = GenericTestUtils
+        .getTempPath(TestCBlockCLI.class.getSimpleName());
+    File filePath = new File(path);
+    if (!filePath.exists() && !filePath.mkdirs()) {
+      throw new IOException("Unable to create test DB dir");
+    }
+    conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat(
+        "/testCblockCli.dat"));
     cBlockManager = new CBlockManager(conf, storageClient);
     cBlockManager.start();
     testPrintOut = new PrintStream(outContent);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
new file mode 100644
index 0000000..94fe4cb
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import com.google.common.primitives.Longs;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
+
+/**
+ * Tests for Cblock read write functionality.
+ */
+public class TestCBlockReadWrite {
+  private final static long GB = 1024 * 1024 * 1024;
+  private final static int KB = 1024;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration config;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static XceiverClientManager xceiverClientManager;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    config = new OzoneConfiguration();
+    URL p = config.getClass().getResource("");
+    String path = p.getPath().concat(
+        TestOzoneContainer.class.getSimpleName());
+    config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    cluster = new MiniOzoneCluster.Builder(config)
+        .numDataNodes(1)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    storageContainerLocationClient = cluster
+        .createStorageContainerLocationClient();
+    xceiverClientManager = new XceiverClientManager(config);
+  }
+
+  @AfterClass
+  public static void shutdown() throws InterruptedException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.cleanup(null, storageContainerLocationClient, cluster);
+  }
+
+  /**
+   * getContainerPipelines creates a set of containers and returns the
+   * Pipelines that define those containers.
+   *
+   * @param count - Number of containers to create.
+   * @return - List of Pipelines.
+   * @throws IOException throws Exception
+   */
+  private List<Pipeline> getContainerPipeline(int count) throws IOException {
+    List<Pipeline> containerPipelines = new LinkedList<>();
+    for (int x = 0; x < count; x++) {
+      String traceID = "trace" + RandomStringUtils.randomNumeric(4);
+      String containerName = "container" + RandomStringUtils.randomNumeric(10);
+      Pipeline pipeline =
+          storageContainerLocationClient.allocateContainer(containerName);
+      XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+      ContainerProtocolCalls.createContainer(client, traceID);
+      // This step is needed since we set private data on pipelines, when we
+      // read the list from CBlockServer. So we mimic that action here.
+      pipeline.setData(Longs.toByteArray(x));
+      containerPipelines.add(pipeline);
+    }
+    return containerPipelines;
+  }
+
+  /**
+   * This test creates a cache and performs a simple write / read.
+   * The operations are done by bypassing the cache.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testDirectIO() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration cConfig = new OzoneConfiguration();
+    cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
+    cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    final long blockID = 0;
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    String dataHash = DigestUtils.sha256Hex(data);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(cConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Assert.assertFalse(cache.isShortCircuitIOEnabled());
+    cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(1, metrics.getNumWriteOps());
+    // Please note that this read is directly from remote container
+    LogicalBlock block = cache.get(blockID);
+    Assert.assertEquals(1, metrics.getNumReadOps());
+    Assert.assertEquals(0, metrics.getNumReadCacheHits());
+    Assert.assertEquals(1, metrics.getNumReadCacheMiss());
+    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+    Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
+
+    cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(2, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(2, metrics.getNumWriteOps());
+    Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
+    // Please note that this read is directly from remote container
+    block = cache.get(blockID + 1);
+    Assert.assertEquals(2, metrics.getNumReadOps());
+    Assert.assertEquals(0, metrics.getNumReadCacheHits());
+    Assert.assertEquals(2, metrics.getNumReadCacheMiss());
+    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+    String readHash = DigestUtils.sha256Hex(block.getData().array());
+    Assert.assertEquals("File content does not match.", dataHash, readHash);
+    GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+    cache.close();
+  }
+
+  /**
+   * This test writes some block to the cache and then shuts down the cache
+   * The cache is then restarted with "short.circuit.io" disable to check
+   * that the blocks are read correctly from the container.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testContainerWrites() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
+    XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+
+    int numUniqueBlocks = 4;
+    String[] data = new String[numUniqueBlocks];
+    String[] dataHash = new String[numUniqueBlocks];
+    for (int i = 0; i < numUniqueBlocks; i++) {
+      data[i] = RandomStringUtils.random(4 * KB);
+      dataHash[i] = DigestUtils.sha256Hex(data[i]);
+    }
+
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xcm, metrics);
+    List<Pipeline> pipelines = getContainerPipeline(10);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xcm)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Thread flushListenerThread = new Thread(flusher);
+    flushListenerThread.setDaemon(true);
+    flushListenerThread.start();
+    Assert.assertTrue(cache.isShortCircuitIOEnabled());
+    // Write data to the cache
+    for (int i = 0; i < 512; i++) {
+      cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
+    }
+    // Close the cache and flush the data to the containers
+    cache.close();
+    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(512, metrics.getNumWriteOps());
+    Thread.sleep(3000);
+    flusher.shutdown();
+    Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1);
+    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
+    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
+    // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
+    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher newFlusher =
+        new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
+    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xcm)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(newFlusher)
+        .setCBlockTargetMetrics(newMetrics)
+        .build();
+    newCache.start();
+    Assert.assertFalse(newCache.isShortCircuitIOEnabled());
+    // this read will be from the container, also match the hash
+    for (int i = 0; i < 512; i++) {
+      LogicalBlock block = newCache.get(i);
+      String readHash = DigestUtils.sha256Hex(block.getData().array());
+      Assert.assertEquals("File content does not match, for index:"
+          + i, dataHash[i % numUniqueBlocks], readHash);
+    }
+    Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
+    Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
+    newCache.close();
+    newFlusher.shutdown();
+  }
+
+  @Test
+  public void testRetryLog() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
+
+    int numblocks = 10;
+    flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks);
+
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+
+    List<Pipeline> fakeContainerPipelines = new LinkedList<>();
+    Pipeline fakePipeline = new Pipeline("fake");
+    fakePipeline.setData(Longs.toByteArray(1));
+    fakeContainerPipelines.add(fakePipeline);
+
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(fakeContainerPipelines)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Thread flushListenerThread = new Thread(flusher);
+    flushListenerThread.setDaemon(true);
+    flushListenerThread.start();
+
+    for (int i = 0; i < numblocks; i++) {
+      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
+    }
+    Assert.assertEquals(numblocks, metrics.getNumWriteOps());
+    Thread.sleep(3000);
+
+    // all the writes to the container will fail because of fake pipelines
+    Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead());
+    Assert.assertTrue(
+        metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks);
+    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites());
+    Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
+    cache.close();
+    flusher.shutdown();
+
+    // restart cache with correct pipelines, now blocks should be uploaded
+    // correctly
+    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher newFlusher =
+        new ContainerCacheFlusher(flushTestConfig,
+            xceiverClientManager, newMetrics);
+    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(newFlusher)
+        .setCBlockTargetMetrics(newMetrics)
+        .build();
+    newCache.start();
+    Thread newFlushListenerThread = new Thread(newFlusher);
+    newFlushListenerThread.setDaemon(true);
+    newFlushListenerThread.start();
+    Thread.sleep(3000);
+    Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks);
+    Assert.assertEquals(0, 
newMetrics.getNumWriteGenericExceptionRetryBlocks());
+    Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java
index 1efef3e..6733304 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java
@@ -29,6 +29,10 @@ import org.junit.Test;
 import java.util.HashSet;
 import java.util.List;
 
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -45,6 +49,8 @@ public class TestCBlockServer {
   public static void setup() throws Exception {
     ScmClient storageClient = new MockStorageClient();
     conf = new OzoneConfiguration();
+    conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
     cBlockManager = new CBlockManager(conf, storageClient);
     cBlockManager.start();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java
index f4f927f..7bcaae4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java
@@ -28,7 +28,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -63,6 +68,8 @@ public class TestCBlockServerPersistence {
         "/testCblockPersistence.dat"));
     try {
       ScmClient storageClient = new MockStorageClient();
+      conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
+      conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
       cBlockManager = new CBlockManager(conf, storageClient);
       cBlockManager.start();
       cBlockManager.createVolume(userName, volumeName1, volumeSize1, 
blockSize);
@@ -84,6 +91,8 @@ public class TestCBlockServerPersistence {
       OzoneConfiguration conf1 = new OzoneConfiguration();
       conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat(
           "/testCblockPersistence.dat"));
+      conf1.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
+      conf1.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
       cBlockManager1 = new CBlockManager(conf1, storageClient1);
       cBlockManager1.start();
       List<VolumeDescriptor> allVolumes1 = cBlockManager1.getAllVolumes();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f50a367/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
index b03fb22..c54a214 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
@@ -62,13 +62,9 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
     DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
 import static org.apache.hadoop.cblock.CBlockConfigKeys.
     DFS_CBLOCK_TRACE_IO;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
 
 /**
- * Tests for Tests for local cache.
+ * Tests for local cache.
  */
 public class TestLocalBlockCache {
   private static final Logger LOG =
@@ -444,247 +440,4 @@ public class TestLocalBlockCache {
         100, 20 * 1000);
     ozoneStore.close();
   }
-
-  /**
-   * This test creates a cache and performs a simple write / read.
-   * The operations are done by bypassing the cache.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testDirectIO() throws IOException,
-      InterruptedException, TimeoutException {
-    OzoneConfiguration cConfig = new OzoneConfiguration();
-    cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
-    cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
-    final long blockID = 0;
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
-    String userName = "user" + RandomStringUtils.randomNumeric(4);
-    String data = RandomStringUtils.random(4 * KB);
-    String dataHash = DigestUtils.sha256Hex(data);
-    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
-    ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig,
-        xceiverClientManager, metrics);
-    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
-        .setConfiguration(cConfig)
-        .setVolumeName(volumeName)
-        .setUserName(userName)
-        .setPipelines(getContainerPipeline(10))
-        .setClientManager(xceiverClientManager)
-        .setBlockSize(4 * KB)
-        .setVolumeSize(50 * GB)
-        .setFlusher(flusher)
-        .setCBlockTargetMetrics(metrics)
-        .build();
-    cache.start();
-    Assert.assertFalse(cache.isShortCircuitIOEnabled());
-    cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
-    Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
-    Assert.assertEquals(1, metrics.getNumWriteOps());
-    // Please note that this read is directly from remote container
-    LogicalBlock block = cache.get(blockID);
-    Assert.assertEquals(1, metrics.getNumReadOps());
-    Assert.assertEquals(0, metrics.getNumReadCacheHits());
-    Assert.assertEquals(1, metrics.getNumReadCacheMiss());
-    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
-    Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
-
-    cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
-    Assert.assertEquals(2, metrics.getNumDirectBlockWrites());
-    Assert.assertEquals(2, metrics.getNumWriteOps());
-    Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
-    // Please note that this read is directly from remote container
-    block = cache.get(blockID + 1);
-    Assert.assertEquals(2, metrics.getNumReadOps());
-    Assert.assertEquals(0, metrics.getNumReadCacheHits());
-    Assert.assertEquals(2, metrics.getNumReadCacheMiss());
-    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
-    String readHash = DigestUtils.sha256Hex(block.getData().array());
-    Assert.assertEquals("File content does not match.", dataHash, readHash);
-    GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
-    cache.close();
-  }
-
-  /**
-   * This test writes some block to the cache and then shuts down the cache
-   * The cache is then restarted with "short.circuit.io" disable to check
-   * that the blocks are read correctly from the container.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testContainerWrites() throws IOException,
-      InterruptedException, TimeoutException {
-    // Create a new config so that this tests write metafile to new location
-    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
-    URL p = flushTestConfig.getClass().getResource("");
-    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
-    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
-    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
-    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
-    flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
-    XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
-    String userName = "user" + RandomStringUtils.randomNumeric(4);
-
-    int numUniqueBlocks = 4;
-    String[] data = new String[numUniqueBlocks];
-    String[] dataHash = new String[numUniqueBlocks];
-    for (int i = 0; i < numUniqueBlocks; i++) {
-      data[i] = RandomStringUtils.random(4 * KB);
-      dataHash[i] = DigestUtils.sha256Hex(data[i]);
-    }
-
-    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
-    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
-        xcm, metrics);
-    List<Pipeline> pipelines = getContainerPipeline(10);
-    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
-        .setConfiguration(flushTestConfig)
-        .setVolumeName(volumeName)
-        .setUserName(userName)
-        .setPipelines(pipelines)
-        .setClientManager(xcm)
-        .setBlockSize(4 * KB)
-        .setVolumeSize(50 * GB)
-        .setFlusher(flusher)
-        .setCBlockTargetMetrics(metrics)
-        .build();
-    cache.start();
-    Thread flushListenerThread = new Thread(flusher);
-    flushListenerThread.setDaemon(true);
-    flushListenerThread.start();
-    Assert.assertTrue(cache.isShortCircuitIOEnabled());
-    // Write data to the cache
-    for (int i = 0; i < 512; i++) {
-      cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
-    }
-    // Close the cache and flush the data to the containers
-    cache.close();
-    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
-    Assert.assertEquals(512, metrics.getNumWriteOps());
-    Thread.sleep(5000);
-    flusher.shutdown();
-    Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1);
-    Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
-    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
-    Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
-    Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
-    // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
-    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
-    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
-    ContainerCacheFlusher newFlusher =
-        new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
-    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
-        .setConfiguration(flushTestConfig)
-        .setVolumeName(volumeName)
-        .setUserName(userName)
-        .setPipelines(pipelines)
-        .setClientManager(xcm)
-        .setBlockSize(4 * KB)
-        .setVolumeSize(50 * GB)
-        .setFlusher(newFlusher)
-        .setCBlockTargetMetrics(newMetrics)
-        .build();
-    newCache.start();
-    Assert.assertFalse(newCache.isShortCircuitIOEnabled());
-    // this read will be from the container, also match the hash
-    for (int i = 0; i < 512; i++) {
-      LogicalBlock block = newCache.get(i);
-      String readHash = DigestUtils.sha256Hex(block.getData().array());
-      Assert.assertEquals("File content does not match, for index:"
-          + i, dataHash[i % numUniqueBlocks], readHash);
-    }
-    Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
-    Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
-    newCache.close();
-    newFlusher.shutdown();
-  }
-
-  @Test
-  public void testRetryLog() throws IOException,
-      InterruptedException, TimeoutException {
-    // Create a new config so that this tests write metafile to new location
-    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
-    URL p = flushTestConfig.getClass().getResource("");
-    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
-    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
-    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
-    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
-    flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
-
-    int numblocks = 10;
-    flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks);
-
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
-    String userName = "user" + RandomStringUtils.randomNumeric(4);
-    String data = RandomStringUtils.random(4 * KB);
-
-    List<Pipeline> fakeContainerPipelines = new LinkedList<>();
-    Pipeline fakePipeline = new Pipeline("fake");
-    fakePipeline.setData(Longs.toByteArray(1));
-    fakeContainerPipelines.add(fakePipeline);
-
-    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
-    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
-        xceiverClientManager, metrics);
-    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
-        .setConfiguration(flushTestConfig)
-        .setVolumeName(volumeName)
-        .setUserName(userName)
-        .setPipelines(fakeContainerPipelines)
-        .setClientManager(xceiverClientManager)
-        .setBlockSize(4 * KB)
-        .setVolumeSize(50 * GB)
-        .setFlusher(flusher)
-        .setCBlockTargetMetrics(metrics)
-        .build();
-    cache.start();
-    Thread flushListenerThread = new Thread(flusher);
-    flushListenerThread.setDaemon(true);
-    flushListenerThread.start();
-
-    for (int i = 0; i < numblocks; i++) {
-      cache.put(i, data.getBytes(StandardCharsets.UTF_8));
-    }
-    Assert.assertEquals(numblocks, metrics.getNumWriteOps());
-    Thread.sleep(3000);
-
-    // all the writes to the container will fail because of fake pipelines
-    Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead());
-    Assert.assertTrue(
-        metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks);
-    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
-    Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites());
-    Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
-    cache.close();
-    flusher.shutdown();
-
-    // restart cache with correct pipelines, now blocks should be uploaded
-    // correctly
-    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
-    ContainerCacheFlusher newFlusher =
-        new ContainerCacheFlusher(flushTestConfig,
-            xceiverClientManager, newMetrics);
-    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
-        .setConfiguration(flushTestConfig)
-        .setVolumeName(volumeName)
-        .setUserName(userName)
-        .setPipelines(getContainerPipeline(10))
-        .setClientManager(xceiverClientManager)
-        .setBlockSize(4 * KB)
-        .setVolumeSize(50 * GB)
-        .setFlusher(newFlusher)
-        .setCBlockTargetMetrics(newMetrics)
-        .build();
-    newCache.start();
-    Thread newFlushListenerThread = new Thread(newFlusher);
-    newFlushListenerThread.setDaemon(true);
-    newFlushListenerThread.start();
-    Thread.sleep(3000);
-    Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks);
-    Assert.assertEquals(0, 
newMetrics.getNumWriteGenericExceptionRetryBlocks());
-    Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks());
-    Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB());
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to