jpountz commented on code in PR #12569:
URL: https://github.com/apache/lucene/pull/12569#discussion_r1330338668
##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -64,4 +82,26 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>>
tasks) throws IOExcept
}
return results;
}
+
+ final <C> Task<C> createTask(Callable<C> callable) {
+ return new Task<>(callable);
+ }
+
+ static class Task<V> extends FutureTask<V> {
+ private Task(Callable<V> callable) {
+ super(callable);
+ }
+
+ @Override
+ public void run() {
+ try {
+ Integer counter = runSameThread.get();
+ runSameThread.set(++counter);
+ super.run();
+ } finally {
+ Integer counter = runSameThread.get();
+ runSameThread.set(--counter);
Review Comment:
and likewise here
```suggestion
runSameThread.set(counter - 1);
```
##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -64,4 +82,26 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>>
tasks) throws IOExcept
}
return results;
}
+
+ final <C> Task<C> createTask(Callable<C> callable) {
+ return new Task<>(callable);
+ }
+
+ static class Task<V> extends FutureTask<V> {
+ private Task(Callable<V> callable) {
+ super(callable);
+ }
+
+ @Override
+ public void run() {
+ try {
+ Integer counter = runSameThread.get();
+ runSameThread.set(++counter);
Review Comment:
nit: it feels weird to increment when you don't read the value later on
```suggestion
runSameThread.set(counter + 1);
```
##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -22,18 +22,29 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.FutureTask;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Executor wrapper responsible for the execution of concurrent tasks. Used to
parallelize search
- * across segments as well as query rewrite in some cases.
+ * across segments as well as query rewrite in some cases. Exposes a {@link
#createTask(Callable)}
+ * method to create tasks given a {@link Callable}, as well as the {@link
#invokeAll(Collection)}
+ * method to execute a set of tasks concurrently. Once all tasks are submitted
to the executor, it
+ * blocks and wait for all tasks to be completed, and then returns a list with
the obtained results.
+ * Ensures that the underlying executor is only used for top-level {@link
#invokeAll(Collection)}
+ * calls, and not for potential {@link #invokeAll(Collection)} calls made from
one of the tasks.
+ * This is to prevent deadlock with certain types of executors, as well as to
limit the level of
+ * parallelism.
*/
class TaskExecutor {
+ // a static thread local is ok as long as there is a single TaskExecutor
ever created
Review Comment:
But there would usually be one TaskExecutor per IndexSearcher right? So more
than one for users that have multiple indexes in the same JVM? Even for a
single IndexSearcher, you could have two point-in-time views that are open at
the same time and would have different `TaskExecutor` instances.
IMO `static` is helpful because it also helps cover cases when users search
into another `IndexSearcher` from a collector?
##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -22,18 +22,28 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.FutureTask;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Executor wrapper responsible for the execution of concurrent tasks. Used to
parallelize search
- * across segments as well as query rewrite in some cases.
+ * across segments as well as query rewrite in some cases. Exposes a {@link
#createTask(Callable)}
+ * method to create tasks given a {@link Callable}, as well as the {@link
#invokeAll(Collection)}
+ * method to execute a set of tasks concurrently. Once all tasks are submitted
to the executor, it
+ * blocks and wait for all tasks to be completed, and then returns a list with
the obtained results.
+ * Ensures that the underlying executor is only used for top-level {@link
#invokeAll(Collection)}
+ * calls, and not for potential {@link #invokeAll(Collection)} calls made from
one of the tasks.
+ * This is to prevent deadlock with certain types of executors, as well as to
limit the level of
+ * parallelism.
Review Comment:
I would remove the bit about limiting the level of parallelism. I don't
think it's a goal, mostly a side effect of the logic to avoid deadlocks.
It's true that this might hurt executors that are not subject to deadlocks,
but I would be very surprised if there were many users relying on it today
since it can only happen when running a rewrite or a search from a rewrite or a
search, which is not typical.
##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -22,18 +22,29 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.FutureTask;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Executor wrapper responsible for the execution of concurrent tasks. Used to
parallelize search
- * across segments as well as query rewrite in some cases.
+ * across segments as well as query rewrite in some cases. Exposes a {@link
#createTask(Callable)}
+ * method to create tasks given a {@link Callable}, as well as the {@link
#invokeAll(Collection)}
+ * method to execute a set of tasks concurrently. Once all tasks are submitted
to the executor, it
+ * blocks and wait for all tasks to be completed, and then returns a list with
the obtained results.
+ * Ensures that the underlying executor is only used for top-level {@link
#invokeAll(Collection)}
+ * calls, and not for potential {@link #invokeAll(Collection)} calls made from
one of the tasks.
+ * This is to prevent deadlock with certain types of executors, as well as to
limit the level of
+ * parallelism.
*/
class TaskExecutor {
+ // a static thread local is ok as long as there is a single TaskExecutor
ever created
+ private static final ThreadLocal<Integer> runSameThread =
ThreadLocal.withInitial(() -> 0);
Review Comment:
can we make the name more descriptive, e.g.
`numberOfRunningTasksInCurrentThread` or something along these lines?
Maybe also leave a comment about how tracking counts instead of just
booleans is important in case `ThreadPoolExecutor.CallerRunsPolicy` is used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]