[ 
https://issues.apache.org/jira/browse/HDFS-17543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859682#comment-17859682
 ] 

ASF GitHub Bot commented on HDFS-17543:
---------------------------------------

Hexiaoqiao commented on code in PR #6868:
URL: https://github.com/apache/hadoop/pull/6868#discussion_r1651044329


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * The AsyncForEachRun class is part of the asynchronous operation utilities
+ * within the Hadoop Distributed File System (HDFS) Federation router.
+ * It provides the functionality to perform asynchronous operations on each
+ * element of an Iterator, applying a given async function and then applying
+ * a final transformation function to the results.
+ *
+ * <p>This class is designed to work with other asynchronous interfaces and
+ * utility classes to enable complex asynchronous workflows. It allows for
+ * non-blocking execution of tasks, which can improve the performance and
+ * responsiveness of HDFS operations.</p>
+ *
+ * <p>The class implements the AsyncRun interface, which means it can be used
+ * in asynchronous task chains. It maintains an Iterator of elements to
+ * process, an asyncFunction to apply to each element, and a final
+ * transformation function (thenApply) to produce the final result.</p>
+ *
+ * <p>The run method initiates the asynchronous operation, and the doOnce
+ * method recursively applies the asyncFunction to each element and handles
+ * the results. If the satisfy flag is set, the operation is completed
+ * with the current result.</p>
+ *
+ * <p>AsyncForEachRun is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ * for (I element : elements) {
+ *     T res = asyncFunction(element);
+ *     R result = thenApply(element, res);
+ *     if (satisfyCondition(res, result)) {
+ *         break;
+ *     }
+ * }
+ * return result;
+ * }
+ * </pre>
+ *
+ * @param <I> the type of the elements being iterated over
+ * @param <T> the type of the intermediate result from the asyncFunction
+ * @param <R> the type of the final result after applying the thenApply 
function
+ * @see AsyncRun
+ * @see AsyncApplyFunction
+ * @see BiFunction
+ */
+public class AsyncForEachRun<I, T, R> implements AsyncRun<R> {
+
+  private boolean satisfy = false;
+  private Iterator<I> iterator;
+  private I now;
+  private final CompletableFuture<R> result = new CompletableFuture<>();
+  private AsyncApplyFunction<I, T> asyncFunction;
+  private BiFunction<AsyncForEachRun<I, T, R>, T, R> thenApply;
+
+  @Override
+  public void run() {
+    try {
+      doOnce(null);
+    } catch (IOException ioe) {
+      result.completeExceptionally(ioe);
+    }
+    setCurCompletableFuture(result);
+  }
+
+  /**
+   * Performs a single iteration of the asynchronous for-each operation.
+   *
+   * <p>This method is called to process each element of the iterator provided 
to
+   * the {@link AsyncForEachRun} constructor. It applies the asynchronous 
function to
+   * the current element, then applies the 'then' function to the result. If 
the
+   * 'satisfy' condition is met, the iteration is halted, and the current 
result is
+   * used to complete the future. This method is recursive, so it will 
continue to
+   * call itself for the next elements until the iterator is exhausted or the 
satisfy
+   * condition is true.</p>
+   *
+   * @param ret the initial or current result to be passed into the 'then' 
function
+   * @throws IOException if an I/O error occurs while applying the 
asynchronous function
+   * @see #forEach(Iterator)
+   * @see #asyncDo(AsyncApplyFunction)
+   * @see #then(BiFunction)
+   */
+  private void doOnce(R ret) throws IOException {
+    if (!iterator.hasNext()) {
+      result.complete(ret);
+      return;
+    }
+    now = iterator.next();
+    CompletableFuture<T> completableFuture = asyncFunction.async(now);
+    completableFuture.thenApply(t -> {
+      R r = null;
+      try {
+        r = thenApply.apply(AsyncForEachRun.this, t);
+      } catch (IOException e) {
+        result.completeExceptionally(new CompletionException(e));
+        return null;
+      }
+      if (satisfy) {
+        result.complete(r);
+        return null;
+      }
+      try {
+        doOnce(r);
+      } catch (IOException e) {
+        throw new CompletionException(e);
+      }
+      return null;
+    }).exceptionally(e ->
+        result.completeExceptionally(e));
+  }
+
+  /**
+   * Retrieves the current element being processed in the asynchronous 
for-each loop.
+   *
+   * <p>This method provides access to the element that is currently being
+   * operated on within the asynchronous iteration. It can be useful for
+   * inspection, logging, or other purposes that require knowledge of the
+   * current state of the iteration.</p>
+   *
+   * @return the current element of type {@code I} being processed in the 
iterator.
+   * @see #forEach(Iterator)
+   * @see #run()
+   */
+  public I getNow() {

Review Comment:
   The name of this function is not matched with annotation OR some ambiguity 
here? my first feeling it will return timestamp, but implement one abstract 
type returned actually here.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Represents a function that accepts a value of type T and produces a result 
of type R.
+ * This interface extends {@link Async} and provides methods to apply the 
function
+ * asynchronously using {@link CompletableFuture}.
+ *
+ * <p>ApplyFunction is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ *    T res = doAsync(input);
+ *    // Can use ApplyFunction
+ *    R result = thenApply(res);
+ * }
+ * </pre>
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the type of the result of the function
+ */
+@FunctionalInterface
+public interface ApplyFunction<T, R> extends Async<R>{
+
+  /**
+   * Applies this function to the given argument.
+   *
+   * @param t the function argument
+   * @return the function result
+   * @throws IOException if an I/O error occurs
+   */
+  R apply(T t) throws IOException;
+
+  /**
+   * Applies this function asynchronously to the result of the given {@link 
CompletableFuture}.
+   * The function is executed on the same thread as the completion of the 
given future.
+   *
+   * @param in the input future
+   * @return a new future that holds the result of the function application
+   */
+  default CompletableFuture<R> apply(CompletableFuture<T> in) {
+    return in.thenApply(t -> {
+      try {
+        return ApplyFunction.this.apply(t);
+      } catch (IOException e) {
+        throw new CompletionException(e);
+      }
+    });
+  }
+
+  /**
+   * Applies this function asynchronously to the result of the given {@link 
CompletableFuture},
+   * using the specified executor for the asynchronous computation.
+   *
+   * @param in the input future
+   * @param executor the executor to use for the asynchronous computation
+   * @return a new future that holds the result of the function application
+   */
+  default CompletableFuture<R> apply(CompletableFuture<T> in, Executor 
executor) {
+    return in.thenApplyAsync(t -> {
+      try {
+        return ApplyFunction.this.apply(t);
+      } catch (IOException e) {
+        throw new CompletionException(e);
+      }
+    }, executor);
+  }
+

Review Comment:
   Please remove the extra blank line.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java:
##########
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.Async.CUR_COMPLETABLE_FUTURE;
+
+/**
+ * The AsyncUtil class provides a collection of utility methods to simplify
+ * the implementation of asynchronous operations using Java's 
CompletableFuture.
+ * It encapsulates common patterns such as applying functions, handling 
exceptions,
+ * and executing tasks in a non-blocking manner. This class is designed to work
+ * with Hadoop's asynchronous router operations in HDFS Federation.
+ *
+ * <p>The utility methods support a fluent-style API, allowing for the 
chaining of
+ * asynchronous operations. For example, after an asynchronous operation 
completes,
+ * a function can be applied to its result, and the process can continue with
+ * the new result. This is particularly useful for complex workflows that 
require
+ * multiple steps, where each step depends on the completion of the previous 
one.</p>
+ *
+ * <p>The class also provides methods to handle exceptions that may occur 
during a
+ * synchronous operation. This ensures that error handling is integrated 
smoothly
+ * into the workflow, allowing for robust and fault-tolerant applications.</p>
+ *
+ * @see CompletableFuture
+ * @see ApplyFunction
+ * @see AsyncApplyFunction
+ * @see AsyncRun
+ * @see AsyncForEachRun
+ * @see CatchFunction
+ * @see AsyncCatchFunction
+ * @see FinallyFunction
+ */
+public final class AsyncUtil {
+  private static final Boolean BOOLEAN_RESULT = false;
+  private static final Long LONG_RESULT = -1L;
+  private static final Object NULL_RESULT = null;
+
+  private AsyncUtil(){}
+
+
+  /**
+   * Provides a default value based on the type specified.
+   *
+   * @param clazz The {@link Class} object representing the type of the value
+   *              to be returned.
+   * @param <R>   The type of the value to be returned.
+   * @return An object with a value determined by the type:
+   *         <ul>
+   *           <li>{@code false} if {@code clazz} is {@link Boolean},
+   *           <li>-1 if {@code clazz} is {@link Long},
+   *           <li>{@code null} for any other type.
+   *         </ul>
+   */
+  public static <R> R asyncReturn(Class<R> clazz) {
+    if (clazz == null) {
+      return null;
+    }
+    if (clazz.equals(Boolean.class)) {
+      return (R) BOOLEAN_RESULT;
+    } else if (clazz.equals(Long.class)) {
+      return (R) LONG_RESULT;
+    }
+    return (R) NULL_RESULT;
+  }
+
+  public static <R> R syncReturn(Class<R> clazz)

Review Comment:
   Suggest to add some javadoc here.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * The AsyncApplyFunction interface represents a function that
+ * asynchronously accepts a value of type T and produces a result
+ * of type R. This interface extends {@link ApplyFunction} and is
+ * designed to be used with asynchronous computation frameworks,
+ * such as Java's {@link java.util.concurrent.CompletableFuture}.
+ *
+ * <p>An implementation of this interface is expected to perform an
+ * asynchronous operation and return a result, which is typically
+ * represented as a {@code CompletableFuture<R>}. This allows for
+ * non-blocking execution of tasks and is particularly useful for
+ * I/O operations or any operation that may take a significant amount
+ * of time to complete.</p>
+ *
+ * <p>AsyncApplyFunction is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ *    T res = doAsync1(input);
+ *    // Can use AsyncApplyFunction
+ *    R result = doAsync2(res);
+ * }
+ * </pre>
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the type of the result of the function
+ * @see ApplyFunction
+ * @see java.util.concurrent.CompletableFuture
+ */
+@FunctionalInterface
+public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> {
+
+  /**
+   * Asynchronously applies this function to the given argument.
+   *
+   * <p>This method is intended to initiate the function application
+   * without waiting for the result. It is typically used when the
+   * result of the operation is not required immediately or when the
+   * operation is part of a larger asynchronous workflow.</p>
+   *
+   * @param t the function argument
+   * @throws IOException if an I/O error occurs during the application
+   *                     of the function
+   */
+  void applyAsync(T t) throws IOException;
+
+  /**
+   * Synchronously applies this function to the given argument and
+   * returns the result.
+   *
+   * <p>This method waits for the asynchronous operation to complete
+   * and returns its result. It is useful when the result is needed
+   * immediately and the calling code cannot proceed without it.</p>
+   *
+   * @param t the function argument
+   * @return the result of applying the function to the argument
+   * @throws IOException if an I/O error occurs during the application
+   *                     of the function
+   */
+  @Override
+  default R apply(T t) throws IOException {
+    applyAsync(t);
+    return result();
+  }
+
+
+  default CompletableFuture<R> async(T t) throws IOException {

Review Comment:
   Suggest to add some javadoc here.





> [ARR] AsyncUtil makes asynchronous code more concise and easier.
> ----------------------------------------------------------------
>
>                 Key: HDFS-17543
>                 URL: https://issues.apache.org/jira/browse/HDFS-17543
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: Jian Zhang
>            Assignee: Jian Zhang
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HDFS-17543.001.patch
>
>
> *Describe*
> Using the original Java CompletableFuture to implement an asynchronous router 
> is not conducive to the development of the open source community due to poor 
> code readability and maintainability. Therefore, I have implemented a 
> lightweight tool that encapsulates CompletableFuture.
> By using this AsyncUtil, it is easy to write readable asynchronous code, 
> which is easy for everyone to understand. I have provided an example of this 
> tool class in UT, demonstrating how to change synchronous methods to 
> asynchronous code.
>  
> *Examples and tests*
> SyncClass provides some common synchronization methods
> AsyncClass is the corresponding asynchronous implementation
> Use TestAsyncUtil to simultaneously test the methods corresponding to 
> SyncClass and AsyncClass, ensuring that both asynchronous and synchronous 
> methods return the same results
>  
> {*}NOTE{*}: In HDFS-17545, I used this tool to implement RouterAsyncRpcClient 
> which extends RouterRpcClient.  You can also view RouterAsyncRpcClient to 
> understand the use of this tool.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to