[GitHub] [hadoop] goiri commented on a diff in pull request #5700: HDFS-17030. Limit wait time for getHAServiceState in ObserverReaderProxy

2023-06-13 Thread via GitHub


goiri commented on code in PR #5700:
URL: https://github.com/apache/hadoop/pull/5700#discussion_r1228400308


##
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##
@@ -155,12 +174,21 @@
*/
   private long observerProbeRetryPeriodMs;
 
+  /**
+   * Timeout in ms when we try to get the HA state of a namenode.
+   */
+  private long namenodeHAStateProbeTimeoutMs;
+
   /**
* The previous time where zero observer were found. If there was observer,
* or it is initialization, this is set to 0.
*/
   private long lastObserverProbeTime;
 
+  private final ExecutorService nnProbingThreadPool =
+  new ThreadPoolExecutor(1, 4, 1L, TimeUnit.MINUTES,

Review Comment:
   Lots of magic numbers here.
   They should be constants with names meaning something or at least comments.



##
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java:
##
@@ -58,30 +74,53 @@
  * NameNode to communicate with.
  */
 public class TestObserverReadProxyProvider {
+  private final static int SLOW_RESPONSE_SLEEP_TIME = 5000; // 5 s

Review Comment:
   Add the unit.
   Probably al use `TimeUnit.SECONDS.toMillis(5)` or the right way.



##
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java:
##
@@ -58,30 +74,53 @@
  * NameNode to communicate with.
  */
 public class TestObserverReadProxyProvider {
+  private final static int SLOW_RESPONSE_SLEEP_TIME = 5000; // 5 s
+  private final static int NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT = 2000; // 2s
+  private final static int NAMENODE_HA_STATE_PROBE_TIMEOUT_LONG = 25000; // 25s
 
   private static final LocatedBlock[] EMPTY_BLOCKS = new LocatedBlock[0];
   private String ns;
   private URI nnURI;
-  private Configuration conf;
 
   private ObserverReadProxyProvider proxyProvider;
+  @Mock private Logger logger;
+
   private NameNodeAnswer[] namenodeAnswers;
   private String[] namenodeAddrs;
 
   @Before
   public void setup() throws Exception {
 ns = "testcluster";
 nnURI = URI.create("hdfs://" + ns);
-conf = new Configuration();
-conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
-// Set observer probe retry period to 0. Required by the tests that
-// transition observer back and forth
-conf.setTimeDuration(
-OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
-conf.setBoolean(HdfsClientConfigKeys.Failover.RANDOM_ORDER, false);
+
+MockitoAnnotations.initMocks(this);
+  }
+
+  /**
+   * Replace LOG in ObserverReadProxy with a mocked logger.
+   */
+  private void setupMockLoggerForProxyProvider()

Review Comment:
   Wasn't there a utility to capture logs?
   LogCapturer?
   ```
   LogCapturer logs = 
GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(DataStreamer.class));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] goiri commented on a diff in pull request #5700: HDFS-17030. Limit wait time for getHAServiceState in ObserverReaderProxy

2023-05-31 Thread via GitHub


goiri commented on code in PR #5700:
URL: https://github.com/apache/hadoop/pull/5700#discussion_r1212047828


##
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo 
changeProxy(NNProxyInfo initial) {
 }
 currentIndex = (currentIndex + 1) % nameNodeProxies.size();
 currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-currentProxy.setCachedState(getHAServiceState(currentProxy));
+currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
 LOG.debug("Changed current proxy from {} to {}",
 initial == null ? "none" : initial.proxyInfo,
 currentProxy.proxyInfo);
 return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implemention is split into two functions so that we can unit test
+   * the second function.
+   */
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo proxyInfo) {
+
+Callable getHAServiceStateTask =

Review Comment:
   Can this be a lambda?



##
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##
@@ -88,6 +96,16 @@
   /** Observer probe retry period default to 10 min. */
   static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000;
 
+  /** Timeout to cancel the ha-state probe rpc request for an namenode. */
+  static final String NAMENODE_HA_STATE_PROBE_TIMEOUT =
+  "dfs.client.namenode.ha-state.probe.timeout";
+  /**
+   * Namenode ha-state probe timeout default to 25 sec.
+   * ipc.client.connect.timeout defaults to be 20 seconds. So, in 25 seconds,
+   * we can try twice to connect to an NN.
+   */
+  static final long NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT = 25;

Review Comment:
   Use TimeUnit.



##
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##
@@ -155,12 +173,22 @@
*/
   private long observerProbeRetryPeriodMs;
 
+  /**
+   * Timeout in seconds when we try to get the HA state of an namenode.
+   */
+  @VisibleForTesting
+  private long namenodeHAStateProbeTimeoutSec;
+
   /**
* The previous time where zero observer were found. If there was observer,
* or it is initialization, this is set to 0.
*/
   private long lastObserverProbeTime;
 
+  private final ExecutorService nnProbingThreadPool =
+  new ThreadPoolExecutor(1, 4, 6L, TimeUnit.MILLISECONDS,

Review Comment:
   Give constant names to the magical numbers.



##
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java:
##
@@ -284,13 +319,68 @@ private synchronized NNProxyInfo 
changeProxy(NNProxyInfo initial) {
 }
 currentIndex = (currentIndex + 1) % nameNodeProxies.size();
 currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-currentProxy.setCachedState(getHAServiceState(currentProxy));
+currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
 LOG.debug("Changed current proxy from {} to {}",
 initial == null ? "none" : initial.proxyInfo,
 currentProxy.proxyInfo);
 return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implemention is split into two functions so that we can unit test
+   * the second function.
+   */
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo proxyInfo) {
+
+Callable getHAServiceStateTask =
+new Callable() {
+  @Override
+  public HAServiceState call() {
+return getHAServiceState(proxyInfo);
+  }
+};
+
+try {
+  Future task =
+  nnProbingThreadPool.submit(getHAServiceStateTask);
+  return getHAServiceStateWithTimeout(proxyInfo, task);
+} catch (RejectedExecutionException e) {
+  LOG.debug("Run out of threads to submit the request to query HA state. "
+  + "Ok to return null and we will fallback to use active NN to serve "
+  + "this request.");
+  retur