[ https://issues.apache.org/jira/browse/HDFS-17543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859682#comment-17859682 ]
ASF GitHub Bot commented on HDFS-17543: --------------------------------------- Hexiaoqiao commented on code in PR #6868: URL: https://github.com/apache/hadoop/pull/6868#discussion_r1651044329 ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java: ########## @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * The AsyncForEachRun class is part of the asynchronous operation utilities + * within the Hadoop Distributed File System (HDFS) Federation router. + * It provides the functionality to perform asynchronous operations on each + * element of an Iterator, applying a given async function and then applying + * a final transformation function to the results. + * + * <p>This class is designed to work with other asynchronous interfaces and + * utility classes to enable complex asynchronous workflows. It allows for + * non-blocking execution of tasks, which can improve the performance and + * responsiveness of HDFS operations.</p> + * + * <p>The class implements the AsyncRun interface, which means it can be used + * in asynchronous task chains. It maintains an Iterator of elements to + * process, an asyncFunction to apply to each element, and a final + * transformation function (thenApply) to produce the final result.</p> + * + * <p>The run method initiates the asynchronous operation, and the doOnce + * method recursively applies the asyncFunction to each element and handles + * the results. If the satisfy flag is set, the operation is completed + * with the current result.</p> + * + * <p>AsyncForEachRun is used to implement the following semantics:</p> + * <pre> + * {@code + * for (I element : elements) { + * T res = asyncFunction(element); + * R result = thenApply(element, res); + * if (satisfyCondition(res, result)) { + * break; + * } + * } + * return result; + * } + * </pre> + * + * @param <I> the type of the elements being iterated over + * @param <T> the type of the intermediate result from the asyncFunction + * @param <R> the type of the final result after applying the thenApply function + * @see AsyncRun + * @see AsyncApplyFunction + * @see BiFunction + */ +public class AsyncForEachRun<I, T, R> implements AsyncRun<R> { + + private boolean satisfy = false; + private Iterator<I> iterator; + private I now; + private final CompletableFuture<R> result = new CompletableFuture<>(); + private AsyncApplyFunction<I, T> asyncFunction; + private BiFunction<AsyncForEachRun<I, T, R>, T, R> thenApply; + + @Override + public void run() { + try { + doOnce(null); + } catch (IOException ioe) { + result.completeExceptionally(ioe); + } + setCurCompletableFuture(result); + } + + /** + * Performs a single iteration of the asynchronous for-each operation. + * + * <p>This method is called to process each element of the iterator provided to + * the {@link AsyncForEachRun} constructor. It applies the asynchronous function to + * the current element, then applies the 'then' function to the result. If the + * 'satisfy' condition is met, the iteration is halted, and the current result is + * used to complete the future. This method is recursive, so it will continue to + * call itself for the next elements until the iterator is exhausted or the satisfy + * condition is true.</p> + * + * @param ret the initial or current result to be passed into the 'then' function + * @throws IOException if an I/O error occurs while applying the asynchronous function + * @see #forEach(Iterator) + * @see #asyncDo(AsyncApplyFunction) + * @see #then(BiFunction) + */ + private void doOnce(R ret) throws IOException { + if (!iterator.hasNext()) { + result.complete(ret); + return; + } + now = iterator.next(); + CompletableFuture<T> completableFuture = asyncFunction.async(now); + completableFuture.thenApply(t -> { + R r = null; + try { + r = thenApply.apply(AsyncForEachRun.this, t); + } catch (IOException e) { + result.completeExceptionally(new CompletionException(e)); + return null; + } + if (satisfy) { + result.complete(r); + return null; + } + try { + doOnce(r); + } catch (IOException e) { + throw new CompletionException(e); + } + return null; + }).exceptionally(e -> + result.completeExceptionally(e)); + } + + /** + * Retrieves the current element being processed in the asynchronous for-each loop. + * + * <p>This method provides access to the element that is currently being + * operated on within the asynchronous iteration. It can be useful for + * inspection, logging, or other purposes that require knowledge of the + * current state of the iteration.</p> + * + * @return the current element of type {@code I} being processed in the iterator. + * @see #forEach(Iterator) + * @see #run() + */ + public I getNow() { Review Comment: The name of this function is not matched with annotation OR some ambiguity here? my first feeling it will return timestamp, but implement one abstract type returned actually here. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java: ########## @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * Represents a function that accepts a value of type T and produces a result of type R. + * This interface extends {@link Async} and provides methods to apply the function + * asynchronously using {@link CompletableFuture}. + * + * <p>ApplyFunction is used to implement the following semantics:</p> + * <pre> + * {@code + * T res = doAsync(input); + * // Can use ApplyFunction + * R result = thenApply(res); + * } + * </pre> + * + * @param <T> the type of the input to the function + * @param <R> the type of the result of the function + */ +@FunctionalInterface +public interface ApplyFunction<T, R> extends Async<R>{ + + /** + * Applies this function to the given argument. + * + * @param t the function argument + * @return the function result + * @throws IOException if an I/O error occurs + */ + R apply(T t) throws IOException; + + /** + * Applies this function asynchronously to the result of the given {@link CompletableFuture}. + * The function is executed on the same thread as the completion of the given future. + * + * @param in the input future + * @return a new future that holds the result of the function application + */ + default CompletableFuture<R> apply(CompletableFuture<T> in) { + return in.thenApply(t -> { + try { + return ApplyFunction.this.apply(t); + } catch (IOException e) { + throw new CompletionException(e); + } + }); + } + + /** + * Applies this function asynchronously to the result of the given {@link CompletableFuture}, + * using the specified executor for the asynchronous computation. + * + * @param in the input future + * @param executor the executor to use for the asynchronous computation + * @return a new future that holds the result of the function application + */ + default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) { + return in.thenApplyAsync(t -> { + try { + return ApplyFunction.this.apply(t); + } catch (IOException e) { + throw new CompletionException(e); + } + }, executor); + } + Review Comment: Please remove the extra blank line. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java: ########## @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.function.Function; + +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.CUR_COMPLETABLE_FUTURE; + +/** + * The AsyncUtil class provides a collection of utility methods to simplify + * the implementation of asynchronous operations using Java's CompletableFuture. + * It encapsulates common patterns such as applying functions, handling exceptions, + * and executing tasks in a non-blocking manner. This class is designed to work + * with Hadoop's asynchronous router operations in HDFS Federation. + * + * <p>The utility methods support a fluent-style API, allowing for the chaining of + * asynchronous operations. For example, after an asynchronous operation completes, + * a function can be applied to its result, and the process can continue with + * the new result. This is particularly useful for complex workflows that require + * multiple steps, where each step depends on the completion of the previous one.</p> + * + * <p>The class also provides methods to handle exceptions that may occur during a + * synchronous operation. This ensures that error handling is integrated smoothly + * into the workflow, allowing for robust and fault-tolerant applications.</p> + * + * @see CompletableFuture + * @see ApplyFunction + * @see AsyncApplyFunction + * @see AsyncRun + * @see AsyncForEachRun + * @see CatchFunction + * @see AsyncCatchFunction + * @see FinallyFunction + */ +public final class AsyncUtil { + private static final Boolean BOOLEAN_RESULT = false; + private static final Long LONG_RESULT = -1L; + private static final Object NULL_RESULT = null; + + private AsyncUtil(){} + + + /** + * Provides a default value based on the type specified. + * + * @param clazz The {@link Class} object representing the type of the value + * to be returned. + * @param <R> The type of the value to be returned. + * @return An object with a value determined by the type: + * <ul> + * <li>{@code false} if {@code clazz} is {@link Boolean}, + * <li>-1 if {@code clazz} is {@link Long}, + * <li>{@code null} for any other type. + * </ul> + */ + public static <R> R asyncReturn(Class<R> clazz) { + if (clazz == null) { + return null; + } + if (clazz.equals(Boolean.class)) { + return (R) BOOLEAN_RESULT; + } else if (clazz.equals(Long.class)) { + return (R) LONG_RESULT; + } + return (R) NULL_RESULT; + } + + public static <R> R syncReturn(Class<R> clazz) Review Comment: Suggest to add some javadoc here. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java: ########## @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * The AsyncApplyFunction interface represents a function that + * asynchronously accepts a value of type T and produces a result + * of type R. This interface extends {@link ApplyFunction} and is + * designed to be used with asynchronous computation frameworks, + * such as Java's {@link java.util.concurrent.CompletableFuture}. + * + * <p>An implementation of this interface is expected to perform an + * asynchronous operation and return a result, which is typically + * represented as a {@code CompletableFuture<R>}. This allows for + * non-blocking execution of tasks and is particularly useful for + * I/O operations or any operation that may take a significant amount + * of time to complete.</p> + * + * <p>AsyncApplyFunction is used to implement the following semantics:</p> + * <pre> + * {@code + * T res = doAsync1(input); + * // Can use AsyncApplyFunction + * R result = doAsync2(res); + * } + * </pre> + * + * @param <T> the type of the input to the function + * @param <R> the type of the result of the function + * @see ApplyFunction + * @see java.util.concurrent.CompletableFuture + */ +@FunctionalInterface +public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> { + + /** + * Asynchronously applies this function to the given argument. + * + * <p>This method is intended to initiate the function application + * without waiting for the result. It is typically used when the + * result of the operation is not required immediately or when the + * operation is part of a larger asynchronous workflow.</p> + * + * @param t the function argument + * @throws IOException if an I/O error occurs during the application + * of the function + */ + void applyAsync(T t) throws IOException; + + /** + * Synchronously applies this function to the given argument and + * returns the result. + * + * <p>This method waits for the asynchronous operation to complete + * and returns its result. It is useful when the result is needed + * immediately and the calling code cannot proceed without it.</p> + * + * @param t the function argument + * @return the result of applying the function to the argument + * @throws IOException if an I/O error occurs during the application + * of the function + */ + @Override + default R apply(T t) throws IOException { + applyAsync(t); + return result(); + } + + + default CompletableFuture<R> async(T t) throws IOException { Review Comment: Suggest to add some javadoc here. > [ARR] AsyncUtil makes asynchronous code more concise and easier. > ---------------------------------------------------------------- > > Key: HDFS-17543 > URL: https://issues.apache.org/jira/browse/HDFS-17543 > Project: Hadoop HDFS > Issue Type: Sub-task > Reporter: Jian Zhang > Assignee: Jian Zhang > Priority: Major > Labels: pull-request-available > Attachments: HDFS-17543.001.patch > > > *Describe* > Using the original Java CompletableFuture to implement an asynchronous router > is not conducive to the development of the open source community due to poor > code readability and maintainability. Therefore, I have implemented a > lightweight tool that encapsulates CompletableFuture. > By using this AsyncUtil, it is easy to write readable asynchronous code, > which is easy for everyone to understand. I have provided an example of this > tool class in UT, demonstrating how to change synchronous methods to > asynchronous code. > > *Examples and tests* > SyncClass provides some common synchronization methods > AsyncClass is the corresponding asynchronous implementation > Use TestAsyncUtil to simultaneously test the methods corresponding to > SyncClass and AsyncClass, ensuring that both asynchronous and synchronous > methods return the same results > > {*}NOTE{*}: In HDFS-17545, I used this tool to implement RouterAsyncRpcClient > which extends RouterRpcClient. You can also view RouterAsyncRpcClient to > understand the use of this tool. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org