goiri commented on code in PR #5700:
URL: https://github.com/apache/hadoop/pull/5700#discussion_r1212047828
##
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo
changeProxy(NNProxyInfo initial) {
}
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-currentProxy.setCachedState(getHAServiceState(currentProxy));
+currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo);
return currentProxy;
}
+ /**
+ * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+ * an NN becomes irresponsive to rpc requests
+ * (when a thread/heap dump is being taken, e.g.).
+ *
+ * For each getHAServiceState() call, a task is created and submitted to a
+ * threadpool for execution. We will wait for a response up to
+ * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+ *
+ * The implemention is split into two functions so that we can unit test
+ * the second function.
+ */
+ HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo proxyInfo) {
+
+Callable getHAServiceStateTask =
Review Comment:
Can this be a lambda?
##
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##
@@ -88,6 +96,16 @@
/** Observer probe retry period default to 10 min. */
static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000;
+ /** Timeout to cancel the ha-state probe rpc request for an namenode. */
+ static final String NAMENODE_HA_STATE_PROBE_TIMEOUT =
+ "dfs.client.namenode.ha-state.probe.timeout";
+ /**
+ * Namenode ha-state probe timeout default to 25 sec.
+ * ipc.client.connect.timeout defaults to be 20 seconds. So, in 25 seconds,
+ * we can try twice to connect to an NN.
+ */
+ static final long NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT = 25;
Review Comment:
Use TimeUnit.
##
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##
@@ -155,12 +173,22 @@
*/
private long observerProbeRetryPeriodMs;
+ /**
+ * Timeout in seconds when we try to get the HA state of an namenode.
+ */
+ @VisibleForTesting
+ private long namenodeHAStateProbeTimeoutSec;
+
/**
* The previous time where zero observer were found. If there was observer,
* or it is initialization, this is set to 0.
*/
private long lastObserverProbeTime;
+ private final ExecutorService nnProbingThreadPool =
+ new ThreadPoolExecutor(1, 4, 6L, TimeUnit.MILLISECONDS,
Review Comment:
Give constant names to the magical numbers.
##
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo
changeProxy(NNProxyInfo initial) {
}
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-currentProxy.setCachedState(getHAServiceState(currentProxy));
+currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo);
return currentProxy;
}
+ /**
+ * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+ * an NN becomes irresponsive to rpc requests
+ * (when a thread/heap dump is being taken, e.g.).
+ *
+ * For each getHAServiceState() call, a task is created and submitted to a
+ * threadpool for execution. We will wait for a response up to
+ * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+ *
+ * The implemention is split into two functions so that we can unit test
+ * the second function.
+ */
+ HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo proxyInfo) {
+
+Callable getHAServiceStateTask =
+new Callable() {
+ @Override
+ public HAServiceState call() {
+return getHAServiceState(proxyInfo);
+ }
+};
+
+try {
+ Future task =
+ nnProbingThreadPool.submit(getHAServiceStateTask);
+ return getHAServiceStateWithTimeout(proxyInfo, task);
+} catch (RejectedExecutionException e) {
+ LOG.debug("Run out of threads to submit the request to query HA state. "
+ + "Ok to return null and we will fallback to use active NN to serve "
+ + "this request.");
+ retur