simbadzina commented on code in PR #5700:
URL: https://github.com/apache/hadoop/pull/5700#discussion_r1212029782


##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -155,12 +173,22 @@
    */
   private long observerProbeRetryPeriodMs;
 
+  /**
+   * Timeout in seconds when we try to get the HA state of an namenode.

Review Comment:
   Nit : `a namenode`



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -88,6 +96,16 @@
   /** Observer probe retry period default to 10 min. */
   static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000;
 
+  /** Timeout to cancel the ha-state probe rpc request for an namenode. */
+  static final String NAMENODE_HA_STATE_PROBE_TIMEOUT =
+      "dfs.client.namenode.ha-state.probe.timeout";

Review Comment:
   Can you use `HdfsClientConfigKeys.Failover.PREFIX` as the prefix for this 
config.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -213,6 +241,13 @@ public ObserverReadProxyProvider(
     observerProbeRetryPeriodMs = conf.getTimeDuration(
         OBSERVER_PROBE_RETRY_PERIOD_KEY,
         OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+    namenodeHAStateProbeTimeoutSec = conf.getTimeDuration(
+        NAMENODE_HA_STATE_PROBE_TIMEOUT,
+        NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT, TimeUnit.SECONDS);
+    // Disallow negative values for namenodeHAStateProbeTimeoutSec
+    if (namenodeHAStateProbeTimeoutSec < 0) {
+      namenodeHAStateProbeTimeoutSec = NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT;
+    }

Review Comment:
   Can you make a negative value mean no timeout instead. That's the typical 
meaning of a negative value in other configs.
   
   If you really wants to enforce having a timeout, then it may be better to 
log and throw and error here.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo<T> 
changeProxy(NNProxyInfo<T> initial) {
     }
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.setCachedState(getHAServiceState(currentProxy));
+    currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
     return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implemention is split into two functions so that we can unit test
+   * the second function.
+   */
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) {
+
+    Callable<HAServiceState> getHAServiceStateTask =
+        new Callable<HAServiceState>() {
+          @Override
+          public HAServiceState call() {
+            return getHAServiceState(proxyInfo);
+          }
+        };

Review Comment:
   Replacing with a lambda is a bit cleaner.
   ```
   Callable<HAServiceState> getHAServiceStateTask = () -> 
getHAServiceState(proxyInfo);
   ```
   
   You can re-introduce the Callable when you need to backport to version 
without support for Java 8.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo<T> 
changeProxy(NNProxyInfo<T> initial) {
     }
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.setCachedState(getHAServiceState(currentProxy));
+    currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
     return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implemention is split into two functions so that we can unit test
+   * the second function.
+   */
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) {
+
+    Callable<HAServiceState> getHAServiceStateTask =
+        new Callable<HAServiceState>() {
+          @Override
+          public HAServiceState call() {
+            return getHAServiceState(proxyInfo);
+          }
+        };
+
+    try {
+      Future<HAServiceState> task =
+          nnProbingThreadPool.submit(getHAServiceStateTask);
+      return getHAServiceStateWithTimeout(proxyInfo, task);
+    } catch (RejectedExecutionException e) {
+      LOG.debug("Run out of threads to submit the request to query HA state. "
+          + "Ok to return null and we will fallback to use active NN to serve "
+          + "this request.");
+      return null;
+    }
+  }
+
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo,
+      Future<HAServiceState> task) {
+    HAServiceState state = null;
+    try {
+      state = task.get(namenodeHAStateProbeTimeoutSec, TimeUnit.SECONDS);
+      LOG.debug("HA State for " + proxyInfo.proxyInfo + " is " + state);
+    } catch (TimeoutException e) {
+      // Cancel the task on timeout
+      LOG.debug("Cancel NN probe task due to timeout for " + 
proxyInfo.proxyInfo);
+      if (task != null) {
+        task.cancel(true);
+      }
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted exception in NN probe task for " + 
proxyInfo.proxyInfo + ": " + e);
+    } catch (ExecutionException e) {
+      LOG.debug("Execution exception in NN probe task for " + 
proxyInfo.proxyInfo + ": " + e);
+    }

Review Comment:
   I think we can combine these two catch blocks. We include the exception in 
the LOG so that should give enforce information of whether it was an 
InterruptedException and an ExecutionException



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java:
##########
@@ -325,6 +357,94 @@ public void testObserverRetriableException() throws 
Exception {
     assertHandledBy(1);
   }
 
+  /**
+   * Happy case for GetHAServiceStateWithTimeout.
+   */
+  @Test
+  public void testGetHAServiceStateWithTimeout() throws Exception {
+    setupProxyProvider(1);
+    final HAServiceState state = HAServiceState.STANDBY;
+    NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+        (NNProxyInfo<ClientProtocol>) mock(NNProxyInfo.class);
+    Future<HAServiceState> task = mock(Future.class);
+    when(task.get(anyLong(), any(TimeUnit.class))).thenReturn(state);
+
+    HAServiceState state2 =
+        proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+    assertEquals(state, state2);
+    verify(task).get(anyLong(), any(TimeUnit.class));
+    verifyNoMoreInteractions(task);
+    verify(logger).debug(startsWith("HA State for"));
+  }
+
+  /**
+   * Test TimeoutException for GetHAServiceStateWithTimeout.
+   */
+  @Test
+  public void testTimeoutExceptionGetHAServiceStateWithTimeout()
+      throws Exception {
+    setupProxyProvider(1);
+    NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+        (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
+    Future<HAServiceState> task = mock(Future.class);
+    when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(
+        new TimeoutException("Timeout"));
+
+    HAServiceState state =
+        proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+    assertNull(state);
+    verify(task).get(anyLong(), any(TimeUnit.class));
+    verify(task).cancel(true);
+    verifyNoMoreInteractions(task);
+    verify(logger).debug(startsWith("Cancel NN probe task due to timeout 
for"));
+  }
+
+  /**
+   * Test InterruptedException for GetHAServiceStateWithTimeout.
+   * Tests for the other two exceptions are the same and thus left out.
+   */
+  @Test
+  public void testInterruptedExceptionGetHAServiceStateWithTimeout()
+      throws Exception {
+    setupProxyProvider(1);
+    NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+        (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
+    Future<HAServiceState> task = mock(Future.class);
+    when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(
+        new InterruptedException("Interrupted"));
+
+    HAServiceState state =
+        proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+    assertNull(state);
+    verify(task).get(anyLong(), any(TimeUnit.class));
+    verifyNoMoreInteractions(task);
+    verify(logger).debug(
+        startsWith("Interrupted exception in NN probe task for"));
+  }
+
+  /**
+   * Test InterruptedException for GetHAServiceStateWithTimeout.
+   * Tests for the other two exceptions are the same and thus left out.

Review Comment:
   What are `the other two exceptions` you are referring to?



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -213,6 +241,13 @@ public ObserverReadProxyProvider(
     observerProbeRetryPeriodMs = conf.getTimeDuration(
         OBSERVER_PROBE_RETRY_PERIOD_KEY,
         OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+    namenodeHAStateProbeTimeoutSec = conf.getTimeDuration(
+        NAMENODE_HA_STATE_PROBE_TIMEOUT,
+        NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT, TimeUnit.SECONDS);

Review Comment:
   Can we make the configuration be in milliseconds? I'm concerned that 
getTimeDuration will round down when a user sets the timeout to be less than a 
second.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##########
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo<T> 
changeProxy(NNProxyInfo<T> initial) {
     }
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.setCachedState(getHAServiceState(currentProxy));
+    currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
     return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implemention is split into two functions so that we can unit test

Review Comment:
   Typo `implemention` -> `implementation`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to