[ 
https://issues.apache.org/jira/browse/HDFS-17030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751779#comment-17751779
 ] 

ASF GitHub Bot commented on HDFS-17030:
---------------------------------------

goiri commented on code in PR #5878:
URL: https://github.com/apache/hadoop/pull/5878#discussion_r1286275737


##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -285,13 +323,67 @@ private synchronized NNProxyInfo<T> 
changeProxy(NNProxyInfo<T> initial) {
     }
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.setCachedState(getHAServiceState(currentProxy));
+    currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
     return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implementation is split into two functions so that we can unit test
+   * the second function.
+   */
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) {
+    Callable<HAServiceState> getHAServiceStateTask = () -> 
getHAServiceState(proxyInfo);
+
+    try {
+      Future<HAServiceState> task =
+          nnProbingThreadPool.submit(getHAServiceStateTask);

Review Comment:
   I would put the lamda directly here and do the following split:
   ```
   Future<HAServiceState> task = nnProbingThreadPool.submit(
       () -> getHAServiceState(proxyInfo));
   ```



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -285,13 +323,67 @@ private synchronized NNProxyInfo<T> 
changeProxy(NNProxyInfo<T> initial) {
     }
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.setCachedState(getHAServiceState(currentProxy));
+    currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
     return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implementation is split into two functions so that we can unit test
+   * the second function.
+   */
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) {
+    Callable<HAServiceState> getHAServiceStateTask = () -> 
getHAServiceState(proxyInfo);
+
+    try {
+      Future<HAServiceState> task =
+          nnProbingThreadPool.submit(getHAServiceStateTask);
+      return getHAServiceStateWithTimeout(proxyInfo, task);
+    } catch (RejectedExecutionException e) {
+      LOG.warn("Run out of threads to submit the request to query HA state. "
+          + "Ok to return null and we will fallback to use active NN to serve "
+          + "this request.");
+      return null;
+    }
+  }
+
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo,
+      Future<HAServiceState> task) {
+    HAServiceState state = null;
+    try {
+      if (namenodeHAStateProbeTimeoutMs > 0) {
+        state = task.get(namenodeHAStateProbeTimeoutMs, TimeUnit.MILLISECONDS);
+      } else {
+        // Disable timeout by waiting indefinitely when 
namenodeHAStateProbeTimeoutSec is set to 0
+        // or a negative value.
+        state = task.get();
+      }
+      LOG.debug("HA State for {} is {}", proxyInfo.proxyInfo, state);
+    } catch (TimeoutException e) {
+      // Cancel the task on timeout
+      String msg = String.format("Cancel NN probe task due to timeout for %s", 
proxyInfo.proxyInfo);
+      LOG.warn(msg, e);
+      if (task != null) {

Review Comment:
   Can the task become null? We should have issues before.





> Limit wait time for getHAServiceState in ObserverReaderProxy
> ------------------------------------------------------------
>
>                 Key: HDFS-17030
>                 URL: https://issues.apache.org/jira/browse/HDFS-17030
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>          Components: hdfs
>    Affects Versions: 3.4.0
>            Reporter: Xing Lin
>            Assignee: Xing Lin
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>
> When namenode HA is enabled and a standby NN is not responsible, we have 
> observed it would take a long time to serve a request, even though we have a 
> healthy observer or active NN. 
> Basically, when a standby is down, the RPC client would (re)try to create 
> socket connection to that standby for _ipc.client.connect.timeout_ _* 
> ipc.client.connect.max.retries.on.timeouts_ before giving up. When we take a 
> heap dump at a standby, the NN still accepts the socket connection but it 
> won't send responses to these RPC requests and we would timeout after 
> _ipc.client.rpc-timeout.ms._ This adds a significantly latency. For clusters 
> at Linkedin, we set _ipc.client.rpc-timeout.ms_ to 120 seconds and thus a 
> request takes more than 2 mins to complete when we take a heap dump at a 
> standby. This has been causing user job failures. 
> We could set _ipc.client.rpc-timeout.ms to_ a smaller value when sending 
> getHAServiceState requests in ObserverReaderProxy (for user rpc requests, we 
> still use the original value from the config). However, that would double the 
> socket connection between clients and the NN (which is a deal-breaker). 
> The proposal is to add a timeout on getHAServiceState() calls in 
> ObserverReaderProxy and we will only wait for the timeout for an NN to 
> respond its HA state. Once we pass that timeout, we will move on to probe the 
> next NN. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to