uschindler commented on code in PR #12569:
URL: https://github.com/apache/lucene/pull/12569#discussion_r1330062036
##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -22,18 +22,28 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.FutureTask;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Executor wrapper responsible for the execution of concurrent tasks. Used to
parallelize search
- * across segments as well as query rewrite in some cases.
+ * across segments as well as query rewrite in some cases. Exposes a {@link
#createTask(Callable)}
+ * method to create tasks given a {@link Callable}, as well as the {@link
#invokeAll(Collection)}
+ * method to execute a set of tasks concurrently. Once all tasks are submitted
to the executor, it
+ * blocks and wait for all tasks to be completed, and then returns a list with
the obtained results.
+ * Ensures that the underlying executor is only used for top-level {@link
#invokeAll(Collection)}
+ * calls, and not for potential {@link #invokeAll(Collection)} calls made from
one of the tasks.
+ * This is to prevent deadlock with certain types of executors, as well as to
limit the level of
+ * parallelism.
*/
class TaskExecutor {
Review Comment:
This class is package private, so how can one use it? IndexSearcher seems to
have a method returning TaskExecutor but with a non-public class. This prevents
usage in code outside Lucene. See `TestTaskExecutor` below.
##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -22,18 +22,28 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.FutureTask;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Executor wrapper responsible for the execution of concurrent tasks. Used to
parallelize search
- * across segments as well as query rewrite in some cases.
+ * across segments as well as query rewrite in some cases. Exposes a {@link
#createTask(Callable)}
+ * method to create tasks given a {@link Callable}, as well as the {@link
#invokeAll(Collection)}
+ * method to execute a set of tasks concurrently. Once all tasks are submitted
to the executor, it
+ * blocks and wait for all tasks to be completed, and then returns a list with
the obtained results.
+ * Ensures that the underlying executor is only used for top-level {@link
#invokeAll(Collection)}
+ * calls, and not for potential {@link #invokeAll(Collection)} calls made from
one of the tasks.
+ * This is to prevent deadlock with certain types of executors, as well as to
limit the level of
+ * parallelism.
*/
class TaskExecutor {
+ private static final ThreadLocal<Boolean> isConcurrentTask =
ThreadLocal.withInitial(() -> false);
Review Comment:
I have a few problems:
- it is a static threadlocal, which is fine as it is unlikely that several
TaskExecutors use the same threadpool. But there could be problems if you
create two different TaskExecutors both with the same ThreadPool. In that case
the TaskExecutors are no longer decoupled (one affects the other). It might not
be a problem at all, but keep that in mind.
- in the case of different TaskExecutors one task would set the thread local
to false in its finally block, this may cause deadlock in the other
TaskExecutor using same thread pool.
- In addition the name `isConcurrentTask` is misleading, as the idea is to
prevent more concurrent tasks from being executed in thread pool. It should
maybe called "runSameThread".
To also support higher parallelism than 1, I'd change this to
`ThreadLocal<Integer>` and increment on starting task and decrement in the
finally. Then you could have a logic like "run in same thread if current value
>=parallelism". This would also prevent the issues above, because when entering
the run method it is incremented and when exiting it is decremented, so
different executors can't confuse the other.
In general I am not fully happy with using a ThreadLocal here at all. Would
it not be better to pass around the `Task` instance and the task instance has a
method to spawn a subtask? This would be similar to fork/join framework where
the `RecursiveTask` is used for exectly that.
IMHO, we should really switch to fork/join, as we need work stealing
algorithms to prevent deadlocks.
##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
Review Comment:
IndexSearcher should own the TaskExecutor and all queries/collectors can use
it.
As this would change public methods, why not move to work-stealing fork/join
here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]