This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c40a6bd46a5 HDFS-17166. RBF: Throwing NoNamenodesAvailableException
for a long time, when failover (#5990)
c40a6bd46a5 is described below
commit c40a6bd46a5c2438f8dc53c9076438ef1cf1b098
Author: Jian Zhang <[email protected]>
AuthorDate: Wed Sep 6 08:48:27 2023 +0800
HDFS-17166. RBF: Throwing NoNamenodesAvailableException for a long time,
when failover (#5990)
---
.../resolver/ActiveNamenodeResolver.java | 10 +++
.../resolver/MembershipNamenodeResolver.java | 41 +++++++++++
.../server/federation/router/RouterRpcClient.java | 2 +
.../server/federation/MiniRouterDFSCluster.java | 9 +++
.../hdfs/server/federation/MockResolver.java | 5 ++
.../router/TestRouterClientRejectOverload.java | 81 ++++++++++++++++++++++
6 files changed, 148 insertions(+)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java
index cae1f478604..de89a152c2b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java
@@ -146,4 +146,14 @@ public interface ActiveNamenodeResolver {
* @param routerId Unique string identifier for the router.
*/
void setRouterId(String routerId);
+
+ /**
+ * Rotate cache, make the current namenode have the lowest priority,
+ * to ensure that the current namenode will not be accessed first next time.
+ *
+ * @param nsId name service id
+ * @param namenode namenode contexts
+ * @param listObserversFirst Observer read case, observer NN will be ranked
first
+ */
+ void rotateCache(String nsId, FederationNamenodeContext namenode, boolean
listObserversFirst);
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
index db1dcdf1818..c0e800e0430 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
@@ -478,4 +478,45 @@ public class MembershipNamenodeResolver
public void setRouterId(String router) {
this.routerId = router;
}
+
+ /**
+ * Rotate cache, make the current namenode have the lowest priority,
+ * to ensure that the current namenode will not be accessed first next time.
+ *
+ * @param nsId name service id
+ * @param namenode namenode contexts
+ * @param listObserversFirst Observer read case, observer NN will be ranked
first
+ */
+ @Override
+ public void rotateCache(
+ String nsId, FederationNamenodeContext namenode, boolean
listObserversFirst) {
+ cacheNS.compute(Pair.of(nsId, listObserversFirst), (ns, namenodeContexts)
-> {
+ if (namenodeContexts == null || namenodeContexts.size() <= 1) {
+ return namenodeContexts;
+ }
+ FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0);
+ /*
+ * If the first nn in the cache is active, the active nn priority cannot
be lowered.
+ * This happens when other threads have already updated the cache.
+ */
+ if (firstNamenodeContext.getState().equals(ACTIVE)) {
+ return namenodeContexts;
+ }
+ /*
+ * If the first nn in the cache at this time is not the nn
+ * that needs to be lowered in priority, there is no need to rotate.
+ * This happens when other threads have already rotated the cache.
+ */
+ if
(firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) {
+ List<FederationNamenodeContext> rotatedNnContexts = new
ArrayList<>(namenodeContexts);
+ Collections.rotate(rotatedNnContexts, -1);
+ String firstNamenodeId = namenodeContexts.get(0).getNamenodeId();
+ LOG.info("Rotate cache of pair <ns: {}, observer first: {}>, put
namenode: {} in the " +
+ "first position of the cache and namenode: {} in the last position
of the cache",
+ nsId, listObserversFirst, firstNamenodeId,
namenode.getNamenodeId());
+ return rotatedNnContexts;
+ }
+ return namenodeContexts;
+ });
+ }
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 321d97e5dac..b38900c3bc2 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -599,6 +599,8 @@ public class RouterRpcClient {
}
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
+ // Rotate cache so that client can retry the next namenode in the
cache
+ this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver);
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
} else {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
index 2c703958704..bf22cf01148 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
@@ -1202,4 +1202,13 @@ public class MiniRouterDFSCluster {
throw new IOException("Cannot wait for the namenodes", e);
}
}
+
+ /**
+ * Get cache flush interval in milliseconds.
+ *
+ * @return Cache flush interval in milliseconds.
+ */
+ public long getCacheFlushInterval() {
+ return cacheFlushInterval;
+ }
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
index 4aaa8e7569e..554879856ac 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
@@ -397,6 +397,11 @@ public class MockResolver
public void setRouterId(String router) {
}
+ @Override
+ public void rotateCache(
+ String nsId, FederationNamenodeContext namenode, boolean
listObserversFirst) {
+ }
+
/**
* Mocks the availability of default namespace.
* @param b if true default namespace is unset.
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
index 8d776546801..176ac4b0782 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
+import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
+import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.STANDBY;
import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer;
import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby;
@@ -42,16 +44,19 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
@@ -357,6 +362,82 @@ public class TestRouterClientRejectOverload {
assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes());
}
+ /**
+ * When failover occurs, the router may record that the ns has no active
namenode.
+ * Only when the router updates the cache next time can the memory status be
updated,
+ * causing the router to report NoNamenodesAvailableException for a long
time.
+ */
+ @Test
+ public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws
Exception {
+ setupCluster(false, true);
+ transitionClusterNSToStandby(cluster);
+ for (RouterContext routerContext : cluster.getRouters()) {
+ // Manually trigger the heartbeat
+ Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
+ .getRouter().getNamenodeHeartbeatServices();
+ for (NamenodeHeartbeatService service : heartbeatServices) {
+ service.periodicInvoke();
+ }
+ // Update service cache
+ routerContext.getRouter().getStateStore().refreshCaches(true);
+ }
+ // Record the time after the router first updated the cache
+ long firstLoadTime = Time.now();
+ List<MiniRouterDFSCluster.NamenodeContext> namenodes =
cluster.getNamenodes();
+
+ // Make sure all namenodes are in standby state
+ for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) {
+ assertEquals(STANDBY.ordinal(),
namenodeContext.getNamenode().getNameNodeState());
+ }
+
+ Configuration conf = cluster.getRouterClientConf();
+ // Set dfs.client.failover.random.order false, to pick 1st router at first
+ conf.setBoolean("dfs.client.failover.random.order", false);
+
+ DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);
+
+ for (RouterContext routerContext : cluster.getRouters()) {
+ // Get the second namenode in the router cache and make it active
+ List<? extends FederationNamenodeContext> ns0 = routerContext.getRouter()
+ .getNamenodeResolver()
+ .getNamenodesForNameserviceId("ns0", false);
+
+ String nsId = ns0.get(1).getNamenodeId();
+ cluster.switchToActive("ns0", nsId);
+ // Manually trigger the heartbeat, but the router does not manually load
the cache
+ Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
+ .getRouter().getNamenodeHeartbeatServices();
+ for (NamenodeHeartbeatService service : heartbeatServices) {
+ service.periodicInvoke();
+ }
+ assertEquals(ACTIVE.ordinal(),
+ cluster.getNamenode("ns0",
nsId).getNamenode().getNameNodeState());
+ }
+
+ // Get router0 metrics
+ FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0)
+ .getRouter().getRpcServer().getRPCMetrics();
+ // Original failures
+ long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();
+
+ /*
+ * At this time, the router has recorded 2 standby namenodes in memory,
+ * and the first accessed namenode is indeed standby,
+ * then an NoNamenodesAvailableException will be reported for the first
access,
+ * and the next access will be successful.
+ */
+ routerClient.getFileInfo("/");
+ long successReadTime = Time.now();
+ assertEquals(originalRouter0Failures + 1,
rpcMetrics0.getProxyOpNoNamenodes());
+
+ /*
+ * access the active namenode without waiting for the router to update the
cache,
+ * even if there are 2 standby states recorded in the router memory.
+ */
+ assertTrue(successReadTime - firstLoadTime <
cluster.getCacheFlushInterval());
+ }
+
+
@Test
public void testAsyncCallerPoolMetrics() throws Exception {
setupCluster(true, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]