Repository: hadoop Updated Branches: refs/heads/ozone-0.3 a492edee4 -> 575613ef7
Revert "HDDS-708. Validate BCSID while reading blocks from containers in datanodes. Contributed by Shashikant Banerjee." This reverts commit a492edee4b84e38552f185a4227b49abf60f762d. The commit msg is wrong. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/575613ef Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/575613ef Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/575613ef Branch: refs/heads/ozone-0.3 Commit: 575613ef7a051d3a26ef8d895ef10498cb71d667 Parents: a492ede Author: Shashikant Banerjee <shashik...@apache.org> Authored: Tue Oct 23 19:14:18 2018 +0530 Committer: Shashikant Banerjee <shashik...@apache.org> Committed: Tue Oct 23 19:14:18 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientGrpc.java | 194 ++++++------------- .../hadoop/hdds/scm/XceiverClientManager.java | 14 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 6 - .../scm/container/common/helpers/Pipeline.java | 4 - .../ozone/client/io/ChunkGroupInputStream.java | 9 +- .../ozone/client/io/ChunkGroupOutputStream.java | 4 - .../hadoop/ozone/TestMiniOzoneCluster.java | 2 +- .../ozone/client/rpc/TestOzoneRpcClient.java | 107 ---------- .../ozone/scm/TestXceiverClientManager.java | 15 +- 9 files changed, 70 insertions(+), 285 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 9526be3..2f11872 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; @@ -41,9 +40,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.UUID; -import java.util.Map; -import java.util.HashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -54,9 +50,9 @@ public class XceiverClientGrpc extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); private final Pipeline pipeline; private final Configuration config; - private Map<UUID, XceiverClientProtocolServiceStub> asyncStubs; + private XceiverClientProtocolServiceStub asyncStub; private XceiverClientMetrics metrics; - private Map<UUID, ManagedChannel> channels; + private ManagedChannel channel; private final Semaphore semaphore; private boolean closed = false; @@ -76,62 +72,46 @@ public class XceiverClientGrpc extends XceiverClientSpi { this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); - this.channels = new HashMap<>(); - this.asyncStubs = new HashMap<>(); } @Override public void connect() throws Exception { - - // leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails leader = this.pipeline.getLeader(); - // just make a connection to the 1st datanode at the beginning - connectToDatanode(leader); - } - private void connectToDatanode(DatanodeDetails dn) { // read port from the data node, on failure use default configured // port. - int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); + int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); if (port == 0) { port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); } - LOG.debug("Connecting to server Port : " + dn.getIpAddress()); - ManagedChannel channel = - NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() - .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) - .build(); - XceiverClientProtocolServiceStub asyncStub = - XceiverClientProtocolServiceGrpc.newStub(channel); - asyncStubs.put(dn.getUuid(), asyncStub); - channels.put(dn.getUuid(), channel); + LOG.debug("Connecting to server Port : " + leader.getIpAddress()); + channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port) + .usePlaintext() + .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .build(); + asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); } + /** - * Returns if the xceiver client connects to all servers in the pipeline. + * Returns if the xceiver client connects to a server. * * @return True if the connection is alive, false otherwise. */ @VisibleForTesting - public boolean isConnected(DatanodeDetails details) { - return isConnected(channels.get(details.getUuid())); - } - - private boolean isConnected(ManagedChannel channel) { - return channel != null && !channel.isTerminated() && !channel.isShutdown(); + public boolean isConnected() { + return !channel.isTerminated() && !channel.isShutdown(); } @Override public void close() { closed = true; - for (ManagedChannel channel : channels.values()) { - channel.shutdownNow(); - try { - channel.awaitTermination(60, TimeUnit.MINUTES); - } catch (Exception e) { - LOG.error("Unexpected exception while waiting for channel termination", - e); - } + channel.shutdownNow(); + try { + channel.awaitTermination(60, TimeUnit.MINUTES); + } catch (Exception e) { + LOG.error("Unexpected exception while waiting for channel termination", + e); } } @@ -140,56 +120,6 @@ public class XceiverClientGrpc extends XceiverClientSpi { return pipeline; } - @Override - public ContainerCommandResponseProto sendCommand( - ContainerCommandRequestProto request) throws IOException { - return sendCommandWithRetry(request); - } - - public ContainerCommandResponseProto sendCommandWithRetry( - ContainerCommandRequestProto request) throws IOException { - int size = pipeline.getMachines().size(); - ContainerCommandResponseProto responseProto = null; - DatanodeDetails dn = null; - - // In case of an exception or an error, we will try to read from the - // datanodes in the pipeline in a round robin fashion. - - // TODO: cache the correct leader info in here, so that any subsequent calls - // should first go to leader - for (int dnIndex = 0; dnIndex < size; dnIndex++) { - try { - dn = pipeline.getMachines().get(dnIndex); - LOG.debug("Executing command " + request + " on datanode " + dn); - // In case the command gets retried on a 2nd datanode, - // sendCommandAsyncCall will create a new channel and async stub - // in case these don't exist for the specific datanode. - responseProto = sendCommandAsync(request, dn).get(); - if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { - break; - } - } catch (ExecutionException | InterruptedException e) { - LOG.warn("Failed to execute command " + request + " on datanode " + dn - .getUuidString(), e); - } - } - - if (responseProto != null) { - return responseProto; - } else { - throw new IOException( - "Failed to execute command " + request + " on the pipeline " - + pipeline.getId()); - } - } - - // TODO: for a true async API, once the waitable future while executing - // the command on one channel fails, it should be retried asynchronously - // on the future Task for all the remaining datanodes. - - // Note: this Async api is not used currently used in any active I/O path. - // In case it gets used, the asynchronous retry logic needs to be plugged - // in here. /** * Sends a given command to server gets a waitable future back. * @@ -198,25 +128,15 @@ public class XceiverClientGrpc extends XceiverClientSpi { * @throws IOException */ @Override - public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( - ContainerCommandRequestProto request) + public CompletableFuture<ContainerCommandResponseProto> + sendCommandAsync(ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { - return sendCommandAsync(request, pipeline.getLeader()); - } - - private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( - ContainerCommandRequestProto request, DatanodeDetails dn) - throws IOException, ExecutionException, InterruptedException { - if (closed) { + if(closed){ throw new IOException("This channel is not connected."); } - UUID dnId = dn.getUuid(); - ManagedChannel channel = channels.get(dnId); - // If the channel doesn't exist for this specific datanode or the channel - // is closed, just reconnect - if (!isConnected(channel)) { - reconnect(dn); + if(channel == null || !isConnected()) { + reconnect(); } final CompletableFuture<ContainerCommandResponseProto> replyFuture = @@ -225,54 +145,48 @@ public class XceiverClientGrpc extends XceiverClientSpi { long requestTime = Time.monotonicNowNanos(); metrics.incrPendingContainerOpsMetrics(request.getCmdType()); // create a new grpc stream for each non-async call. - - // TODO: for async calls, we should reuse StreamObserver resources. final StreamObserver<ContainerCommandRequestProto> requestObserver = - asyncStubs.get(dnId) - .send(new StreamObserver<ContainerCommandResponseProto>() { - @Override - public void onNext(ContainerCommandResponseProto value) { - replyFuture.complete(value); - metrics.decrPendingContainerOpsMetrics(request.getCmdType()); - metrics.addContainerOpsLatency(request.getCmdType(), - Time.monotonicNowNanos() - requestTime); - semaphore.release(); - } - - @Override - public void onError(Throwable t) { - replyFuture.completeExceptionally(t); - metrics.decrPendingContainerOpsMetrics(request.getCmdType()); - metrics.addContainerOpsLatency(request.getCmdType(), - Time.monotonicNowNanos() - requestTime); - semaphore.release(); - } - - @Override - public void onCompleted() { - if (!replyFuture.isDone()) { - replyFuture.completeExceptionally(new IOException( - "Stream completed but no reply for request " + request)); - } - } - }); + asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() { + @Override + public void onNext(ContainerCommandResponseProto value) { + replyFuture.complete(value); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + @Override + public void onError(Throwable t) { + replyFuture.completeExceptionally(t); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + + @Override + public void onCompleted() { + if (!replyFuture.isDone()) { + replyFuture.completeExceptionally( + new IOException("Stream completed but no reply for request " + + request)); + } + } + }); requestObserver.onNext(request); requestObserver.onCompleted(); return replyFuture; } - private void reconnect(DatanodeDetails dn) - throws IOException { - ManagedChannel channel; + private void reconnect() throws IOException { try { - connectToDatanode(dn); - channel = channels.get(dn.getUuid()); + connect(); } catch (Exception e) { LOG.error("Error while connecting: ", e); throw new IOException(e); } - if (channel == null || !isConnected(channel)) { + if (channel == null || !isConnected()) { throw new IOException("This channel is not connected."); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 83b5a4c..d542abc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -27,6 +27,7 @@ import com.google.common.cache.RemovalNotification; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import java.io.Closeable; import java.io.IOException; @@ -58,7 +59,7 @@ public class XceiverClientManager implements Closeable { //TODO : change this to SCM configuration class private final Configuration conf; - private final Cache<String, XceiverClientSpi> clientCache; + private final Cache<PipelineID, XceiverClientSpi> clientCache; private final boolean useRatis; private static XceiverClientMetrics metrics; @@ -82,10 +83,10 @@ public class XceiverClientManager implements Closeable { .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) .maximumSize(maxSize) .removalListener( - new RemovalListener<String, XceiverClientSpi>() { + new RemovalListener<PipelineID, XceiverClientSpi>() { @Override public void onRemoval( - RemovalNotification<String, XceiverClientSpi> + RemovalNotification<PipelineID, XceiverClientSpi> removalNotification) { synchronized (clientCache) { // Mark the entry as evicted @@ -97,7 +98,7 @@ public class XceiverClientManager implements Closeable { } @VisibleForTesting - public Cache<String, XceiverClientSpi> getClientCache() { + public Cache<PipelineID, XceiverClientSpi> getClientCache() { return clientCache; } @@ -139,14 +140,13 @@ public class XceiverClientManager implements Closeable { private XceiverClientSpi getClient(Pipeline pipeline) throws IOException { - HddsProtos.ReplicationType type = pipeline.getType(); try { - return clientCache.get(pipeline.getId().getId().toString() + type, + return clientCache.get(pipeline.getId(), new Callable<XceiverClientSpi>() { @Override public XceiverClientSpi call() throws Exception { XceiverClientSpi client = null; - switch (type) { + switch (pipeline.getType()) { case RATIS: client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf); break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index fda30fb..4efe7ba 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.io.MultipleIOException; -import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; @@ -52,7 +51,6 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -192,10 +190,6 @@ public final class XceiverClientRatis extends XceiverClientSpi { getClient().sendAsync(() -> byteString); } - public void watchForCommit(long index, long timeout) throws Exception { - getClient().sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED) - .get(timeout, TimeUnit.MILLISECONDS); - } /** * Sends a given command to server gets a waitable future back. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index 7fa4c39..49c00a8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -301,10 +301,6 @@ public class Pipeline { return b.toString(); } - public void setType(HddsProtos.ReplicationType type) { - this.type = type; - } - /** * Returns a JSON string of this object. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index 3772c59..3158334 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -22,9 +22,7 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -278,13 +276,8 @@ public class ChunkGroupInputStream extends InputStream implements Seekable { long containerID = blockID.getContainerID(); ContainerWithPipeline containerWithPipeline = storageContainerLocationClient.getContainerWithPipeline(containerID); - Pipeline pipeline = containerWithPipeline.getPipeline(); - - // irrespective of the container state, we will always read via Standalone - // protocol. - pipeline.setType(HddsProtos.ReplicationType.STAND_ALONE); XceiverClientSpi xceiverClient = xceiverClientManager - .acquireClient(pipeline); + .acquireClient(containerWithPipeline.getPipeline()); boolean success = false; containerKey = omKeyLocationInfo.getLocalID(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 5966718..0a38a5a 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -116,10 +116,6 @@ public class ChunkGroupOutputStream extends OutputStream { public List<ChunkOutputStreamEntry> getStreamEntries() { return streamEntries; } - @VisibleForTesting - public XceiverClientManager getXceiverClientManager() { - return xceiverClientManager; - } public List<OmKeyLocationInfo> getLocationInfoList() throws IOException { List<OmKeyLocationInfo> locationInfoList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index bf6a189..302ea46 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -102,7 +102,7 @@ public class TestMiniOzoneCluster { // Verify client is able to connect to the container try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){ client.connect(); - assertTrue(client.isConnected(pipeline.getLeader())); + assertTrue(client.isConnected()); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 1a401d6..b60343a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -24,10 +24,6 @@ import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientRatis; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.*; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -35,7 +31,6 @@ import org.apache.hadoop.ozone.client.*; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; -import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.common.helpers.BlockData; @@ -602,108 +597,6 @@ public class TestOzoneRpcClient { } @Test - public void testPutKeyAndGetKeyThreeNodes() - throws Exception { - String volumeName = UUID.randomUUID().toString(); - String bucketName = UUID.randomUUID().toString(); - - String value = "sample value"; - store.createVolume(volumeName); - OzoneVolume volume = store.getVolume(volumeName); - volume.createBucket(bucketName); - OzoneBucket bucket = volume.getBucket(bucketName); - - String keyName = UUID.randomUUID().toString(); - - OzoneOutputStream out = bucket - .createKey(keyName, value.getBytes().length, ReplicationType.RATIS, - ReplicationFactor.THREE); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) out.getOutputStream(); - XceiverClientManager manager = groupOutputStream.getXceiverClientManager(); - out.write(value.getBytes()); - out.close(); - // First, confirm the key info from the client matches the info in OM. - OmKeyArgs.Builder builder = new OmKeyArgs.Builder(); - builder.setVolumeName(volumeName).setBucketName(bucketName) - .setKeyName(keyName); - OmKeyLocationInfo keyInfo = ozoneManager.lookupKey(builder.build()). - getKeyLocationVersions().get(0).getBlocksLatestVersionOnly().get(0); - long containerID = keyInfo.getContainerID(); - long localID = keyInfo.getLocalID(); - OzoneKeyDetails keyDetails = bucket.getKey(keyName); - Assert.assertEquals(keyName, keyDetails.getName()); - - List<OzoneKeyLocation> keyLocations = keyDetails.getOzoneKeyLocations(); - Assert.assertEquals(1, keyLocations.size()); - Assert.assertEquals(containerID, keyLocations.get(0).getContainerID()); - Assert.assertEquals(localID, keyLocations.get(0).getLocalID()); - - // Make sure that the data size matched. - Assert - .assertEquals(value.getBytes().length, keyLocations.get(0).getLength()); - - ContainerWithPipeline container = - cluster.getStorageContainerManager().getContainerManager() - .getContainerWithPipeline(containerID); - Pipeline pipeline = container.getPipeline(); - List<DatanodeDetails> datanodes = pipeline.getMachines(); - - DatanodeDetails datanodeDetails = datanodes.get(0); - Assert.assertNotNull(datanodeDetails); - - XceiverClientSpi clientSpi = manager.acquireClient(pipeline); - Assert.assertTrue(clientSpi instanceof XceiverClientRatis); - XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi; - - ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000); - // shutdown the datanode - cluster.shutdownHddsDatanode(datanodeDetails); - - Assert.assertTrue(container.getContainerInfo().getState() - == HddsProtos.LifeCycleState.OPEN); - // try to read, this shouls be successful - readKey(bucket, keyName, value); - - Assert.assertTrue(container.getContainerInfo().getState() - == HddsProtos.LifeCycleState.OPEN); - // shutdown the second datanode - datanodeDetails = datanodes.get(1); - cluster.shutdownHddsDatanode(datanodeDetails); - Assert.assertTrue(container.getContainerInfo().getState() - == HddsProtos.LifeCycleState.OPEN); - - // the container is open and with loss of 2 nodes we still should be able - // to read via Standalone protocol - // try to read - readKey(bucket, keyName, value); - - // shutdown the 3rd datanode - datanodeDetails = datanodes.get(2); - cluster.shutdownHddsDatanode(datanodeDetails); - try { - // try to read - readKey(bucket, keyName, value); - Assert.fail("Expected exception not thrown"); - } catch (IOException e) { - Assert.assertTrue(e.getMessage().contains("Failed to execute command")); - Assert.assertTrue( - e.getMessage().contains("on the pipeline " + pipeline.getId())); - } - manager.releaseClient(clientSpi); - } - - private void readKey(OzoneBucket bucket, String keyName, String data) - throws IOException { - OzoneKey key = bucket.getKey(keyName); - Assert.assertEquals(keyName, key.getName()); - OzoneInputStream is = bucket.readKey(keyName); - byte[] fileContent = new byte[data.getBytes().length]; - is.read(fileContent); - is.close(); - } - - @Test public void testGetKeyDetails() throws IOException, OzoneException { String volumeName = UUID.randomUUID().toString(); String bucketName = UUID.randomUUID().toString(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/575613ef/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index 8b35bbb..da445bf 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.scm; import com.google.common.cache.Cache; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -106,7 +107,7 @@ public class TestXceiverClientManager { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); XceiverClientManager clientManager = new XceiverClientManager(conf); - Cache<String, XceiverClientSpi> cache = + Cache<PipelineID, XceiverClientSpi> cache = clientManager.getClientCache(); ContainerWithPipeline container1 = @@ -129,9 +130,8 @@ public class TestXceiverClientManager { Assert.assertNotEquals(client1, client2); // least recent container (i.e containerName1) is evicted - XceiverClientSpi nonExistent1 = cache.getIfPresent( - container1.getContainerInfo().getPipelineID().getId().toString() - + container1.getContainerInfo().getReplicationType()); + XceiverClientSpi nonExistent1 = cache + .getIfPresent(container1.getContainerInfo().getPipelineID()); Assert.assertEquals(null, nonExistent1); // However container call should succeed because of refcount on the client. String traceID1 = "trace" + RandomStringUtils.randomNumeric(4); @@ -160,7 +160,7 @@ public class TestXceiverClientManager { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); XceiverClientManager clientManager = new XceiverClientManager(conf); - Cache<String, XceiverClientSpi> cache = + Cache<PipelineID, XceiverClientSpi> cache = clientManager.getClientCache(); ContainerWithPipeline container1 = @@ -183,9 +183,8 @@ public class TestXceiverClientManager { Assert.assertNotEquals(client1, client2); // now client 1 should be evicted - XceiverClientSpi nonExistent = cache.getIfPresent( - container1.getContainerInfo().getPipelineID().getId().toString() - + container1.getContainerInfo().getReplicationType()); + XceiverClientSpi nonExistent = cache + .getIfPresent(container1.getContainerInfo().getPipelineID()); Assert.assertEquals(null, nonExistent); // Any container operation should now fail --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org