This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new eeb4ac7  HDDS-2242. Avoid unnecessary rpc needed to discover the 
pipeline leader. (#313)
eeb4ac7 is described below

commit eeb4ac70607500675037b57b88fd463b22918781
Author: Siddharth <swa...@apache.org>
AuthorDate: Fri Dec 13 13:31:08 2019 +0530

    HDDS-2242. Avoid unnecessary rpc needed to discover the pipeline leader. 
(#313)
---
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  13 +--
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |  20 ++++
 .../hadoop/hdds/scm/TestRatisPipelineLeader.java   | 129 +++++++++++++++++++++
 3 files changed, 154 insertions(+), 8 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 38f01ce..7c36c26 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.hdds.ratis;
 import java.io.IOException;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -143,7 +141,8 @@ public interface RatisHelper {
   static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
       RetryPolicy retryPolicy, int maxOutStandingRequest,
       GrpcTlsConfig tlsConfig, TimeDuration timeout) throws IOException {
-    return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()),
+    return newRaftClient(rpcType,
+        toRaftPeerId(pipeline.getLeaderNode()),
         newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
             pipeline.getNodes()), retryPolicy, maxOutStandingRequest, 
tlsConfig,
         timeout);
@@ -158,16 +157,14 @@ public interface RatisHelper {
         OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
         OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
             .getDuration(), timeUnit);
-    final TimeDuration clientRequestTimeout =
-        TimeDuration.valueOf(duration, timeUnit);
-    return clientRequestTimeout;
+    return TimeDuration.valueOf(duration, timeUnit);
   }
 
   static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
       RetryPolicy retryPolicy, int maxOutstandingRequests,
       GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
     return newRaftClient(rpcType, leader.getId(),
-        newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
+        newRaftGroup(Collections.singletonList(leader)), retryPolicy,
         maxOutstandingRequests, tlsConfig, clientRequestTimeout);
   }
 
@@ -175,7 +172,7 @@ public interface RatisHelper {
       RetryPolicy retryPolicy, int maxOutstandingRequests,
       TimeDuration clientRequestTimeout) {
     return newRaftClient(rpcType, leader.getId(),
-        newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
+        newRaftGroup(Collections.singletonList(leader)), retryPolicy,
         maxOutstandingRequests, null, clientRequestTimeout);
   }
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 66b1efa..bceb647 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -132,6 +133,25 @@ public final class Pipeline {
     return new ArrayList<>(nodeStatus.keySet());
   }
 
+  /**
+   * Returns the leader if found else defaults to closest node.
+   *
+   * @return {@link DatanodeDetails}
+   */
+  public DatanodeDetails getLeaderNode() throws IOException {
+    if (nodeStatus.isEmpty()) {
+      throw new IOException(String.format("Pipeline=%s is empty", id));
+    }
+    Optional<DatanodeDetails> datanodeDetails =
+        nodeStatus.keySet().stream().filter(d ->
+            d.getUuid().equals(leaderId)).findFirst();
+    if (datanodeDetails.isPresent()) {
+      return datanodeDetails.get();
+    } else {
+      return getClosestNode();
+    }
+  }
+
   public DatanodeDetails getFirstNode() throws IOException {
     if (nodeStatus.isEmpty()) {
       throw new IOException(String.format("Pipeline=%s is empty", id));
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestRatisPipelineLeader.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestRatisPipelineLeader.java
new file mode 100644
index 0000000..f5b7eaf
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestRatisPipelineLeader.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test pipeline leader information is correctly used.
+ */
+public class TestRatisPipelineLeader {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster
+        .newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testLeaderIdUsedOnFirstCall() throws Exception {
+    List<Pipeline> pipelines = cluster.getStorageContainerManager()
+        .getPipelineManager().getPipelines();
+    Optional<Pipeline> ratisPipeline = pipelines.stream().filter(p ->
+        p.getType().equals(HddsProtos.ReplicationType.RATIS)).findFirst();
+    Assert.assertTrue(ratisPipeline.isPresent());
+    Assert.assertTrue(ratisPipeline.get().isHealthy());
+    // Verify correct leader info populated
+    verifyLeaderInfo(ratisPipeline.get());
+    // Verify client connects to Leader without NotLeaderException
+    XceiverClientRatis xceiverClientRatis =
+        XceiverClientRatis.newXceiverClientRatis(ratisPipeline.get(), conf);
+    Logger.getLogger(GrpcClientProtocolService.class).setLevel(Level.DEBUG);
+    GenericTestUtils.LogCapturer logCapturer =
+        
GenericTestUtils.LogCapturer.captureLogs(GrpcClientProtocolService.LOG);
+    xceiverClientRatis.connect();
+    ContainerProtocolCalls.createContainer(xceiverClientRatis, 1L, null);
+    logCapturer.stopCapturing();
+    Assert.assertFalse("Client should connect to pipeline leader on first 
try.",
+        logCapturer.getOutput().contains(
+            "org.apache.ratis.protocol.NotLeaderException"));
+  }
+
+  @Test(timeout = 120000)
+  public void testLeaderIdAfterLeaderChange() throws Exception {
+    List<Pipeline> pipelines = cluster.getStorageContainerManager()
+        .getPipelineManager().getPipelines();
+    Optional<Pipeline> ratisPipeline = pipelines.stream().filter(p ->
+        p.getType().equals(HddsProtos.ReplicationType.RATIS)).findFirst();
+    Assert.assertTrue(ratisPipeline.isPresent());
+    Assert.assertTrue(ratisPipeline.get().isHealthy());
+    Optional<HddsDatanodeService> dnToStop =
+        cluster.getHddsDatanodes().stream().filter(s ->
+            !s.getDatanodeStateMachine().getDatanodeDetails().getUuid().equals(
+                ratisPipeline.get().getLeaderId())).findAny();
+    Assert.assertTrue(dnToStop.isPresent());
+    dnToStop.get().stop();
+    GenericTestUtils.waitFor(() -> ratisPipeline.get().isHealthy(), 300, 5000);
+    verifyLeaderInfo(ratisPipeline.get());
+  }
+
+  private void verifyLeaderInfo(Pipeline ratisPipeline) throws Exception {
+    Optional<HddsDatanodeService> hddsDatanodeService =
+        cluster.getHddsDatanodes().stream().filter(s ->
+            s.getDatanodeStateMachine().getDatanodeDetails().getUuid()
+                .equals(ratisPipeline.getLeaderId())).findFirst();
+    Assert.assertTrue(hddsDatanodeService.isPresent());
+
+    XceiverServerRatis serverRatis =
+        (XceiverServerRatis) hddsDatanodeService.get()
+            .getDatanodeStateMachine().getContainer().getWriteChannel();
+
+    GroupInfoRequest groupInfoRequest = new GroupInfoRequest(
+        ClientId.randomId(), serverRatis.getServer().getId(),
+        RaftGroupId.valueOf(ratisPipeline.getId().getId()), 100);
+    GroupInfoReply reply =
+        serverRatis.getServer().getGroupInfo(groupInfoRequest);
+    Assert.assertTrue(reply.getRoleInfoProto().hasLeaderInfo());
+    Assert.assertEquals(ratisPipeline.getLeaderId().toString(),
+        reply.getRoleInfoProto().getSelf().getId().toStringUtf8());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org

Reply via email to