javanna commented on code in PR #12183:
URL: https://github.com/apache/lucene/pull/12183#discussion_r1235163337
##########
lucene/core/src/java/org/apache/lucene/index/TermStates.java:
##########
@@ -86,19 +92,58 @@ public TermStates(
* @param needsStats if {@code true} then all leaf contexts will be visited
up-front to collect
* term statistics. Otherwise, the {@link TermState} objects will be
built only when requested
*/
- public static TermStates build(IndexReaderContext context, Term term,
boolean needsStats)
+ public static TermStates build(
+ IndexSearcher indexSearcher, IndexReaderContext context, Term term,
boolean needsStats)
throws IOException {
assert context != null && context.isTopLevel;
final TermStates perReaderTermState = new TermStates(needsStats ? null :
term, context);
if (needsStats) {
- for (final LeafReaderContext ctx : context.leaves()) {
- // if (DEBUG) System.out.println(" r=" + leaves[i].reader);
- TermsEnum termsEnum = loadTermsEnum(ctx, term);
- if (termsEnum != null) {
- final TermState termState = termsEnum.termState();
- // if (DEBUG) System.out.println(" found");
- perReaderTermState.register(
- termState, ctx.ord, termsEnum.docFreq(),
termsEnum.totalTermFreq());
+ Executor executor = indexSearcher.getExecutor();
+ boolean isShutdown = false;
+ if (executor instanceof ExecutorService) {
+ isShutdown = ((ExecutorService) executor).isShutdown();
+ }
+ if (executor != null && isShutdown == false) {
+ // build term states concurrently
+ List<FutureTask<Integer>> tasks =
+ context.leaves().stream()
+ .map(
+ ctx ->
+ new FutureTask<>(
+ () -> {
+ TermsEnum termsEnum = loadTermsEnum(ctx, term);
+ if (termsEnum != null) {
+ final TermState termState =
termsEnum.termState();
+ perReaderTermState.register(
+ termState,
+ ctx.ord,
+ termsEnum.docFreq(),
+ termsEnum.totalTermFreq());
+ }
+ return 0;
+ }))
+ .toList();
+ for (FutureTask<Integer> task : tasks) {
+ executor.execute(task);
Review Comment:
Should we rely on slices here and `SliceExecutor` instead of having one task
per segment? I am worried that we'd create too many tasks and request too many
threads from the executor.
##########
lucene/core/src/java/org/apache/lucene/index/TermStates.java:
##########
@@ -86,19 +92,58 @@ public TermStates(
* @param needsStats if {@code true} then all leaf contexts will be visited
up-front to collect
* term statistics. Otherwise, the {@link TermState} objects will be
built only when requested
*/
- public static TermStates build(IndexReaderContext context, Term term,
boolean needsStats)
+ public static TermStates build(
+ IndexSearcher indexSearcher, IndexReaderContext context, Term term,
boolean needsStats)
throws IOException {
assert context != null && context.isTopLevel;
final TermStates perReaderTermState = new TermStates(needsStats ? null :
term, context);
if (needsStats) {
- for (final LeafReaderContext ctx : context.leaves()) {
- // if (DEBUG) System.out.println(" r=" + leaves[i].reader);
- TermsEnum termsEnum = loadTermsEnum(ctx, term);
- if (termsEnum != null) {
- final TermState termState = termsEnum.termState();
- // if (DEBUG) System.out.println(" found");
- perReaderTermState.register(
- termState, ctx.ord, termsEnum.docFreq(),
termsEnum.totalTermFreq());
+ Executor executor = indexSearcher.getExecutor();
+ boolean isShutdown = false;
+ if (executor instanceof ExecutorService) {
+ isShutdown = ((ExecutorService) executor).isShutdown();
+ }
+ if (executor != null && isShutdown == false) {
+ // build term states concurrently
+ List<FutureTask<Integer>> tasks =
+ context.leaves().stream()
+ .map(
+ ctx ->
+ new FutureTask<>(
+ () -> {
+ TermsEnum termsEnum = loadTermsEnum(ctx, term);
+ if (termsEnum != null) {
+ final TermState termState =
termsEnum.termState();
+ perReaderTermState.register(
+ termState,
+ ctx.ord,
+ termsEnum.docFreq(),
+ termsEnum.totalTermFreq());
+ }
+ return 0;
+ }))
+ .toList();
+ for (FutureTask<Integer> task : tasks) {
+ executor.execute(task);
+ }
+ for (FutureTask<Integer> task : tasks) {
+ try {
+ task.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
Review Comment:
I see this type of pattern of task creation, executor#execute, then wait for
all tasks to be completed being repeated in different places: in
`IndexSearcher#search(Query, CollectorManager)`, in
`AbstractKnnvectoryQuery#parallelSearch`, and it's being added in other places
as part of this PR. I wonder if we should look at consolidating the execution
logic, unless there are different requirements in these different places where
perform concurrent execution.
##########
lucene/core/src/java/org/apache/lucene/index/TermStates.java:
##########
@@ -86,19 +92,58 @@ public TermStates(
* @param needsStats if {@code true} then all leaf contexts will be visited
up-front to collect
* term statistics. Otherwise, the {@link TermState} objects will be
built only when requested
*/
- public static TermStates build(IndexReaderContext context, Term term,
boolean needsStats)
+ public static TermStates build(
+ IndexSearcher indexSearcher, IndexReaderContext context, Term term,
boolean needsStats)
throws IOException {
assert context != null && context.isTopLevel;
final TermStates perReaderTermState = new TermStates(needsStats ? null :
term, context);
if (needsStats) {
- for (final LeafReaderContext ctx : context.leaves()) {
- // if (DEBUG) System.out.println(" r=" + leaves[i].reader);
- TermsEnum termsEnum = loadTermsEnum(ctx, term);
- if (termsEnum != null) {
- final TermState termState = termsEnum.termState();
- // if (DEBUG) System.out.println(" found");
- perReaderTermState.register(
- termState, ctx.ord, termsEnum.docFreq(),
termsEnum.totalTermFreq());
+ Executor executor = indexSearcher.getExecutor();
+ boolean isShutdown = false;
+ if (executor instanceof ExecutorService) {
+ isShutdown = ((ExecutorService) executor).isShutdown();
+ }
+ if (executor != null && isShutdown == false) {
+ // build term states concurrently
+ List<FutureTask<Integer>> tasks =
+ context.leaves().stream()
+ .map(
+ ctx ->
+ new FutureTask<>(
+ () -> {
+ TermsEnum termsEnum = loadTermsEnum(ctx, term);
+ if (termsEnum != null) {
+ final TermState termState =
termsEnum.termState();
+ perReaderTermState.register(
+ termState,
+ ctx.ord,
+ termsEnum.docFreq(),
+ termsEnum.totalTermFreq());
+ }
+ return 0;
+ }))
+ .toList();
+ for (FutureTask<Integer> task : tasks) {
+ executor.execute(task);
+ }
+ for (FutureTask<Integer> task : tasks) {
+ try {
+ task.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
Review Comment:
Another aspect that gives a bit of headache is this blocking wait while the
tasks are completing. There is no better way today, but if the plan is to
introduce concurrency in more and more places over time, should we consider
some async approach, where there is no thread waiting for the tasks to be
completed, but rather the last task that completes has a callback to do what
needs to be done. I do realize that this is a big change, it would hinder code
readability, but it would be more efficient...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]