vigyasharma commented on a change in pull request #633: URL: https://github.com/apache/lucene/pull/633#discussion_r822185980
########## File path: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java ########## @@ -3121,147 +3125,265 @@ private void validateMergeReader(CodecReader leaf) { */ public long addIndexes(CodecReader... readers) throws IOException { ensureOpen(); - - // long so we can detect int overflow: - long numDocs = 0; long seqNo; - try { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "flush at addIndexes(CodecReader...)"); - } - flush(false, true); + long numDocs = 0; + final int mergeTimeoutInSeconds = 600; - String mergedName = newSegmentName(); - int numSoftDeleted = 0; - for (CodecReader leaf : readers) { - numDocs += leaf.numDocs(); + try { + // Best effort up front validations + for (CodecReader leaf: readers) { validateMergeReader(leaf); - if (softDeletesEnabled) { - Bits liveDocs = leaf.getLiveDocs(); - numSoftDeleted += - PendingSoftDeletes.countSoftDeletes( - DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator( - config.getSoftDeletesField(), leaf), - liveDocs); + for (FieldInfo fi: leaf.getFieldInfos()) { + globalFieldNumberMap.verifyFieldInfo(fi); } + numDocs += leaf.numDocs(); } - - // Best-effort up front check: testReserveDocs(numDocs); - final IOContext context = - new IOContext( - new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS)); + synchronized (this) { + ensureOpen(); + if (merges.areEnabled() == false) { + throw new UnsupportedOperationException("Merges are disabled on current writer. " + + "Cannot execute addIndexes(CodecReaders...) API"); + } + } + + MergePolicy mergePolicy = config.getMergePolicy(); + MergePolicy.MergeSpecification spec = mergePolicy.findMerges(Arrays.asList(readers)); + boolean mergesComplete = false; + if (spec != null && spec.merges.size() > 0) { + try { + spec.merges.forEach(addIndexesMergeSource::registerMerge); + mergeScheduler.merge(addIndexesMergeSource, MergeTrigger.ADD_INDEXES); + mergesComplete = spec.await(); + } finally { + if (!mergesComplete) { + // nocommit -- ensure all intermediate files are deleted + for (MergePolicy.OneMerge merge: spec.merges) { + deleteNewFiles(merge.getMergeInfo().files()); + } + } + } + } - // TODO: somehow we should fix this merge so it's - // abortable so that IW.close(false) is able to stop it - TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory); - Codec codec = config.getCodec(); - // We set the min version to null for now, it will be set later by SegmentMerger - SegmentInfo info = - new SegmentInfo( - directoryOrig, - Version.LATEST, - null, - mergedName, - -1, - false, - codec, - Collections.emptyMap(), - StringHelper.randomId(), - Collections.emptyMap(), - config.getIndexSort()); - - SegmentMerger merger = - new SegmentMerger( - Arrays.asList(readers), info, infoStream, trackingDir, globalFieldNumberMap, context); + if (mergesComplete) { + List<SegmentCommitInfo> infos = new ArrayList<>(); + long totalMaxDoc = 0; + for (MergePolicy.OneMerge merge: spec.merges) { + totalMaxDoc += merge.totalMaxDoc; + if (merge.getMergeInfo() != null) { + infos.add(merge.getMergeInfo()); + } + } - if (!merger.shouldMerge()) { - return docWriter.getNextSequenceNumber(); + // nocommit -- add tests for this transactional behavior + synchronized (this) { + if (infos.isEmpty() == false) { + boolean success = false; + try { + ensureOpen(); + // Reserve the docs, just before we update SIS: + reserveDocs(totalMaxDoc); + success = true; + } finally { + if (!success) { + for (SegmentCommitInfo sipc : infos) { + // Safe: these files must exist + deleteNewFiles(sipc.files()); + } + } + } + segmentInfos.addAll(infos); + checkpoint(); + } + seqNo = docWriter.getNextSequenceNumber(); + } + } else { + // We should normally not reach here, as an earlier call should throw an exception. + throw new MergePolicy.MergeException("Could not complete merges within configured timeout of [" + mergeTimeoutInSeconds + "] seconds"); } + } catch (VirtualMachineError tragedy) { + tragicEvent(tragedy, "addIndexes(CodecReader...)"); + throw tragedy; + } - synchronized (this) { - ensureOpen(); - assert merges.areEnabled(); - runningAddIndexesMerges.add(merger); + maybeMerge(); + return seqNo; + } + + private class AddIndexesMergeSource implements MergeScheduler.MergeSource { Review comment: +1, `MergeSource` is a neat abstraction. I wanted to move this out of IW, to a different class altogether - try to make IW leaner. But I ran into a bunch of test failures and method access issues that digressed from the main change. Maybe I'll try it again in a separate, follow up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org