javanna commented on code in PR #15936:
URL: https://github.com/apache/lucene/pull/15936#discussion_r3316222121


##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java:
##########
@@ -290,4 +296,146 @@ public static <T> TopGroups<T> merge(
           totalMaxScore);
     }
   }
+
+  private record MergedBlockGroup(Object[] topValues, int shardIndex, int 
groupIndex) {}
+
+  private static class GroupComparator implements Comparator<MergedBlockGroup> 
{
+    @SuppressWarnings("rawtypes")
+    public final FieldComparator[] comparators;
+
+    public final int[] reversed;
+
+    @SuppressWarnings({"rawtypes"})
+    public GroupComparator(Sort groupSort) {
+      final SortField[] sortFields = groupSort.getSort();
+      comparators = new FieldComparator[sortFields.length];
+      reversed = new int[sortFields.length];
+      for (int compIDX = 0; compIDX < sortFields.length; compIDX++) {
+        final SortField sortField = sortFields[compIDX];
+        comparators[compIDX] = sortField.getComparator(1, Pruning.NONE);
+        reversed[compIDX] = sortField.getReverse() ? -1 : 1;
+      }
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked"})
+    public int compare(MergedBlockGroup group, MergedBlockGroup other) {
+      if (group == other) {
+        return 0;
+      }
+      final Object[] groupValues = group.topValues;
+      final Object[] otherValues = other.topValues;
+      for (int compIDX = 0; compIDX < comparators.length; compIDX++) {
+        final int c =
+            reversed[compIDX]
+                * comparators[compIDX].compareValues(groupValues[compIDX], 
otherValues[compIDX]);
+        if (c != 0) {
+          return c;
+        }
+      }
+
+      assert group.shardIndex != other.shardIndex;
+      return group.shardIndex - other.shardIndex;
+    }
+  }
+
+  /**
+   * Merge TopGroups that are partitioned into blocks per shard. This method 
assumes that within
+   * each shard, the groups are sorted according to the groupSort.
+   *
+   * @param shardGroups list of TopGroups, one per shard.
+   * @param groupSort The {@link Sort} used to sort the groups. The top sorted 
document within each
+   *     * group according to groupSort, determines how that group sorts 
against other groups. This
+   *     * must be non-null, ie, if you want to groupSort by relevance use 
Sort.RELEVANCE.
+   * @param groupOffset Which group to start from.
+   * @param topNGroups How many top groups to keep.
+   * @param docSort The sort to use within each group
+   * @return TopGroups instance or null if there are no groups.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> TopGroups<T> mergeBlockGroups(
+      List<TopGroups<T>> shardGroups,
+      Sort groupSort,
+      int groupOffset,
+      int topNGroups,
+      Sort docSort) {
+    if (shardGroups.isEmpty()) {
+      return new TopGroups<>(
+          groupSort.getSort(),
+          docSort.getSort(),
+          0,
+          0,
+          (GroupDocs<T>[]) new GroupDocs<?>[0],
+          Float.NaN);
+    }
+
+    int totalGroupCount = 0;
+    int totalHitCount = 0;
+    int totalGroupedHitCount = 0;
+    for (TopGroups<T> sg : shardGroups) {
+      totalGroupCount += sg.totalGroupCount;
+      totalHitCount += sg.totalHitCount;
+    }
+
+    // k-way merge
+    GroupComparator groupComp = new GroupComparator(groupSort);
+    NavigableSet<MergedBlockGroup> queue = new TreeSet<>(groupComp);
+
+    float totalMaxScore = Float.NaN;
+    final boolean groupSortByRelevance = groupSort.equals(Sort.RELEVANCE);
+    // init queue
+    for (int idx = 0; idx < shardGroups.size(); idx++) {
+      TopGroups<T> topGroups = shardGroups.get(idx);
+      if (topGroups.groups.length == 0) {
+        continue;
+      }
+      if (!groupSortByRelevance) {
+        totalMaxScore = nonNANmax(totalMaxScore, topGroups.maxScore);
+      }
+      GroupDocs<T> firstGroupDocs = topGroups.groups[0];
+      queue.add(new MergedBlockGroup(firstGroupDocs.groupSortValues(), idx, 
0));
+    }
+
+    if (groupSortByRelevance) {
+      totalMaxScore = shardGroups.get(queue.first().shardIndex).maxScore;

Review Comment:
   Could it happen that the queue is empty and this throws exception?



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each segment is searched by a separate {@link BlockGroupingCollector}.

Review Comment:
   each slice is searched by a separate collector. A slice is made of one or 
more segments. Actually, a slice could be a partition of a segment too.



##########
lucene/grouping/src/test/org/apache/lucene/search/grouping/TestBlockGrouping.java:
##########
@@ -77,6 +78,66 @@ public void testSimple() throws IOException {
     shard.close();
   }
 
+  public void testShardedBlockGrouping() throws IOException {
+    Shard shardControl = new Shard();
+    int shardCount = random().nextInt(3) + 2;
+    Shard[] shards = new Shard[shardCount];
+    for (int shardIdx = 0; shardIdx < shardCount; shardIdx++) {
+      shards[shardIdx] = new Shard();
+      // int bookCount = atLeast(20);
+      for (int bookIdx = 0; bookIdx < 5; bookIdx++) {
+        List<Document> block = new ArrayList<>();
+        String bookName = "book" + shardIdx + bookIdx;
+        // int chapterCount = atLeast(10);
+        int chapterCount = 2;
+        for (int j = 0; j < 2; j++) {
+          Document doc = new Document();
+          String chapterName = "chapter" + j;
+          String chapterText = randomText();
+          doc.add(new TextField("book", bookName, Field.Store.YES));
+          doc.add(new TextField("chapter", chapterName, Field.Store.YES));
+          doc.add(new TextField("text", chapterText, Field.Store.NO));
+          doc.add(new NumericDocValuesField("length", chapterText.length()));
+          doc.add(new SortedDocValuesField("book", new BytesRef(bookName)));
+          if (j == chapterCount - 1) {
+            doc.add(new TextField("blockEnd", "true", Field.Store.NO));
+          }
+          block.add(doc);
+        }
+        shards[shardIdx].writer.addDocuments(block);
+        shardControl.writer.addDocuments(block);
+      }
+    }
+
+    IndexSearcher shardControlIndexSearcher = shardControl.getIndexSearcher();
+
+    Query blockEndQuery = new TermQuery(new Term("blockEnd", "true"));
+    GroupingSearch grouper = new GroupingSearch(blockEndQuery);
+    grouper.setGroupDocsLimit(10);
+
+    Query topLevel = new TermQuery(new Term("text", "grandmother"));
+    TopGroups<BytesRef> singleShardTopGroups =
+        grouper.search(shardControlIndexSearcher, topLevel, 0, 5);
+
+    List<TopGroups<BytesRef>> shardTopGroups = new ArrayList<>();
+    for (int shardIdx = 0; shardIdx < shardCount; shardIdx++) {
+      shardTopGroups.add(grouper.search(shards[shardIdx].getIndexSearcher(), 
topLevel, 0, 5));
+    }
+
+    TopGroups<BytesRef> mergedTopGroups =
+        TopGroups.mergeBlockGroups(shardTopGroups, Sort.RELEVANCE, 0, 5, 
Sort.RELEVANCE);
+    assertNotNull(mergedTopGroups);
+
+    assertEquals(singleShardTopGroups.totalHitCount, 
mergedTopGroups.totalHitCount);
+    assertEquals(singleShardTopGroups.totalGroupCount, 
mergedTopGroups.totalGroupCount);
+    assertEquals(singleShardTopGroups.groups.length, 
mergedTopGroups.groups.length);

Review Comment:
   do we need to also verify ordering and add a bit more assertions here?



##########
lucene/grouping/src/test/org/apache/lucene/search/grouping/TestBlockGrouping.java:
##########
@@ -77,6 +77,64 @@ public void testSimple() throws IOException {
     shard.close();
   }
 
+  public void testShardedBlockGrouping() throws IOException {

Review Comment:
   ok thanks for clarifying, could you add some javadocs perhaps to this test 
method to clarify its purpose?



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each segment is searched by a separate {@link BlockGroupingCollector}.
+ *
+ * <p>Documents must be indexed as blocks using {@link
+ * org.apache.lucene.index.IndexWriter#addDocuments 
IndexWriter.addDocuments()} or {@link
+ * org.apache.lucene.index.IndexWriter#updateDocuments 
IndexWriter.updateDocuments()}.
+ *
+ * <p>See {@link BlockGroupingCollector} for more details.
+ *
+ * <p>Example usage:
+ *
+ * <pre class="prettyprint">
+ * IndexSearcher searcher = new IndexSearcher(reader);
+ * Query lastDocPerGroupQuery = new TermQuery(new Term("groupEnd", "true"));
+ * Weight lastDocPerGroup = searcher.createWeight(
+ *     searcher.rewrite(lastDocPerGroupQuery), ScoreMode.COMPLETE_NO_SCORES, 
1);
+ *
+ * BlockGroupingCollectorManager&lt;BytesRef&gt; manager = new 
BlockGroupingCollectorManager&lt;&gt;(
+ *     Sort.RELEVANCE,   // groupSort
+ *     0,                // groupOffset
+ *     10,               // topNGroups
+ *     true,             // needsScores
+ *     lastDocPerGroup,
+ *     Sort.RELEVANCE,   // withinGroupSort
+ *     0,                // withinGroupOffset
+ *     5);               // maxDocsPerGroup
+ *
+ * TopGroups&lt;BytesRef&gt; result = searcher.search(query, manager);
+ * </pre>
+ *
+ * @lucene.experimental
+ */
+public class BlockGroupingCollectorManager<T>
+    implements CollectorManager<BlockGroupingCollector, TopGroups<T>> {
+
+  private final Sort groupSort;
+  private final int groupOffset;
+  private final int topNGroups;
+  private final boolean needsScores;
+  private final Weight lastDocPerGroup;
+
+  private final Sort withinGroupSort;
+  private final int withinGroupOffset;
+  private final int maxDocsPerGroup;
+
+  /**
+   * Creates a new BlockGroupingCollectorManager.
+   *
+   * @param groupSort the sort used to rank groups
+   * @param groupOffset the offset into the groups to start returning from
+   * @param topNGroups the number of top groups to collect
+   * @param needsScores whether scores are needed (must be true if groupSort 
or withinGroupSort uses
+   *     scores)
+   * @param lastDocPerGroup a {@link Weight} that matches the last document in 
each group block
+   * @param withinGroupSort the sort used to rank documents within each group
+   * @param withinGroupOffset the offset into each group's documents to start 
returning from
+   * @param maxDocsPerGroup the maximum number of documents to return per group
+   */
+  public BlockGroupingCollectorManager(
+      Sort groupSort,
+      int groupOffset,
+      int topNGroups,
+      boolean needsScores,
+      Weight lastDocPerGroup,
+      Sort withinGroupSort,
+      int withinGroupOffset,
+      int maxDocsPerGroup) {
+    this.groupSort = groupSort;
+    this.groupOffset = groupOffset;
+    this.topNGroups = topNGroups;
+    this.needsScores = needsScores;
+    this.lastDocPerGroup = lastDocPerGroup;
+    this.withinGroupSort = withinGroupSort;
+    this.withinGroupOffset = withinGroupOffset;
+    this.maxDocsPerGroup = maxDocsPerGroup;
+  }
+
+  @Override
+  public BlockGroupingCollector newCollector() throws IOException {
+    return new BlockGroupingCollector(groupSort, topNGroups, needsScores, 
lastDocPerGroup);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public TopGroups<T> reduce(Collection<BlockGroupingCollector> collectors) 
throws IOException {
+    // Merge results from multiple collectors

Review Comment:
   I am not sure that this comment adds value, should we remove it?



##########
lucene/grouping/src/test/org/apache/lucene/search/grouping/TestBlockGrouping.java:
##########
@@ -77,6 +77,64 @@ public void testSimple() throws IOException {
     shard.close();
   }
 
+  public void testShardedBlockGrouping() throws IOException {

Review Comment:
   On exercising concurrency: Ii I understand correctly, the searcher is 
created via `shard.getIndexSearcher`, which does `searcher = new 
IndexSearcher(this.writer.getReader());` without passing an executor. This way 
there is no concurrency, a single collector goes sequentially through all the 
segments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to