This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit d2442eeaddc78e870f21ce65e43cb35ba6bb559e Author: Jian Zhang <1361320...@qq.com> AuthorDate: Mon Jul 1 12:56:05 2024 +0800 HDFS-17543. [ARR] AsyncUtil makes asynchronous code more concise and easier. (#6868). Contributed by Jian Zhang. Reviewed-by: hfutatzhanghb <hfutzhan...@163.com> Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../federation/router/async/ApplyFunction.java | 89 +++++ .../hdfs/server/federation/router/async/Async.java | 115 +++++++ .../router/async/AsyncApplyFunction.java | 162 +++++++++ .../federation/router/async/AsyncBiFunction.java | 83 +++++ .../router/async/AsyncCatchFunction.java | 174 ++++++++++ .../federation/router/async/AsyncForEachRun.java | 185 ++++++++++ .../server/federation/router/async/AsyncRun.java | 74 ++++ .../server/federation/router/async/AsyncUtil.java | 380 +++++++++++++++++++++ .../federation/router/async/CatchFunction.java | 120 +++++++ .../federation/router/async/FinallyFunction.java | 96 ++++++ .../federation/router/async/package-info.java | 35 ++ .../server/federation/router/async/AsyncClass.java | 249 ++++++++++++++ .../server/federation/router/async/BaseClass.java | 66 ++++ .../server/federation/router/async/SyncClass.java | 194 +++++++++++ .../federation/router/async/TestAsyncUtil.java | 277 +++++++++++++++ 15 files changed, 2299 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java new file mode 100644 index 00000000000..ac3d5edb635 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; + +/** + * Represents a function that accepts a value of type T and produces a result of type R. + * This interface extends {@link Async} and provides methods to apply the function + * asynchronously using {@link CompletableFuture}. + * + * <p>ApplyFunction is used to implement the following semantics:</p> + * <pre> + * {@code + * T res = doAsync(input); + * // Can use ApplyFunction + * R result = thenApply(res); + * } + * </pre> + * + * @param <T> the type of the input to the function + * @param <R> the type of the result of the function + */ +@FunctionalInterface +public interface ApplyFunction<T, R> extends Async<R>{ + + /** + * Applies this function to the given argument. + * + * @param t the function argument + * @return the function result + * @throws IOException if an I/O error occurs + */ + R apply(T t) throws IOException; + + /** + * Applies this function asynchronously to the result of the given {@link CompletableFuture}. + * The function is executed on the same thread as the completion of the given future. + * + * @param in the input future + * @return a new future that holds the result of the function application + */ + default CompletableFuture<R> apply(CompletableFuture<T> in) { + return in.thenApply(t -> { + try { + return ApplyFunction.this.apply(t); + } catch (IOException e) { + throw warpCompletionException(e); + } + }); + } + + /** + * Applies this function asynchronously to the result of the given {@link CompletableFuture}, + * using the specified executor for the asynchronous computation. + * + * @param in the input future + * @param executor the executor to use for the asynchronous computation + * @return a new future that holds the result of the function application + */ + default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) { + return in.thenApplyAsync(t -> { + try { + return ApplyFunction.this.apply(t); + } catch (IOException e) { + throw warpCompletionException(e); + } + }, executor); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java new file mode 100644 index 00000000000..e184bffaeef --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletionException; + +/** + * An interface for asynchronous operations, providing utility methods + * and constants related to asynchronous computations. + * + * @param <R> The type of the result of the asynchronous operation + */ +public interface Async<R> { + + /** + * A thread-local variable to store the {@link CompletableFuture} instance for the current thread. + * <p> + * <b>Note:</b> After executing an asynchronous method, the thread stores the CompletableFuture + * of the asynchronous method in the thread's local variable + */ + ThreadLocal<CompletableFuture<Object>> CUR_COMPLETABLE_FUTURE + = new ThreadLocal<>(); + + /** + * Sets the {@link CompletableFuture} instance for the current thread. + * + * @param completableFuture The {@link CompletableFuture} instance to be set + * @param <T> The type of the result in the CompletableFuture + */ + default <T> void setCurCompletableFuture(CompletableFuture<T> completableFuture) { + CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture); + } + + /** + * Gets the {@link CompletableFuture} instance for the current thread. + * + * @return The {@link CompletableFuture} instance for the current thread, + * or {@code null} if not set + */ + default CompletableFuture<R> getCurCompletableFuture() { + return (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get(); + } + + /** + * Blocks and retrieves the result of the {@link CompletableFuture} instance + * for the current thread. + * + * @return The result of the CompletableFuture, or {@code null} if the thread was interrupted + * @throws IOException If the completion exception to the CompletableFuture + * is an IOException or a subclass of it + */ + default R result() throws IOException { + try { + CompletableFuture<R> completableFuture = + (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get(); + assert completableFuture != null; + return completableFuture.get(); + } catch (InterruptedException e) { + return null; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException)cause; + } + throw new IOException(e); + } + } + + /** + * Extracts the real cause of an exception wrapped by CompletionException. + * + * @param e The incoming exception, which may be a CompletionException. + * @return Returns the real cause of the original exception, + * or the original exception if there is no cause. + */ + static Throwable unWarpCompletionException(Throwable e) { + if (e instanceof CompletionException) { + if (e.getCause() != null) { + return e.getCause(); + } + } + return e; + } + + /** + * Wraps the incoming exception in a new CompletionException. + * + * @param e The incoming exception, which may be any type of Throwable. + * @return Returns a new CompletionException with the original exception as its cause. + */ + static CompletionException warpCompletionException(Throwable e) { + if (e instanceof CompletionException) { + return (CompletionException) e; + } + return new CompletionException(e); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java new file mode 100644 index 00000000000..b34a6c479cd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; + +/** + * The AsyncApplyFunction interface represents a function that + * asynchronously accepts a value of type T and produces a result + * of type R. This interface extends {@link ApplyFunction} and is + * designed to be used with asynchronous computation frameworks, + * such as Java's {@link java.util.concurrent.CompletableFuture}. + * + * <p>An implementation of this interface is expected to perform an + * asynchronous operation and return a result, which is typically + * represented as a {@code CompletableFuture<R>}. This allows for + * non-blocking execution of tasks and is particularly useful for + * I/O operations or any operation that may take a significant amount + * of time to complete.</p> + * + * <p>AsyncApplyFunction is used to implement the following semantics:</p> + * <pre> + * {@code + * T res = doAsync1(input); + * // Can use AsyncApplyFunction + * R result = doAsync2(res); + * } + * </pre> + * + * @param <T> the type of the input to the function + * @param <R> the type of the result of the function + * @see ApplyFunction + * @see java.util.concurrent.CompletableFuture + */ +@FunctionalInterface +public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> { + + /** + * Asynchronously applies this function to the given argument. + * + * <p>This method is intended to initiate the function application + * without waiting for the result. It is typically used when the + * result of the operation is not required immediately or when the + * operation is part of a larger asynchronous workflow.</p> + * + * @param t the function argument + * @throws IOException if an I/O error occurs during the application + * of the function + */ + void applyAsync(T t) throws IOException; + + /** + * Synchronously applies this function to the given argument and + * returns the result. + * + * <p>This method waits for the asynchronous operation to complete + * and returns its result. It is useful when the result is needed + * immediately and the calling code cannot proceed without it.</p> + * + * @param t the function argument + * @return the result of applying the function to the argument + * @throws IOException if an I/O error occurs during the application + * of the function + */ + @Override + default R apply(T t) throws IOException { + applyAsync(t); + return result(); + } + + /** + * Initiates the asynchronous application of this function to the given result. + * <p> + * This method calls applyAsync to start the asynchronous operation and then retrieves + * the current thread's CompletableFuture using getCurCompletableFuture. + * It returns this CompletableFuture, which will be completed with the result of the + * asynchronous operation once it is finished. + * <p> + * This method is useful for chaining with other asynchronous operations, as it allows the + * current operation to be part of a larger asynchronous workflow. + * + * @param t the function argument + * @return a CompletableFuture that will be completed with the result of the + * asynchronous operation + * @throws IOException if an I/O error occurs during the initiation of the asynchronous operation + */ + default CompletableFuture<R> async(T t) throws IOException { + applyAsync(t); + CompletableFuture<R> completableFuture = getCurCompletableFuture(); + assert completableFuture != null; + return completableFuture; + } + + /** + * Asynchronously applies this function to the result of the given + * CompletableFuture. + * + * <p>This method chains the function application to the completion + * of the input future. It returns a new CompletableFuture that + * completes with the function's result when the input future + * completes.</p> + * + * @param in the input future + * @return a new CompletableFuture that holds the result of the + * function application + */ + @Override + default CompletableFuture<R> apply(CompletableFuture<T> in) { + return in.thenCompose(t -> { + try { + return async(t); + } catch (IOException e) { + throw warpCompletionException(e); + } + }); + } + + /** + * Asynchronously applies this function to the result of the given + * CompletableFuture, using the specified executor for the + * asynchronous computation. + * + * <p>This method allows for more control over the execution + * context of the asynchronous operation, such as running the + * operation in a separate thread or thread pool.</p> + * + * @param in the input future + * @param executor the executor to use for the asynchronous + * computation + * @return a new CompletableFuture that holds the result of the + * function application + */ + @Override + default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) { + return in.thenComposeAsync(t -> { + try { + return async(t); + } catch (IOException e) { + throw warpCompletionException(e); + } + }, executor); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java new file mode 100644 index 00000000000..3e94736e7f1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** + * The {@code AsyncBiFunction} interface represents a bi-function that + * asynchronously accepts two arguments and produces a result. This interface + * extends the {@link Async} interface and provides a method to apply the + * function asynchronously. + * + * <p>Implementations of this interface are expected to perform an + * asynchronous operation that takes two parameters of types T and P, and + * returns a result of type R. The asynchronous operation is typically + * represented as a {@link CompletableFuture} of the result type R.</p> + * + * <p>For example, an implementation of this interface might perform an + * asynchronous computation using the two input parameters and return a + * future representing the result of that computation.</p> + * + * @param <T> the type of the first argument to the function + * @param <P> the type of the second argument to the function + * @param <R> the type of the result of the function + * @see Async + */ +@FunctionalInterface +public interface AsyncBiFunction<T, P, R> extends Async<R>{ + + /** + * Asynchronously applies this function to the given arguments. + * + * <p>This method is intended to initiate the function application + * without waiting for the result. It should be used when the + * operation can be performed in the background, and the result + * is not required immediately.</p> + * + * @param t the first argument to the function + * @param p the second argument to the function + * @throws IOException if an I/O error occurs during the application of the function + */ + void applyAsync(T t, P p) throws IOException; + + /** + * Initiates the asynchronous application of this function to the given result. + * <p> + * This method calls applyAsync to start the asynchronous operation and then retrieves + * the current thread's CompletableFuture using getCurCompletableFuture. + * It returns this CompletableFuture, which will be completed with the result of the + * asynchronous operation once it is finished. + * <p> + * This method is useful for chaining with other asynchronous operations, as it allows the + * current operation to be part of a larger asynchronous workflow. + * + * @param t the first argument to the function + * @param p the second argument to the function + * @return a CompletableFuture that will be completed with the result of the + * asynchronous operation + * @throws IOException if an I/O error occurs during the initiation of the asynchronous operation + */ + default CompletableFuture<R> async(T t, P p) throws IOException { + applyAsync(t, p); + CompletableFuture<R> completableFuture = getCurCompletableFuture(); + assert completableFuture != null; + return completableFuture; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java new file mode 100644 index 00000000000..01fcc44a19f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; + +/** + * The AsyncCatchFunction interface represents a function that handles exceptions + * occurring within an asynchronous operation. It extends the CatchFunction + * interface and adds the capability to perform asynchronous exception handling. + * + * <p>This interface is part of the asynchronous utilities provided by the Hadoop + * Distributed File System (HDFS) Federation router. It is used in conjunction + * with other asynchronous interfaces such as AsyncRun to build complex, + * non-blocking operations.</p> + * + * <p>An implementation of this interface should define how to handle a caught + * exception asynchronously. It takes two parameters: the result of the + * asynchronous operation (if any) and the caught exception. The function can then + * initiate an asynchronous process to handle the exception, which may involve + * logging the error, performing a recovery operation, or any other custom logic.</p> + * + * <p>For example, the applyAsync method is intended to be called when an exception + * is caught during an asynchronous operation. It should initiate the asynchronous + * process to handle the exception without blocking the main execution thread.</p> + * + * <p>The default method apply is provided to allow synchronous operation in case + * the asynchronous handling is not required. It simply calls applyAsync and waits + * for the result.</p> + * + * <p>The default method async is provided to allow chaining with other asynchronous + * operations. It calls applyAsync and returns a CompletableFuture that completes + * with the result of the operation.</p> + * + * <p>AsyncCatchFunction is used to implement the following semantics:</p> + * <pre> + * {@code + * try{ + * R res = doAsync1(input); + * } catch(E e) { + * // Can use AsyncCatchFunction + * R result = doAsync2(res, e); + * } + * } + * </pre> + * + * @param <R> the type of the result of the asynchronous operation + * @param <E> the type of the exception to catch, extending Throwable + * @see CatchFunction + * @see AsyncRun + */ +@FunctionalInterface +public interface AsyncCatchFunction<R, E extends Throwable> + extends CatchFunction<R, E> { + + /** + * Asynchronously applies this function to the given exception. + * + * <p>This method is intended to be called when an exception is caught + * during an asynchronous operation. It should initiate the asynchronous process + * to handle the exception without blocking the main execution thread. + * The actual handling of the exception, such as logging the error, performing + * a recovery operation, or any other custom logic, should be implemented in this + * method.</p> + * + * @param r the result of the asynchronous operation, if any; may be null + * @param e the caught exception + * @throws IOException if an I/O error occurs during the application of the function + */ + void applyAsync(R r, E e) throws IOException; + + /** + * Synchronously applies this function to the given result and exception. + * <p> + * This method first calls {@code applyAsync} to initiate the asynchronous handling + * of the exception. Then, it waits for the asynchronous operation to complete + * by calling {@code result}, which retrieves the result of the current + * thread's {@link CompletableFuture}. + * <p> + * + * @param r the result of the asynchronous operation, if any; may be null + * @param e the caught exception + * @return the result after applying the function + * @throws IOException if an I/O error occurs during the application of the function + */ + @Override + default R apply(R r, E e) throws IOException { + applyAsync(r, e); + return result(); + } + + /** + * Initiates the asynchronous application of this function to the given result and exception. + * <p> + * This method calls applyAsync to start the asynchronous operation and then retrieves + * the current thread's CompletableFuture using getCurCompletableFuture. + * It returns this CompletableFuture, which will be completed with the result of the + * asynchronous operation once it is finished. + * <p> + * This method is useful for chaining with other asynchronous operations, as it allows the + * current operation to be part of a larger asynchronous workflow. + * + * @param r the result of the asynchronous operation, if any; may be null + * @param e the caught exception + * @return a CompletableFuture that will be completed with the result of the + * asynchronous operation + * @throws IOException if an I/O error occurs during the initiation of the asynchronous operation + */ + default CompletableFuture<R> async(R r, E e) throws IOException { + applyAsync(r, e); + CompletableFuture<R> completableFuture = getCurCompletableFuture(); + assert completableFuture != null; + return completableFuture; + } + + /** + * Applies the catch function to a {@link CompletableFuture}, handling exceptions of a + * specified type. + * <p> + * This method is a default implementation that provides a way to integrate exception + * handling into a chain of asynchronous operations. It takes a {@code CompletableFuture} + * and a class object representing the exception type to catch. The method then completes + * the future with the result of applying this catch function to the input future and the + * specified exception type. + * <p> + * If the input future completes exceptionally with an instance of the specified exception + * type, the catch function is applied to the exception. Otherwise, if the future + * completes with a different type of exception or normally, the original result or + * exception is propagated. + * + * @param in the input {@code CompletableFuture} to which the catch function is applied + * @param eClazz the class object representing the exception type to catch + * @return a new {@code CompletableFuture} that completes with the result of applying + * the catch function, or propagates the original exception if it does not match + * the specified type + */ + @Override + default CompletableFuture<R> apply( + CompletableFuture<R> in, Class<E> eClazz) { + return in.handle((r, e) -> { + if (e == null) { + return in; + } + Throwable readException = unWarpCompletionException(e); + if (eClazz.isInstance(readException)) { + try { + return async(r, (E) readException); + } catch (IOException ex) { + throw warpCompletionException(ex); + } + } + throw warpCompletionException(e); + }).thenCompose(result -> result); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java new file mode 100644 index 00000000000..322242d1c49 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; + +/** + * The AsyncForEachRun class is part of the asynchronous operation utilities + * within the Hadoop Distributed File System (HDFS) Federation router. + * It provides the functionality to perform asynchronous operations on each + * element of an Iterator, applying a given async function. + * + * <p>This class is designed to work with other asynchronous interfaces and + * utility classes to enable complex asynchronous workflows. It allows for + * non-blocking execution of tasks, which can improve the performance and + * responsiveness of HDFS operations.</p> + * + * <p>The class implements the AsyncRun interface, which means it can be used + * in asynchronous task chains. It maintains an Iterator of elements to + * process, an asyncDoOnce to apply to each element.</p> + * + * <p>The run method initiates the asynchronous operation, and the doOnce + * method recursively applies the asyncDoOnce to each element and handles + * the results. If the shouldBreak flag is set, the operation is completed + * with the current result.</p> + * + * <p>AsyncForEachRun is used to implement the following semantics:</p> + * <pre> + * {@code + * for (I element : elements) { + * R result = asyncDoOnce(element); + * } + * return result; + * } + * </pre> + * + * @param <I> the type of the elements being iterated over + * @param <R> the type of the final result after applying the thenApply function + * @see AsyncRun + * @see AsyncBiFunction + */ +public class AsyncForEachRun<I, R> implements AsyncRun<R> { + + // Indicates whether the iteration should be broken immediately + // after the next asynchronous operation is completed. + private boolean shouldBreak = false; + // The Iterator over the elements to process asynchronously. + private Iterator<I> iterator; + // The async function to apply to each element from the iterator. + private AsyncBiFunction<AsyncForEachRun<I, R>, I, R> asyncDoOnce; + + /** + * Initiates the asynchronous foreach operation by starting the iteration process + * over the elements provided by the iterator. This method sets up the initial + * call to doOnce(R) with a null result, which begins the recursive + * application of the async function to each element of the iterator. + * + * <p>This method is an implementation of the {@link AsyncRun} interface's + * {@code run} method, allowing it to be used in a chain of asynchronous + * operations. It is responsible for starting the asynchronous processing and + * handling the completion of the operation through the internal + * {@link CompletableFuture}.</p> + * + * <p>If an exception occurs during the first call to {@code doOnce}, the + * exception is caught and the internal CompletableFuture is completed + * exceptionally with a {@link CompletionException} wrapping the original + * IOException.</p> + * + * <p>After initiating the operation, the method sets the current thread's + * {@link Async} {@link CompletableFuture} by calling + * {@link #setCurCompletableFuture(CompletableFuture)} with the internal result + * CompletableFuture. This allows other parts of the asynchronous workflow to + * chain further operations or handle the final result once the foreach loop + * completes.</p> + * + * @see AsyncRun + * @see Async#setCurCompletableFuture(CompletableFuture) + */ + @Override + public void run() { + if (iterator == null || !iterator.hasNext()) { + setCurCompletableFuture(CompletableFuture.completedFuture(null)); + return; + } + CompletableFuture<R> result; + try { + result = doOnce(iterator.next()); + } catch (IOException ioe) { + result = new CompletableFuture<>(); + result.completeExceptionally(warpCompletionException(ioe)); + } + setCurCompletableFuture(result); + } + + /** + * Recursively applies the async function to the next element of the iterator + * and handles the result. This method is called for each iteration of the + * asynchronous foreach loop, applying the async function to each element + * and chaining the results. + * + * <p>If the iterator has no more elements, the CompletableFuture held by this + * class is completed with the last result. If an exception occurs during + * the application of the async function, it is propagated to the + * CompletableFuture, which completes exceptionally.</p> + * + * <p>This method is designed to be called by the {@link #run()} method and + * handles the iteration logic, including breaking the loop if the + * {@link #shouldBreak} flag is set to true.</p> + * + * @param element The current element from the async function application. + * @throws IOException if an I/O error occurs during the application of the async function. + */ + private CompletableFuture<R> doOnce(I element) throws IOException { + CompletableFuture<R> completableFuture = asyncDoOnce.async(AsyncForEachRun.this, element); + return completableFuture.thenCompose(res -> { + if (shouldBreak || !iterator.hasNext()) { + return completableFuture; + } + try { + return doOnce(iterator.next()); + } catch (IOException e) { + throw warpCompletionException(e); + } + }); + } + + /** + * Triggers the termination of the current asynchronous iteration. + * + * <p>This method is used to break out of the asynchronous for-each loop + * prematurely. It sets a flag that indicates the iteration should be + * terminated at the earliest opportunity. This is particularly useful when + * the processing logic determines that further iteration is unnecessary + * or when a specific condition has been met.</p> + * + * <p>Once this method is called, the next time the loop is about to process + * a new element, it will check the flag and cease operation, allowing the + * application to move on to the next step or complete the task.</p> + */ + public void breakNow() { + shouldBreak = true; + } + + /** + * Sets the Iterator for the elements to be processed in the asynchronous operation. + * + * @param forEach The Iterator over the elements. + * @return The current AsyncForEachRun instance for chaining. + */ + public AsyncForEachRun<I, R> forEach(Iterator<I> forEach) { + this.iterator = forEach; + return this; + } + + /** + * Sets the async function to apply to each element from the iterator. + * + * @param asyncDo The async function. + * @return The current AsyncForEachRun instance for chaining. + */ + public AsyncForEachRun<I, R> asyncDo(AsyncBiFunction<AsyncForEachRun<I, R>, I, R> asyncDo) { + this.asyncDoOnce = asyncDo; + return this; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java new file mode 100644 index 00000000000..03d39f36d7d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** + * The AsyncRun interface represents an asynchronous operation that can be + * executed in the context of the Hadoop Distributed File System (HDFS) + * Federation router. Implementations of this interface are responsible for + * performing a task and completing a {@link CompletableFuture} with the + * result of the operation. + * + * <p> + * The {@code run} method of this interface is intended to be used in an + * asynchronous workflow, where the operation may involve I/O tasks or + * other long-running processes. By implementing this interface, classes + * can define custom asynchronous behavior that can be chained with other + * asynchronous operations using utility methods provided by the + * {@link AsyncUtil} class.</p> + * + * <p> + * For example, an implementation of AsyncRun could perform a non-blocking + * read or write operation to HDFS, and upon completion, it could use + * AsyncUtil methods to handle the result or propagate any exceptions that + * occurred during the operation.</p> + * + * @param <R> the type of the result produced by the asynchronous operation + * @see AsyncUtil + */ +@FunctionalInterface +public interface AsyncRun<R> extends Async<R> { + + /** + * Executes the asynchronous operation represented by this AsyncRun instance. + * This method is expected to perform the operation and, upon completion, + * complete the current thread's {@link CompletableFuture} with the result. + * + * @throws IOException if an I/O error occurs during the execution of the operation + */ + void run() throws IOException; + + /** + * Provides an asynchronous version of the {@code run} method, which returns a + * {@link CompletableFuture} representing the result of the operation. + * This method is typically used in an asynchronous workflow to initiate the + * operation without waiting for its completion. + * + * @return a CompletableFuture that completes with the result of the operation + * @throws IOException if an I/O error occurs during the initiation of the operation + */ + default CompletableFuture<R> async() throws IOException { + run(); + CompletableFuture<R> completableFuture = getCurCompletableFuture(); + assert completableFuture != null; + return completableFuture; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java new file mode 100644 index 00000000000..ec2f7360b30 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.function.Function; + +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.CUR_COMPLETABLE_FUTURE; +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; + +/** + * The AsyncUtil class provides a collection of utility methods to simplify + * the implementation of asynchronous operations using Java's CompletableFuture. + * It encapsulates common patterns such as applying functions, handling exceptions, + * and executing tasks in a non-blocking manner. This class is designed to work + * with Hadoop's asynchronous router operations in HDFS Federation. + * + * <p>The utility methods support a fluent-style API, allowing for the chaining of + * asynchronous operations. For example, after an asynchronous operation completes, + * a function can be applied to its result, and the process can continue with + * the new result. This is particularly useful for complex workflows that require + * multiple steps, where each step depends on the completion of the previous one.</p> + * + * <p>The class also provides methods to handle exceptions that may occur during a + * synchronous operation. This ensures that error handling is integrated smoothly + * into the workflow, allowing for robust and fault-tolerant applications.</p> + * + * @see CompletableFuture + * @see ApplyFunction + * @see AsyncApplyFunction + * @see AsyncRun + * @see AsyncForEachRun + * @see CatchFunction + * @see AsyncCatchFunction + * @see FinallyFunction + * @see AsyncBiFunction + */ +public final class AsyncUtil { + private static final Boolean BOOLEAN_RESULT = false; + private static final Long LONG_RESULT = -1L; + private static final Object NULL_RESULT = null; + + private AsyncUtil(){} + + /** + * Provides a default value based on the type specified. + * + * @param clazz The {@link Class} object representing the type of the value + * to be returned. + * @param <R> The type of the value to be returned. + * @return An object with a value determined by the type: + * <ul> + * <li>{@code false} if {@code clazz} is {@link Boolean}, + * <li>-1 if {@code clazz} is {@link Long}, + * <li>{@code null} for any other type. + * </ul> + */ + public static <R> R asyncReturn(Class<R> clazz) { + if (clazz == null) { + return null; + } + if (clazz.equals(Boolean.class)) { + return (R) BOOLEAN_RESULT; + } else if (clazz.equals(Long.class)) { + return (R) LONG_RESULT; + } + return (R) NULL_RESULT; + } + + /** + * Synchronously returns the result of the current asynchronous operation. + * This method is designed to be used in scenarios where the result of an + * asynchronous operation is needed synchronously, and it is known that + * the operation has completed. + * + * <p>The method retrieves the current thread's {@link CompletableFuture} and + * attempts to get the result. If the future is not yet complete, this + * method will block until the result is available. If the future completed + * exceptionally, the cause of the exception is thrown as a runtime + * exception wrapped in an {@link ExecutionException}.</p> + * + * <p>This method is typically used after an asynchronous operation has been + * initiated and the caller needs to obtain the result in a synchronous + * manner, for example, when bridging between asynchronous and synchronous + * code paths.</p> + * + * @param <R> the type of the result to be returned + * @param clazz the {@link Class} object representing the type of the value + * to be returned, used to cast the result to the correct type + * @return the result of the asynchronous operation as an object of the + * specified class + * @throws Exception if an error occurs during the synchronous retrieval of + * the result, including the original exception if the + * future completed exceptionally + */ + public static <R> R syncReturn(Class<R> clazz) + throws Exception { + CompletableFuture<Object> completableFuture = CUR_COMPLETABLE_FUTURE.get(); + assert completableFuture != null; + try { + return (R) completableFuture.get(); + } catch (ExecutionException e) { + throw (Exception)e.getCause(); + } + } + + /** + * Completes the current asynchronous operation with the specified value. + * This method sets the result of the current thread's {@link CompletableFuture} + * to the provided value, effectively completing the asynchronous operation. + * + * @param value The value to complete the future with. + * @param <R> The type of the value to be completed. + */ + public static <R> void asyncComplete(R value) { + CUR_COMPLETABLE_FUTURE.set( + CompletableFuture.completedFuture(value)); + } + + /** + * Completes the current asynchronous operation with an exception. + * This method sets the result of the current thread's {@link CompletableFuture} + * to an exceptional completion, using the provided {@link Throwable} as the cause. + * This is typically used to handle errors in asynchronous operations. + * + * @param e The exception to complete the future exceptionally with. + */ + public static void asyncThrowException(Throwable e) { + CompletableFuture<Object> result = new CompletableFuture<>(); + result.completeExceptionally(warpCompletionException(e)); + CUR_COMPLETABLE_FUTURE.set(result); + } + + /** + * Applies an asynchronous function to the current {@link CompletableFuture}. + * This method retrieves the current thread's {@link CompletableFuture} and applies + * the provided {@link ApplyFunction} to it. It is used to chain asynchronous + * operations, where the result of one operation is used as the input for the next. + * + * @param function The asynchronous function to apply, which takes a type T and + * produces a type R. + * @param <T> The type of the input to the function. + * @param <R> The type of the result of the function. + * @see CompletableFuture + * @see ApplyFunction + */ + public static <T, R> void asyncApply(ApplyFunction<T, R> function) { + CompletableFuture<T> completableFuture = + (CompletableFuture<T>) CUR_COMPLETABLE_FUTURE.get(); + assert completableFuture != null; + CompletableFuture<R> result = function.apply(completableFuture); + CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result); + } + + /** + * Applies an asynchronous function to the current {@link CompletableFuture} + * using the specified executor. This method retrieves the current thread's + * {@link CompletableFuture} and applies the provided{@link ApplyFunction} to + * it with the given executor service. It allows for more control over the + * execution context, such as running the operation in a separate thread or + * thread pool. + * + * <p>This is particularly useful when you need to perform blocking I/O operations + * or other long-running tasks without blocking the main thread or + * when you want to manage the thread resources more efficiently.</p> + * + * @param function The asynchronous function to apply, which takes a type T and + * produces a type R. + * @param executor The executor service used to run the asynchronous function. + * @param <T> The type of the input to the function. + * @param <R> The type of the result of the function. + * @see CompletableFuture + * @see ApplyFunction + */ + public static <T, R> void asyncApplyUseExecutor( + ApplyFunction<T, R> function, Executor executor) { + CompletableFuture<T> completableFuture = + (CompletableFuture<T>) CUR_COMPLETABLE_FUTURE.get(); + assert completableFuture != null; + CompletableFuture<R> result = function.apply(completableFuture, executor); + CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result); + } + + /** + * Attempts to execute an asynchronous task defined by the provided + * {@link AsyncRun} and associates it with the current thread's + * {@link CompletableFuture}. This method is useful for trying operations + * that may throw exceptions and handling them asynchronously. + * + * <p>The provided {@code asyncRun} is a functional interface that + * encapsulates the logic to be executed asynchronously. It is executed in + * the context of the current CompletableFuture, allowing for chaining further + * asynchronous operations based on the result or exception of this try.</p> + * + * <p>If the operation completes successfully, the result is propagated to the + * next operation in the chain. If an exception occurs, it can be caught and + * handled using the {@link #asyncCatch(CatchFunction, Class)} method, + * allowing for error recovery or alternative processing.</p> + * + * @param asyncRun The asynchronous task to be executed, defined by + * an {@link AsyncRun} instance. + * @param <R> The type of the result produced by the asynchronous task. + * @see AsyncRun + * @see #asyncCatch(CatchFunction, Class) + */ + public static <R> void asyncTry(AsyncRun<R> asyncRun) { + try { + CompletableFuture<R> result = asyncRun.async(); + CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result); + } catch (Throwable e) { + asyncThrowException(e); + } + } + + /** + * Handles exceptions to a specified type that may occur during + * an asynchronous operation. This method is used to catch and deal + * with exceptions in a non-blocking manner, allowing the application + * to continue processing even when errors occur. + * + * <p>The provided {@code function} is a {@link CatchFunction} that + * defines how to handle the caught exception. It takes the result of + * the asynchronous operation (if any) and the caught exception, and + * returns a new result or modified result to continue the asynchronous + * processing.</p> + * + * <p>The {@code eClass} parameter specifies the type of exceptions to + * catch. Only exceptions that are instances of this type (or its + * subclasses) will be caught and handled by the provided function.</p> + * + * @param function The {@link CatchFunction} that defines how to + * handle the caught exception. + * @param eClass The class of the exception type to catch. + * @param <R> The type of the result of the asynchronous operation. + * @param <E> The type of the exception to catch. + * @see CatchFunction + */ + public static <R, E extends Throwable> void asyncCatch( + CatchFunction<R, E> function, Class<E> eClass) { + CompletableFuture<R> completableFuture = + (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get(); + assert completableFuture != null; + CompletableFuture<R> result = function.apply(completableFuture, eClass); + CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result); + } + + /** + * Executes a final action after an asynchronous operation + * completes, regardless of whether the operation was successful + * or resulted in an exception. This method provides a way to + * perform cleanup or finalization tasks in an asynchronous + * workflow. + * + * <p>The provided {@code function} is a {@link FinallyFunction} + * that encapsulates the logic to be executed after the + * asynchronous operation. It takes the result of the operation + * and returns a new result, which can be used to continue the + * asynchronous processing or to handle the final output of + * the workflow.</p> + * + * <p>This method is particularly useful for releasing resources, + * closing connections, or performing other cleanup actions that + * need to occur after all other operations have completed.</p> + * + * @param function The {@link FinallyFunction} that defines + * the final action to be executed. + * @param <R> The type of the result of the asynchronous + * operation. + * @see FinallyFunction + */ + public static <R> void asyncFinally(FinallyFunction<R> function) { + CompletableFuture<R> completableFuture = + (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get(); + assert completableFuture != null; + CompletableFuture<R> result = function.apply(completableFuture); + CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result); + } + + /** + * Executes an asynchronous operation for each element in an Iterator, applying + * a given async function to each element. This method is part of the asynchronous + * utilities provided to facilitate non-blocking operations on collections of elements. + * + * <p>The provided {@code asyncDo} is an {@link AsyncBiFunction} that encapsulates + * the logic to be executed asynchronously for each element. It is executed in + * the context of the current CompletableFuture, allowing for chaining further + * asynchronous operations based on the result or exception of each iteration.</p> + * + * <p>The method is particularly useful for performing asynchronous iterations + * over collections where the processing of each element is independent.</p> + * + * @param forEach the Iterator over which to iterate and apply the async function + * @param asyncDo the asynchronous function to apply to each element of the Iterator, + * implemented as an {@link AsyncBiFunction} + * @param <I> the type of the elements being iterated over + * @param <R> the type of the result produced by the asynchronous task applied to each element + * @see AsyncBiFunction + * @see AsyncForEachRun + */ + public static <I, R> void asyncForEach( + Iterator<I> forEach, AsyncBiFunction<AsyncForEachRun<I, R>, I, R> asyncDo) { + AsyncForEachRun<I, R> asyncForEachRun = new AsyncForEachRun<>(); + asyncForEachRun.forEach(forEach).asyncDo(asyncDo).run(); + } + + /** + * Applies an asynchronous operation to each element of a collection + * and aggregates the results. This method is designed to process a + * collection of elements concurrently using asynchronous tasks, and + * then combine the results into a single aggregated result. + * + * <p>The operation defined by {@code asyncDo} is applied to each + * element of the collection. This operation is expected to return a + * {@link CompletableFuture} representing the asynchronous task. + * Once all tasks have been started, the method (async) waits for all of + * them to complete and then uses the {@code then} function to + * process and aggregate the results.</p> + * + * <p>The {@code then} function takes an array of {@link CompletableFuture} + * instances, each representing the future result of an individual + * asynchronous operation. It should return a new aggregated result + * based on these futures. This allows for various forms of result + * aggregation, such as collecting all results into a list, + * reducing them to a single value, or performing any other custom + * aggregation logic.</p> + * + * @param collection the collection of elements to process. + * @param asyncDo the asynchronous operation to apply to each + * element. It must return a {@link CompletableFuture} + * representing the operation. + * @param then a function that takes an array of futures + * representing the results of the asynchronous + * operations and returns an aggregated result. + * @param <I> the type of the elements in the collection. + * @param <R> the type of the intermediate result from the + * asynchronous operations. + * @param <P> the type of the final aggregated result. + * @see CompletableFuture + */ + public static <I, R, P> void asyncCurrent( + Collection<I> collection, AsyncApplyFunction<I, R> asyncDo, + Function<CompletableFuture<R>[], P> then) { + CompletableFuture<R>[] completableFutures = + new CompletableFuture[collection.size()]; + int i = 0; + for(I entry : collection) { + CompletableFuture<R> future = null; + try { + future = asyncDo.async(entry); + } catch (IOException e) { + future = new CompletableFuture<>(); + future.completeExceptionally(warpCompletionException(e)); + } + completableFutures[i++] = future; + } + CompletableFuture<P> result = CompletableFuture.allOf(completableFutures) + .handle((unused, throwable) -> then.apply(completableFutures)); + CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java new file mode 100644 index 00000000000..fbb0af56ce0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException; +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; + +/** + * The {@code CatchFunction} interface represents a function that handles exceptions + * occurring within an asynchronous operation. It provides a mechanism to catch and + * process exceptions to a specific type, allowing for error recovery or alternative + * processing paths within an asynchronous workflow. + * + * <p>This interface is part of the asynchronous utilities provided by the Hadoop + * Distributed File System (HDFS) Federation router. It is used in conjunction with + * other asynchronous interfaces such as {@link AsyncRun} and + * {@link FinallyFunction} to build complex, non-blocking operations.</p> + * + * <p>An implementation of this interface should define how to handle a caught + * exception. It takes two parameters: the result of the asynchronous operation (if + * any) and the caught exception. The function can then return a new result or modify + * the existing result to continue the asynchronous processing.</p> + * + * <p>CatchFunction is used to implement the following semantics:</p> + * <pre> + * {@code + * try{ + * R res = doAsync(input); + * } catch(E e) { + * // Can use CatchFunction + * R result = thenApply(res, e); + * } + * } + * </pre> + * + * @param <R> the type of the result of the asynchronous operation + * @param <E> the type of the exception to catch, extending {@link Throwable} + * @see AsyncRun + * @see FinallyFunction + */ +@FunctionalInterface +public interface CatchFunction<R, E extends Throwable> + extends Async<R>{ + + /** + * Applies this catch function to the given result and exception. + * <p> + * This method is called to process an exception that occurred during an asynchronous + * operation. The implementation of this method should define how to handle the + * caught exception. It may involve logging the error, performing a recovery operation, + * or any other custom error handling logic. + * <p> + * The method takes two parameters: the result of the asynchronous operation (if any), + * and the caught exception. Depending on the implementation, the method may return a + * new result, modify the existing result, or throw a new exception. + * + * @param r the result of the asynchronous operation, which may be null if the operation + * did not complete successfully + * @param e the caught exception, which the function should handle + * @return the result after applying the catch function, which may be a new result or a + * modified version of the input result + * @throws IOException if an I/O error occurs during the application of the catch function + */ + R apply(R r, E e) throws IOException; + + /** + * Applies the catch function to a {@code CompletableFuture}, handling exceptions of a + * specified type. + * <p> + * This default method provides a way to integrate exception handling into a chain of + * asynchronous operations. It takes a {@code CompletableFuture} and a class object + * representing the type of exception to catch. The method uses the handle method of the + * {@code CompletableFuture} to apply the catch function. + * <p> + * If the input future completes exceptionally with an instance of the specified exception + * type, the catch function is applied to the exception. If the future completes with a + * different type of exception or normally, the original result or exception is propagated. + * + * @param in the input {@code CompletableFuture} to which the catch function is applied + * @param eClazz the class object representing the exception type to catch + * @return a new {@code CompletableFuture} that completes with the result of applying + * the catch function, or propagates the original exception if it does not match + * the specified type + */ + default CompletableFuture<R> apply( + CompletableFuture<R> in, Class<E> eClazz) { + return in.handle((r, e) -> { + if (e == null) { + return r; + } + Throwable readException = unWarpCompletionException(e); + if (eClazz.isInstance(readException)) { + try { + return CatchFunction.this.apply(r, (E) readException); + } catch (IOException ioe) { + throw warpCompletionException(ioe); + } + } + throw warpCompletionException(e); + }); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java new file mode 100644 index 00000000000..0243f0a0a1a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; + +/** + * The {@code FinallyFunction} interface represents a function that is used to perform + * final actions after an asynchronous operation completes, regardless of whether the + * operation was successful or resulted in an exception. This interface is part of + * the asynchronous utilities provided by the Hadoop Distributed File System (HDFS) + * Federation router. + * + * <p>A {@code FinallyFunction} is typically used for cleanup or finalization tasks, + * such as releasing resources, closing connections, or performing other actions that + * need to occur after all other operations have completed.</p> + * + * <p>An implementation of this interface should define what the final action is. It + * takes the result of the asynchronous operation as an argument and returns a new + * result, which can be the same as the input result or a modified version of it.</p> + * + * <p>FinallyFunction is used to implement the following semantics:</p> + * <pre> + * {@code + * try{ + * R res = doAsync(input); + * } catch(...) { + * ... + * } finally { + * // Can use FinallyFunction + * R result = thenApply(res); + * } + * } + * </pre> + * + * @param <R> the type of the result of the asynchronous operation + */ +@FunctionalInterface +public interface FinallyFunction<R> { + + /** + * Applies this final action function to the result of an asynchronous operation. + * + * @param r the result of the asynchronous operation, which may be null if the + * operation did not complete successfully + * @return the result after applying the final action, which may be a new result or a + * modified version of the input result + * @throws IOException if an I/O error occurs during the application of the final action + */ + R apply(R r) throws IOException; + + /** + * Applies this final action function to a {@code CompletableFuture}, which is expected + * to be the result of an asynchronous operation. + * <p> + * This method is a convenience that simplifies the use of {@code FinallyFunction} + * with asynchronous operations. It handles the completion of the future and applies + * the {@code FinallyFunction} to the result. + * + * @param in the {@code CompletableFuture} representing the asynchronous operation + * @return a new {@code CompletableFuture} that completes with the result of applying + * the final action function + */ + default CompletableFuture<R> apply(CompletableFuture<R> in) { + return in.handle((r, e) -> { + try { + R ret = apply(r); + if (e != null) { + throw warpCompletionException(e); + } else { + return ret; + } + } catch (IOException ioe) { + throw warpCompletionException(ioe); + } + }); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java new file mode 100644 index 00000000000..48fd0ad89ab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes that facilitate asynchronous operations within the Hadoop + * Distributed File System (HDFS) Federation router. These classes are designed to work with + * the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that + * can improve the performance and responsiveness of HDFS operations. + * + * <p>These classes work together to enable complex asynchronous workflows, making it easier to + * write code that can handle long-running tasks without blocking, thus improving the overall + * efficiency and scalability of HDFS operations.</p> + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java new file mode 100644 index 00000000000..e5bf7ce08e8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Function; + +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCurrent; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry; + +/** + * AsyncClass demonstrates the conversion of synchronous methods + * from SyncClass into asynchronous operations using AsyncUtil. + * This class overrides methods with asynchronous logic, enhancing + * the performance by allowing non-blocking task execution. + * + * <p> + * By utilizing AsyncUtil's utility methods, such as asyncApply, + * asyncForEach, and others, each method in AsyncClass can perform + * time-consuming tasks on a separate thread, thus not blocking + * the main execution thread. + * </p> + * + * <p> + * For example, the applyMethod in AsyncClass is an async version of + * the same method in SyncClass. It uses asyncApply to schedule + * the timeConsumingMethod to run asynchronously and returns a + * CompletableFuture that will be completed with the result of + * the operation. + * </p> + * + * <p> + * This class serves as an example of how to transform synchronous + * operations into asynchronous ones using the AsyncUtil tools, + * which can be applied to other parts of the HDFS Federation + * router or similar systems to improve concurrency and + * performance. + * </p> + * + * @see SyncClass + * @see AsyncUtil + * @see CompletableFuture + */ +public class AsyncClass extends SyncClass{ + private static final Logger LOG = + LoggerFactory.getLogger(AsyncClass.class); + private ExecutorService executorService; + private final static String ASYNC_WORKER = "Async Worker"; + + public AsyncClass(long timeConsuming) { + super(timeConsuming); + executorService = Executors.newFixedThreadPool(1, r -> { + Thread asyncWork = new Thread(r); + asyncWork.setDaemon(true); + asyncWork.setName(ASYNC_WORKER); + return asyncWork; + }); + } + + @Override + public String applyMethod(int input) { + timeConsumingMethod(input); + asyncApply(res -> { + return "applyMethod" + res; + }); + return asyncReturn(String.class); + } + + @Override + public String applyMethod(int input, boolean canException) { + timeConsumingMethod(input); + asyncApply(res -> { + if (canException) { + if (res.equals("[2]")) { + throw new IOException("input 2 exception"); + } else if (res.equals("[3]")) { + throw new RuntimeException("input 3 exception"); + } + } + return res; + }); + return asyncReturn(String.class); + } + + @Override + public String exceptionMethod(int input) { + if (input == 2) { + asyncThrowException(new IOException("input 2 exception")); + return null; + } else if (input == 3) { + asyncThrowException(new RuntimeException("input 3 exception")); + return null; + } + return applyMethod(input); + } + + @Override + public String forEachMethod(List<Integer> list) { + StringBuilder result = new StringBuilder(); + asyncForEach(list.iterator(), + (forEach, input) -> { + timeConsumingMethod(input); + asyncApply(res -> { + result.append("forEach" + res + ","); + return result.toString(); + }); + }); + return asyncReturn(String.class); + } + + @Override + public String forEachBreakMethod(List<Integer> list) { + StringBuilder result = new StringBuilder(); + asyncForEach(list.iterator(), + (forEach, input) -> { + timeConsumingMethod(input); + asyncApply(res -> { + if (res.equals("[2]")) { + forEach.breakNow(); + } else { + result.append("forEach" + res + ","); + } + return result.toString(); + }); + }); + return asyncReturn(String.class); + } + + @Override + public String forEachBreakByExceptionMethod(List<Integer> list) { + StringBuilder result = new StringBuilder(); + asyncForEach(list.iterator(), + (forEach, input) -> { + asyncTry(() -> { + applyMethod(input, true); + asyncApply(res -> { + result.append("forEach" + res + ","); + return result.toString(); + }); + }); + asyncCatch((res, e) -> { + if (e instanceof IOException) { + result.append(e + ","); + } else if (e instanceof RuntimeException) { + forEach.breakNow(); + } + return result.toString(); + }, Exception.class); + }); + return asyncReturn(String.class); + } + + @Override + public String applyThenApplyMethod(int input) { + timeConsumingMethod(input); + asyncApply((AsyncApplyFunction<String, String>) res -> { + if (res.equals("[1]")) { + timeConsumingMethod(2); + } else { + asyncComplete(res); + } + }); + return asyncReturn(String.class); + } + + @Override + public String applyCatchThenApplyMethod(int input) { + asyncTry(() -> applyMethod(input, true)); + asyncCatch((AsyncCatchFunction<String, IOException>) (res, ioe) -> { + applyMethod(1); + }, IOException.class); + return asyncReturn(String.class); + } + + @Override + public String applyCatchFinallyMethod( + int input, List<String> resource) { + asyncTry(() -> applyMethod(input, true)); + asyncCatch((res, e) -> { + throw new IOException("Catch " + e.getMessage()); + }, IOException.class); + asyncFinally((FinallyFunction<String>) res -> { + resource.clear(); + return res; + }); + return asyncReturn(String.class); + } + + @Override + public String currentMethod(List<Integer> list) { + asyncCurrent(list, + input -> applyMethod(input, true), + (Function<CompletableFuture<String>[], String>) futures -> { + StringBuilder result = new StringBuilder(); + for (Future<String> future : futures) { + try { + String res = future.get(); + result.append(res + ","); + } catch (Exception e) { + result.append(e.getMessage() + ","); + } + } + return result.toString(); + }); + return asyncReturn(String.class); + } + + @Override + public String timeConsumingMethod(int input) { + CompletableFuture<Object> result = CompletableFuture + .supplyAsync(() -> { + LOG.info("[{} thread] invoke consumingMethod for parameter: {}", + Thread.currentThread().getName(), input); + return AsyncClass.super.timeConsumingMethod(input); + }, executorService); + Async.CUR_COMPLETABLE_FUTURE.set(result); + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java new file mode 100644 index 00000000000..8d5b5b1dc82 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.List; + +/** + * It defines a set of methods that can be executed either synchronously + * or asynchronously, depending on the implementation. + * + * <p> + * This interface is designed to abstract the common operations that need + * to be performed in a time-consuming manner, such as processing a list + * of items or applying a method that involves I/O operations. By defining + * these methods in an interface, it allows for both synchronous and + * asynchronous implementations, providing flexibility and the ability to + * improve performance without changing the external API. + * </p> + * + * <p> + * Implementations of this interface are expected to provide concrete + * implementations of the defined methods, either by performing the + * operations synchronously in a blocking manner or by performing them + * asynchronously in a non-blocking manner. + * </p> + * + * @see SyncClass + * @see AsyncClass + */ +public interface BaseClass { + String applyMethod(int input); + + String applyMethod(int input, boolean canException) throws IOException; + + String exceptionMethod(int input) throws IOException; + + String forEachMethod(List<Integer> list); + + String forEachBreakMethod(List<Integer> list); + + String forEachBreakByExceptionMethod(List<Integer> list); + + String applyThenApplyMethod(int input); + + String applyCatchThenApplyMethod(int input); + + String applyCatchFinallyMethod(int input, List<String> resource) throws IOException; + + String currentMethod(List<Integer> list); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java new file mode 100644 index 00000000000..e55edb098e1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * SyncClass implements BaseClass, providing a synchronous + * version of the methods. All operations are performed in a + * blocking manner, waiting for completion before proceeding. + * + * This class is the foundation for the AsyncClass, which + * provides asynchronous implementations. + * + * @see BaseClass + * @see AsyncClass + */ +public class SyncClass implements BaseClass{ + private long timeConsuming; + + public SyncClass(long timeConsuming) { + this.timeConsuming = timeConsuming; + } + + @Override + public String applyMethod(int input) { + String res = timeConsumingMethod(input); + return "applyMethod" + res; + } + + @Override + public String applyMethod(int input, boolean canException) throws IOException { + String res = timeConsumingMethod(input); + if (canException) { + if (res.equals("[2]")) { + throw new IOException("input 2 exception"); + } else if (res.equals("[3]")) { + throw new RuntimeException("input 3 exception"); + } + } + return res; + } + + @Override + public String exceptionMethod(int input) throws IOException { + if (input == 2) { + throw new IOException("input 2 exception"); + } else if (input == 3) { + throw new RuntimeException("input 3 exception"); + } + return applyMethod(input); + } + + @Override + public String forEachMethod(List<Integer> list) { + StringBuilder result = new StringBuilder(); + for (int input : list) { + String res = timeConsumingMethod(input); + result.append("forEach" + res + ","); + } + return result.toString(); + } + + @Override + public String forEachBreakMethod(List<Integer> list) { + StringBuilder result = new StringBuilder(); + for (int input : list) { + String res = timeConsumingMethod(input); + if (res.equals("[2]")) { + break; + } + result.append("forEach" + res + ","); + } + return result.toString(); + } + + @Override + public String forEachBreakByExceptionMethod(List<Integer> list) { + StringBuilder result = new StringBuilder(); + for (int input : list) { + try { + String res = applyMethod(input, true); + result.append("forEach" + res + ","); + } catch (IOException e) { + result.append(e + ","); + } catch (RuntimeException e) { + break; + } + } + return result.toString(); + } + + @Override + public String applyThenApplyMethod(int input) { + String res = timeConsumingMethod(input); + if (res.equals("[1]")) { + res = timeConsumingMethod(2); + } + return res; + } + + @Override + public String applyCatchThenApplyMethod(int input) { + String res = null; + try { + res = applyMethod(input, true); + } catch (IOException e) { + res = applyMethod(1); + } + return res; + } + + @Override + public String applyCatchFinallyMethod( + int input, List<String> resource) throws IOException { + String res = null; + try { + res = applyMethod(input, true); + } catch (IOException e) { + throw new IOException("Catch " + e.getMessage()); + } finally { + resource.clear(); + } + return res; + } + + @Override + public String currentMethod(List<Integer> list) { + ExecutorService executor = getExecutorService(); + List<Future<String>> futures = new ArrayList<>(); + for (int input : list) { + Future<String> future = executor.submit( + () -> applyMethod(input, true)); + futures.add(future); + } + + StringBuilder result = new StringBuilder(); + for (Future<String> future : futures) { + try { + String res = future.get(); + result.append(res + ","); + } catch (Exception e) { + result.append(e.getMessage() + ","); + } + } + return result.toString(); + } + + + /** + * Simulates a synchronous method that performs + * a time-consuming task and returns a result. + * + * @param input The input parameter for the method. + * @return A string that represents the result of the method. + */ + public String timeConsumingMethod(int input) { + try { + Thread.sleep(timeConsuming); + return "[" + input + "]"; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return "Error:" + e.getMessage(); + } + } + + private ExecutorService getExecutorService() { + return Executors.newFixedThreadPool(2, r -> { + Thread t = new Thread(r); + t.setDaemon(true); + return t; + }); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java new file mode 100644 index 00000000000..c540af612b9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * The TestAsyncUtil class provides a suite of test cases for the + * asynchronous utility class AsyncUtil. It utilizes the JUnit testing + * framework to verify that asynchronous operations are performed as + * expected. + * + * <p> + * This class contains multiple test methods designed to test various + * asynchronous operation scenarios, including: + * <ul> + * <li>testApply - Tests the asynchronous application of a method.</li> + * <li>testApplyException - Tests exception handling in + * asynchronous methods.</li> + * <li>testApplyThenApplyMethod - Tests the chaining of + * asynchronous method calls.</li> + * <li>testCatchThenApplyMethod - Tests the invocation of + * asynchronous methods after exception catching.</li> + * <li>testForEach - Tests asynchronous iteration operations.</li> + * <li>testForEachBreak - Tests asynchronous iteration with break + * conditions.</li> + * <li>testForEachBreakByException - Tests the interruption of + * asynchronous iteration due to exceptions.</li> + * </ul> + * </p> + * + * The tests cover both synchronous (Sync) and asynchronous (Async) + * configurations to ensure consistent behavior under different + * execution modes. + * + * @see AsyncUtil + * @see BaseClass + * @see SyncClass + * @see AsyncClass + */ +public class TestAsyncUtil { + private static final Logger LOG = + LoggerFactory.getLogger(TestAsyncUtil.class); + private static final long TIME_CONSUMING = 100; + private BaseClass baseClass; + private boolean enableAsync; + + public enum ExecutionMode { + SYNC, + ASYNC + } + + @Before + public void setUp(ExecutionMode mode) { + if (mode.equals(ExecutionMode.ASYNC)) { + baseClass = new AsyncClass(TIME_CONSUMING); + enableAsync = true; + } else { + baseClass = new SyncClass(TIME_CONSUMING); + } + } + + @After + public void after() { + baseClass = null; + enableAsync = false; + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testApply(ExecutionMode mode) + throws Exception { + setUp(mode); + long start = Time.monotonicNow(); + String result = baseClass.applyMethod(1); + long cost = Time.monotonicNow() - start; + LOG.info("[{}] main thread cost: {} ms", mode, cost); + checkResult("applyMethod[1]", result, TIME_CONSUMING, cost); + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testApplyException(ExecutionMode mode) throws Exception { + setUp(mode); + checkException( + () -> baseClass.applyMethod(2, true), + IOException.class, "input 2 exception"); + + checkException( + () -> baseClass.applyMethod(3, true), + RuntimeException.class, "input 3 exception"); + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testExceptionMethod(ExecutionMode mode) throws Exception { + setUp(mode); + checkException( + () -> baseClass.exceptionMethod(2), + IOException.class, "input 2 exception"); + + checkException( + () -> baseClass.exceptionMethod(3), + RuntimeException.class, "input 3 exception"); + + long start = Time.monotonicNow(); + String result = baseClass.exceptionMethod(1); + long cost = Time.monotonicNow() - start; + LOG.info("[{}] main thread cost: {} ms", mode, cost); + checkResult("applyMethod[1]", result, TIME_CONSUMING, cost); + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testApplyThenApplyMethod(ExecutionMode mode) throws Exception { + setUp(mode); + long start = Time.monotonicNow(); + String result = baseClass.applyThenApplyMethod(1); + long cost = Time.monotonicNow() - start; + checkResult("[2]", result, TIME_CONSUMING, cost); + LOG.info("[{}] main thread cost: {} ms", mode, cost); + + start = Time.monotonicNow(); + result = baseClass.applyThenApplyMethod(3); + cost = Time.monotonicNow() - start; + checkResult("[3]", result, TIME_CONSUMING, cost); + LOG.info("[{}] main thread cost: {} ms", mode, cost); + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testCatchThenApplyMethod(ExecutionMode mode) throws Exception { + setUp(mode); + long start = Time.monotonicNow(); + String result = baseClass.applyCatchThenApplyMethod(2); + long cost = Time.monotonicNow() - start; + checkResult("applyMethod[1]", result, TIME_CONSUMING, cost); + LOG.info("[{}] main thread cost: {} ms", mode, cost); + + start = Time.monotonicNow(); + result = baseClass.applyCatchThenApplyMethod(0); + cost = Time.monotonicNow() - start; + checkResult("[0]", result, TIME_CONSUMING, cost); + LOG.info("[{}] main thread cost: {} ms", mode, cost); + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testCatchFinallyMethod(ExecutionMode mode) throws Exception { + setUp(mode); + List<String> resource = new ArrayList<>(); + resource.add("resource1"); + checkException( + () -> baseClass.applyCatchFinallyMethod(2, resource), + IOException.class, "input 2 exception"); + assertTrue(resource.size() == 0); + + long start = Time.monotonicNow(); + String result = baseClass.applyCatchFinallyMethod(0, resource); + long cost = Time.monotonicNow() - start; + checkResult("[0]", result, TIME_CONSUMING, cost); + assertTrue(resource.size() == 0); + LOG.info("[{}] main thread cost: {} ms", mode, cost); + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testForEach(ExecutionMode mode) throws Exception { + setUp(mode); + long start = Time.monotonicNow(); + String result = baseClass.forEachMethod(Arrays.asList(1, 2, 3)); + long cost = Time.monotonicNow() - start; + LOG.info("[{}] main thread cost: {} ms", mode, cost); + checkResult("forEach[1],forEach[2],forEach[3],", result, + TIME_CONSUMING, cost); + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testForEachBreak(ExecutionMode mode) throws Exception { + setUp(mode); + long start = Time.monotonicNow(); + String result = baseClass.forEachBreakMethod(Arrays.asList(1, 2, 3)); + long cost = Time.monotonicNow() - start; + LOG.info("[{}] main thread cost: {} ms", mode, cost); + checkResult("forEach[1],", result, TIME_CONSUMING, cost); + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testForEachBreakByException(ExecutionMode mode) + throws Exception { + setUp(mode); + long start = Time.monotonicNow(); + String result = baseClass.forEachBreakByExceptionMethod(Arrays.asList(1, 2, 3)); + long cost = Time.monotonicNow() - start; + LOG.info("[{}] main thread cost: {} ms", mode, cost); + checkResult("forEach[1],java.io.IOException: input 2 exception,", + result, TIME_CONSUMING, cost); + } + + @EnumSource(ExecutionMode.class) + @ParameterizedTest + public void testCurrentMethod(ExecutionMode mode) + throws Exception { + setUp(mode); + long start = Time.monotonicNow(); + String result = baseClass.currentMethod(Arrays.asList(1, 2, 3)); + long cost = Time.monotonicNow() - start; + LOG.info("[{}] main thread cost: {} ms", mode, cost); + checkResult("[1],java.io.IOException: input 2 exception," + + "java.lang.RuntimeException: input 3 exception,", + result, TIME_CONSUMING, cost); + } + + private void checkResult( + String result, String actualResult, long cost, long actualCost) + throws Exception { + if (enableAsync) { + Assertions.assertNull(actualResult); + actualResult = AsyncUtil.syncReturn(String.class); + assertNotNull(actualResult); + assertTrue(actualCost < cost); + } else { + assertFalse(actualCost < cost); + } + assertEquals(result, actualResult); + } + + private < E extends Throwable> void checkException( + Callable<String> eval, Class<E> clazz, String contained) throws Exception { + if (enableAsync) { + LambdaTestUtils.intercept(clazz, contained, + () -> { + eval.call(); + return AsyncUtil.syncReturn(String.class); + }); + } else { + LambdaTestUtils.intercept(clazz, contained, () -> { + String res = eval.call(); + return res; + }); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org