This is an automated email from the ASF dual-hosted git repository. nanda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push: new eeb4ac7 HDDS-2242. Avoid unnecessary rpc needed to discover the pipeline leader. (#313) eeb4ac7 is described below commit eeb4ac70607500675037b57b88fd463b22918781 Author: Siddharth <swa...@apache.org> AuthorDate: Fri Dec 13 13:31:08 2019 +0530 HDDS-2242. Avoid unnecessary rpc needed to discover the pipeline leader. (#313) --- .../org/apache/hadoop/hdds/ratis/RatisHelper.java | 13 +-- .../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 20 ++++ .../hadoop/hdds/scm/TestRatisPipelineLeader.java | 129 +++++++++++++++++++++ 3 files changed, 154 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index 38f01ce..7c36c26 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -21,8 +21,6 @@ package org.apache.hadoop.hdds.ratis; import java.io.IOException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -143,7 +141,8 @@ public interface RatisHelper { static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline, RetryPolicy retryPolicy, int maxOutStandingRequest, GrpcTlsConfig tlsConfig, TimeDuration timeout) throws IOException { - return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()), + return newRaftClient(rpcType, + toRaftPeerId(pipeline.getLeaderNode()), newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()), pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig, timeout); @@ -158,16 +157,14 @@ public interface RatisHelper { OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY, OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT .getDuration(), timeUnit); - final TimeDuration clientRequestTimeout = - TimeDuration.valueOf(duration, timeUnit); - return clientRequestTimeout; + return TimeDuration.valueOf(duration, timeUnit); } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, RetryPolicy retryPolicy, int maxOutstandingRequests, GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) { return newRaftClient(rpcType, leader.getId(), - newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy, + newRaftGroup(Collections.singletonList(leader)), retryPolicy, maxOutstandingRequests, tlsConfig, clientRequestTimeout); } @@ -175,7 +172,7 @@ public interface RatisHelper { RetryPolicy retryPolicy, int maxOutstandingRequests, TimeDuration clientRequestTimeout) { return newRaftClient(rpcType, leader.getId(), - newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy, + newRaftGroup(Collections.singletonList(leader)), retryPolicy, maxOutstandingRequests, null, clientRequestTimeout); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 66b1efa..bceb647 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; @@ -132,6 +133,25 @@ public final class Pipeline { return new ArrayList<>(nodeStatus.keySet()); } + /** + * Returns the leader if found else defaults to closest node. + * + * @return {@link DatanodeDetails} + */ + public DatanodeDetails getLeaderNode() throws IOException { + if (nodeStatus.isEmpty()) { + throw new IOException(String.format("Pipeline=%s is empty", id)); + } + Optional<DatanodeDetails> datanodeDetails = + nodeStatus.keySet().stream().filter(d -> + d.getUuid().equals(leaderId)).findFirst(); + if (datanodeDetails.isPresent()) { + return datanodeDetails.get(); + } else { + return getClosestNode(); + } + } + public DatanodeDetails getFirstNode() throws IOException { if (nodeStatus.isEmpty()) { throw new IOException(String.format("Pipeline=%s is empty", id)); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestRatisPipelineLeader.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestRatisPipelineLeader.java new file mode 100644 index 0000000..f5b7eaf --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestRatisPipelineLeader.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import java.util.List; +import java.util.Optional; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.ratis.grpc.client.GrpcClientProtocolService; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.GroupInfoReply; +import org.apache.ratis.protocol.GroupInfoRequest; +import org.apache.ratis.protocol.RaftGroupId; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test pipeline leader information is correctly used. + */ +public class TestRatisPipelineLeader { + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + + @BeforeClass + public static void setup() throws Exception { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster + .newBuilder(conf) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + } + + @AfterClass + public static void shutdown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 120000) + public void testLeaderIdUsedOnFirstCall() throws Exception { + List<Pipeline> pipelines = cluster.getStorageContainerManager() + .getPipelineManager().getPipelines(); + Optional<Pipeline> ratisPipeline = pipelines.stream().filter(p -> + p.getType().equals(HddsProtos.ReplicationType.RATIS)).findFirst(); + Assert.assertTrue(ratisPipeline.isPresent()); + Assert.assertTrue(ratisPipeline.get().isHealthy()); + // Verify correct leader info populated + verifyLeaderInfo(ratisPipeline.get()); + // Verify client connects to Leader without NotLeaderException + XceiverClientRatis xceiverClientRatis = + XceiverClientRatis.newXceiverClientRatis(ratisPipeline.get(), conf); + Logger.getLogger(GrpcClientProtocolService.class).setLevel(Level.DEBUG); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(GrpcClientProtocolService.LOG); + xceiverClientRatis.connect(); + ContainerProtocolCalls.createContainer(xceiverClientRatis, 1L, null); + logCapturer.stopCapturing(); + Assert.assertFalse("Client should connect to pipeline leader on first try.", + logCapturer.getOutput().contains( + "org.apache.ratis.protocol.NotLeaderException")); + } + + @Test(timeout = 120000) + public void testLeaderIdAfterLeaderChange() throws Exception { + List<Pipeline> pipelines = cluster.getStorageContainerManager() + .getPipelineManager().getPipelines(); + Optional<Pipeline> ratisPipeline = pipelines.stream().filter(p -> + p.getType().equals(HddsProtos.ReplicationType.RATIS)).findFirst(); + Assert.assertTrue(ratisPipeline.isPresent()); + Assert.assertTrue(ratisPipeline.get().isHealthy()); + Optional<HddsDatanodeService> dnToStop = + cluster.getHddsDatanodes().stream().filter(s -> + !s.getDatanodeStateMachine().getDatanodeDetails().getUuid().equals( + ratisPipeline.get().getLeaderId())).findAny(); + Assert.assertTrue(dnToStop.isPresent()); + dnToStop.get().stop(); + GenericTestUtils.waitFor(() -> ratisPipeline.get().isHealthy(), 300, 5000); + verifyLeaderInfo(ratisPipeline.get()); + } + + private void verifyLeaderInfo(Pipeline ratisPipeline) throws Exception { + Optional<HddsDatanodeService> hddsDatanodeService = + cluster.getHddsDatanodes().stream().filter(s -> + s.getDatanodeStateMachine().getDatanodeDetails().getUuid() + .equals(ratisPipeline.getLeaderId())).findFirst(); + Assert.assertTrue(hddsDatanodeService.isPresent()); + + XceiverServerRatis serverRatis = + (XceiverServerRatis) hddsDatanodeService.get() + .getDatanodeStateMachine().getContainer().getWriteChannel(); + + GroupInfoRequest groupInfoRequest = new GroupInfoRequest( + ClientId.randomId(), serverRatis.getServer().getId(), + RaftGroupId.valueOf(ratisPipeline.getId().getId()), 100); + GroupInfoReply reply = + serverRatis.getServer().getGroupInfo(groupInfoRequest); + Assert.assertTrue(reply.getRoleInfoProto().hasLeaderInfo()); + Assert.assertEquals(ratisPipeline.getLeaderId().toString(), + reply.getRoleInfoProto().getSelf().getId().toStringUtf8()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org