[ https://issues.apache.org/jira/browse/HDFS-17030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728078#comment-17728078 ]
ASF GitHub Bot commented on HDFS-17030: --------------------------------------- simbadzina commented on code in PR #5700: URL: https://github.com/apache/hadoop/pull/5700#discussion_r1212029782 ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java: ########## @@ -155,12 +173,22 @@ */ private long observerProbeRetryPeriodMs; + /** + * Timeout in seconds when we try to get the HA state of an namenode. Review Comment: Nit : `a namenode` ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java: ########## @@ -88,6 +96,16 @@ /** Observer probe retry period default to 10 min. */ static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000; + /** Timeout to cancel the ha-state probe rpc request for an namenode. */ + static final String NAMENODE_HA_STATE_PROBE_TIMEOUT = + "dfs.client.namenode.ha-state.probe.timeout"; Review Comment: Can you use `HdfsClientConfigKeys.Failover.PREFIX` as the prefix for this config. ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java: ########## @@ -213,6 +241,13 @@ public ObserverReadProxyProvider( observerProbeRetryPeriodMs = conf.getTimeDuration( OBSERVER_PROBE_RETRY_PERIOD_KEY, OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); + namenodeHAStateProbeTimeoutSec = conf.getTimeDuration( + NAMENODE_HA_STATE_PROBE_TIMEOUT, + NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT, TimeUnit.SECONDS); + // Disallow negative values for namenodeHAStateProbeTimeoutSec + if (namenodeHAStateProbeTimeoutSec < 0) { + namenodeHAStateProbeTimeoutSec = NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT; + } Review Comment: Can you make a negative value mean no timeout instead. That's the typical meaning of a negative value in other configs. If you really wants to enforce having a timeout, then it may be better to log and throw and error here. ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java: ########## @@ -284,13 +319,68 @@ private synchronized NNProxyInfo<T> changeProxy(NNProxyInfo<T> initial) { } currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); - currentProxy.setCachedState(getHAServiceState(currentProxy)); + currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy)); LOG.debug("Changed current proxy from {} to {}", initial == null ? "none" : initial.proxyInfo, currentProxy.proxyInfo); return currentProxy; } + /** + * Execute getHAServiceState() call with a timeout, to avoid a long wait when + * an NN becomes irresponsive to rpc requests + * (when a thread/heap dump is being taken, e.g.). + * + * For each getHAServiceState() call, a task is created and submitted to a + * threadpool for execution. We will wait for a response up to + * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out. + * + * The implemention is split into two functions so that we can unit test + * the second function. + */ + HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) { + + Callable<HAServiceState> getHAServiceStateTask = + new Callable<HAServiceState>() { + @Override + public HAServiceState call() { + return getHAServiceState(proxyInfo); + } + }; Review Comment: Replacing with a lambda is a bit cleaner. ``` Callable<HAServiceState> getHAServiceStateTask = () -> getHAServiceState(proxyInfo); ``` You can re-introduce the Callable when you need to backport to version without support for Java 8. ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java: ########## @@ -284,13 +319,68 @@ private synchronized NNProxyInfo<T> changeProxy(NNProxyInfo<T> initial) { } currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); - currentProxy.setCachedState(getHAServiceState(currentProxy)); + currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy)); LOG.debug("Changed current proxy from {} to {}", initial == null ? "none" : initial.proxyInfo, currentProxy.proxyInfo); return currentProxy; } + /** + * Execute getHAServiceState() call with a timeout, to avoid a long wait when + * an NN becomes irresponsive to rpc requests + * (when a thread/heap dump is being taken, e.g.). + * + * For each getHAServiceState() call, a task is created and submitted to a + * threadpool for execution. We will wait for a response up to + * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out. + * + * The implemention is split into two functions so that we can unit test + * the second function. + */ + HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) { + + Callable<HAServiceState> getHAServiceStateTask = + new Callable<HAServiceState>() { + @Override + public HAServiceState call() { + return getHAServiceState(proxyInfo); + } + }; + + try { + Future<HAServiceState> task = + nnProbingThreadPool.submit(getHAServiceStateTask); + return getHAServiceStateWithTimeout(proxyInfo, task); + } catch (RejectedExecutionException e) { + LOG.debug("Run out of threads to submit the request to query HA state. " + + "Ok to return null and we will fallback to use active NN to serve " + + "this request."); + return null; + } + } + + HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo, + Future<HAServiceState> task) { + HAServiceState state = null; + try { + state = task.get(namenodeHAStateProbeTimeoutSec, TimeUnit.SECONDS); + LOG.debug("HA State for " + proxyInfo.proxyInfo + " is " + state); + } catch (TimeoutException e) { + // Cancel the task on timeout + LOG.debug("Cancel NN probe task due to timeout for " + proxyInfo.proxyInfo); + if (task != null) { + task.cancel(true); + } + } catch (InterruptedException e) { + LOG.debug("Interrupted exception in NN probe task for " + proxyInfo.proxyInfo + ": " + e); + } catch (ExecutionException e) { + LOG.debug("Execution exception in NN probe task for " + proxyInfo.proxyInfo + ": " + e); + } Review Comment: I think we can combine these two catch blocks. We include the exception in the LOG so that should give enforce information of whether it was an InterruptedException and an ExecutionException ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java: ########## @@ -325,6 +357,94 @@ public void testObserverRetriableException() throws Exception { assertHandledBy(1); } + /** + * Happy case for GetHAServiceStateWithTimeout. + */ + @Test + public void testGetHAServiceStateWithTimeout() throws Exception { + setupProxyProvider(1); + final HAServiceState state = HAServiceState.STANDBY; + NNProxyInfo<ClientProtocol> dummyNNProxyInfo = + (NNProxyInfo<ClientProtocol>) mock(NNProxyInfo.class); + Future<HAServiceState> task = mock(Future.class); + when(task.get(anyLong(), any(TimeUnit.class))).thenReturn(state); + + HAServiceState state2 = + proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertEquals(state, state2); + verify(task).get(anyLong(), any(TimeUnit.class)); + verifyNoMoreInteractions(task); + verify(logger).debug(startsWith("HA State for")); + } + + /** + * Test TimeoutException for GetHAServiceStateWithTimeout. + */ + @Test + public void testTimeoutExceptionGetHAServiceStateWithTimeout() + throws Exception { + setupProxyProvider(1); + NNProxyInfo<ClientProtocol> dummyNNProxyInfo = + (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class); + Future<HAServiceState> task = mock(Future.class); + when(task.get(anyLong(), any(TimeUnit.class))).thenThrow( + new TimeoutException("Timeout")); + + HAServiceState state = + proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertNull(state); + verify(task).get(anyLong(), any(TimeUnit.class)); + verify(task).cancel(true); + verifyNoMoreInteractions(task); + verify(logger).debug(startsWith("Cancel NN probe task due to timeout for")); + } + + /** + * Test InterruptedException for GetHAServiceStateWithTimeout. + * Tests for the other two exceptions are the same and thus left out. + */ + @Test + public void testInterruptedExceptionGetHAServiceStateWithTimeout() + throws Exception { + setupProxyProvider(1); + NNProxyInfo<ClientProtocol> dummyNNProxyInfo = + (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class); + Future<HAServiceState> task = mock(Future.class); + when(task.get(anyLong(), any(TimeUnit.class))).thenThrow( + new InterruptedException("Interrupted")); + + HAServiceState state = + proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertNull(state); + verify(task).get(anyLong(), any(TimeUnit.class)); + verifyNoMoreInteractions(task); + verify(logger).debug( + startsWith("Interrupted exception in NN probe task for")); + } + + /** + * Test InterruptedException for GetHAServiceStateWithTimeout. + * Tests for the other two exceptions are the same and thus left out. Review Comment: What are `the other two exceptions` you are referring to? ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java: ########## @@ -213,6 +241,13 @@ public ObserverReadProxyProvider( observerProbeRetryPeriodMs = conf.getTimeDuration( OBSERVER_PROBE_RETRY_PERIOD_KEY, OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); + namenodeHAStateProbeTimeoutSec = conf.getTimeDuration( + NAMENODE_HA_STATE_PROBE_TIMEOUT, + NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT, TimeUnit.SECONDS); Review Comment: Can we make the configuration be in milliseconds? I'm concerned that getTimeDuration will round down when a user sets the timeout to be less than a second. ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java: ########## @@ -284,13 +319,68 @@ private synchronized NNProxyInfo<T> changeProxy(NNProxyInfo<T> initial) { } currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); - currentProxy.setCachedState(getHAServiceState(currentProxy)); + currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy)); LOG.debug("Changed current proxy from {} to {}", initial == null ? "none" : initial.proxyInfo, currentProxy.proxyInfo); return currentProxy; } + /** + * Execute getHAServiceState() call with a timeout, to avoid a long wait when + * an NN becomes irresponsive to rpc requests + * (when a thread/heap dump is being taken, e.g.). + * + * For each getHAServiceState() call, a task is created and submitted to a + * threadpool for execution. We will wait for a response up to + * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out. + * + * The implemention is split into two functions so that we can unit test Review Comment: Typo `implemention` -> `implementation` > Limit wait time for getHAServiceState in ObserverReaderProxy > ------------------------------------------------------------ > > Key: HDFS-17030 > URL: https://issues.apache.org/jira/browse/HDFS-17030 > Project: Hadoop HDFS > Issue Type: Improvement > Components: hdfs > Affects Versions: 3.4.0 > Reporter: Xing Lin > Assignee: Xing Lin > Priority: Minor > Labels: pull-request-available > > When namenode HA is enabled and a standby NN is not responsible, we have > observed it would take a long time to serve a request, even though we have a > healthy observer or active NN. > Basically, when a standby is down, the RPC client would (re)try to create > socket connection to that standby for _ipc.client.connect.timeout_ _* > ipc.client.connect.max.retries.on.timeouts_ before giving up. When we take a > heap dump at a standby, the NN still accepts the socket connection but it > won't send responses to these RPC requests and we would timeout after > _ipc.client.rpc-timeout.ms._ This adds a significantly latency. For clusters > at Linkedin, we set _ipc.client.rpc-timeout.ms_ to 120 seconds and thus a > request takes more than 2 mins to complete when we take a heap dump at a > standby. This has been causing user job failures. > We could set _ipc.client.rpc-timeout.ms to_ a smaller value when sending > getHAServiceState requests in ObserverReaderProxy (for user rpc requests, we > still use the original value from the config). However, that would double the > socket connection between clients and the NN (which is a deal-breaker). > The proposal is to add a timeout on getHAServiceState() calls in > ObserverReaderProxy and we will only wait for the timeout for an NN to > respond its HA state. Once we pass that timeout, we will move on to probe the > next NN. > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org