[
https://issues.apache.org/jira/browse/HDFS-17030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728078#comment-17728078
]
ASF GitHub Bot commented on HDFS-17030:
---------------------------------------
simbadzina commented on code in PR #5700:
URL: https://github.com/apache/hadoop/pull/5700#discussion_r1212029782
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -155,12 +173,22 @@
*/
private long observerProbeRetryPeriodMs;
+ /**
+ * Timeout in seconds when we try to get the HA state of an namenode.
Review Comment:
Nit : `a namenode`
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -88,6 +96,16 @@
/** Observer probe retry period default to 10 min. */
static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000;
+ /** Timeout to cancel the ha-state probe rpc request for an namenode. */
+ static final String NAMENODE_HA_STATE_PROBE_TIMEOUT =
+ "dfs.client.namenode.ha-state.probe.timeout";
Review Comment:
Can you use `HdfsClientConfigKeys.Failover.PREFIX` as the prefix for this
config.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -213,6 +241,13 @@ public ObserverReadProxyProvider(
observerProbeRetryPeriodMs = conf.getTimeDuration(
OBSERVER_PROBE_RETRY_PERIOD_KEY,
OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+ namenodeHAStateProbeTimeoutSec = conf.getTimeDuration(
+ NAMENODE_HA_STATE_PROBE_TIMEOUT,
+ NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT, TimeUnit.SECONDS);
+ // Disallow negative values for namenodeHAStateProbeTimeoutSec
+ if (namenodeHAStateProbeTimeoutSec < 0) {
+ namenodeHAStateProbeTimeoutSec = NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT;
+ }
Review Comment:
Can you make a negative value mean no timeout instead. That's the typical
meaning of a negative value in other configs.
If you really wants to enforce having a timeout, then it may be better to
log and throw and error here.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo<T>
changeProxy(NNProxyInfo<T> initial) {
}
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
- currentProxy.setCachedState(getHAServiceState(currentProxy));
+ currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo);
return currentProxy;
}
+ /**
+ * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+ * an NN becomes irresponsive to rpc requests
+ * (when a thread/heap dump is being taken, e.g.).
+ *
+ * For each getHAServiceState() call, a task is created and submitted to a
+ * threadpool for execution. We will wait for a response up to
+ * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+ *
+ * The implemention is split into two functions so that we can unit test
+ * the second function.
+ */
+ HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) {
+
+ Callable<HAServiceState> getHAServiceStateTask =
+ new Callable<HAServiceState>() {
+ @Override
+ public HAServiceState call() {
+ return getHAServiceState(proxyInfo);
+ }
+ };
Review Comment:
Replacing with a lambda is a bit cleaner.
```
Callable<HAServiceState> getHAServiceStateTask = () ->
getHAServiceState(proxyInfo);
```
You can re-introduce the Callable when you need to backport to version
without support for Java 8.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo<T>
changeProxy(NNProxyInfo<T> initial) {
}
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
- currentProxy.setCachedState(getHAServiceState(currentProxy));
+ currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo);
return currentProxy;
}
+ /**
+ * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+ * an NN becomes irresponsive to rpc requests
+ * (when a thread/heap dump is being taken, e.g.).
+ *
+ * For each getHAServiceState() call, a task is created and submitted to a
+ * threadpool for execution. We will wait for a response up to
+ * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+ *
+ * The implemention is split into two functions so that we can unit test
+ * the second function.
+ */
+ HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) {
+
+ Callable<HAServiceState> getHAServiceStateTask =
+ new Callable<HAServiceState>() {
+ @Override
+ public HAServiceState call() {
+ return getHAServiceState(proxyInfo);
+ }
+ };
+
+ try {
+ Future<HAServiceState> task =
+ nnProbingThreadPool.submit(getHAServiceStateTask);
+ return getHAServiceStateWithTimeout(proxyInfo, task);
+ } catch (RejectedExecutionException e) {
+ LOG.debug("Run out of threads to submit the request to query HA state. "
+ + "Ok to return null and we will fallback to use active NN to serve "
+ + "this request.");
+ return null;
+ }
+ }
+
+ HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo,
+ Future<HAServiceState> task) {
+ HAServiceState state = null;
+ try {
+ state = task.get(namenodeHAStateProbeTimeoutSec, TimeUnit.SECONDS);
+ LOG.debug("HA State for " + proxyInfo.proxyInfo + " is " + state);
+ } catch (TimeoutException e) {
+ // Cancel the task on timeout
+ LOG.debug("Cancel NN probe task due to timeout for " +
proxyInfo.proxyInfo);
+ if (task != null) {
+ task.cancel(true);
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted exception in NN probe task for " +
proxyInfo.proxyInfo + ": " + e);
+ } catch (ExecutionException e) {
+ LOG.debug("Execution exception in NN probe task for " +
proxyInfo.proxyInfo + ": " + e);
+ }
Review Comment:
I think we can combine these two catch blocks. We include the exception in
the LOG so that should give enforce information of whether it was an
InterruptedException and an ExecutionException
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java:
##########
@@ -325,6 +357,94 @@ public void testObserverRetriableException() throws
Exception {
assertHandledBy(1);
}
+ /**
+ * Happy case for GetHAServiceStateWithTimeout.
+ */
+ @Test
+ public void testGetHAServiceStateWithTimeout() throws Exception {
+ setupProxyProvider(1);
+ final HAServiceState state = HAServiceState.STANDBY;
+ NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+ (NNProxyInfo<ClientProtocol>) mock(NNProxyInfo.class);
+ Future<HAServiceState> task = mock(Future.class);
+ when(task.get(anyLong(), any(TimeUnit.class))).thenReturn(state);
+
+ HAServiceState state2 =
+ proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+ assertEquals(state, state2);
+ verify(task).get(anyLong(), any(TimeUnit.class));
+ verifyNoMoreInteractions(task);
+ verify(logger).debug(startsWith("HA State for"));
+ }
+
+ /**
+ * Test TimeoutException for GetHAServiceStateWithTimeout.
+ */
+ @Test
+ public void testTimeoutExceptionGetHAServiceStateWithTimeout()
+ throws Exception {
+ setupProxyProvider(1);
+ NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+ (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
+ Future<HAServiceState> task = mock(Future.class);
+ when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(
+ new TimeoutException("Timeout"));
+
+ HAServiceState state =
+ proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+ assertNull(state);
+ verify(task).get(anyLong(), any(TimeUnit.class));
+ verify(task).cancel(true);
+ verifyNoMoreInteractions(task);
+ verify(logger).debug(startsWith("Cancel NN probe task due to timeout
for"));
+ }
+
+ /**
+ * Test InterruptedException for GetHAServiceStateWithTimeout.
+ * Tests for the other two exceptions are the same and thus left out.
+ */
+ @Test
+ public void testInterruptedExceptionGetHAServiceStateWithTimeout()
+ throws Exception {
+ setupProxyProvider(1);
+ NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+ (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
+ Future<HAServiceState> task = mock(Future.class);
+ when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(
+ new InterruptedException("Interrupted"));
+
+ HAServiceState state =
+ proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+ assertNull(state);
+ verify(task).get(anyLong(), any(TimeUnit.class));
+ verifyNoMoreInteractions(task);
+ verify(logger).debug(
+ startsWith("Interrupted exception in NN probe task for"));
+ }
+
+ /**
+ * Test InterruptedException for GetHAServiceStateWithTimeout.
+ * Tests for the other two exceptions are the same and thus left out.
Review Comment:
What are `the other two exceptions` you are referring to?
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -213,6 +241,13 @@ public ObserverReadProxyProvider(
observerProbeRetryPeriodMs = conf.getTimeDuration(
OBSERVER_PROBE_RETRY_PERIOD_KEY,
OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+ namenodeHAStateProbeTimeoutSec = conf.getTimeDuration(
+ NAMENODE_HA_STATE_PROBE_TIMEOUT,
+ NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT, TimeUnit.SECONDS);
Review Comment:
Can we make the configuration be in milliseconds? I'm concerned that
getTimeDuration will round down when a user sets the timeout to be less than a
second.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo<T>
changeProxy(NNProxyInfo<T> initial) {
}
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
- currentProxy.setCachedState(getHAServiceState(currentProxy));
+ currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo);
return currentProxy;
}
+ /**
+ * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+ * an NN becomes irresponsive to rpc requests
+ * (when a thread/heap dump is being taken, e.g.).
+ *
+ * For each getHAServiceState() call, a task is created and submitted to a
+ * threadpool for execution. We will wait for a response up to
+ * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+ *
+ * The implemention is split into two functions so that we can unit test
Review Comment:
Typo `implemention` -> `implementation`
> Limit wait time for getHAServiceState in ObserverReaderProxy
> ------------------------------------------------------------
>
> Key: HDFS-17030
> URL: https://issues.apache.org/jira/browse/HDFS-17030
> Project: Hadoop HDFS
> Issue Type: Improvement
> Components: hdfs
> Affects Versions: 3.4.0
> Reporter: Xing Lin
> Assignee: Xing Lin
> Priority: Minor
> Labels: pull-request-available
>
> When namenode HA is enabled and a standby NN is not responsible, we have
> observed it would take a long time to serve a request, even though we have a
> healthy observer or active NN.
> Basically, when a standby is down, the RPC client would (re)try to create
> socket connection to that standby for _ipc.client.connect.timeout_ _*
> ipc.client.connect.max.retries.on.timeouts_ before giving up. When we take a
> heap dump at a standby, the NN still accepts the socket connection but it
> won't send responses to these RPC requests and we would timeout after
> _ipc.client.rpc-timeout.ms._ This adds a significantly latency. For clusters
> at Linkedin, we set _ipc.client.rpc-timeout.ms_ to 120 seconds and thus a
> request takes more than 2 mins to complete when we take a heap dump at a
> standby. This has been causing user job failures.
> We could set _ipc.client.rpc-timeout.ms to_ a smaller value when sending
> getHAServiceState requests in ObserverReaderProxy (for user rpc requests, we
> still use the original value from the config). However, that would double the
> socket connection between clients and the NN (which is a deal-breaker).
> The proposal is to add a timeout on getHAServiceState() calls in
> ObserverReaderProxy and we will only wait for the timeout for an NN to
> respond its HA state. Once we pass that timeout, we will move on to probe the
> next NN.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]