This is an automated email from the ASF dual-hosted git repository. sammichen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push: new 8fab5f2 HDDS-2922. Balance ratis leader distribution in datanodes (#1371) 8fab5f2 is described below commit 8fab5f21c5e9bcfb1ce9b55a4179b8c958a53b25 Author: runzhiwang <51938049+runzhiw...@users.noreply.github.com> AuthorDate: Mon Oct 19 11:44:10 2020 +0800 HDDS-2922. Balance ratis leader distribution in datanodes (#1371) --- .../org/apache/hadoop/hdds/ratis/RatisHelper.java | 19 ++ .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 3 + .../hadoop/hdds/scm/exceptions/SCMException.java | 3 +- .../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 42 +++- .../common/src/main/resources/ozone-default.xml | 18 ++ .../CreatePipelineCommandHandler.java | 10 +- .../common/transport/server/XceiverServerSpi.java | 10 +- .../transport/server/ratis/XceiverServerRatis.java | 27 ++- .../protocol/commands/CreatePipelineCommand.java | 46 ++++- .../TestCreatePipelineCommandHandler.java | 7 +- .../interface-client/src/main/proto/hdds.proto | 1 + .../proto/ScmServerDatanodeHeartbeatProtocol.proto | 1 + .../src/main/proto/ScmServerProtocol.proto | 1 + .../hdds/scm/pipeline/PipelineStateManager.java | 3 +- .../hdds/scm/pipeline/RatisPipelineProvider.java | 29 ++- .../algorithms/DefaultLeaderChoosePolicy.java | 42 ++++ .../choose/algorithms/LeaderChoosePolicy.java | 55 ++++++ .../algorithms/LeaderChoosePolicyFactory.java | 75 +++++++ .../algorithms/MinLeaderCountChoosePolicy.java | 91 +++++++++ .../leader/choose/algorithms/package-info.java | 19 ++ .../choose/algorithms/TestLeaderChoosePolicy.java | 74 +++++++ .../hdds/scm/pipeline/TestLeaderChoosePolicy.java | 216 +++++++++++++++++++++ .../TestRatisPipelineCreateAndDestroy.java | 2 +- .../apache/hadoop/ozone/TestMiniOzoneCluster.java | 4 +- .../ozone/om/TestOzoneManagerRocksDBLogging.java | 2 +- 25 files changed, 778 insertions(+), 22 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index 8325f09..324774d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.ratis; import java.io.IOException; import java.security.cert.X509Certificate; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -106,6 +107,11 @@ public final class RatisHelper { return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id)); } + public static RaftPeer toRaftPeer(DatanodeDetails id, int priority) { + return new RaftPeer( + toRaftPeerId(id), toRaftPeerAddressString(id), priority); + } + private static List<RaftPeer> toRaftPeers(Pipeline pipeline) { return toRaftPeers(pipeline.getNodes()); } @@ -126,6 +132,19 @@ public final class RatisHelper { } public static RaftGroup newRaftGroup(RaftGroupId groupId, + List<DatanodeDetails> peers, List<Integer> priorityList) { + assert peers.size() == priorityList.size(); + + final List<RaftPeer> newPeers = new ArrayList<>(); + for (int i = 0; i < peers.size(); i++) { + RaftPeer peer = RatisHelper.toRaftPeer(peers.get(i), priorityList.get(i)); + newPeers.add(peer); + } + return peers.isEmpty() ? RaftGroup.valueOf(groupId, Collections.emptyList()) + : RaftGroup.valueOf(groupId, newPeers); + } + + public static RaftGroup newRaftGroup(RaftGroupId groupId, Collection<DatanodeDetails> peers) { final List<RaftPeer> newPeers = peers.stream() .map(RatisHelper::toRaftPeer) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 540d2c0..7b01e07 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -321,6 +321,9 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT = "ozone.scm.pipeline.allocated.timeout"; + public static final String OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY = + "ozone.scm.pipeline.leader-choose.policy"; + public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT = "5m"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java index 0146eae..48a8e05 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java @@ -123,6 +123,7 @@ public class SCMException extends IOException { FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY, FAILED_TO_ALLOCATE_ENOUGH_BLOCKS, INTERNAL_ERROR, - FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY + FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY, + FAILED_TO_INIT_LEADER_CHOOSE_POLICY } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index d5c1024..a4787ef 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -61,6 +61,8 @@ public final class Pipeline { private UUID leaderId; // Timestamp for pipeline upon creation private Instant creationTimestamp; + // suggested leader id with high priority + private final UUID suggestedLeaderId; /** * The immutable properties of pipeline object is used in @@ -69,13 +71,14 @@ public final class Pipeline { */ private Pipeline(PipelineID id, ReplicationType type, ReplicationFactor factor, PipelineState state, - Map<DatanodeDetails, Long> nodeStatus) { + Map<DatanodeDetails, Long> nodeStatus, UUID suggestedLeaderId) { this.id = id; this.type = type; this.factor = factor; this.state = state; this.nodeStatus = nodeStatus; this.creationTimestamp = Instant.now(); + this.suggestedLeaderId = suggestedLeaderId; } /** @@ -124,6 +127,16 @@ public final class Pipeline { } /** + * Return the suggested leaderId which has a high priority among DNs of the + * pipeline. + * + * @return Suggested LeaderId + */ + public UUID getSuggestedLeaderId() { + return suggestedLeaderId; + } + + /** * Set the creation timestamp. Only for protobuf now. * * @param creationTimestamp @@ -278,6 +291,14 @@ public final class Pipeline { builder.setLeaderID128(uuid128); } + if (suggestedLeaderId != null) { + HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder() + .setMostSigBits(suggestedLeaderId.getMostSignificantBits()) + .setLeastSigBits(suggestedLeaderId.getLeastSignificantBits()) + .build(); + builder.setSuggestedLeaderID(uuid128); + } + // To save the message size on wire, only transfer the node order based on // network topology List<DatanodeDetails> nodes = nodesInOrder.get(); @@ -315,12 +336,20 @@ public final class Pipeline { leaderId = UUID.fromString(pipeline.getLeaderID()); } + UUID suggestedLeaderId = null; + if (pipeline.hasSuggestedLeaderID()) { + HddsProtos.UUID uuid = pipeline.getSuggestedLeaderID(); + suggestedLeaderId = + new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); + } + return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId())) .setFactor(pipeline.getFactor()) .setType(pipeline.getType()) .setState(PipelineState.fromProtobuf(pipeline.getState())) .setNodes(nodes) .setLeaderId(leaderId) + .setSuggestedLeaderId(suggestedLeaderId) .setNodesInOrder(pipeline.getMemberOrdersList()) .setCreateTimestamp(pipeline.getCreationTimeStamp()) .build(); @@ -392,6 +421,7 @@ public final class Pipeline { private List<DatanodeDetails> nodesInOrder = null; private UUID leaderId = null; private Instant creationTimestamp = null; + private UUID suggestedLeaderId = null; public Builder() {} @@ -404,6 +434,7 @@ public final class Pipeline { this.nodesInOrder = pipeline.nodesInOrder.get(); this.leaderId = pipeline.getLeaderId(); this.creationTimestamp = pipeline.getCreationTimestamp(); + this.suggestedLeaderId = pipeline.getSuggestedLeaderId(); } public Builder setId(PipelineID id1) { @@ -447,13 +478,19 @@ public final class Pipeline { return this; } + public Builder setSuggestedLeaderId(UUID uuid) { + this.suggestedLeaderId = uuid; + return this; + } + public Pipeline build() { Preconditions.checkNotNull(id); Preconditions.checkNotNull(type); Preconditions.checkNotNull(factor); Preconditions.checkNotNull(state); Preconditions.checkNotNull(nodeStatus); - Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus); + Pipeline pipeline = + new Pipeline(id, type, factor, state, nodeStatus, suggestedLeaderId); pipeline.setLeaderId(leaderId); // overwrite with original creationTimestamp if (creationTimestamp != null) { @@ -484,6 +521,7 @@ public final class Pipeline { // This branch is for pipeline clone pipeline.setNodesInOrder(nodesInOrder); } + return pipeline; } } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 026c0a8..4853978 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -858,6 +858,24 @@ </description> </property> <property> + <name>ozone.scm.pipeline.leader-choose.policy</name> + <value> + org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.DefaultLeaderChoosePolicy + </value> + <tag>OZONE, SCM, PIPELINE</tag> + <description> + The policy used for choosing desired leader for pipeline creation. + There are two policies supporting now: DefaultLeaderChoosePolicy, MinLeaderCountChoosePolicy. + org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.DefaultLeaderChoosePolicy + implements a policy that choose leader without depending on priority. + org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.MinLeaderCountChoosePolicy + implements a policy that choose leader which has the minimum exist leader count. + In the future, we need to add policies which consider: + 1. resource, the datanode with the most abundant cpu and memory can be made the leader + 2. topology, the datanode nearest to the client can be made the leader + </description> + </property> + <property> <name>ozone.scm.container.size</name> <value>5GB</value> <tag>OZONE, PERFORMANCE, MANAGEMENT</tag> diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java index e435f7b..8f41fe9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java @@ -17,8 +17,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import java.io.IOException; -import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; +import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -82,18 +82,20 @@ public class CreatePipelineCommandHandler implements CommandHandler { final CreatePipelineCommandProto createCommand = ((CreatePipelineCommand)command).getProto(); final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID(); - final Collection<DatanodeDetails> peers = + final List<DatanodeDetails> peers = createCommand.getDatanodeList().stream() .map(DatanodeDetails::getFromProtoBuf) .collect(Collectors.toList()); + final List<Integer> priorityList = createCommand.getPriorityList(); try { XceiverServerSpi server = ozoneContainer.getWriteChannel(); if (!server.isExist(pipelineID)) { final RaftGroupId groupId = RaftGroupId.valueOf( PipelineID.getFromProtobuf(pipelineID).getId()); - final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers); - server.addGroup(pipelineID, peers); + final RaftGroup group = + RatisHelper.newRaftGroup(groupId, peers, priorityList); + server.addGroup(pipelineID, peers, priorityList); peers.stream().filter( d -> !d.getUuid().equals(dn.getUuid())) .forEach(d -> { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index d8dfefd..4805612 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto; import java.io.IOException; -import java.util.Collection; import java.util.List; /** A server endpoint that acts as the communication layer for Ozone @@ -68,9 +67,16 @@ public interface XceiverServerSpi { * Join a new pipeline. */ default void addGroup(HddsProtos.PipelineID pipelineId, - Collection<DatanodeDetails> peers) throws IOException { + List<DatanodeDetails> peers) throws IOException { } + /** + * Join a new pipeline with priority. + */ + default void addGroup(HddsProtos.PipelineID pipelineId, + List<DatanodeDetails> peers, + List<Integer> priorityList) throws IOException { + } /** * Exit a pipeline. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 573e681..0b0756c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -115,6 +116,9 @@ public final class XceiverServerRatis implements XceiverServerSpi { private static final Logger LOG = LoggerFactory .getLogger(XceiverServerRatis.class); private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); + private static final List<Integer> DEFAULT_PRIORITY_LIST = + new ArrayList<>( + Collections.nCopies(HddsProtos.ReplicationFactor.THREE_VALUE, 0)); private static long nextCallId() { return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; @@ -711,10 +715,23 @@ public final class XceiverServerRatis implements XceiverServerSpi { @Override public void addGroup(HddsProtos.PipelineID pipelineId, - Collection<DatanodeDetails> peers) throws IOException { + List<DatanodeDetails> peers) throws IOException { + if (peers.size() == getDefaultPriorityList().size()) { + addGroup(pipelineId, peers, getDefaultPriorityList()); + } else { + addGroup(pipelineId, peers, + new ArrayList<>(Collections.nCopies(peers.size(), 0))); + } + } + + @Override + public void addGroup(HddsProtos.PipelineID pipelineId, + List<DatanodeDetails> peers, + List<Integer> priorityList) throws IOException { final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId); final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId()); - final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers); + final RaftGroup group = + RatisHelper.newRaftGroup(groupId, peers, priorityList); GroupManagementRequest request = GroupManagementRequest.newAdd( clientId, server.getId(), nextCallId(), group); @@ -864,4 +881,10 @@ public final class XceiverServerRatis implements XceiverServerSpi { return ImmutableList.copyOf(executors); } + /** + * @return list of default priority + */ + public static List<Integer> getDefaultPriorityList() { + return DEFAULT_PRIORITY_LIST; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java index 9e22cbc..6fdb4ce 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java @@ -25,7 +25,10 @@ import org.apache.hadoop.hdds.protocol.proto. import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -35,10 +38,14 @@ import java.util.stream.Collectors; public class CreatePipelineCommand extends SCMCommand<CreatePipelineCommandProto> { + private static final Integer HIGH_PRIORITY = 1; + private static final Integer LOW_PRIORITY = 0; + private final PipelineID pipelineID; private final ReplicationFactor factor; private final ReplicationType type; private final List<DatanodeDetails> nodelist; + private final List<Integer> priorityList; public CreatePipelineCommand(final PipelineID pipelineID, final ReplicationType type, final ReplicationFactor factor, @@ -48,16 +55,49 @@ public class CreatePipelineCommand this.factor = factor; this.type = type; this.nodelist = datanodeList; + if (datanodeList.size() == + XceiverServerRatis.getDefaultPriorityList().size()) { + this.priorityList = XceiverServerRatis.getDefaultPriorityList(); + } else { + this.priorityList = + new ArrayList<>(Collections.nCopies(datanodeList.size(), 0)); + } + } + + public CreatePipelineCommand(final PipelineID pipelineID, + final ReplicationType type, final ReplicationFactor factor, + final List<DatanodeDetails> datanodeList, + final DatanodeDetails suggestedLeader) { + super(); + this.pipelineID = pipelineID; + this.factor = factor; + this.type = type; + this.nodelist = datanodeList; + this.priorityList = new ArrayList<>(); + initPriorityList(datanodeList, suggestedLeader); + } + + private void initPriorityList( + List<DatanodeDetails> dns, DatanodeDetails suggestedLeader) { + for (DatanodeDetails dn : dns) { + if (dn.equals(suggestedLeader)) { + priorityList.add(HIGH_PRIORITY); + } else { + priorityList.add(LOW_PRIORITY); + } + } } public CreatePipelineCommand(long cmdId, final PipelineID pipelineID, final ReplicationType type, final ReplicationFactor factor, - final List<DatanodeDetails> datanodeList) { + final List<DatanodeDetails> datanodeList, + final List<Integer> priorityList) { super(cmdId); this.pipelineID = pipelineID; this.factor = factor; this.type = type; this.nodelist = datanodeList; + this.priorityList = priorityList; } /** @@ -80,6 +120,7 @@ public class CreatePipelineCommand .addAllDatanode(nodelist.stream() .map(DatanodeDetails::getProtoBufMessage) .collect(Collectors.toList())) + .addAllPriority(priorityList) .build(); } @@ -91,7 +132,8 @@ public class CreatePipelineCommand createPipelineProto.getType(), createPipelineProto.getFactor(), createPipelineProto.getDatanodeList().stream() .map(DatanodeDetails::getFromProtoBuf) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + createPipelineProto.getPriorityList()); } public PipelineID getPipelineID() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java index 8ee6ac7..ede0b94 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java @@ -48,7 +48,9 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; /** @@ -113,8 +115,11 @@ public class TestCreatePipelineCommandHandler { commandHandler.handle(command, ozoneContainer, stateContext, connectionManager); + List<Integer> priorityList = + new ArrayList<>(Collections.nCopies(datanodes.size(), 0)); + Mockito.verify(writeChanel, Mockito.times(1)) - .addGroup(pipelineID.getProtobuf(), datanodes); + .addGroup(pipelineID.getProtobuf(), datanodes, priorityList); Mockito.verify(raftClient, Mockito.times(2)) .groupAdd(Mockito.any(RaftGroup.class), Mockito.any(RaftPeerId.class)); diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 0c9b261..b43a74c 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -100,6 +100,7 @@ message Pipeline { optional string leaderID = 6; repeated uint32 memberOrders = 7; optional uint64 creationTimeStamp = 8; + optional UUID suggestedLeaderID = 9; // TODO(runzhiwang): when leaderID is gone, specify 6 as the index of leaderID128 optional UUID leaderID128 = 100; } diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 1dc4bcd..4f610ff 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -380,6 +380,7 @@ message CreatePipelineCommandProto { required ReplicationFactor factor = 3; repeated DatanodeDetailsProto datanode = 4; required int64 cmdId = 5; + repeated int32 priority = 6; } /** diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto index 682d4d9..7d59bd7 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto @@ -115,6 +115,7 @@ enum Status { FAILED_TO_ALLOCATE_ENOUGH_BLOCKS = 27; INTERNAL_ERROR = 29; FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY = 30; + FAILED_TO_INIT_LEADER_CHOOSE_POLICY = 31; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index bb56a03..8bc5bd5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -59,7 +59,8 @@ public class PipelineStateManager { pipelineStateMap.addContainerToPipeline(pipelineId, containerID); } - Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException { + public Pipeline getPipeline(PipelineID pipelineID) + throws PipelineNotFoundException { return pipelineStateMap.getPipeline(pipelineID); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index e39f141..830db18 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -31,11 +31,14 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; +import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicy; +import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicyFactory; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; +import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +55,10 @@ public class RatisPipelineProvider extends PipelineProvider { private final PipelinePlacementPolicy placementPolicy; private int pipelineNumberLimit; private int maxPipelinePerDatanode; + private final LeaderChoosePolicy leaderChoosePolicy; - RatisPipelineProvider(NodeManager nodeManager, + @VisibleForTesting + public RatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, ConfigurationSource conf, EventPublisher eventPublisher) { super(nodeManager, stateManager); @@ -67,6 +72,12 @@ public class RatisPipelineProvider extends PipelineProvider { String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); this.maxPipelinePerDatanode = dnLimit == null ? 0 : Integer.parseInt(dnLimit); + try { + leaderChoosePolicy = LeaderChoosePolicyFactory + .getPolicy(conf, nodeManager, stateManager); + } catch (Exception e) { + throw new RuntimeException(e); + } } private boolean exceedPipelineNumberLimit(ReplicationFactor factor) { @@ -98,8 +109,14 @@ public class RatisPipelineProvider extends PipelineProvider { return false; } + @VisibleForTesting + public LeaderChoosePolicy getLeaderChoosePolicy() { + return leaderChoosePolicy; + } + @Override - public Pipeline create(ReplicationFactor factor) throws IOException { + public synchronized Pipeline create(ReplicationFactor factor) + throws IOException { if (exceedPipelineNumberLimit(factor)) { throw new SCMException("Ratis pipeline number meets the limit: " + pipelineNumberLimit + " factor : " + @@ -121,16 +138,22 @@ public class RatisPipelineProvider extends PipelineProvider { throw new IllegalStateException("Unknown factor: " + factor.name()); } + DatanodeDetails suggestedLeader = leaderChoosePolicy.chooseLeader(dns); + Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) .setState(PipelineState.ALLOCATED) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(dns) + .setSuggestedLeaderId( + suggestedLeader != null ? suggestedLeader.getUuid() : null) .build(); // Send command to datanodes to create pipeline - final CreatePipelineCommand createCommand = + final CreatePipelineCommand createCommand = suggestedLeader != null ? + new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), + factor, dns, suggestedLeader) : new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), factor, dns); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java new file mode 100644 index 0000000..415cf10 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; + +import java.util.List; + +/** + * The default leader choose policy. + * Do not choose leader here, so that all nodes have the same priority + * and ratis elects leader without depending on priority. + */ +public class DefaultLeaderChoosePolicy extends LeaderChoosePolicy { + + public DefaultLeaderChoosePolicy( + NodeManager nodeManager, PipelineStateManager pipelineStateManager) { + super(nodeManager, pipelineStateManager); + } + + @Override + public DatanodeDetails chooseLeader(List<DatanodeDetails> dns) { + return null; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java new file mode 100644 index 0000000..04c155b --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; + +import java.util.List; + +/** + * A {@link LeaderChoosePolicy} support choosing leader from datanode list. + */ +public abstract class LeaderChoosePolicy { + + private final NodeManager nodeManager; + private final PipelineStateManager pipelineStateManager; + + public LeaderChoosePolicy( + NodeManager nodeManager, PipelineStateManager pipelineStateManager) { + this.nodeManager = nodeManager; + this.pipelineStateManager = pipelineStateManager; + } + + /** + * Given an initial list of datanodes, return one of the datanodes. + * + * @param dns list of datanodes. + * @return one of the datanodes. + */ + public abstract DatanodeDetails chooseLeader(List<DatanodeDetails> dns); + + protected NodeManager getNodeManager() { + return nodeManager; + } + + protected PipelineStateManager getPipelineStateManager() { + return pipelineStateManager; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java new file mode 100644 index 0000000..8e1a0ff --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; + +/** + * A factory to create leader choose policy instance based on configuration + * property {@link ScmConfigKeys#OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY}. + */ +public final class LeaderChoosePolicyFactory { + private static final Logger LOG = + LoggerFactory.getLogger(LeaderChoosePolicyFactory.class); + + private static final Class<? extends LeaderChoosePolicy> + OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY_DEFAULT = + MinLeaderCountChoosePolicy.class; + + private LeaderChoosePolicyFactory() { + } + + + public static LeaderChoosePolicy getPolicy( + ConfigurationSource conf, final NodeManager nodeManager, + final PipelineStateManager pipelineStateManager) throws SCMException { + final Class<? extends LeaderChoosePolicy> policyClass = conf + .getClass(ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY_DEFAULT, + LeaderChoosePolicy.class); + Constructor<? extends LeaderChoosePolicy> constructor; + try { + constructor = policyClass.getDeclaredConstructor(NodeManager.class, + PipelineStateManager.class); + LOG.info("Create leader choose policy of type {}", + policyClass.getCanonicalName()); + } catch (NoSuchMethodException e) { + String msg = "Failed to find constructor(NodeManager, " + + "PipelineStateManager) for class " + + policyClass.getCanonicalName(); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_INIT_LEADER_CHOOSE_POLICY); + } + + try { + return constructor.newInstance(nodeManager, pipelineStateManager); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate class " + + policyClass.getCanonicalName() + " for " + e.getMessage()); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java new file mode 100644 index 0000000..d4068b9 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * The minimum leader count choose policy that chooses leader + * which has the minimum exist leader count. + */ +public class MinLeaderCountChoosePolicy extends LeaderChoosePolicy { + + private static final Logger LOG = + LoggerFactory.getLogger(MinLeaderCountChoosePolicy.class); + + public MinLeaderCountChoosePolicy( + NodeManager nodeManager, PipelineStateManager pipelineStateManager) { + super(nodeManager, pipelineStateManager); + } + + @Override + public DatanodeDetails chooseLeader(List<DatanodeDetails> dns) { + Map<DatanodeDetails, Integer> suggestedLeaderCount = + getSuggestedLeaderCount( + dns, getNodeManager(), getPipelineStateManager()); + int minLeaderCount = Integer.MAX_VALUE; + DatanodeDetails suggestedLeader = null; + + for (Map.Entry<DatanodeDetails, Integer> entry : + suggestedLeaderCount.entrySet()) { + if (entry.getValue() < minLeaderCount) { + minLeaderCount = entry.getValue(); + suggestedLeader = entry.getKey(); + } + } + + return suggestedLeader; + } + + private Map<DatanodeDetails, Integer> getSuggestedLeaderCount( + List<DatanodeDetails> dns, NodeManager nodeManager, + PipelineStateManager pipelineStateManager) { + Map<DatanodeDetails, Integer> suggestedLeaderCount = new HashMap<>(); + for (DatanodeDetails dn : dns) { + suggestedLeaderCount.put(dn, 0); + + Set<PipelineID> pipelineIDSet = nodeManager.getPipelines(dn); + for (PipelineID pipelineID : pipelineIDSet) { + try { + Pipeline pipeline = pipelineStateManager.getPipeline(pipelineID); + if (!pipeline.isClosed() + && dn.getUuid().equals(pipeline.getSuggestedLeaderId())) { + suggestedLeaderCount.put(dn, suggestedLeaderCount.get(dn) + 1); + } + } catch (PipelineNotFoundException e) { + LOG.debug("Pipeline not found in pipeline state manager : {}", + pipelineID, e); + } + } + } + + return suggestedLeaderCount; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/package-info.java new file mode 100644 index 0000000..e29369a --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; +// Various leader choosing algorithms. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java new file mode 100644 index 0000000..53905e7 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; +import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineProvider; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +/** + * Unit tests for {@link LeaderChoosePolicy}. + */ +public class TestLeaderChoosePolicy { + private OzoneConfiguration conf; + + private ScmConfig scmConfig; + + @Before + public void setup() { + //initialize network topology instance + conf = new OzoneConfiguration(); + scmConfig = conf.getObject(ScmConfig.class); + } + + @Test + public void testDefaultPolicy() { + RatisPipelineProvider ratisPipelineProvider = new RatisPipelineProvider( + mock(NodeManager.class), + mock(PipelineStateManager.class), + conf, + mock(EventPublisher.class)); + Assert.assertSame( + ratisPipelineProvider.getLeaderChoosePolicy().getClass(), + DefaultLeaderChoosePolicy.class); + } + + @Test(expected = RuntimeException.class) + public void testClassNotImplemented() { + // set a class not implemented + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" + + ".HelloWorld"); + new RatisPipelineProvider( + mock(NodeManager.class), + mock(PipelineStateManager.class), + conf, + mock(EventPublisher.class)); + + // expecting exception + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestLeaderChoosePolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestLeaderChoosePolicy.java new file mode 100644 index 0000000..ecf1c2f --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestLeaderChoosePolicy.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; + +/** + * Tests for LeaderChoosePolicy. + */ +@Ignore +public class TestLeaderChoosePolicy { + + private static MiniOzoneCluster cluster; + private OzoneConfiguration conf = new OzoneConfiguration(); + private static PipelineManager pipelineManager; + + public void init(int numDatanodes, int datanodePipelineLimit) + throws Exception { + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, + GenericTestUtils.getRandomizedTempPath()); + conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, datanodePipelineLimit); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(numDatanodes) + .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3) + .setHbInterval(2000) + .setHbProcessorInterval(1000) + .build(); + cluster.waitForClusterToBeReady(); + StorageContainerManager scm = cluster.getStorageContainerManager(); + pipelineManager = scm.getPipelineManager(); + } + + @After + public void cleanup() { + cluster.shutdown(); + } + + private void checkLeaderBalance(int dnNum, int leaderNumOfEachDn) + throws Exception { + List<Pipeline> pipelines = pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN); + + for (Pipeline pipeline : pipelines) { + LambdaTestUtils.await(30000, 500, () -> + pipeline.getLeaderId().equals(pipeline.getSuggestedLeaderId())); + } + + Map<UUID, Integer> leaderCount = new HashMap<>(); + for (Pipeline pipeline : pipelines) { + UUID leader = pipeline.getLeaderId(); + if (!leaderCount.containsKey(leader)) { + leaderCount.put(leader, 0); + } + + leaderCount.put(leader, leaderCount.get(leader) + 1); + } + + Assert.assertTrue(leaderCount.size() == dnNum); + for (UUID key : leaderCount.keySet()) { + Assert.assertTrue(leaderCount.get(key) == leaderNumOfEachDn); + } + } + + @Test(timeout = 360000) + public void testRestoreSuggestedLeader() throws Exception { + conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); + conf.set(OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" + + ".MinLeaderCountChoosePolicy"); + int dnNum = 3; + int dnPipelineLimit = 3; + int leaderNumOfEachDn = dnPipelineLimit / dnNum; + int pipelineNum = 3; + + init(dnNum, dnPipelineLimit); + // make sure two pipelines are created + waitForPipelines(pipelineNum); + // No Factor ONE pipeline is auto created. + Assert.assertEquals(0, pipelineManager.getPipelines( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE).size()); + + // pipelineNum pipelines in 3 datanodes, + // each datanode has leaderNumOfEachDn leaders after balance + checkLeaderBalance(dnNum, leaderNumOfEachDn); + List<Pipeline> pipelinesBeforeRestart = + cluster.getStorageContainerManager().getPipelineManager() + .getPipelines(); + + cluster.restartStorageContainerManager(true); + + checkLeaderBalance(dnNum, leaderNumOfEachDn); + List<Pipeline> pipelinesAfterRestart = + cluster.getStorageContainerManager().getPipelineManager() + .getPipelines(); + + Assert.assertEquals( + pipelinesBeforeRestart.size(), pipelinesAfterRestart.size()); + + for (Pipeline p : pipelinesBeforeRestart) { + boolean equal = false; + for (Pipeline q : pipelinesAfterRestart) { + if (p.getId().equals(q.getId()) + && p.getSuggestedLeaderId().equals(q.getSuggestedLeaderId())) { + equal = true; + } + } + + Assert.assertTrue(equal); + } + } + + @Test(timeout = 360000) + public void testMinLeaderCountChoosePolicy() throws Exception { + conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); + conf.set(OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" + + ".MinLeaderCountChoosePolicy"); + int dnNum = 3; + int dnPipelineLimit = 3; + int leaderNumOfEachDn = dnPipelineLimit / dnNum; + int pipelineNum = 3; + + init(dnNum, dnPipelineLimit); + // make sure pipelines are created + waitForPipelines(pipelineNum); + // No Factor ONE pipeline is auto created. + Assert.assertEquals(0, pipelineManager.getPipelines( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE).size()); + + // pipelineNum pipelines in 3 datanodes, + // each datanode has leaderNumOfEachDn leaders after balance + checkLeaderBalance(dnNum, leaderNumOfEachDn); + + Random r = new Random(0); + for (int i = 0; i < 10; i++) { + // destroy some pipelines, wait new pipelines created, + // then check leader balance + + List<Pipeline> pipelines = pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN); + + int destroyNum = r.nextInt(pipelines.size()); + for (int k = 0; k <= destroyNum; k++) { + pipelineManager.finalizeAndDestroyPipeline(pipelines.get(k), false); + } + + waitForPipelines(pipelineNum); + + checkLeaderBalance(dnNum, leaderNumOfEachDn); + } + } + + @Test(timeout = 60000) + public void testDefaultLeaderChoosePolicy() throws Exception { + conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); + conf.set(OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" + + ".DefaultLeaderChoosePolicy"); + int dnNum = 3; + int dnPipelineLimit = 3; + int pipelineNum = 3; + + init(dnNum, dnPipelineLimit); + // make sure pipelines are created + waitForPipelines(pipelineNum); + } + + private void waitForPipelines(int numPipelines) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) + .size() >= numPipelines, 100, 60000); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java index bd677db..6236900 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java @@ -169,6 +169,6 @@ public class TestRatisPipelineCreateAndDestroy { GenericTestUtils.waitFor(() -> pipelineManager .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) - .size() >= numPipelines, 100, 40000); + .size() >= numPipelines, 100, 60000); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index 9a9f0c7..46e3d67 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -102,7 +102,7 @@ public class TestMiniOzoneCluster { FileUtils.deleteQuietly(READ_TMP); } - @Test(timeout = 30000) + @Test(timeout = 60000) public void testStartMultipleDatanodes() throws Exception { final int numberOfNodes = 3; cluster = MiniOzoneCluster.newBuilder(conf) @@ -290,7 +290,7 @@ public class TestMiniOzoneCluster { * Test that a DN can register with SCM even if it was started before the SCM. * @throws Exception */ - @Test (timeout = 60000) + @Test (timeout = 100000) public void testDNstartAfterSCM() throws Exception { // Start a cluster with 3 DN cluster = MiniOzoneCluster.newBuilder(conf) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java index 57c7061..5ddde8a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java @@ -42,7 +42,7 @@ public class TestOzoneManagerRocksDBLogging { private RocksDBConfiguration dbConf; @Rule - public Timeout timeout = new Timeout(60000); + public Timeout timeout = new Timeout(100000); private static GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(DBStoreBuilder.ROCKS_DB_LOGGER); --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org