jtibshirani commented on a change in pull request #601: URL: https://github.com/apache/lucene/pull/601#discussion_r790996819
########## File path: lucene/core/src/java/org/apache/lucene/codecs/lucene90/Lucene90HnswVectorsWriter.java ########## @@ -138,32 +129,147 @@ public void writeField(FieldInfo fieldInfo, KnnVectorsReader knnVectorsReader) fieldInfo.getVectorSimilarityFunction(), vectorIndexOffset, offsets, - count, maxConn, beamWidth); } else { throw new IllegalArgumentException( "Indexing an HNSW graph requires a random access vector values, got " + vectors); } + + long vectorDataLength = vectorData.getFilePointer() - vectorDataOffset; long vectorIndexLength = vectorIndex.getFilePointer() - vectorIndexOffset; writeMeta( fieldInfo, vectorDataOffset, vectorDataLength, vectorIndexOffset, vectorIndexLength, - count, docIds); writeGraphOffsets(meta, offsets); } + @Override + public void merge(MergeState mergeState) throws IOException { + for (int i = 0; i < mergeState.fieldInfos.length; i++) { + KnnVectorsReader reader = mergeState.knnVectorsReaders[i]; + assert reader != null || mergeState.fieldInfos[i].hasVectorValues() == false; + if (reader != null) { + reader.checkIntegrity(); + } + } + + for (FieldInfo fieldInfo : mergeState.mergeFieldInfos) { + if (fieldInfo.hasVectorValues()) { + if (mergeState.infoStream.isEnabled("VV")) { + mergeState.infoStream.message("VV", "merging " + mergeState.segmentInfo); + } + mergeField(fieldInfo, mergeState); + if (mergeState.infoStream.isEnabled("VV")) { + mergeState.infoStream.message("VV", "merge done " + mergeState.segmentInfo); + } + } + } + finish(); + } + + private void mergeField(FieldInfo fieldInfo, MergeState mergeState) throws IOException { + if (mergeState.infoStream.isEnabled("VV")) { + mergeState.infoStream.message("VV", "merging " + mergeState.segmentInfo); + } + + long vectorDataOffset = vectorData.alignFilePointer(Float.BYTES); + + VectorValues vectors = MergedVectorValues.mergeVectorValues(fieldInfo, mergeState); + IndexOutput tempVectorData = + segmentWriteState.directory.createTempOutput( + vectorData.getName(), "temp", segmentWriteState.context); + IndexInput vectorDataInput = null; + boolean success = false; + try { + // write the merged vector data to a temporary file + int[] docIds = writeVectorData(tempVectorData, vectors); + CodecUtil.writeFooter(tempVectorData); + IOUtils.close(tempVectorData); + + // copy the temporary file vectors to the actual data file + vectorDataInput = + segmentWriteState.directory.openInput( + tempVectorData.getName(), segmentWriteState.context); + vectorData.copyBytes(vectorDataInput, vectorDataInput.length() - CodecUtil.footerLength()); + CodecUtil.retrieveChecksum(vectorDataInput); + + // build the graph using the temporary vector data + Lucene90HnswVectorsReader.OffHeapVectorValues offHeapVectors = + new Lucene90HnswVectorsReader.OffHeapVectorValues( + vectors.dimension(), docIds, vectorDataInput); + + long[] offsets = new long[docIds.length]; + long vectorIndexOffset = vectorIndex.getFilePointer(); + writeGraph( + vectorIndex, + offHeapVectors, + fieldInfo.getVectorSimilarityFunction(), + vectorIndexOffset, + offsets, + maxConn, + beamWidth); + + long vectorDataLength = vectorData.getFilePointer() - vectorDataOffset; + long vectorIndexLength = vectorIndex.getFilePointer() - vectorIndexOffset; + writeMeta( + fieldInfo, + vectorDataOffset, + vectorDataLength, + vectorIndexOffset, + vectorIndexLength, + docIds); + writeGraphOffsets(meta, offsets); + success = true; + } finally { + IOUtils.close(vectorDataInput); + if (success) { + segmentWriteState.directory.deleteFile(tempVectorData.getName()); + } else { + IOUtils.closeWhileHandlingException(tempVectorData); + IOUtils.deleteFilesIgnoringExceptions( + segmentWriteState.directory, tempVectorData.getName()); + } + } + + if (mergeState.infoStream.isEnabled("VV")) { + mergeState.infoStream.message("VV", "merge done " + mergeState.segmentInfo); + } + } + + /** + * Writes the vector values to the output and returns a mapping from dense ordinals to document + * IDs. The length of the returned array matches the total number of documents with a vector + * (which excludes deleted documents), so it may be less than {@link VectorValues#size()}. + */ + private static int[] writeVectorData(IndexOutput output, VectorValues vectors) + throws IOException { + int[] docIds = new int[vectors.size()]; + int count = 0; + for (int docV = vectors.nextDoc(); docV != NO_MORE_DOCS; docV = vectors.nextDoc(), count++) { + // write vector + BytesRef binaryValue = vectors.binaryValue(); + assert binaryValue.length == vectors.dimension() * Float.BYTES; + output.writeBytes(binaryValue.bytes, binaryValue.offset, binaryValue.length); + docIds[count] = docV; + } + + if (docIds.length > count) { Review comment: My understanding: it can happen in the case of merging segments with deletions. In this PR we only check `vectors.size() == docIds.length` when flushing a segment, which is a different case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org