vigyasharma commented on a change in pull request #633:
URL: https://github.com/apache/lucene/pull/633#discussion_r822185980



##########
File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
##########
@@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) {
    */
   public long addIndexes(CodecReader... readers) throws IOException {
     ensureOpen();
-
-    // long so we can detect int overflow:
-    long numDocs = 0;
     long seqNo;
-    try {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "flush at addIndexes(CodecReader...)");
-      }
-      flush(false, true);
+    long numDocs = 0;
+    final int mergeTimeoutInSeconds = 600;
 
-      String mergedName = newSegmentName();
-      int numSoftDeleted = 0;
-      for (CodecReader leaf : readers) {
-        numDocs += leaf.numDocs();
+    try {
+      // Best effort up front validations
+      for (CodecReader leaf: readers) {
         validateMergeReader(leaf);
-        if (softDeletesEnabled) {
-          Bits liveDocs = leaf.getLiveDocs();
-          numSoftDeleted +=
-              PendingSoftDeletes.countSoftDeletes(
-                  DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(
-                      config.getSoftDeletesField(), leaf),
-                  liveDocs);
+        for (FieldInfo fi: leaf.getFieldInfos()) {
+          globalFieldNumberMap.verifyFieldInfo(fi);
         }
+        numDocs += leaf.numDocs();
       }
-
-      // Best-effort up front check:
       testReserveDocs(numDocs);
 
-      final IOContext context =
-          new IOContext(
-              new MergeInfo(Math.toIntExact(numDocs), -1, false, 
UNBOUNDED_MAX_MERGE_SEGMENTS));
+      synchronized (this) {
+        ensureOpen();
+        if (merges.areEnabled() == false) {
+          throw new UnsupportedOperationException("Merges are disabled on 
current writer. " +
+            "Cannot execute addIndexes(CodecReaders...) API");
+        }
+      }
+
+      MergePolicy mergePolicy = config.getMergePolicy();
+      MergePolicy.MergeSpecification spec = 
mergePolicy.findMerges(Arrays.asList(readers));
+      boolean mergesComplete = false;
+      if (spec != null && spec.merges.size() > 0) {
+        try {
+          spec.merges.forEach(addIndexesMergeSource::registerMerge);
+          mergeScheduler.merge(addIndexesMergeSource, 
MergeTrigger.ADD_INDEXES);
+          mergesComplete = spec.await();
+        } finally {
+          if (!mergesComplete) {
+            // nocommit -- ensure all intermediate files are deleted
+            for (MergePolicy.OneMerge merge: spec.merges) {
+              deleteNewFiles(merge.getMergeInfo().files());
+            }
+          }
+        }
+      }
 
-      // TODO: somehow we should fix this merge so it's
-      // abortable so that IW.close(false) is able to stop it
-      TrackingDirectoryWrapper trackingDir = new 
TrackingDirectoryWrapper(directory);
-      Codec codec = config.getCodec();
-      // We set the min version to null for now, it will be set later by 
SegmentMerger
-      SegmentInfo info =
-          new SegmentInfo(
-              directoryOrig,
-              Version.LATEST,
-              null,
-              mergedName,
-              -1,
-              false,
-              codec,
-              Collections.emptyMap(),
-              StringHelper.randomId(),
-              Collections.emptyMap(),
-              config.getIndexSort());
-
-      SegmentMerger merger =
-          new SegmentMerger(
-              Arrays.asList(readers), info, infoStream, trackingDir, 
globalFieldNumberMap, context);
+      if (mergesComplete) {
+        List<SegmentCommitInfo> infos = new ArrayList<>();
+        long totalMaxDoc = 0;
+        for (MergePolicy.OneMerge merge: spec.merges) {
+          totalMaxDoc += merge.totalMaxDoc;
+          if (merge.getMergeInfo() != null) {
+            infos.add(merge.getMergeInfo());
+          }
+        }
 
-      if (!merger.shouldMerge()) {
-        return docWriter.getNextSequenceNumber();
+        // nocommit -- add tests for this transactional behavior
+        synchronized (this) {
+          if (infos.isEmpty() == false) {
+            boolean success = false;
+            try {
+              ensureOpen();
+              // Reserve the docs, just before we update SIS:
+              reserveDocs(totalMaxDoc);
+              success = true;
+            } finally {
+              if (!success) {
+                for (SegmentCommitInfo sipc : infos) {
+                  // Safe: these files must exist
+                  deleteNewFiles(sipc.files());
+                }
+              }
+            }
+            segmentInfos.addAll(infos);
+            checkpoint();
+          }
+          seqNo = docWriter.getNextSequenceNumber();
+        }
+      } else {
+        // We should normally not reach here, as an earlier call should throw 
an exception.
+        throw new MergePolicy.MergeException("Could not complete merges within 
configured timeout of [" + mergeTimeoutInSeconds + "] seconds");
       }
+    } catch (VirtualMachineError tragedy) {
+      tragicEvent(tragedy, "addIndexes(CodecReader...)");
+      throw tragedy;
+    }
 
-      synchronized (this) {
-        ensureOpen();
-        assert merges.areEnabled();
-        runningAddIndexesMerges.add(merger);
+    maybeMerge();
+    return seqNo;
+  }
+
+  private class AddIndexesMergeSource implements MergeScheduler.MergeSource {

Review comment:
       +1, `MergeSource` is a neat abstraction. 
   I wanted to move this out of IW, to a different class altogether - try to 
make IW leaner. But I ran into a bunch of test failures and method access 
issues that digressed from the main change. Maybe I'll try it again in a 
separate, follow up PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to