Repository: hadoop Updated Branches: refs/heads/HDFS-7240 6e74039a1 -> 013c36f3c
HDFS-12720. Ozone: Ratis options are not passed from KSM Client protobuf helper correctly. Contributed by Mukul Kumar Singh. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/013c36f3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/013c36f3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/013c36f3 Branch: refs/heads/HDFS-7240 Commit: 013c36f3cc2995506ad108017c3e9d20aef1a0d2 Parents: 6e74039 Author: Mukul Kumar Singh <msi...@apache.org> Authored: Fri Nov 3 09:59:25 2017 +0530 Committer: Mukul Kumar Singh <msi...@apache.org> Committed: Fri Nov 3 09:59:25 2017 +0530 ---------------------------------------------------------------------- .../ozone/client/io/ChunkGroupOutputStream.java | 21 +++- .../hadoop/ozone/client/rpc/RpcClient.java | 2 + .../hadoop/ozone/ksm/helpers/KsmKeyArgs.java | 8 +- ...ceManagerProtocolClientSideTranslatorPB.java | 4 + .../org/apache/hadoop/scm/XceiverClient.java | 27 ++-- .../apache/hadoop/scm/XceiverClientManager.java | 1 + .../apache/hadoop/scm/XceiverClientRatis.java | 22 ++-- .../common/helpers/BlockContainerInfo.java | 8 ++ .../main/java/org/apache/ratis/RatisHelper.java | 17 ++- .../scm/container/ContainerStateManager.java | 15 ++- .../scm/pipelines/ratis/RatisManagerImpl.java | 21 ++-- .../web/storage/DistributedStorageHandler.java | 2 + .../apache/hadoop/ozone/MiniOzoneCluster.java | 29 +++++ .../ozone/client/rpc/TestOzoneRpcClient.java | 124 ++++++++++++++++++- .../apache/hadoop/ozone/tools/TestCorona.java | 34 ++++- 15 files changed, 278 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index b63596f..d0975a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; @@ -100,7 +102,8 @@ public class ChunkGroupOutputStream extends OutputStream { OpenKeySession handler, XceiverClientManager xceiverClientManager, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, KeySpaceManagerProtocolClientSideTranslatorPB ksmClient, - int chunkSize, String requestId) throws IOException { + int chunkSize, String requestId, ReplicationFactor factor, + ReplicationType type) throws IOException { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.byteOffset = 0; @@ -111,6 +114,8 @@ public class ChunkGroupOutputStream extends OutputStream { .setVolumeName(info.getVolumeName()) .setBucketName(info.getBucketName()) .setKeyName(info.getKeyName()) + .setType(type) + .setFactor(factor) .setDataSize(info.getDataSize()).build(); this.openID = handler.getId(); this.xceiverClientManager = xceiverClientManager; @@ -292,6 +297,8 @@ public class ChunkGroupOutputStream extends OutputStream { private KeySpaceManagerProtocolClientSideTranslatorPB ksmClient; private int chunkSize; private String requestID; + private ReplicationType type; + private ReplicationFactor factor; public Builder setHandler(OpenKeySession handler) { this.openHandler = handler; @@ -325,9 +332,19 @@ public class ChunkGroupOutputStream extends OutputStream { return this; } + public Builder setType(ReplicationType type) { + this.type = type; + return this; + } + + public Builder setFactor(ReplicationFactor replicationFactor) { + this.factor = replicationFactor; + return this; + } + public ChunkGroupOutputStream build() throws IOException { return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, - ksmClient, chunkSize, requestID); + ksmClient, chunkSize, requestID, factor, type); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 2604030..0dda10b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -461,6 +461,8 @@ public class RpcClient implements ClientProtocol { .setKsmClient(keySpaceManagerClient) .setChunkSize(chunkSize) .setRequestID(requestId) + .setType(OzoneProtos.ReplicationType.valueOf(type.toString())) + .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue())) .build(); return new OzoneOutputStream(groupOutputStream); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyArgs.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyArgs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyArgs.java index bfc4a56..abeac10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyArgs.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyArgs.java @@ -101,13 +101,13 @@ public final class KsmKeyArgs { return this; } - public Builder setType(ReplicationType type) { - this.type = type; + public Builder setType(ReplicationType replicationType) { + this.type = replicationType; return this; } - public Builder setFactor(ReplicationFactor factor) { - this.factor = factor; + public Builder setFactor(ReplicationFactor replicationFactor) { + this.factor = replicationFactor; return this; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java index 7d3c039..bee3da2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java @@ -520,6 +520,8 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB KeyArgs.Builder keyArgs = KeyArgs.newBuilder() .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) + .setFactor(args.getFactor()) + .setType(args.getType()) .setKeyName(args.getKeyName()); if (args.getDataSize() > 0) { keyArgs.setDataSize(args.getDataSize()); @@ -547,6 +549,8 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) .setKeyName(args.getKeyName()) + .setFactor(args.getFactor()) + .setType(args.getType()) .setDataSize(args.getDataSize()).build(); req.setKeyArgs(keyArgs); req.setClientID(clientID); http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java index 9749331..60f0998 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java @@ -21,7 +21,7 @@ package org.apache.hadoop.scm; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.ChannelFuture; +import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; @@ -39,7 +39,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.List; /** @@ -49,7 +48,7 @@ public class XceiverClient extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); private final Pipeline pipeline; private final Configuration config; - private ChannelFuture channelFuture; + private Channel channel; private Bootstrap b; private EventLoopGroup group; @@ -70,9 +69,7 @@ public class XceiverClient extends XceiverClientSpi { @Override public void connect() throws Exception { - if (channelFuture != null - && channelFuture.channel() != null - && channelFuture.channel().isActive()) { + if (channel != null && channel.isActive()) { throw new IOException("This client is already connected to a host."); } @@ -92,7 +89,7 @@ public class XceiverClient extends XceiverClientSpi { OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); } LOG.debug("Connecting to server Port : " + port); - channelFuture = b.connect(leader.getHostName(), port).sync(); + channel = b.connect(leader.getHostName(), port).sync().channel(); } /** @@ -102,17 +99,13 @@ public class XceiverClient extends XceiverClientSpi { */ @VisibleForTesting public boolean isConnected() { - return channelFuture.channel().isActive(); + return channel.isActive(); } @Override public void close() { if (group != null) { - group.shutdownGracefully(0, 0, TimeUnit.SECONDS); - } - - if (channelFuture != null) { - channelFuture.channel().close(); + group.shutdownGracefully().awaitUninterruptibly(); } } @@ -126,11 +119,11 @@ public class XceiverClient extends XceiverClientSpi { ContainerProtos.ContainerCommandRequestProto request) throws IOException { try { - if ((channelFuture == null) || (!channelFuture.channel().isActive())) { + if ((channel == null) || (!channel.isActive())) { throw new IOException("This channel is not connected."); } XceiverClientHandler handler = - channelFuture.channel().pipeline().get(XceiverClientHandler.class); + channel.pipeline().get(XceiverClientHandler.class); return handler.sendCommand(request); } catch (ExecutionException | InterruptedException e) { @@ -149,11 +142,11 @@ public class XceiverClient extends XceiverClientSpi { public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { - if ((channelFuture == null) || (!channelFuture.channel().isActive())) { + if ((channel == null) || (!channel.isActive())) { throw new IOException("This channel is not connected."); } XceiverClientHandler handler = - channelFuture.channel().pipeline().get(XceiverClientHandler.class); + channel.pipeline().get(XceiverClientHandler.class); return handler.sendCommandAsync(request); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java index af8d19c..161cdce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java @@ -199,6 +199,7 @@ public class XceiverClientManager implements Closeable { */ public OzoneProtos.ReplicationType getType() { // TODO : Fix me and make Ratis default before release. + // TODO: Remove this as replication factor and type are pipeline properties if(isUseRatis()) { return OzoneProtos.ReplicationType.RATIS; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java index 0513b2e..fd82845 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java @@ -28,6 +28,7 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; @@ -37,13 +38,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; /** * An abstract implementation of {@link XceiverClientSpi} using Ratis. @@ -77,11 +76,10 @@ public final class XceiverClientRatis extends XceiverClientSpi { */ public void createPipeline(String clusterId, List<DatanodeID> datanodes) throws IOException { - final List<RaftPeer> newPeers = datanodes.stream() - .map(RatisHelper::toRaftPeer) - .collect(Collectors.toList()); - LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, newPeers); - reinitialize(datanodes, newPeers); + RaftGroup group = RatisHelper.newRaftGroup(datanodes); + LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, + group.getPeers()); + reinitialize(datanodes, group); } /** @@ -94,7 +92,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { } private void reinitialize( - List<DatanodeID> datanodes, Collection<RaftPeer> newPeers) + List<DatanodeID> datanodes, RaftGroup group) throws IOException { if (datanodes.isEmpty()) { return; @@ -103,7 +101,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { IOException exception = null; for (DatanodeID d : datanodes) { try { - reinitialize(d, newPeers); + reinitialize(d, group); } catch (IOException ioe) { if (exception == null) { exception = new IOException( @@ -121,14 +119,14 @@ public final class XceiverClientRatis extends XceiverClientSpi { /** * Adds a new peers to the Ratis Ring. * @param datanode - new datanode - * @param newPeers - Raft machines + * @param group - Raft group * @throws IOException - on Failure. */ - private void reinitialize(DatanodeID datanode, Collection<RaftPeer> newPeers) + private void reinitialize(DatanodeID datanode, RaftGroup group) throws IOException { final RaftPeer p = RatisHelper.toRaftPeer(datanode); try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) { - client.reinitialize(RatisHelper.newRaftGroup(newPeers), p.getId()); + client.reinitialize(group, p.getId()); } catch (IOException ioe) { LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ", p, datanode, ioe); http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java index d0e1423..8e33996 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java @@ -20,6 +20,7 @@ package org.apache.hadoop.scm.container.common.helpers; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.util.Time; import java.io.Serializable; @@ -136,4 +137,11 @@ public class BlockContainerInfo extends ContainerInfo public int compareTo(BlockContainerInfo o) { return this.compare(this, o); } + + public boolean canAllocate(long size, long containerSize) { + //TODO: move container size inside Container Info + return ((getState() == OzoneProtos.LifeCycleState.ALLOCATED || + getState() == OzoneProtos.LifeCycleState.OPEN) && + (getAllocated() + size <= containerSize)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java index 0638b6c..f7830a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java @@ -28,10 +28,15 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.List; +import java.util.Collections; +import java.util.Collection; +import java.util.ArrayList; +import java.util.Arrays; import java.util.stream.Collectors; /** @@ -68,7 +73,8 @@ public interface RatisHelper { /* TODO: use a dummy id for all groups for the moment. * It should be changed to a unique id for each group. */ - RaftGroupId DUMMY_GROUP_ID = RaftGroupId.randomId(); + RaftGroupId DUMMY_GROUP_ID = + RaftGroupId.valueOf(ByteString.copyFromUtf8("AOZONERATISGROUP")); RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID, Collections.emptyList()); @@ -77,6 +83,13 @@ public interface RatisHelper { return EMPTY_GROUP; } + static RaftGroup newRaftGroup(List<DatanodeID> datanodes) { + final List<RaftPeer> newPeers = datanodes.stream() + .map(RatisHelper::toRaftPeer) + .collect(Collectors.toList()); + return RatisHelper.newRaftGroup(newPeers); + } + static RaftGroup newRaftGroup(Collection<RaftPeer> peers) { return peers.isEmpty()? emptyRaftGroup() : new RaftGroup(DUMMY_GROUP_ID, peers); http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java index cd6df9a..55d2d2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java @@ -410,8 +410,7 @@ public class ContainerStateManager { while (iter.hasNext()) { BlockContainerInfo info = iter.next(); - if (info.getAllocated() + size <= this.containerSize) { - + if (info.canAllocate(size, this.containerSize)) { queue.remove(info); info.addAllocated(size); info.setLastUsed(Time.monotonicNow()); @@ -419,10 +418,14 @@ public class ContainerStateManager { return info; } else { - // We should close this container. - LOG.info("Moving {} to containerCloseQueue.", info.toString()); - containerCloseQueue.add(info); - //TODO: Next JIRA will handle these containers to close. + if (info.getState() != LifeCycleState.CLOSED) { + // We should close this container. + LOG.info("Moving {} to containerCloseQueue.", info.toString()); + info.setState(LifeCycleState.CLOSED); + containerCloseQueue.add(info); + //TODO: Next JIRA will handle these containers to close. + //TODO: move container to right queue + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java index 125580e..fb02172 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java @@ -39,8 +39,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos - .LifeCycleState.ALLOCATED; -import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos .LifeCycleState.OPEN; @@ -121,7 +119,12 @@ public class RatisManagerImpl implements PipelineManager { .createPipeline(pipeline.getPipelineName(), pipeline.getMachines()); } } else { - pipeline = findOpenPipeline(); + Pipeline openPipeline = findOpenPipeline(replicationFactor); + if (openPipeline != null) { + // if an open pipeline is found use the same machines + pipeline = allocateRatisPipeline(openPipeline.getMachines(), + containerName, replicationFactor); + } } if (pipeline == null) { LOG.error("Get pipeline call failed. We are not able to find free nodes" + @@ -135,7 +138,7 @@ public class RatisManagerImpl implements PipelineManager { * * @return - Pipeline or null */ - Pipeline findOpenPipeline() { + Pipeline findOpenPipeline(OzoneProtos.ReplicationFactor factor) { Pipeline pipeline = null; final int sentinal = -1; if (activePipelines.size() == 0) { @@ -149,7 +152,7 @@ public class RatisManagerImpl implements PipelineManager { Pipeline temp = activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex); // if we find an operational pipeline just return that. - if (temp.getLifeCycleState() == OPEN) { + if ((temp.getLifeCycleState() == OPEN) && (temp.getFactor() == factor)) { pipeline = temp; break; } @@ -173,7 +176,7 @@ public class RatisManagerImpl implements PipelineManager { String pipelineName = PREFIX + UUID.randomUUID().toString().substring(PREFIX.length()); pipeline.setType(OzoneProtos.ReplicationType.RATIS); - pipeline.setLifeCycleState(ALLOCATED); + pipeline.setLifeCycleState(OPEN); pipeline.setFactor(factor); pipeline.setPipelineName(pipelineName); pipeline.setContainerName(containerName); @@ -210,10 +213,10 @@ public class RatisManagerImpl implements PipelineManager { Preconditions.checkNotNull(datanode); if (!ratisMembers.contains(datanode)) { newNodesList.add(datanode); - // once a datanode has been added to a pipeline, exclude it from - // further allocations - ratisMembers.add(datanode); if (newNodesList.size() == count) { + // once a datanode has been added to a pipeline, exclude it from + // further allocations + ratisMembers.addAll(newNodesList); LOG.info("Allocating a new pipeline of size: {}", count); return newNodesList; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index cacd03c..dbbc01f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -416,6 +416,8 @@ public final class DistributedStorageHandler implements StorageHandler { .setKsmClient(keySpaceManagerClient) .setChunkSize(chunkSize) .setRequestID(args.getRequestID()) + .setType(xceiverClientManager.getType()) + .setFactor(xceiverClientManager.getFactor()) .build(); return new OzoneOutputStream(groupOutputStream); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 628d0eb..8a4a819 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -26,13 +26,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.container.common .statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.ksm.KSMConfigKeys; import org.apache.hadoop.ozone.ksm.KeySpaceManager; +import org.apache.hadoop.ozone.ksm.protocolPB + .KeySpaceManagerProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB; import org.apache.hadoop.ozone.web.client.OzoneRestClient; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.protocolPB @@ -253,6 +258,30 @@ public final class MiniOzoneCluster extends MiniDFSCluster } /** + * Creates an RPC proxy connected to this cluster's KeySpaceManager + * for accessing Key Space Manager information. Callers take ownership of + * the proxy and must close it when done. + * + * @return RPC proxy for accessing Key Space Manager information + * @throws IOException if there is an I/O error + */ + public KeySpaceManagerProtocolClientSideTranslatorPB + createKeySpaceManagerClient() throws IOException { + long ksmVersion = RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class); + InetSocketAddress ksmAddress = OzoneClientUtils + .getKsmAddressForClients(conf); + LOG.info("Creating KeySpaceManager RPC client with address {}", + ksmAddress); + RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class, + ProtobufRpcEngine.class); + return new KeySpaceManagerProtocolClientSideTranslatorPB( + RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion, + ksmAddress, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), + Client.getRpcTimeout(conf))); + } + + /** * Waits for the Ozone cluster to be ready for processing requests. */ public void waitOzoneReady() throws TimeoutException, InterruptedException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index ad1bcad..2b99841 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -38,7 +38,16 @@ import org.apache.hadoop.ozone.client.ReplicationType; import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; +import org.apache.hadoop.ozone.ksm.protocolPB. + KeySpaceManagerProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocolPB. + StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.util.Time; import org.junit.AfterClass; import org.junit.Assert; @@ -64,6 +73,10 @@ public class TestOzoneRpcClient { private static MiniOzoneCluster cluster = null; private static OzoneClient ozClient = null; private static ObjectStore store = null; + private static KeySpaceManagerProtocolClientSideTranslatorPB + keySpaceManagerClient; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; /** * Create a MiniOzoneCluster for testing. @@ -78,13 +91,16 @@ public class TestOzoneRpcClient { OzoneConfiguration conf = new OzoneConfiguration(); conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, OzoneConsts.OZONE_HANDLER_DISTRIBUTED); - cluster = new MiniOzoneCluster.Builder(conf) + cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(5) .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); conf.set("ozone.client.protocol", "org.apache.hadoop.ozone.client.rpc.RpcClient"); OzoneClientFactory.setConfiguration(conf); ozClient = OzoneClientFactory.getClient(); store = ozClient.getObjectStore(); + storageContainerLocationClient = + cluster.createStorageContainerLocationClient(); + keySpaceManagerClient = cluster.createKeySpaceManagerClient(); } @Test @@ -360,6 +376,29 @@ public class TestOzoneRpcClient { volume.getBucket(bucketName); } + private boolean verifyRatisReplication(String volumeName, String bucketName, + String keyName, ReplicationType type, ReplicationFactor factor) + throws IOException { + KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .build(); + OzoneProtos.ReplicationType replicationType = + OzoneProtos.ReplicationType.valueOf(type.toString()); + OzoneProtos.ReplicationFactor replicationFactor = + OzoneProtos.ReplicationFactor.valueOf(factor.getValue()); + KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs); + for (KsmKeyLocationInfo info: keyInfo.getKeyLocationList()) { + Pipeline pipeline = + storageContainerLocationClient.getContainer(info.getContainerName()); + if ((pipeline.getFactor() != replicationFactor) || + (pipeline.getType() != replicationType)) { + return false; + } + } + return true; + } @Test public void testPutKey() @@ -387,6 +426,80 @@ public class TestOzoneRpcClient { OzoneInputStream is = bucket.readKey(keyName); byte[] fileContent = new byte[value.getBytes().length]; is.read(fileContent); + Assert.assertTrue(verifyRatisReplication(volumeName, bucketName, + keyName, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE)); + Assert.assertEquals(value, new String(fileContent)); + Assert.assertTrue(key.getCreationTime() >= currentTime); + Assert.assertTrue(key.getModificationTime() >= currentTime); + } + } + + @Test + public void testPutKeyRatisOneNode() + throws IOException, OzoneException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + long currentTime = Time.now(); + + String value = "sample value"; + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + for (int i = 0; i < 10; i++) { + String keyName = UUID.randomUUID().toString(); + + OzoneOutputStream out = bucket.createKey(keyName, + value.getBytes().length, ReplicationType.RATIS, + ReplicationFactor.ONE); + out.write(value.getBytes()); + out.close(); + OzoneKey key = bucket.getKey(keyName); + Assert.assertEquals(keyName, key.getName()); + OzoneInputStream is = bucket.readKey(keyName); + byte[] fileContent = new byte[value.getBytes().length]; + is.read(fileContent); + is.close(); + Assert.assertTrue(verifyRatisReplication(volumeName, bucketName, + keyName, ReplicationType.RATIS, ReplicationFactor.ONE)); + Assert.assertEquals(value, new String(fileContent)); + Assert.assertTrue(key.getCreationTime() >= currentTime); + Assert.assertTrue(key.getModificationTime() >= currentTime); + } + } + + @Test + public void testPutKeyRatisThreeNodes() + throws IOException, OzoneException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + long currentTime = Time.now(); + + String value = "sample value"; + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + for (int i = 0; i < 10; i++) { + String keyName = UUID.randomUUID().toString(); + + OzoneOutputStream out = bucket.createKey(keyName, + value.getBytes().length, ReplicationType.RATIS, + ReplicationFactor.THREE); + out.write(value.getBytes()); + out.close(); + OzoneKey key = bucket.getKey(keyName); + Assert.assertEquals(keyName, key.getName()); + OzoneInputStream is = bucket.readKey(keyName); + byte[] fileContent = new byte[value.getBytes().length]; + is.read(fileContent); + is.close(); + Assert.assertTrue(verifyRatisReplication(volumeName, bucketName, + keyName, ReplicationType.RATIS, + ReplicationFactor.THREE)); Assert.assertEquals(value, new String(fileContent)); Assert.assertTrue(key.getCreationTime() >= currentTime); Assert.assertTrue(key.getModificationTime() >= currentTime); @@ -691,6 +804,15 @@ public class TestOzoneRpcClient { if(ozClient != null) { ozClient.close(); } + + if (storageContainerLocationClient != null) { + storageContainerLocationClient.close(); + } + + if (keySpaceManagerClient != null) { + keySpaceManagerClient.close(); + } + if (cluster != null) { cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013c36f3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java index 43f2af7..2ca7f8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java @@ -117,22 +117,48 @@ public class TestCorona { } @Test - public void ratisTest() throws Exception { + public void multiThread() throws Exception { List<String> args = new ArrayList<>(); args.add("-numOfVolumes"); + args.add("10"); + args.add("-numOfBuckets"); args.add("1"); + args.add("-numOfKeys"); + args.add("10"); + args.add("-numOfThread"); + args.add("10"); + args.add("-keySize"); + args.add("10240"); + Corona corona = new Corona(conf); + int res = ToolRunner.run(conf, corona, + args.toArray(new String[0])); + Assert.assertEquals(10, corona.getNumberOfVolumesCreated()); + Assert.assertEquals(10, corona.getNumberOfBucketsCreated()); + Assert.assertEquals(100, corona.getNumberOfKeysAdded()); + Assert.assertEquals(0, res); + } + + @Test + public void ratisTest3() throws Exception { + List<String> args = new ArrayList<>(); + args.add("-numOfVolumes"); + args.add("10"); args.add("-numOfBuckets"); args.add("1"); args.add("-numOfKeys"); args.add("10"); args.add("-ratis"); args.add("3"); + args.add("-numOfThread"); + args.add("10"); + args.add("-keySize"); + args.add("10240"); Corona corona = new Corona(conf); int res = ToolRunner.run(conf, corona, args.toArray(new String[0])); - Assert.assertEquals(1, corona.getNumberOfVolumesCreated()); - Assert.assertEquals(1, corona.getNumberOfBucketsCreated()); - Assert.assertEquals(10, corona.getNumberOfKeysAdded()); + Assert.assertEquals(10, corona.getNumberOfVolumesCreated()); + Assert.assertEquals(10, corona.getNumberOfBucketsCreated()); + Assert.assertEquals(100, corona.getNumberOfKeysAdded()); Assert.assertEquals(0, res); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org