xiangfu0 commented on code in PR #18852:
URL: https://github.com/apache/pinot/pull/18852#discussion_r3487855199


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java:
##########
@@ -90,6 +122,287 @@ public boolean needUpdateIndices(SegmentDirectory.Reader 
segmentReader) {
     return false;
   }
 
+  /// Absorbs a vector index sidecar into {@code columns.psf} as a typed 
entry, then deletes the
+  /// sibling. For IVF backends the bytes are copied verbatim; the on-disk IVF 
header is the
+  /// contract and the reader handles version dispatch. For HNSW, if a 
combined-form file already
+  /// exists (written by a creator run with the flag on), it is absorbed 
directly; if only the
+  /// Lucene directory remains (operator just flipped the flag on an existing 
segment), it is first
+  /// packed into a transient combined file, which is then absorbed and 
removed.
+  ///
+  /// **Crash recovery:** if a prior absorb crashed between {@code 
newIndexFor} (which committed
+  /// bytes into {@code columns.psf} and added a {@code _columnEntries} entry) 
and the sidecar file
+  /// deletion, the next load will see both forms. We detect that state 
upfront via the columns.psf
+  /// typed entry ({@link VectorIndexUtils#getConsolidatedVectorEntry}), NOT 
via {@code hasIndexFor}
+  /// — which for vector also reports the on-disk sidecar and would therefore 
flag a normal
+  /// first-absorb as already-absorbed. We then verify the typed entry's size 
matches the sidecar's
+  /// length (so we do not delete a sidecar that happens to coexist with an 
unrelated typed entry),
+  /// and clean up the orphan sidecar instead of re-running the absorb.
+  private void absorbCombinedIntoColumnsPsf(SegmentDirectory.Writer 
segmentWriter, String column,
+      VectorBackendType backendType, File indexDir)
+      throws Exception {
+    File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+        _segmentDirectory.getSegmentMetadata().getVersion());
+    String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+    if (backendType == VectorBackendType.HNSW) {
+      absorbHnswIntoColumnsPsf(segmentWriter, column, v3Dir, segmentName);
+      return;
+    }
+
+    // IVF path: prefer the combined-form file (freshly-written by a creator 
run with flag=on);
+    // fall back to the legacy sidecar (segments built with flag=off that the 
operator now wants
+    // consolidated).
+    File combinedFile = new File(v3Dir,
+        column + VectorIndexUtils.getIndexFileExtension(backendType, /* 
combined */ true));
+    File legacyFile = new File(v3Dir,
+        column + VectorIndexUtils.getIndexFileExtension(backendType, /* 
combined */ false));
+    File picked = combinedFile.exists() ? combinedFile : legacyFile;
+    if (!picked.exists()) {
+      LOGGER.warn("Expected vector index file {} not found during vector 
consolidation; skipping", picked);
+      return;
+    }
+    // Crash-recovery: if a prior absorb committed bytes to columns.psf but 
crashed before
+    // deleting the sidecar, the typed entry is already present. Detect that 
via the columns.psf
+    // typed entry directly — NOT hasIndexFor, which for vector also reports 
the on-disk sidecar we
+    // are about to absorb. Using hasIndexFor here would make a normal 
first-absorb look
+    // already-absorbed and the subsequent getIndexFor().size() would throw.
+    PinotDataBuffer existingEntry = 
VectorIndexUtils.getConsolidatedVectorEntry(segmentWriter, column);
+    if (existingEntry != null) {
+      long existingSize = existingEntry.size();
+      if (existingSize == picked.length()) {
+        LOGGER.warn("Vector index already present in columns.psf for segment: 
{}, column: {}; "
+            + "deleting orphan sidecar file from a previously-crashed absorb 
run", segmentName, column);
+        FileUtils.deleteQuietly(picked);
+        return;
+      }
+      // Size mismatch — the typed entry is from a different build than the 
sidecar. Refuse to
+      // proceed; an operator must reconcile manually rather than have us 
guess.
+      throw new IOException("Vector index already present in columns.psf for 
column: " + column
+          + " (size=" + existingSize + ") but sidecar file " + picked.getName()
+          + " has different size " + picked.length() + ". Refusing to 
overwrite — please remove "
+          + "the conflicting sidecar or rebuild the segment.");
+    }
+    LOGGER.info("Absorbing vector sidecar file into columns.psf for segment: 
{}, column: {} (backend={})",
+        segmentName, column, backendType);
+    LoaderUtils.writeIndexToV3Format(segmentWriter, column, picked, 
StandardIndexes.vector());
+  }
+
+  /// HNSW-specific absorb: packs the Lucene directory (or uses an existing 
combined-form file) into
+  /// a single combined file and absorbs it into {@code columns.psf}.
+  ///
+  /// If the combined-form file already exists (creator ran with the flag on 
and produced it),
+  /// it is absorbed directly. If only the Lucene directory exists (operator 
just flipped the flag
+  /// on an existing segment), the directory is first packed into a transient 
combined file, which is
+  /// then absorbed and cleaned up.
+  private void absorbHnswIntoColumnsPsf(SegmentDirectory.Writer segmentWriter, 
String column, File v3Dir,
+      String segmentName)
+      throws Exception {
+    File combinedFile = new File(v3Dir,
+        column + 
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */ 
true));
+    File hnswDir = new File(v3Dir,
+        column + 
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */ 
false));
+    if (!combinedFile.exists()) {
+      if (!hnswDir.exists() || !hnswDir.isDirectory()) {
+        LOGGER.warn("Expected HNSW directory or combined file for column {} 
not found; skipping absorb", column);
+        return;
+      }
+      // Pack the Lucene directory into a transient combined file for 
absorption. The directory
+      // and the combined file are both cleaned up below after a successful 
absorb (or by the
+      // crash-recovery branch above on a subsequent retry).
+      HnswVectorIndexCombined.combineHnswIndexFiles(hnswDir, 
combinedFile.getAbsolutePath(), v3Dir.getParentFile(),
+          column);
+    }
+
+    // Crash-recovery: check whether the typed entry already exists. Detect it 
via the columns.psf
+    // typed entry directly — NOT hasIndexFor, which for vector also reports 
the still-present
+    // legacy Lucene directory (we pack but do not delete it above), so 
hasIndexFor would be true on
+    // a normal first-absorb and the subsequent getIndexFor().size() would 
throw.
+    PinotDataBuffer existingEntry = 
VectorIndexUtils.getConsolidatedVectorEntry(segmentWriter, column);
+    if (existingEntry != null) {
+      long existingSize = existingEntry.size();
+      if (existingSize == combinedFile.length()) {
+        LOGGER.warn("HNSW vector index already present in columns.psf for 
segment: {}, column: {}; "
+            + "deleting orphan files from a previously-crashed absorb run", 
segmentName, column);
+        FileUtils.deleteQuietly(combinedFile);
+        // Clean up the Lucene directory regardless of createdTransient: if we 
packed the directory
+        // into a transient combined file in this very run, the directory 
still exists and must be
+        // removed to prevent hasCombinedFile from re-triggering on the next 
segment load.
+        if (hnswDir.exists()) {
+          FileUtils.deleteDirectory(hnswDir);
+        }
+        return;
+      }
+      throw new IOException("HNSW vector index already present in columns.psf 
for column: " + column
+          + " (size=" + existingSize + ") but combined file " + 
combinedFile.getName()
+          + " has different size " + combinedFile.length() + ". Refusing to 
overwrite — please remove "
+          + "the conflicting sidecar or rebuild the segment.");
+    }
+
+    LOGGER.info("Absorbing HNSW combined file into columns.psf for segment: 
{}, column: {}", segmentName, column);
+    LoaderUtils.writeIndexToV3Format(segmentWriter, column, combinedFile, 
StandardIndexes.vector());
+    // writeIndexToV3Format already force-deletes the absorbed combined file 
(like the IVF path above),
+    // so only the Lucene directory needs explicit cleanup here.
+    if (hnswDir.exists()) {
+      FileUtils.deleteDirectory(hnswDir);
+    }
+  }
+
+  /// Extracts the consolidated vector payload from {@code columns.psf} back 
into the legacy on-disk
+  /// form and drops the typed entry. Used when the operator flips {@code 
storeInSegmentFile} from
+  /// {@code true} to {@code false}.
+  ///
+  /// For IVF backends the bytes are streamed verbatim to a sidecar file. For 
HNSW, the packed
+  /// combined file is streamed out first, then unpacked into a Lucene 
directory — the inverse of
+  /// the absorb path.
+  ///
+  /// **Ordering:** bytes are streamed to a temp file *before* {@code 
removeIndex}
+  /// is called, because {@code SingleFileIndexDirectory.removeIndex} for 
vector also runs
+  /// {@link VectorIndexUtils#cleanupVectorIndex(File, String)}, which deletes 
any file with a
+  /// recognised extension. The temp extension ({@code .vector.extract-tmp}) 
is not in that list,
+  /// so the bytes survive until the consolidated entry is safely removed.
+  private void extractConsolidatedToLegacyFile(SegmentDirectory.Writer 
segmentWriter, String column,
+      VectorBackendType backendType, File indexDir)
+      throws IOException {
+    File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+        _segmentDirectory.getSegmentMetadata().getVersion());
+    String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+    if (backendType == VectorBackendType.HNSW) {
+      extractHnswConsolidatedToDirectory(segmentWriter, column, v3Dir, 
segmentName);
+      return;
+    }
+
+    // IVF path: stream bytes to the legacy combined extension.
+    File finalCombined = new File(v3Dir, column + 
VectorIndexUtils.getIndexFileExtension(backendType));
+    File combinedFormFile = new File(v3Dir,
+        column + VectorIndexUtils.getIndexFileExtension(backendType, /* 
combined */ true));
+    // Defensive: extract is destructive (removeIndex calls 
cleanupVectorIndex, which deletes any
+    // sidecar with a recognised IVF extension). If a sidecar already exists 
on disk we'd silently
+    // clobber it. Refuse to proceed and force the operator to reconcile — 
this state should not
+    // arise from the handler's own paths (see hasCombinedFile in 
needUpdateIndices) but a
+    // manually-edited segment dir or a half-completed migration could produce 
it.
+    if (finalCombined.exists()) {
+      throw new IOException("Extract path expected no on-disk sidecar but 
found: " + finalCombined
+          + ". Refusing to proceed — please remove the conflicting file 
manually.");
+    }
+    if (combinedFormFile.exists()) {
+      throw new IOException("Extract path expected no on-disk sidecar but 
found: " + combinedFormFile
+          + ". Refusing to proceed — please remove the conflicting file 
manually.");
+    }
+    File tempCombined = new File(v3Dir, column + ".vector.extract-tmp");
+    // Clean any leftover from a previously-crashed extract.
+    FileUtils.deleteQuietly(tempCombined);
+
+    LOGGER.info("Extracting vector consolidated entry to sidecar file for 
segment: {}, column: {} (backend={})",
+        segmentName, column, backendType);
+    PinotDataBuffer buffer = segmentWriter.getIndexFor(column, 
StandardIndexes.vector());
+    long size = buffer.size();
+    streamBufferToFile(buffer, size, tempCombined);
+
+    // Remove the consolidated entry. {@code cleanupVectorIndex} runs as a 
side effect; it will
+    // not touch our temp file because the temp extension is not in its 
recognised list.
+    segmentWriter.removeIndex(column, StandardIndexes.vector());
+
+    if (!tempCombined.renameTo(finalCombined)) {
+      // Best-effort copy-and-delete fallback if rename fails (e.g. cross-FS 
in tests).
+      FileUtils.copyFile(tempCombined, finalCombined);
+      FileUtils.deleteQuietly(tempCombined);
+    }
+  }
+
+  /// HNSW-specific extract: streams the combined HNSW payload from {@code 
columns.psf} to a temp
+  /// file, unpacks it into a Lucene directory, and only then removes the 
consolidated typed entry.
+  ///
+  /// **Ordering rationale:** unpack runs *before* {@code removeIndex}. If the 
unpack
+  /// fails mid-way, the typed entry is still present in {@code columns.psf} 
and the next load
+  /// retries the extract — the operator does not lose the index. The reverse 
order would leave a
+  /// crash window in which the typed entry is gone but the Lucene directory 
does not yet exist,
+  /// leaving the segment permanently without its HNSW index until a full 
rebuild.
+  private void extractHnswConsolidatedToDirectory(SegmentDirectory.Writer 
segmentWriter, String column,
+      File v3Dir, String segmentName)
+      throws IOException {
+    File hnswDir = new File(v3Dir,
+        column + 
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */ 
false));
+    File combinedFormFile = new File(v3Dir,
+        column + 
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */ 
true));
+    if (hnswDir.exists()) {
+      throw new IOException("Extract path expected no on-disk HNSW directory 
but found: " + hnswDir
+          + ". Refusing to proceed — please remove the conflicting directory 
manually.");
+    }
+    if (combinedFormFile.exists()) {
+      throw new IOException("Extract path expected no on-disk combined HNSW 
file but found: " + combinedFormFile
+          + ". Refusing to proceed — please remove the conflicting file 
manually.");
+    }
+    File tempCombined = new File(v3Dir, column + ".vector.extract-tmp");
+    FileUtils.deleteQuietly(tempCombined);
+
+    LOGGER.info(
+        "Extracting HNSW consolidated entry to Lucene directory for segment: 
{}, column: {}", segmentName, column);
+    PinotDataBuffer buffer = segmentWriter.getIndexFor(column, 
StandardIndexes.vector());
+    streamBufferToFile(buffer, buffer.size(), tempCombined);
+
+    // Unpack the combined file back into a Lucene directory BEFORE removing 
the typed entry. If
+    // unpack throws, the typed entry remains in columns.psf and the next 
handler run retries —
+    // no permanent loss. Best-effort clean up the half-unpacked directory so 
the retry's
+    // pre-flight directory-exists check still fires.
+    try {
+      HnswVectorIndexCombined.extractHnswIndexFiles(tempCombined, hnswDir);
+    } catch (IOException | RuntimeException e) {
+      if (hnswDir.exists()) {
+        FileUtils.deleteQuietly(hnswDir);
+      }
+      FileUtils.deleteQuietly(tempCombined);
+      throw e;
+    }
+
+    // Unpack succeeded — the bytes are now on disk in legacy form. Remove the 
consolidated entry.
+    // If this throws, the segment ends up with BOTH forms; the reader 
factory's file-backed path
+    // (flag=false) uses the Lucene directory and ignores the orphan typed 
entry until the next
+    // handler run cleans it.
+    segmentWriter.removeIndex(column, StandardIndexes.vector());

Review Comment:
   `removeIndex()` now runs `VectorIndexUtils.cleanupVectorIndex()`, and that 
helper recursively `deleteQuietly()`s the legacy HNSW directory and mapping 
file. So the directory unpacked at line 349 gets deleted immediately here, and 
the segment finishes the `storeInSegmentFile=true -> false` migration without a 
usable legacy HNSW artifact. Please unpack into a temp location that vector 
cleanup will not match, drop only the typed `columns.psf` entry, then move the 
Lucene directory into place after cleanup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to