KeeProMise commented on code in PR #7108:
URL: https://github.com/apache/hadoop/pull/7108#discussion_r1795729948
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -824,6 +874,60 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz,
IOException ioe,
throw ioe;
}
+ /**
+ * Invoke the method sequentially on available namespaces,
+ * throw no namespace available exception, if no namespaces are available.
+ * Asynchronous version of invokeOnNs method.
+ * @param method the remote method.
+ * @param clazz Class for the return type.
+ * @param ioe IOException .
+ * @param nss List of name spaces in the federation
+ * @return the response received after invoking method.
+ * @throws IOException
+ */
+ <T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
+ Set<FederationNamespaceInfo> nss) throws IOException {
+ if (nss.isEmpty()) {
+ throw ioe;
+ }
+
+ asyncComplete(null);
+ Iterator<FederationNamespaceInfo> nsIterator = nss.iterator();
+ asyncForEach(nsIterator, (foreach, fnInfo) -> {
+ String nsId = fnInfo.getNameserviceId();
+ LOG.debug("Invoking {} on namespace {}", method, nsId);
+ asyncTry(() -> {
+ rpcClient.invokeSingle(nsId, method, clazz);
+ asyncApply(result -> {
+ if (result != null && isExpectedClass(clazz, result)) {
+ foreach.breakNow();
+ return result;
+ }
+ return null;
+ });
+ });
+
+ asyncCatch((AsyncCatchFunction<T, IOException>)(ret, ex) -> {
Review Comment:
Should use CatchFunction.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -791,6 +801,46 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T>
clazz)
return invokeOnNs(method, clazz, io, nss);
}
+ /**
+ * Invokes the method at default namespace, if default namespace is not
+ * available then at the other available namespaces.
+ * If the namespace is unavailable, retry with other namespaces.
+ * Asynchronous version of invokeAtAvailableNs method.
+ * @param <T> expected return type.
+ * @param method the remote method.
+ * @return the response received after invoking method.
+ * @throws IOException
+ */
+ <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
+ throws IOException {
+ String nsId = subclusterResolver.getDefaultNamespace();
+ // If default Ns is not present return result from first namespace.
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ // If no namespace is available, throw IOException.
+ IOException io = new IOException("No namespace available.");
+
+ asyncComplete(null);
+ if (!nsId.isEmpty()) {
+ asyncTry(() -> {
+ rpcClient.invokeSingle(nsId, method, clazz);
+ });
+
+ asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> {
+ if (!clientProto.isUnavailableSubclusterException(ioe)) {
+ LOG.debug("{} exception cannot be retried",
+ ioe.getClass().getSimpleName());
+ throw ioe;
+ }
+ nss.removeIf(n -> n.getNameserviceId().equals(nsId));
+ invokeOnNs(method, clazz, io, nss);
Review Comment:
Hi @hfutatzhanghb should use invokeOnNsAsync.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]