Repository: hadoop
Updated Branches:
  refs/heads/ozone-0.3 a492edee4 -> 575613ef7


Revert "HDDS-708. Validate BCSID while reading blocks from containers in 
datanodes. Contributed by Shashikant Banerjee."

This reverts commit a492edee4b84e38552f185a4227b49abf60f762d.
The commit msg is wrong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/575613ef
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/575613ef
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/575613ef

Branch: refs/heads/ozone-0.3
Commit: 575613ef7a051d3a26ef8d895ef10498cb71d667
Parents: a492ede
Author: Shashikant Banerjee <shashik...@apache.org>
Authored: Tue Oct 23 19:14:18 2018 +0530
Committer: Shashikant Banerjee <shashik...@apache.org>
Committed: Tue Oct 23 19:14:18 2018 +0530

----------------------------------------------------------------------
 .../hadoop/hdds/scm/XceiverClientGrpc.java      | 194 ++++++-------------
 .../hadoop/hdds/scm/XceiverClientManager.java   |  14 +-
 .../hadoop/hdds/scm/XceiverClientRatis.java     |   6 -
 .../scm/container/common/helpers/Pipeline.java  |   4 -
 .../ozone/client/io/ChunkGroupInputStream.java  |   9 +-
 .../ozone/client/io/ChunkGroupOutputStream.java |   4 -
 .../hadoop/ozone/TestMiniOzoneCluster.java      |   2 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java    | 107 ----------
 .../ozone/scm/TestXceiverClientManager.java     |  15 +-
 9 files changed, 70 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 9526be3..2f11872 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@@ -41,9 +40,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.UUID;
-import java.util.Map;
-import java.util.HashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -54,9 +50,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
   private final Pipeline pipeline;
   private final Configuration config;
-  private Map<UUID, XceiverClientProtocolServiceStub> asyncStubs;
+  private XceiverClientProtocolServiceStub asyncStub;
   private XceiverClientMetrics metrics;
-  private Map<UUID, ManagedChannel> channels;
+  private ManagedChannel channel;
   private final Semaphore semaphore;
   private boolean closed = false;
 
@@ -76,62 +72,46 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     this.semaphore =
         new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
     this.metrics = XceiverClientManager.getXceiverClientMetrics();
-    this.channels = new HashMap<>();
-    this.asyncStubs = new HashMap<>();
   }
 
   @Override
   public void connect() throws Exception {
-
-    // leader by default is the 1st datanode in the datanode list of pipleline
     DatanodeDetails leader = this.pipeline.getLeader();
-    // just make a connection to the 1st datanode at the beginning
-    connectToDatanode(leader);
-  }
 
-  private void connectToDatanode(DatanodeDetails dn) {
     // read port from the data node, on failure use default configured
     // port.
-    int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
+    int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
     if (port == 0) {
       port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
           OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     }
-    LOG.debug("Connecting to server Port : " + dn.getIpAddress());
-    ManagedChannel channel =
-        NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
-            
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
-            .build();
-    XceiverClientProtocolServiceStub asyncStub =
-        XceiverClientProtocolServiceGrpc.newStub(channel);
-    asyncStubs.put(dn.getUuid(), asyncStub);
-    channels.put(dn.getUuid(), channel);
+    LOG.debug("Connecting to server Port : " + leader.getIpAddress());
+    channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
+        .usePlaintext()
+        .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+        .build();
+    asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
   }
+
   /**
-   * Returns if the xceiver client connects to all servers in the pipeline.
+   * Returns if the xceiver client connects to a server.
    *
    * @return True if the connection is alive, false otherwise.
    */
   @VisibleForTesting
-  public boolean isConnected(DatanodeDetails details) {
-    return isConnected(channels.get(details.getUuid()));
-  }
-
-  private boolean isConnected(ManagedChannel channel) {
-    return channel != null && !channel.isTerminated() && !channel.isShutdown();
+  public boolean isConnected() {
+    return !channel.isTerminated() && !channel.isShutdown();
   }
 
   @Override
   public void close() {
     closed = true;
-    for (ManagedChannel channel : channels.values()) {
-      channel.shutdownNow();
-      try {
-        channel.awaitTermination(60, TimeUnit.MINUTES);
-      } catch (Exception e) {
-        LOG.error("Unexpected exception while waiting for channel termination",
-            e);
-      }
+    channel.shutdownNow();
+    try {
+      channel.awaitTermination(60, TimeUnit.MINUTES);
+    } catch (Exception e) {
+      LOG.error("Unexpected exception while waiting for channel termination",
+          e);
     }
   }
 
@@ -140,56 +120,6 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     return pipeline;
   }
 
-  @Override
-  public ContainerCommandResponseProto sendCommand(
-      ContainerCommandRequestProto request) throws IOException {
-    return sendCommandWithRetry(request);
-  }
-
-  public ContainerCommandResponseProto sendCommandWithRetry(
-      ContainerCommandRequestProto request) throws IOException {
-    int size = pipeline.getMachines().size();
-    ContainerCommandResponseProto responseProto = null;
-    DatanodeDetails dn = null;
-
-    // In case of an exception or an error, we will try to read from the
-    // datanodes in the pipeline in a round robin fashion.
-
-    // TODO: cache the correct leader info in here, so that any subsequent 
calls
-    // should first go to leader
-    for (int dnIndex = 0; dnIndex < size; dnIndex++) {
-      try {
-        dn = pipeline.getMachines().get(dnIndex);
-        LOG.debug("Executing command " + request + " on datanode " + dn);
-        // In case the command gets retried on a 2nd datanode,
-        // sendCommandAsyncCall will create a new channel and async stub
-        // in case these don't exist for the specific datanode.
-        responseProto = sendCommandAsync(request, dn).get();
-        if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
-          break;
-        }
-      } catch (ExecutionException | InterruptedException e) {
-        LOG.warn("Failed to execute command " + request + " on datanode " + dn
-            .getUuidString(), e);
-      }
-    }
-
-    if (responseProto != null) {
-      return responseProto;
-    } else {
-      throw new IOException(
-          "Failed to execute command " + request + " on the pipeline "
-              + pipeline.getId());
-    }
-  }
-
-  // TODO: for a true async API, once the waitable future while executing
-  // the command on one channel fails, it should be retried asynchronously
-  // on the future Task for all the remaining datanodes.
-
-  // Note: this Async api is not used currently used in any active I/O path.
-  // In case it gets used, the asynchronous retry logic needs to be plugged
-  // in here.
   /**
    * Sends a given command to server gets a waitable future back.
    *
@@ -198,25 +128,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    * @throws IOException
    */
   @Override
-  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
-      ContainerCommandRequestProto request)
+  public CompletableFuture<ContainerCommandResponseProto>
+      sendCommandAsync(ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException {
-    return sendCommandAsync(request, pipeline.getLeader());
-  }
-
-  private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
-      ContainerCommandRequestProto request, DatanodeDetails dn)
-      throws IOException, ExecutionException, InterruptedException {
-    if (closed) {
+    if(closed){
       throw new IOException("This channel is not connected.");
     }
 
-    UUID dnId = dn.getUuid();
-    ManagedChannel channel = channels.get(dnId);
-    // If the channel doesn't exist for this specific datanode or the channel
-    // is closed, just reconnect
-    if (!isConnected(channel)) {
-      reconnect(dn);
+    if(channel == null || !isConnected()) {
+      reconnect();
     }
 
     final CompletableFuture<ContainerCommandResponseProto> replyFuture =
@@ -225,54 +145,48 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     long requestTime = Time.monotonicNowNanos();
     metrics.incrPendingContainerOpsMetrics(request.getCmdType());
     // create a new grpc stream for each non-async call.
-
-    // TODO: for async calls, we should reuse StreamObserver resources.
     final StreamObserver<ContainerCommandRequestProto> requestObserver =
-        asyncStubs.get(dnId)
-            .send(new StreamObserver<ContainerCommandResponseProto>() {
-              @Override
-              public void onNext(ContainerCommandResponseProto value) {
-                replyFuture.complete(value);
-                metrics.decrPendingContainerOpsMetrics(request.getCmdType());
-                metrics.addContainerOpsLatency(request.getCmdType(),
-                    Time.monotonicNowNanos() - requestTime);
-                semaphore.release();
-              }
-
-              @Override
-              public void onError(Throwable t) {
-                replyFuture.completeExceptionally(t);
-                metrics.decrPendingContainerOpsMetrics(request.getCmdType());
-                metrics.addContainerOpsLatency(request.getCmdType(),
-                    Time.monotonicNowNanos() - requestTime);
-                semaphore.release();
-              }
-
-              @Override
-              public void onCompleted() {
-                if (!replyFuture.isDone()) {
-                  replyFuture.completeExceptionally(new IOException(
-                      "Stream completed but no reply for request " + request));
-                }
-              }
-            });
+        asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() {
+          @Override
+          public void onNext(ContainerCommandResponseProto value) {
+            replyFuture.complete(value);
+            metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+            metrics.addContainerOpsLatency(request.getCmdType(),
+                Time.monotonicNowNanos() - requestTime);
+            semaphore.release();
+          }
+          @Override
+          public void onError(Throwable t) {
+            replyFuture.completeExceptionally(t);
+            metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+            metrics.addContainerOpsLatency(request.getCmdType(),
+                Time.monotonicNowNanos() - requestTime);
+            semaphore.release();
+          }
+
+          @Override
+          public void onCompleted() {
+            if (!replyFuture.isDone()) {
+              replyFuture.completeExceptionally(
+                  new IOException("Stream completed but no reply for request "
+                      + request));
+            }
+          }
+        });
     requestObserver.onNext(request);
     requestObserver.onCompleted();
     return replyFuture;
   }
 
-  private void reconnect(DatanodeDetails dn)
-      throws IOException {
-    ManagedChannel channel;
+  private void reconnect() throws IOException {
     try {
-      connectToDatanode(dn);
-      channel = channels.get(dn.getUuid());
+      connect();
     } catch (Exception e) {
       LOG.error("Error while connecting: ", e);
       throw new IOException(e);
     }
 
-    if (channel == null || !isConnected(channel)) {
+    if (channel == null || !isConnected()) {
       throw new IOException("This channel is not connected.");
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 83b5a4c..d542abc 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -27,6 +27,7 @@ import com.google.common.cache.RemovalNotification;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -58,7 +59,7 @@ public class XceiverClientManager implements Closeable {
 
   //TODO : change this to SCM configuration class
   private final Configuration conf;
-  private final Cache<String, XceiverClientSpi> clientCache;
+  private final Cache<PipelineID, XceiverClientSpi> clientCache;
   private final boolean useRatis;
 
   private static XceiverClientMetrics metrics;
@@ -82,10 +83,10 @@ public class XceiverClientManager implements Closeable {
         .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
         .maximumSize(maxSize)
         .removalListener(
-            new RemovalListener<String, XceiverClientSpi>() {
+            new RemovalListener<PipelineID, XceiverClientSpi>() {
             @Override
             public void onRemoval(
-                RemovalNotification<String, XceiverClientSpi>
+                RemovalNotification<PipelineID, XceiverClientSpi>
                   removalNotification) {
               synchronized (clientCache) {
                 // Mark the entry as evicted
@@ -97,7 +98,7 @@ public class XceiverClientManager implements Closeable {
   }
 
   @VisibleForTesting
-  public Cache<String, XceiverClientSpi> getClientCache() {
+  public Cache<PipelineID, XceiverClientSpi> getClientCache() {
     return clientCache;
   }
 
@@ -139,14 +140,13 @@ public class XceiverClientManager implements Closeable {
 
   private XceiverClientSpi getClient(Pipeline pipeline)
       throws IOException {
-    HddsProtos.ReplicationType type = pipeline.getType();
     try {
-      return clientCache.get(pipeline.getId().getId().toString() + type,
+      return clientCache.get(pipeline.getId(),
           new Callable<XceiverClientSpi>() {
           @Override
           public XceiverClientSpi call() throws Exception {
             XceiverClientSpi client = null;
-            switch (type) {
+            switch (pipeline.getType()) {
             case RATIS:
               client = XceiverClientRatis.newXceiverClientRatis(pipeline, 
conf);
               break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index fda30fb..4efe7ba 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.io.MultipleIOException;
-import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.thirdparty.com.google.protobuf
     .InvalidProtocolBufferException;
@@ -52,7 +51,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -192,10 +190,6 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
         getClient().sendAsync(() -> byteString);
   }
 
-  public void watchForCommit(long index, long timeout) throws Exception {
-    getClient().sendWatchAsync(index, 
RaftProtos.ReplicationLevel.ALL_COMMITTED)
-        .get(timeout, TimeUnit.MILLISECONDS);
-  }
   /**
    * Sends a given command to server gets a waitable future back.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index 7fa4c39..49c00a8 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -301,10 +301,6 @@ public class Pipeline {
     return b.toString();
   }
 
-  public void setType(HddsProtos.ReplicationType type) {
-    this.type = type;
-  }
-
   /**
    * Returns a JSON string of this object.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
index 3772c59..3158334 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
@@ -22,9 +22,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -278,13 +276,8 @@ public class ChunkGroupInputStream extends InputStream 
implements Seekable {
       long containerID = blockID.getContainerID();
       ContainerWithPipeline containerWithPipeline =
           storageContainerLocationClient.getContainerWithPipeline(containerID);
-      Pipeline pipeline = containerWithPipeline.getPipeline();
-
-      // irrespective of the container state, we will always read via 
Standalone
-      // protocol.
-      pipeline.setType(HddsProtos.ReplicationType.STAND_ALONE);
       XceiverClientSpi xceiverClient = xceiverClientManager
-          .acquireClient(pipeline);
+          .acquireClient(containerWithPipeline.getPipeline());
       boolean success = false;
       containerKey = omKeyLocationInfo.getLocalID();
       try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 5966718..0a38a5a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -116,10 +116,6 @@ public class ChunkGroupOutputStream extends OutputStream {
   public List<ChunkOutputStreamEntry> getStreamEntries() {
     return streamEntries;
   }
-  @VisibleForTesting
-  public XceiverClientManager getXceiverClientManager() {
-    return xceiverClientManager;
-  }
 
   public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
     List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index bf6a189..302ea46 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -102,7 +102,7 @@ public class TestMiniOzoneCluster {
       // Verify client is able to connect to the container
       try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){
         client.connect();
-        assertTrue(client.isConnected(pipeline.getLeader()));
+        assertTrue(client.isConnected());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 1a401d6..b60343a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -24,10 +24,6 @@ import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientRatis;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.*;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -35,7 +31,6 @@ import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -602,108 +597,6 @@ public class TestOzoneRpcClient {
   }
 
   @Test
-  public void testPutKeyAndGetKeyThreeNodes()
-      throws Exception {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-
-    String value = "sample value";
-    store.createVolume(volumeName);
-    OzoneVolume volume = store.getVolume(volumeName);
-    volume.createBucket(bucketName);
-    OzoneBucket bucket = volume.getBucket(bucketName);
-
-    String keyName = UUID.randomUUID().toString();
-
-    OzoneOutputStream out = bucket
-        .createKey(keyName, value.getBytes().length, ReplicationType.RATIS,
-            ReplicationFactor.THREE);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) out.getOutputStream();
-    XceiverClientManager manager = groupOutputStream.getXceiverClientManager();
-    out.write(value.getBytes());
-    out.close();
-    // First, confirm the key info from the client matches the info in OM.
-    OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
-    builder.setVolumeName(volumeName).setBucketName(bucketName)
-        .setKeyName(keyName);
-    OmKeyLocationInfo keyInfo = ozoneManager.lookupKey(builder.build()).
-        getKeyLocationVersions().get(0).getBlocksLatestVersionOnly().get(0);
-    long containerID = keyInfo.getContainerID();
-    long localID = keyInfo.getLocalID();
-    OzoneKeyDetails keyDetails = bucket.getKey(keyName);
-    Assert.assertEquals(keyName, keyDetails.getName());
-
-    List<OzoneKeyLocation> keyLocations = keyDetails.getOzoneKeyLocations();
-    Assert.assertEquals(1, keyLocations.size());
-    Assert.assertEquals(containerID, keyLocations.get(0).getContainerID());
-    Assert.assertEquals(localID, keyLocations.get(0).getLocalID());
-
-    // Make sure that the data size matched.
-    Assert
-        .assertEquals(value.getBytes().length, 
keyLocations.get(0).getLength());
-
-    ContainerWithPipeline container =
-        cluster.getStorageContainerManager().getContainerManager()
-            .getContainerWithPipeline(containerID);
-    Pipeline pipeline = container.getPipeline();
-    List<DatanodeDetails> datanodes = pipeline.getMachines();
-
-    DatanodeDetails datanodeDetails = datanodes.get(0);
-    Assert.assertNotNull(datanodeDetails);
-
-    XceiverClientSpi clientSpi = manager.acquireClient(pipeline);
-    Assert.assertTrue(clientSpi instanceof XceiverClientRatis);
-    XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi;
-
-    ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000);
-    // shutdown the datanode
-    cluster.shutdownHddsDatanode(datanodeDetails);
-
-    Assert.assertTrue(container.getContainerInfo().getState()
-        == HddsProtos.LifeCycleState.OPEN);
-    // try to read, this shouls be successful
-    readKey(bucket, keyName, value);
-
-    Assert.assertTrue(container.getContainerInfo().getState()
-        == HddsProtos.LifeCycleState.OPEN);
-    // shutdown the second datanode
-    datanodeDetails = datanodes.get(1);
-    cluster.shutdownHddsDatanode(datanodeDetails);
-    Assert.assertTrue(container.getContainerInfo().getState()
-        == HddsProtos.LifeCycleState.OPEN);
-
-    // the container is open and with loss of 2 nodes we still should be able
-    // to read via Standalone protocol
-    // try to read
-    readKey(bucket, keyName, value);
-
-    // shutdown the 3rd datanode
-    datanodeDetails = datanodes.get(2);
-    cluster.shutdownHddsDatanode(datanodeDetails);
-    try {
-      // try to read
-      readKey(bucket, keyName, value);
-      Assert.fail("Expected exception not thrown");
-    } catch (IOException e) {
-      Assert.assertTrue(e.getMessage().contains("Failed to execute command"));
-      Assert.assertTrue(
-          e.getMessage().contains("on the pipeline " + pipeline.getId()));
-    }
-    manager.releaseClient(clientSpi);
-  }
-
-  private void readKey(OzoneBucket bucket, String keyName, String data)
-      throws IOException {
-    OzoneKey key = bucket.getKey(keyName);
-    Assert.assertEquals(keyName, key.getName());
-    OzoneInputStream is = bucket.readKey(keyName);
-    byte[] fileContent = new byte[data.getBytes().length];
-    is.read(fileContent);
-    is.close();
-  }
-
-  @Test
   public void testGetKeyDetails() throws IOException, OzoneException {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
index 8b35bbb..da445bf 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.scm;
 import com.google.common.cache.Cache;
 import org.apache.commons.lang3.RandomStringUtils;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -106,7 +107,7 @@ public class TestXceiverClientManager {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
-    Cache<String, XceiverClientSpi> cache =
+    Cache<PipelineID, XceiverClientSpi> cache =
         clientManager.getClientCache();
 
     ContainerWithPipeline container1 =
@@ -129,9 +130,8 @@ public class TestXceiverClientManager {
     Assert.assertNotEquals(client1, client2);
 
     // least recent container (i.e containerName1) is evicted
-    XceiverClientSpi nonExistent1 = cache.getIfPresent(
-        container1.getContainerInfo().getPipelineID().getId().toString()
-            + container1.getContainerInfo().getReplicationType());
+    XceiverClientSpi nonExistent1 = cache
+        .getIfPresent(container1.getContainerInfo().getPipelineID());
     Assert.assertEquals(null, nonExistent1);
     // However container call should succeed because of refcount on the client.
     String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
@@ -160,7 +160,7 @@ public class TestXceiverClientManager {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
-    Cache<String, XceiverClientSpi> cache =
+    Cache<PipelineID, XceiverClientSpi> cache =
         clientManager.getClientCache();
 
     ContainerWithPipeline container1 =
@@ -183,9 +183,8 @@ public class TestXceiverClientManager {
     Assert.assertNotEquals(client1, client2);
 
     // now client 1 should be evicted
-    XceiverClientSpi nonExistent = cache.getIfPresent(
-        container1.getContainerInfo().getPipelineID().getId().toString()
-            + container1.getContainerInfo().getReplicationType());
+    XceiverClientSpi nonExistent = cache
+        .getIfPresent(container1.getContainerInfo().getPipelineID());
     Assert.assertEquals(null, nonExistent);
 
     // Any container operation should now fail


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to