jtibshirani commented on a change in pull request #601:
URL: https://github.com/apache/lucene/pull/601#discussion_r790996819



##########
File path: 
lucene/core/src/java/org/apache/lucene/codecs/lucene90/Lucene90HnswVectorsWriter.java
##########
@@ -138,32 +129,147 @@ public void writeField(FieldInfo fieldInfo, 
KnnVectorsReader knnVectorsReader)
           fieldInfo.getVectorSimilarityFunction(),
           vectorIndexOffset,
           offsets,
-          count,
           maxConn,
           beamWidth);
     } else {
       throw new IllegalArgumentException(
           "Indexing an HNSW graph requires a random access vector values, got 
" + vectors);
     }
+
+    long vectorDataLength = vectorData.getFilePointer() - vectorDataOffset;
     long vectorIndexLength = vectorIndex.getFilePointer() - vectorIndexOffset;
     writeMeta(
         fieldInfo,
         vectorDataOffset,
         vectorDataLength,
         vectorIndexOffset,
         vectorIndexLength,
-        count,
         docIds);
     writeGraphOffsets(meta, offsets);
   }
 
+  @Override
+  public void merge(MergeState mergeState) throws IOException {
+    for (int i = 0; i < mergeState.fieldInfos.length; i++) {
+      KnnVectorsReader reader = mergeState.knnVectorsReaders[i];
+      assert reader != null || mergeState.fieldInfos[i].hasVectorValues() == 
false;
+      if (reader != null) {
+        reader.checkIntegrity();
+      }
+    }
+
+    for (FieldInfo fieldInfo : mergeState.mergeFieldInfos) {
+      if (fieldInfo.hasVectorValues()) {
+        if (mergeState.infoStream.isEnabled("VV")) {
+          mergeState.infoStream.message("VV", "merging " + 
mergeState.segmentInfo);
+        }
+        mergeField(fieldInfo, mergeState);
+        if (mergeState.infoStream.isEnabled("VV")) {
+          mergeState.infoStream.message("VV", "merge done " + 
mergeState.segmentInfo);
+        }
+      }
+    }
+    finish();
+  }
+
+  private void mergeField(FieldInfo fieldInfo, MergeState mergeState) throws 
IOException {
+    if (mergeState.infoStream.isEnabled("VV")) {
+      mergeState.infoStream.message("VV", "merging " + mergeState.segmentInfo);
+    }
+
+    long vectorDataOffset = vectorData.alignFilePointer(Float.BYTES);
+
+    VectorValues vectors = MergedVectorValues.mergeVectorValues(fieldInfo, 
mergeState);
+    IndexOutput tempVectorData =
+        segmentWriteState.directory.createTempOutput(
+            vectorData.getName(), "temp", segmentWriteState.context);
+    IndexInput vectorDataInput = null;
+    boolean success = false;
+    try {
+      // write the merged vector data to a temporary file
+      int[] docIds = writeVectorData(tempVectorData, vectors);
+      CodecUtil.writeFooter(tempVectorData);
+      IOUtils.close(tempVectorData);
+
+      // copy the temporary file vectors to the actual data file
+      vectorDataInput =
+          segmentWriteState.directory.openInput(
+              tempVectorData.getName(), segmentWriteState.context);
+      vectorData.copyBytes(vectorDataInput, vectorDataInput.length() - 
CodecUtil.footerLength());
+      CodecUtil.retrieveChecksum(vectorDataInput);
+
+      // build the graph using the temporary vector data
+      Lucene90HnswVectorsReader.OffHeapVectorValues offHeapVectors =
+          new Lucene90HnswVectorsReader.OffHeapVectorValues(
+              vectors.dimension(), docIds, vectorDataInput);
+
+      long[] offsets = new long[docIds.length];
+      long vectorIndexOffset = vectorIndex.getFilePointer();
+      writeGraph(
+          vectorIndex,
+          offHeapVectors,
+          fieldInfo.getVectorSimilarityFunction(),
+          vectorIndexOffset,
+          offsets,
+          maxConn,
+          beamWidth);
+
+      long vectorDataLength = vectorData.getFilePointer() - vectorDataOffset;
+      long vectorIndexLength = vectorIndex.getFilePointer() - 
vectorIndexOffset;
+      writeMeta(
+          fieldInfo,
+          vectorDataOffset,
+          vectorDataLength,
+          vectorIndexOffset,
+          vectorIndexLength,
+          docIds);
+      writeGraphOffsets(meta, offsets);
+      success = true;
+    } finally {
+      IOUtils.close(vectorDataInput);
+      if (success) {
+        segmentWriteState.directory.deleteFile(tempVectorData.getName());
+      } else {
+        IOUtils.closeWhileHandlingException(tempVectorData);
+        IOUtils.deleteFilesIgnoringExceptions(
+            segmentWriteState.directory, tempVectorData.getName());
+      }
+    }
+
+    if (mergeState.infoStream.isEnabled("VV")) {
+      mergeState.infoStream.message("VV", "merge done " + 
mergeState.segmentInfo);
+    }
+  }
+
+  /**
+   * Writes the vector values to the output and returns a mapping from dense 
ordinals to document
+   * IDs. The length of the returned array matches the total number of 
documents with a vector
+   * (which excludes deleted documents), so it may be less than {@link 
VectorValues#size()}.
+   */
+  private static int[] writeVectorData(IndexOutput output, VectorValues 
vectors)
+      throws IOException {
+    int[] docIds = new int[vectors.size()];
+    int count = 0;
+    for (int docV = vectors.nextDoc(); docV != NO_MORE_DOCS; docV = 
vectors.nextDoc(), count++) {
+      // write vector
+      BytesRef binaryValue = vectors.binaryValue();
+      assert binaryValue.length == vectors.dimension() * Float.BYTES;
+      output.writeBytes(binaryValue.bytes, binaryValue.offset, 
binaryValue.length);
+      docIds[count] = docV;
+    }
+
+    if (docIds.length > count) {

Review comment:
       My understanding: it can happen in the case of merging segments with 
deletions. In this PR we only check `vectors.size() == docIds.length` when 
flushing a segment, which is a different case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to