keith-turner commented on code in PR #2752:
URL: https://github.com/apache/accumulo/pull/2752#discussion_r897406839
##########
core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java:
##########
@@ -49,4 +51,28 @@ public static <T> CompletableFuture<T>
merge(List<CompletableFuture<T>> futures,
return futures.get(0);
}
+ /**
+ * Asynchronously, iterate some function until a given condition is met.
+ */
+ public static <T> CompletableFuture<T>
iterateWhileAsync(Function<T,CompletableFuture<T>> step,
+ Predicate<T> isDone, T init) {
+ // We'd like to use a lambda here, but lambdas don't have
+ // `this`, so we would have to use some clumsy indirection to
+ // achieve self-reference.
+ Function<T,CompletableFuture<T>> go = new Function<>() {
+ @Override
+ public CompletableFuture<T> apply(T x) {
+ if (isDone.test(x)) {
+ return CompletableFuture.completedFuture(x);
+ }
+ // Making the recursive call with thenComposeAsync prevents
+ // stack overflows (but risks deadlock if we run out of
+ // threads).
+ //
+ // TODO should this method take an ExecutorService argument?
+ return step.apply(x).thenComposeAsync(this);
Review Comment:
May be able to drop the Async here and a few lines later.
```suggestion
return step.apply(x).thenCompose(this);
```
Elsewhere in the code there is a line like
```
futures.add(CompletableFuture
.supplyAsync(new FilesProcessor(tinfo, location, allFiles,
cancelFlag), execSrv));
```
I think conceptually the thenCompose is a follow on step and would execute
in a thread from execSrv. I was not sure about this so I pulled your branch
and wrote the following test program.
```java
package org.apache.accumulo.core.summary;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import java.util.function.Function;
import org.apache.accumulo.core.util.CompletableFutureUtil;
public class Test {
public static void main(String args[]) throws Exception{
ExecutorService es = Executors.newFixedThreadPool(2);
Predicate<Long> test = num -> num >=10;
Function<Long, CompletableFuture<Long>> step = num -> {
System.out.println(num+" Step thread
"+Thread.currentThread().getName());
var cf = CompletableFuture.supplyAsync(() ->{
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(num+" Supply async thread
"+Thread.currentThread().getName());
return num;
}, es);
cf = cf.thenApply(l -> {
System.out.println(l+" Then apply thread
"+Thread.currentThread().getName());
return l+1;
});
return cf;
};
CompletableFuture<Long> result =
CompletableFutureUtil.iterateWhileAsync(step, test, 5L);
System.out.println(result.get());
}
}
```
Without modifying iterateWhileAsync I see the following output from this
program.
```
5 Step thread ForkJoinPool.commonPool-worker-3
5 Supply async thread pool-1-thread-1
5 Then apply thread pool-1-thread-1
6 Step thread ForkJoinPool.commonPool-worker-3
6 Supply async thread pool-1-thread-2
6 Then apply thread pool-1-thread-2
7 Step thread ForkJoinPool.commonPool-worker-3
7 Supply async thread pool-1-thread-1
7 Then apply thread pool-1-thread-1
8 Step thread ForkJoinPool.commonPool-worker-3
8 Supply async thread pool-1-thread-2
8 Then apply thread pool-1-thread-2
9 Step thread ForkJoinPool.commonPool-worker-3
9 Supply async thread pool-1-thread-1
9 Then apply thread pool-1-thread-1
10
```
When I modify `iterateWhileAsync` to use `thenCompose` instead of
`thenComposeAsync` I see the following output.
```
5 Step thread main
5 Supply async thread pool-1-thread-1
5 Then apply thread pool-1-thread-1
6 Step thread pool-1-thread-1
6 Supply async thread pool-1-thread-2
6 Then apply thread pool-1-thread-2
7 Step thread pool-1-thread-2
7 Supply async thread pool-1-thread-2
7 Then apply thread pool-1-thread-2
8 Step thread pool-1-thread-2
8 Supply async thread pool-1-thread-2
8 Then apply thread pool-1-thread-2
9 Step thread pool-1-thread-2
9 Supply async thread pool-1-thread-2
9 Then apply thread pool-1-thread-2
10
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]