xiangfu0 commented on code in PR #18852:
URL: https://github.com/apache/pinot/pull/18852#discussion_r3487855199
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java:
##########
@@ -90,6 +122,287 @@ public boolean needUpdateIndices(SegmentDirectory.Reader
segmentReader) {
return false;
}
+ /// Absorbs a vector index sidecar into {@code columns.psf} as a typed
entry, then deletes the
+ /// sibling. For IVF backends the bytes are copied verbatim; the on-disk IVF
header is the
+ /// contract and the reader handles version dispatch. For HNSW, if a
combined-form file already
+ /// exists (written by a creator run with the flag on), it is absorbed
directly; if only the
+ /// Lucene directory remains (operator just flipped the flag on an existing
segment), it is first
+ /// packed into a transient combined file, which is then absorbed and
removed.
+ ///
+ /// **Crash recovery:** if a prior absorb crashed between {@code
newIndexFor} (which committed
+ /// bytes into {@code columns.psf} and added a {@code _columnEntries} entry)
and the sidecar file
+ /// deletion, the next load will see both forms. We detect that state
upfront via the columns.psf
+ /// typed entry ({@link VectorIndexUtils#getConsolidatedVectorEntry}), NOT
via {@code hasIndexFor}
+ /// — which for vector also reports the on-disk sidecar and would therefore
flag a normal
+ /// first-absorb as already-absorbed. We then verify the typed entry's size
matches the sidecar's
+ /// length (so we do not delete a sidecar that happens to coexist with an
unrelated typed entry),
+ /// and clean up the orphan sidecar instead of re-running the absorb.
+ private void absorbCombinedIntoColumnsPsf(SegmentDirectory.Writer
segmentWriter, String column,
+ VectorBackendType backendType, File indexDir)
+ throws Exception {
+ File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+ _segmentDirectory.getSegmentMetadata().getVersion());
+ String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+ if (backendType == VectorBackendType.HNSW) {
+ absorbHnswIntoColumnsPsf(segmentWriter, column, v3Dir, segmentName);
+ return;
+ }
+
+ // IVF path: prefer the combined-form file (freshly-written by a creator
run with flag=on);
+ // fall back to the legacy sidecar (segments built with flag=off that the
operator now wants
+ // consolidated).
+ File combinedFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ true));
+ File legacyFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ false));
+ File picked = combinedFile.exists() ? combinedFile : legacyFile;
+ if (!picked.exists()) {
+ LOGGER.warn("Expected vector index file {} not found during vector
consolidation; skipping", picked);
+ return;
+ }
+ // Crash-recovery: if a prior absorb committed bytes to columns.psf but
crashed before
+ // deleting the sidecar, the typed entry is already present. Detect that
via the columns.psf
+ // typed entry directly — NOT hasIndexFor, which for vector also reports
the on-disk sidecar we
+ // are about to absorb. Using hasIndexFor here would make a normal
first-absorb look
+ // already-absorbed and the subsequent getIndexFor().size() would throw.
+ PinotDataBuffer existingEntry =
VectorIndexUtils.getConsolidatedVectorEntry(segmentWriter, column);
+ if (existingEntry != null) {
+ long existingSize = existingEntry.size();
+ if (existingSize == picked.length()) {
+ LOGGER.warn("Vector index already present in columns.psf for segment:
{}, column: {}; "
+ + "deleting orphan sidecar file from a previously-crashed absorb
run", segmentName, column);
+ FileUtils.deleteQuietly(picked);
+ return;
+ }
+ // Size mismatch — the typed entry is from a different build than the
sidecar. Refuse to
+ // proceed; an operator must reconcile manually rather than have us
guess.
+ throw new IOException("Vector index already present in columns.psf for
column: " + column
+ + " (size=" + existingSize + ") but sidecar file " + picked.getName()
+ + " has different size " + picked.length() + ". Refusing to
overwrite — please remove "
+ + "the conflicting sidecar or rebuild the segment.");
+ }
+ LOGGER.info("Absorbing vector sidecar file into columns.psf for segment:
{}, column: {} (backend={})",
+ segmentName, column, backendType);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, picked,
StandardIndexes.vector());
+ }
+
+ /// HNSW-specific absorb: packs the Lucene directory (or uses an existing
combined-form file) into
+ /// a single combined file and absorbs it into {@code columns.psf}.
+ ///
+ /// If the combined-form file already exists (creator ran with the flag on
and produced it),
+ /// it is absorbed directly. If only the Lucene directory exists (operator
just flipped the flag
+ /// on an existing segment), the directory is first packed into a transient
combined file, which is
+ /// then absorbed and cleaned up.
+ private void absorbHnswIntoColumnsPsf(SegmentDirectory.Writer segmentWriter,
String column, File v3Dir,
+ String segmentName)
+ throws Exception {
+ File combinedFile = new File(v3Dir,
+ column +
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */
true));
+ File hnswDir = new File(v3Dir,
+ column +
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */
false));
+ if (!combinedFile.exists()) {
+ if (!hnswDir.exists() || !hnswDir.isDirectory()) {
+ LOGGER.warn("Expected HNSW directory or combined file for column {}
not found; skipping absorb", column);
+ return;
+ }
+ // Pack the Lucene directory into a transient combined file for
absorption. The directory
+ // and the combined file are both cleaned up below after a successful
absorb (or by the
+ // crash-recovery branch above on a subsequent retry).
+ HnswVectorIndexCombined.combineHnswIndexFiles(hnswDir,
combinedFile.getAbsolutePath(), v3Dir.getParentFile(),
+ column);
+ }
+
+ // Crash-recovery: check whether the typed entry already exists. Detect it
via the columns.psf
+ // typed entry directly — NOT hasIndexFor, which for vector also reports
the still-present
+ // legacy Lucene directory (we pack but do not delete it above), so
hasIndexFor would be true on
+ // a normal first-absorb and the subsequent getIndexFor().size() would
throw.
+ PinotDataBuffer existingEntry =
VectorIndexUtils.getConsolidatedVectorEntry(segmentWriter, column);
+ if (existingEntry != null) {
+ long existingSize = existingEntry.size();
+ if (existingSize == combinedFile.length()) {
+ LOGGER.warn("HNSW vector index already present in columns.psf for
segment: {}, column: {}; "
+ + "deleting orphan files from a previously-crashed absorb run",
segmentName, column);
+ FileUtils.deleteQuietly(combinedFile);
+ // Clean up the Lucene directory regardless of createdTransient: if we
packed the directory
+ // into a transient combined file in this very run, the directory
still exists and must be
+ // removed to prevent hasCombinedFile from re-triggering on the next
segment load.
+ if (hnswDir.exists()) {
+ FileUtils.deleteDirectory(hnswDir);
+ }
+ return;
+ }
+ throw new IOException("HNSW vector index already present in columns.psf
for column: " + column
+ + " (size=" + existingSize + ") but combined file " +
combinedFile.getName()
+ + " has different size " + combinedFile.length() + ". Refusing to
overwrite — please remove "
+ + "the conflicting sidecar or rebuild the segment.");
+ }
+
+ LOGGER.info("Absorbing HNSW combined file into columns.psf for segment:
{}, column: {}", segmentName, column);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, combinedFile,
StandardIndexes.vector());
+ // writeIndexToV3Format already force-deletes the absorbed combined file
(like the IVF path above),
+ // so only the Lucene directory needs explicit cleanup here.
+ if (hnswDir.exists()) {
+ FileUtils.deleteDirectory(hnswDir);
+ }
+ }
+
+ /// Extracts the consolidated vector payload from {@code columns.psf} back
into the legacy on-disk
+ /// form and drops the typed entry. Used when the operator flips {@code
storeInSegmentFile} from
+ /// {@code true} to {@code false}.
+ ///
+ /// For IVF backends the bytes are streamed verbatim to a sidecar file. For
HNSW, the packed
+ /// combined file is streamed out first, then unpacked into a Lucene
directory — the inverse of
+ /// the absorb path.
+ ///
+ /// **Ordering:** bytes are streamed to a temp file *before* {@code
removeIndex}
+ /// is called, because {@code SingleFileIndexDirectory.removeIndex} for
vector also runs
+ /// {@link VectorIndexUtils#cleanupVectorIndex(File, String)}, which deletes
any file with a
+ /// recognised extension. The temp extension ({@code .vector.extract-tmp})
is not in that list,
+ /// so the bytes survive until the consolidated entry is safely removed.
+ private void extractConsolidatedToLegacyFile(SegmentDirectory.Writer
segmentWriter, String column,
+ VectorBackendType backendType, File indexDir)
+ throws IOException {
+ File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+ _segmentDirectory.getSegmentMetadata().getVersion());
+ String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+ if (backendType == VectorBackendType.HNSW) {
+ extractHnswConsolidatedToDirectory(segmentWriter, column, v3Dir,
segmentName);
+ return;
+ }
+
+ // IVF path: stream bytes to the legacy combined extension.
+ File finalCombined = new File(v3Dir, column +
VectorIndexUtils.getIndexFileExtension(backendType));
+ File combinedFormFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ true));
+ // Defensive: extract is destructive (removeIndex calls
cleanupVectorIndex, which deletes any
+ // sidecar with a recognised IVF extension). If a sidecar already exists
on disk we'd silently
+ // clobber it. Refuse to proceed and force the operator to reconcile —
this state should not
+ // arise from the handler's own paths (see hasCombinedFile in
needUpdateIndices) but a
+ // manually-edited segment dir or a half-completed migration could produce
it.
+ if (finalCombined.exists()) {
+ throw new IOException("Extract path expected no on-disk sidecar but
found: " + finalCombined
+ + ". Refusing to proceed — please remove the conflicting file
manually.");
+ }
+ if (combinedFormFile.exists()) {
+ throw new IOException("Extract path expected no on-disk sidecar but
found: " + combinedFormFile
+ + ". Refusing to proceed — please remove the conflicting file
manually.");
+ }
+ File tempCombined = new File(v3Dir, column + ".vector.extract-tmp");
+ // Clean any leftover from a previously-crashed extract.
+ FileUtils.deleteQuietly(tempCombined);
+
+ LOGGER.info("Extracting vector consolidated entry to sidecar file for
segment: {}, column: {} (backend={})",
+ segmentName, column, backendType);
+ PinotDataBuffer buffer = segmentWriter.getIndexFor(column,
StandardIndexes.vector());
+ long size = buffer.size();
+ streamBufferToFile(buffer, size, tempCombined);
+
+ // Remove the consolidated entry. {@code cleanupVectorIndex} runs as a
side effect; it will
+ // not touch our temp file because the temp extension is not in its
recognised list.
+ segmentWriter.removeIndex(column, StandardIndexes.vector());
+
+ if (!tempCombined.renameTo(finalCombined)) {
+ // Best-effort copy-and-delete fallback if rename fails (e.g. cross-FS
in tests).
+ FileUtils.copyFile(tempCombined, finalCombined);
+ FileUtils.deleteQuietly(tempCombined);
+ }
+ }
+
+ /// HNSW-specific extract: streams the combined HNSW payload from {@code
columns.psf} to a temp
+ /// file, unpacks it into a Lucene directory, and only then removes the
consolidated typed entry.
+ ///
+ /// **Ordering rationale:** unpack runs *before* {@code removeIndex}. If the
unpack
+ /// fails mid-way, the typed entry is still present in {@code columns.psf}
and the next load
+ /// retries the extract — the operator does not lose the index. The reverse
order would leave a
+ /// crash window in which the typed entry is gone but the Lucene directory
does not yet exist,
+ /// leaving the segment permanently without its HNSW index until a full
rebuild.
+ private void extractHnswConsolidatedToDirectory(SegmentDirectory.Writer
segmentWriter, String column,
+ File v3Dir, String segmentName)
+ throws IOException {
+ File hnswDir = new File(v3Dir,
+ column +
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */
false));
+ File combinedFormFile = new File(v3Dir,
+ column +
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */
true));
+ if (hnswDir.exists()) {
+ throw new IOException("Extract path expected no on-disk HNSW directory
but found: " + hnswDir
+ + ". Refusing to proceed — please remove the conflicting directory
manually.");
+ }
+ if (combinedFormFile.exists()) {
+ throw new IOException("Extract path expected no on-disk combined HNSW
file but found: " + combinedFormFile
+ + ". Refusing to proceed — please remove the conflicting file
manually.");
+ }
+ File tempCombined = new File(v3Dir, column + ".vector.extract-tmp");
+ FileUtils.deleteQuietly(tempCombined);
+
+ LOGGER.info(
+ "Extracting HNSW consolidated entry to Lucene directory for segment:
{}, column: {}", segmentName, column);
+ PinotDataBuffer buffer = segmentWriter.getIndexFor(column,
StandardIndexes.vector());
+ streamBufferToFile(buffer, buffer.size(), tempCombined);
+
+ // Unpack the combined file back into a Lucene directory BEFORE removing
the typed entry. If
+ // unpack throws, the typed entry remains in columns.psf and the next
handler run retries —
+ // no permanent loss. Best-effort clean up the half-unpacked directory so
the retry's
+ // pre-flight directory-exists check still fires.
+ try {
+ HnswVectorIndexCombined.extractHnswIndexFiles(tempCombined, hnswDir);
+ } catch (IOException | RuntimeException e) {
+ if (hnswDir.exists()) {
+ FileUtils.deleteQuietly(hnswDir);
+ }
+ FileUtils.deleteQuietly(tempCombined);
+ throw e;
+ }
+
+ // Unpack succeeded — the bytes are now on disk in legacy form. Remove the
consolidated entry.
+ // If this throws, the segment ends up with BOTH forms; the reader
factory's file-backed path
+ // (flag=false) uses the Lucene directory and ignores the orphan typed
entry until the next
+ // handler run cleans it.
+ segmentWriter.removeIndex(column, StandardIndexes.vector());
Review Comment:
`removeIndex()` now runs `VectorIndexUtils.cleanupVectorIndex()`, and that
helper recursively `deleteQuietly()`s the legacy HNSW directory and mapping
file. So the directory unpacked at line 349 gets deleted immediately here, and
the segment finishes the `storeInSegmentFile=true -> false` migration without a
usable legacy HNSW artifact. Please unpack into a temp location that vector
cleanup will not match, drop only the typed `columns.psf` entry, then move the
Lucene directory into place after cleanup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]