This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 138d33e  HDDS-3188 Add failover proxy for SCM block location. (#1340)
138d33e is described below

commit 138d33ec0c6a0385f6a59f42fadaaec079ddb6c0
Author: Li Cheng <bloodhell2...@gmail.com>
AuthorDate: Thu Oct 1 00:27:09 2020 +0800

    HDDS-3188 Add failover proxy for SCM block location. (#1340)
---
 .../hadoop/hdds/scm/exceptions/SCMException.java   |   3 +-
 ...lockLocationProtocolClientSideTranslatorPB.java |  25 +-
 .../SCMBlockLocationFailoverProxyProvider.java     | 280 +++++++++++++++++++++
 .../hadoop/hdds/scm/proxy/SCMClientConfig.java     | 103 ++++++++
 .../apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java |  73 ++++++
 .../apache/hadoop/hdds/scm/proxy/package-info.java |  22 ++
 .../src/main/proto/ScmServerProtocol.proto         |   3 +
 ...lockLocationProtocolServerSideTranslatorPB.java |  18 ++
 .../hdds/scm/server/SCMBlockProtocolServer.java    |   4 +
 .../hdds/scm/server/StorageContainerManager.java   |  19 ++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  11 +-
 11 files changed, 545 insertions(+), 16 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index db1f82a..11b7b3c 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -122,6 +122,7 @@ public class SCMException extends IOException {
     FAILED_TO_FIND_ACTIVE_PIPELINE,
     FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY,
     FAILED_TO_ALLOCATE_ENOUGH_BLOCKS,
-    INTERNAL_ERROR
+    INTERNAL_ERROR,
+    SCM_NOT_LEADER
   }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index e86ee81..12c51f6 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type;
@@ -45,10 +46,11 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
 
@@ -73,15 +75,21 @@ public final class 
ScmBlockLocationProtocolClientSideTranslatorPB
   private static final RpcController NULL_RPC_CONTROLLER = null;
 
   private final ScmBlockLocationProtocolPB rpcProxy;
+  private SCMBlockLocationFailoverProxyProvider failoverProxyProvider;
 
   /**
    * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
    *
-   * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
+   * @param proxyProvider {@link SCMBlockLocationFailoverProxyProvider}
+   * failover proxy provider.
    */
   public ScmBlockLocationProtocolClientSideTranslatorPB(
-      ScmBlockLocationProtocolPB rpcProxy) {
-    this.rpcProxy = rpcProxy;
+      SCMBlockLocationFailoverProxyProvider proxyProvider) {
+    Preconditions.checkState(proxyProvider != null);
+    this.failoverProxyProvider = proxyProvider;
+    this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create(
+        ScmBlockLocationProtocolPB.class, failoverProxyProvider,
+        failoverProxyProvider.getSCMBlockLocationRetryPolicy(null));
   }
 
   /**
@@ -105,6 +113,11 @@ public final class 
ScmBlockLocationProtocolClientSideTranslatorPB
     try {
       SCMBlockLocationResponse response =
           rpcProxy.send(NULL_RPC_CONTROLLER, req);
+      if (response.getStatus() ==
+          ScmBlockLocationProtocolProtos.Status.SCM_NOT_LEADER) {
+        failoverProxyProvider
+            .performFailoverToAssignedLeader(response.getLeaderSCMNodeId());
+      }
       return response;
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
@@ -267,7 +280,7 @@ public final class 
ScmBlockLocationProtocolClientSideTranslatorPB
   }
 
   @Override
-  public void close() {
-    RPC.stopProxy(rpcProxy);
+  public void close() throws IOException {
+    failoverProxyProvider.close();
   }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
new file mode 100644
index 0000000..1beb69e
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
+import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
+import static org.apache.hadoop.hdds.HddsUtils.getHostName;
+
+/**
+ * Failover proxy provider for SCM.
+ */
+public class SCMBlockLocationFailoverProxyProvider implements
+    FailoverProxyProvider<ScmBlockLocationProtocolPB>, Closeable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class);
+
+  private Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies;
+  private Map<String, SCMProxyInfo> scmProxyInfoMap;
+  private List<String> scmNodeIDList;
+
+  private String currentProxySCMNodeId;
+  private int currentProxyIndex;
+
+  private final ConfigurationSource conf;
+  private final long scmVersion;
+
+  private final String scmServiceId;
+
+  private String lastAttemptedLeader;
+
+  private final int maxRetryCount;
+  private final long retryInterval;
+
+  public static final String SCM_DUMMY_NODEID_PREFIX = "scm";
+
+  public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
+    this.conf = conf;
+    this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocol.class);
+    this.scmServiceId = conf.getTrimmed(OZONE_SCM_SERVICE_IDS_KEY);
+    this.scmProxies = new HashMap<>();
+    this.scmProxyInfoMap = new HashMap<>();
+    this.scmNodeIDList = new ArrayList<>();
+    loadConfigs();
+
+
+    this.currentProxyIndex = 0;
+    currentProxySCMNodeId = scmNodeIDList.get(currentProxyIndex);
+
+    SCMClientConfig config = conf.getObject(SCMClientConfig.class);
+    this.maxRetryCount = config.getRetryCount();
+    this.retryInterval = config.getRetryInterval();
+  }
+
+  @VisibleForTesting
+  protected Collection<InetSocketAddress> getSCMAddressList() {
+    Collection<String> scmAddressList =
+        conf.getTrimmedStringCollection(OZONE_SCM_NAMES);
+    Collection<InetSocketAddress> resultList = new ArrayList<>();
+    if (!scmAddressList.isEmpty()) {
+      final int port = getPortNumberFromConfigKeys(conf,
+          ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY)
+          .orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT);
+      for (String scmAddress : scmAddressList) {
+        LOG.info("SCM Address for proxy is {}", scmAddress);
+
+        Optional<String> hostname = getHostName(scmAddress);
+        if (hostname.isPresent()) {
+          resultList.add(NetUtils.createSocketAddr(
+              hostname.get() + ":" + port));
+        }
+      }
+    }
+    if (resultList.isEmpty()) {
+      // fall back
+      resultList.add(getScmAddressForBlockClients(conf));
+    }
+    return resultList;
+  }
+
+  private void loadConfigs() {
+    Collection<InetSocketAddress> scmAddressList = getSCMAddressList();
+    int scmNodeIndex = 1;
+    for (InetSocketAddress scmAddress : scmAddressList) {
+      String nodeId = SCM_DUMMY_NODEID_PREFIX + scmNodeIndex;
+      if (scmAddress == null) {
+        LOG.error("Failed to create SCM proxy for {}.", nodeId);
+        continue;
+      }
+      scmNodeIndex++;
+      SCMProxyInfo scmProxyInfo = new SCMProxyInfo(
+          scmServiceId, nodeId, scmAddress);
+      ProxyInfo<ScmBlockLocationProtocolPB> proxy = new ProxyInfo<>(
+          null, scmProxyInfo.toString());
+      scmProxies.put(nodeId, proxy);
+      scmProxyInfoMap.put(nodeId, scmProxyInfo);
+      scmNodeIDList.add(nodeId);
+    }
+
+    if (scmProxies.isEmpty()) {
+      throw new IllegalArgumentException("Could not find any configured " +
+          "addresses for SCM. Please configure the system with "
+          + OZONE_SCM_NAMES);
+    }
+  }
+
+  @VisibleForTesting
+  public synchronized String getCurrentProxyOMNodeId() {
+    return currentProxySCMNodeId;
+  }
+
+  @Override
+  public synchronized ProxyInfo getProxy() {
+    ProxyInfo currentProxyInfo = scmProxies.get(currentProxySCMNodeId);
+    createSCMProxyIfNeeded(currentProxyInfo, currentProxySCMNodeId);
+    return currentProxyInfo;
+  }
+
+  @Override
+  public void performFailover(ScmBlockLocationProtocolPB newLeader) {
+    // Should do nothing here.
+    LOG.debug("Failing over to next proxy. {}", getCurrentProxyOMNodeId());
+  }
+
+  public void performFailoverToAssignedLeader(String newLeader) {
+    if (newLeader == null) {
+      // If newLeader is not assigned, it will fail over to next proxy.
+      nextProxyIndex();
+    } else {
+      if (!assignLeaderToNode(newLeader)) {
+        LOG.debug("Failing over OM proxy to nodeId: {}", newLeader);
+        nextProxyIndex();
+      }
+    }
+  }
+
+  @Override
+  public Class<ScmBlockLocationProtocolPB> getInterface() {
+    return ScmBlockLocationProtocolPB.class;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    for (ProxyInfo<ScmBlockLocationProtocolPB> proxy : scmProxies.values()) {
+      ScmBlockLocationProtocolPB scmProxy = proxy.proxy;
+      if (scmProxy != null) {
+        RPC.stopProxy(scmProxy);
+      }
+    }
+  }
+
+  public RetryAction getRetryAction(int failovers) {
+    if (failovers < maxRetryCount) {
+      return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+          getRetryInterval());
+    } else {
+      return RetryAction.FAIL;
+    }
+  }
+
+  private synchronized long getRetryInterval() {
+    // TODO add exponential backup
+    return retryInterval;
+  }
+
+  private synchronized int nextProxyIndex() {
+    lastAttemptedLeader = currentProxySCMNodeId;
+
+    // round robin the next proxy
+    currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size();
+    currentProxySCMNodeId =  scmNodeIDList.get(currentProxyIndex);
+    return currentProxyIndex;
+  }
+
+  private synchronized boolean assignLeaderToNode(String newLeaderNodeId) {
+    if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
+      if (scmProxies.containsKey(newLeaderNodeId)) {
+        lastAttemptedLeader = currentProxySCMNodeId;
+        currentProxySCMNodeId = newLeaderNodeId;
+        currentProxyIndex = scmNodeIDList.indexOf(currentProxySCMNodeId);
+        return true;
+      }
+    } else {
+      lastAttemptedLeader = currentProxySCMNodeId;
+    }
+    return false;
+  }
+
+  /**
+   * Creates proxy object if it does not already exist.
+   */
+  private void createSCMProxyIfNeeded(ProxyInfo proxyInfo,
+                                     String nodeId) {
+    if (proxyInfo.proxy == null) {
+      InetSocketAddress address = scmProxyInfoMap.get(nodeId).getAddress();
+      try {
+        ScmBlockLocationProtocolPB proxy = createSCMProxy(address);
+        try {
+          proxyInfo.proxy = proxy;
+        } catch (IllegalAccessError iae) {
+          scmProxies.put(nodeId,
+              new ProxyInfo<>(proxy, proxyInfo.proxyInfo));
+        }
+      } catch (IOException ioe) {
+        LOG.error("{} Failed to create RPC proxy to SCM at {}",
+            this.getClass().getSimpleName(), address, ioe);
+        throw new RuntimeException(ioe);
+      }
+    }
+  }
+
+  private ScmBlockLocationProtocolPB createSCMProxy(
+      InetSocketAddress scmAddress) throws IOException {
+    Configuration hadoopConf =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
+    RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocol.class,
+        ProtobufRpcEngine.class);
+    return RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
+        scmAddress, UserGroupInformation.getCurrentUser(), hadoopConf,
+        NetUtils.getDefaultSocketFactory(hadoopConf),
+        (int)conf.getObject(SCMClientConfig.class).getRpcTimeOut());
+  }
+
+  public RetryPolicy getSCMBlockLocationRetryPolicy(String newLeader) {
+    RetryPolicy retryPolicy = new RetryPolicy() {
+      @Override
+      public RetryAction shouldRetry(Exception e, int retry,
+                                     int failover, boolean b) {
+        performFailoverToAssignedLeader(newLeader);
+        return getRetryAction(failover);
+      }
+    };
+    return retryPolicy;
+  }
+}
+
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMClientConfig.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMClientConfig.java
new file mode 100644
index 0000000..99dc446
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMClientConfig.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.CLIENT;
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+
+/**
+ * Config for SCM Block Client.
+ */
+@ConfigGroup(prefix = "hdds.scmclient")
+public class SCMClientConfig {
+  public static final String SCM_CLIENT_RPC_TIME_OUT = "rpc.timeout";
+  public static final String SCM_CLIENT_FAILOVER_MAX_RETRY =
+      "failover.max.retry";
+  public static final String SCM_CLIENT_RETRY_INTERVAL =
+      "failover.retry.interval";
+
+  @Config(key = SCM_CLIENT_RPC_TIME_OUT,
+      defaultValue = "15m",
+      type = ConfigType.TIME,
+      tags = {OZONE, SCM, CLIENT},
+      timeUnit = TimeUnit.MILLISECONDS,
+      description = "RpcClient timeout on waiting for the response from " +
+          "SCM. The default value is set to 15 minutes. " +
+          "If ipc.client.ping is set to true and this rpc-timeout " +
+          "is greater than the value of ipc.ping.interval, the effective " +
+          "value of the rpc-timeout is rounded up to multiple of " +
+          "ipc.ping.interval."
+  )
+  private long rpcTimeOut = 15 * 60 * 1000;
+
+  @Config(key = SCM_CLIENT_FAILOVER_MAX_RETRY,
+      defaultValue = "15",
+      type = ConfigType.INT,
+      tags = {OZONE, SCM, CLIENT},
+      description = "Max retry count for SCM Client when failover happens."
+  )
+  private int retryCount = 15;
+
+  @Config(key = SCM_CLIENT_RETRY_INTERVAL,
+      defaultValue = "2s",
+      type = ConfigType.TIME,
+      tags = {OZONE, SCM, CLIENT},
+      timeUnit = TimeUnit.MILLISECONDS,
+      description = "SCM Client timeout on waiting for the next connection " +
+          "retry to other SCM IP. The default value is set to 2 minutes. "
+  )
+  private long retryInterval = 2 * 1000;
+
+  public long getRpcTimeOut() {
+    return rpcTimeOut;
+  }
+
+  public void setRpcTimeOut(long timeOut) {
+    // As at the end this value should not exceed MAX_VALUE, as underlying
+    // Rpc layer SocketTimeout parameter is int.
+    if (rpcTimeOut > Integer.MAX_VALUE) {
+      this.rpcTimeOut = Integer.MAX_VALUE;
+    }
+    this.rpcTimeOut = timeOut;
+  }
+
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  public void setRetryCount(int retryCount) {
+    this.retryCount = retryCount;
+  }
+
+  public long getRetryInterval() {
+    return retryInterval;
+  }
+
+  public void setRetryInterval(long retryInterval) {
+    this.retryInterval = retryInterval;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java
new file mode 100644
index 0000000..ec2a5b0
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Class to store SCM proxy info.
+ */
+public class SCMProxyInfo {
+  private String serviceId;
+  private String nodeId;
+  private String rpcAddrStr;
+  private InetSocketAddress rpcAddr;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMProxyInfo.class);
+
+  public SCMProxyInfo(String serviceID, String nodeID,
+                      InetSocketAddress rpcAddress) {
+    Preconditions.checkNotNull(rpcAddress);
+    this.serviceId = serviceID;
+    this.nodeId = nodeID;
+    this.rpcAddrStr = rpcAddress.toString();
+    this.rpcAddr = rpcAddress;
+    if (rpcAddr.isUnresolved()) {
+      LOG.warn("SCM address {} for serviceID {} remains unresolved " +
+              "for node ID {} Check your ozone-site.xml file to ensure scm " +
+              "addresses are configured properly.",
+          rpcAddress, serviceId, nodeId);
+    }
+  }
+
+  public String toString() {
+    return new StringBuilder()
+        .append("nodeId=")
+        .append(nodeId)
+        .append(",nodeAddress=")
+        .append(rpcAddrStr).toString();
+  }
+
+  public InetSocketAddress getAddress() {
+    return rpcAddr;
+  }
+
+  public String getServiceId() {
+    return serviceId;
+  }
+
+  public String getNodeId() {
+    return nodeId;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/package-info.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/package-info.java
new file mode 100644
index 0000000..e3bb058
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.proxy;
+
+/**
+ * This package contains classes related to scm proxy.
+ */
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index fc7a598..06f9c31 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -70,6 +70,8 @@ message SCMBlockLocationResponse {
 
   optional string leaderOMNodeId = 6;
 
+  optional string leaderSCMNodeId = 7;
+
   optional AllocateScmBlockResponseProto       allocateScmBlockResponse   = 11;
   optional DeleteScmKeyBlocksResponseProto     deleteScmKeyBlocksResponse = 12;
   optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse         = 13;
@@ -114,6 +116,7 @@ enum Status {
   FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY = 26;
   FAILED_TO_ALLOCATE_ENOUGH_BLOCKS = 27;
   INTERNAL_ERROR = 29;
+  SCM_NOT_LEADER = 30;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index fb07351..eec0718 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
 import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
@@ -94,9 +95,26 @@ public final class 
ScmBlockLocationProtocolServerSideTranslatorPB
         .setTraceID(traceID);
   }
 
+  private boolean isLeader() throws ServiceException {
+    if (!(impl instanceof SCMBlockProtocolServer)) {
+      throw new ServiceException("Should be SCMBlockProtocolServer");
+    } else {
+      return ((SCMBlockProtocolServer) impl).getScm().checkLeader();
+    }
+  }
+
   @Override
   public SCMBlockLocationResponse send(RpcController controller,
       SCMBlockLocationRequest request) throws ServiceException {
+    if (!isLeader()) {
+      SCMBlockLocationResponse.Builder response = createSCMBlockResponse(
+          request.getCmdType(),
+          request.getTraceID());
+      response.setSuccess(false);
+      response.setStatus(Status.SCM_NOT_LEADER);
+      response.setLeaderSCMNodeId(null);
+      return response.build();
+    }
     return dispatcher.processRequest(
         request,
         this::processMessage,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 99f873f..e334b73 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -297,6 +297,10 @@ public class SCMBlockProtocolServer implements
     }
   }
 
+  public StorageContainerManager getScm() {
+    return scm;
+  }
+
   @Override
   public List<DatanodeDetails> sortDatanodes(List<String> nodes,
       String clientMachine) throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 768ca09..44e133a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1027,6 +1027,25 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     return replicationManager;
   }
 
+  /**
+   * Check if the current scm is the leader.
+   * @return - if the current scm is the leader.
+   */
+  public boolean checkLeader() {
+    return scmHAManager.isLeader();
+  }
+
+  /**
+   * Get suggested leader from Raft.
+   * @return - suggested leader address.
+   */
+  public String getSuggestedLeader() {
+    if (scmHAManager.getSuggestedLeader() == null) {
+      return null;
+    }
+    return scmHAManager.getSuggestedLeader().getAddress();
+  }
+
   public void checkAdminAccess(String remoteUser) throws IOException {
     if (remoteUser != null && !scmAdminUsernames.contains(remoteUser)) {
       throw new IOException(
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 01340cd..3129dee 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -70,6 +70,7 @@ import 
org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideT
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient;
@@ -186,7 +187,6 @@ import org.apache.commons.lang3.StringUtils;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
-import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
 import static 
org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
 import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
@@ -807,16 +807,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       OzoneConfiguration conf) throws IOException {
     RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
         ProtobufRpcEngine.class);
-    long scmVersion =
-        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
-    InetSocketAddress scmBlockAddress =
-        getScmAddressForBlockClients(conf);
     ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
         new ScmBlockLocationProtocolClientSideTranslatorPB(
-            RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
-                scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
-                NetUtils.getDefaultSocketFactory(conf),
-                Client.getRpcTimeout(conf)));
+            new SCMBlockLocationFailoverProxyProvider(conf));
     return TracingUtil
         .createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class,
             conf);


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org

Reply via email to