Copilot commented on code in PR #18852:
URL: https://github.com/apache/pinot/pull/18852#discussion_r3478417134
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java:
##########
@@ -160,8 +160,11 @@ public boolean hasIndexFor(String column, IndexType<?, ?,
?> type) {
if (type == StandardIndexes.text() &&
TextIndexUtils.hasTextIndex(_segmentDirectory, column)) {
return true;
}
- if (type == StandardIndexes.vector()) {
- return VectorIndexUtils.hasVectorIndex(_segmentDirectory, column);
+ // Vector index may live either as a combined file (legacy /
storeInSegmentFile=false) or as
+ // a typed entry inside columns.psf (storeInSegmentFile=true). Check both
— mirror the text
+ // pattern of "combined OR _columnEntries".
+ if (type == StandardIndexes.vector() &&
VectorIndexUtils.hasVectorIndex(_segmentDirectory, column)) {
+ return true;
}
IndexKey key = new IndexKey(column, type);
return _columnEntries.containsKey(key);
Review Comment:
`hasIndexFor(column, StandardIndexes.vector())` currently returns true when
a legacy vector sidecar exists, even if there is no typed-entry in
`columns.psf`. However, `getIndexFor(column, StandardIndexes.vector())` in
`SingleFileIndexDirectory` only serves typed entries and will throw if the
entry is absent. This breaks the `hasIndexFor`/`getIndexFor` contract and also
causes the new migration/crash-recovery logic in `VectorIndexHandler` to
mis-detect normal legacy segments as “already absorbed”.
Consider limiting `hasIndexFor(..., vector)` to `_columnEntries` presence
only (typed entry), and rely on `VectorIndexUtils.hasVectorIndex()` /
`getColumnsWithIndex()` when you need to detect legacy sidecars on disk.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java:
##########
@@ -90,6 +122,290 @@ public boolean needUpdateIndices(SegmentDirectory.Reader
segmentReader) {
return false;
}
+ /**
+ * Absorbs a vector index sidecar into {@code columns.psf} as a typed entry,
then deletes the
+ * sibling. For IVF backends the bytes are copied verbatim; the on-disk IVF
header is the
+ * contract and the reader handles version dispatch. For HNSW, if a
combined-form file already
+ * exists (written by a creator run with the flag on), it is absorbed
directly; if only the
+ * Lucene directory remains (operator just flipped the flag on an existing
segment), it is first
+ * packed into a transient combined file, which is then absorbed and removed.
+ *
+ * <p><b>Crash recovery:</b> if a prior absorb crashed between {@code
newIndexFor} (which
+ * committed bytes into {@code columns.psf} and added a {@code
_columnEntries} entry) and the
+ * sidecar file deletion, the next load will see both forms. We detect that
state upfront via
+ * {@code hasIndexFor}, verify the typed entry's size matches the sidecar's
length (so we do not
+ * delete a sidecar that happens to coexist with an unrelated typed entry),
and clean up the
+ * orphan sidecar instead of re-running the absorb.</p>
+ */
+ private void absorbCombinedIntoColumnsPsf(SegmentDirectory.Writer
segmentWriter, String column,
+ VectorBackendType backendType, File indexDir)
+ throws Exception {
+ File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+ _segmentDirectory.getSegmentMetadata().getVersion());
+ String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+ if (backendType == VectorBackendType.HNSW) {
+ absorbHnswIntoColumnsPsf(segmentWriter, column, v3Dir, segmentName);
+ return;
+ }
+
+ // IVF path: prefer the combined-form file (freshly-written by a creator
run with flag=on);
+ // fall back to the legacy sidecar (segments built with flag=off that the
operator now wants
+ // consolidated).
+ File combinedFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ true));
+ File legacyFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ false));
+ File picked = combinedFile.exists() ? combinedFile : legacyFile;
+ if (!picked.exists()) {
+ LOGGER.warn("Expected vector index file {} not found during vector
consolidation; skipping", picked);
+ return;
+ }
+ // Crash-recovery: if a prior absorb committed bytes to columns.psf but
crashed before
+ // deleting the sidecar, the typed entry is already present. Detect that
directly via
+ // hasIndexFor + size check, rather than catching the duplicate-key
exception by message.
+ if (segmentWriter.hasIndexFor(column, StandardIndexes.vector())) {
+ long existingSize = segmentWriter.getIndexFor(column,
StandardIndexes.vector()).size();
+ if (existingSize == picked.length()) {
+ LOGGER.warn("Vector index already present in columns.psf for segment:
{}, column: {}; "
+ + "deleting orphan sidecar file from a previously-crashed absorb
run", segmentName, column);
+ FileUtils.deleteQuietly(picked);
+ return;
+ }
+ // Size mismatch — the typed entry is from a different build than the
sidecar. Refuse to
+ // proceed; an operator must reconcile manually rather than have us
guess.
+ throw new IOException("Vector index already present in columns.psf for
column: " + column
+ + " (size=" + existingSize + ") but sidecar file " + picked.getName()
+ + " has different size " + picked.length() + ". Refusing to
overwrite — please remove "
+ + "the conflicting sidecar or rebuild the segment.");
+ }
Review Comment:
Crash-recovery in `absorbCombinedIntoColumnsPsf` uses
`segmentWriter.hasIndexFor(column, StandardIndexes.vector())` to infer that a
typed entry already exists in `columns.psf`. With the updated
`SingleFileIndexDirectory.hasIndexFor`, this can be true just because a legacy
sidecar exists on disk, even when no typed entry has been created yet. In that
case, the subsequent `getIndexFor(...).size()` will throw and block the
legacy→columns.psf migration.
Safer pattern: attempt `segmentWriter.getIndexFor(...)` and treat “missing
typed entry” as the normal absorb path; only run the size-check recovery when
`getIndexFor` succeeds.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java:
##########
@@ -147,16 +148,31 @@ private void copyIndexData(File v2Directory,
SegmentMetadataImpl v2Metadata, Fil
SegmentDirectory.Writer v3DataWriter = v3Segment.createWriter()) {
for (String column : v2Metadata.getAllColumns()) {
for (IndexType<?, ?, ?> indexType :
IndexService.getInstance().getAllIndexes()) {
- //If Text index files are combined merge into columns.psf else
no-op
+ // Text index: skip the standard copy when a legacy Lucene
directory is present
+ // ({@code copyLuceneTextIndexIfExists} below handles that as a
sibling copy).
+ // Combined .text.index files fall through to the standard path
and get packed.
if (indexType == StandardIndexes.text()) {
if (!TextIndexUtils.hasTextIndex(v2Directory, column)) {
copyIndexIfExists(v2DataReader, v3DataWriter, column,
indexType);
}
- } else {
- if (indexType != StandardIndexes.vector()) {
+ continue;
+ }
+ // Vector index mirrors text: skip the standard copy when a legacy
IVF sidecar (or
+ // HNSW directory) is present ({@code copyVectorIndexIfExists}
handles those as
+ // sibling copies). Combined .vector.ivfflat.combined.index /
.ivfpq.combined.index
+ // files fall through to the standard path and get packed into
columns.psf.
+ // Mixed state: if both legacy and combined exist (e.g. operator
toggled the flag
+ // and rebuilt without cleaning the old file), combined wins —
pack it and let
+ // copyVectorIndexIfExists drop the legacy sibling so the operator
gets exactly one
+ // copy of the bytes in the V3 segment.
+ if (indexType == StandardIndexes.vector()) {
+ boolean hasCombined =
VectorIndexUtils.hasCombinedFormVectorIndex(v2Directory, column);
+ if (hasCombined || !VectorIndexUtils.hasVectorIndex(v2Directory,
column)) {
copyIndexIfExists(v2DataReader, v3DataWriter, column,
indexType);
}
+ continue;
}
Review Comment:
When `hasCombinedFormVectorIndex(...)` is true, this path calls
`copyIndexIfExists(...)` for `StandardIndexes.vector()`. In V1/V2 segments, the
underlying `SegmentDirectory.Reader` is file-based and may resolve the vector
index to a *directory* (legacy HNSW) rather than the combined packed file,
which then fails when mapping as a regular file.
To avoid converter failures in mixed/crash states (legacy HNSW dir +
combined file both present), consider only taking the “standard copy into
columns.psf” path when *no* legacy vector index exists
(`!VectorIndexUtils.hasVectorIndex(...)`). That still packs combined-only
artifacts, while deferring mixed states to the sibling-copy path.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java:
##########
@@ -90,6 +122,290 @@ public boolean needUpdateIndices(SegmentDirectory.Reader
segmentReader) {
return false;
}
+ /**
+ * Absorbs a vector index sidecar into {@code columns.psf} as a typed entry,
then deletes the
+ * sibling. For IVF backends the bytes are copied verbatim; the on-disk IVF
header is the
+ * contract and the reader handles version dispatch. For HNSW, if a
combined-form file already
+ * exists (written by a creator run with the flag on), it is absorbed
directly; if only the
+ * Lucene directory remains (operator just flipped the flag on an existing
segment), it is first
+ * packed into a transient combined file, which is then absorbed and removed.
+ *
+ * <p><b>Crash recovery:</b> if a prior absorb crashed between {@code
newIndexFor} (which
+ * committed bytes into {@code columns.psf} and added a {@code
_columnEntries} entry) and the
+ * sidecar file deletion, the next load will see both forms. We detect that
state upfront via
+ * {@code hasIndexFor}, verify the typed entry's size matches the sidecar's
length (so we do not
+ * delete a sidecar that happens to coexist with an unrelated typed entry),
and clean up the
+ * orphan sidecar instead of re-running the absorb.</p>
+ */
+ private void absorbCombinedIntoColumnsPsf(SegmentDirectory.Writer
segmentWriter, String column,
+ VectorBackendType backendType, File indexDir)
+ throws Exception {
+ File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+ _segmentDirectory.getSegmentMetadata().getVersion());
+ String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+ if (backendType == VectorBackendType.HNSW) {
+ absorbHnswIntoColumnsPsf(segmentWriter, column, v3Dir, segmentName);
+ return;
+ }
+
+ // IVF path: prefer the combined-form file (freshly-written by a creator
run with flag=on);
+ // fall back to the legacy sidecar (segments built with flag=off that the
operator now wants
+ // consolidated).
+ File combinedFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ true));
+ File legacyFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ false));
+ File picked = combinedFile.exists() ? combinedFile : legacyFile;
+ if (!picked.exists()) {
+ LOGGER.warn("Expected vector index file {} not found during vector
consolidation; skipping", picked);
+ return;
+ }
+ // Crash-recovery: if a prior absorb committed bytes to columns.psf but
crashed before
+ // deleting the sidecar, the typed entry is already present. Detect that
directly via
+ // hasIndexFor + size check, rather than catching the duplicate-key
exception by message.
+ if (segmentWriter.hasIndexFor(column, StandardIndexes.vector())) {
+ long existingSize = segmentWriter.getIndexFor(column,
StandardIndexes.vector()).size();
+ if (existingSize == picked.length()) {
+ LOGGER.warn("Vector index already present in columns.psf for segment:
{}, column: {}; "
+ + "deleting orphan sidecar file from a previously-crashed absorb
run", segmentName, column);
+ FileUtils.deleteQuietly(picked);
+ return;
+ }
+ // Size mismatch — the typed entry is from a different build than the
sidecar. Refuse to
+ // proceed; an operator must reconcile manually rather than have us
guess.
+ throw new IOException("Vector index already present in columns.psf for
column: " + column
+ + " (size=" + existingSize + ") but sidecar file " + picked.getName()
+ + " has different size " + picked.length() + ". Refusing to
overwrite — please remove "
+ + "the conflicting sidecar or rebuild the segment.");
+ }
+ LOGGER.info("Absorbing vector sidecar file into columns.psf for segment:
{}, column: {} (backend={})",
+ segmentName, column, backendType);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, picked,
StandardIndexes.vector());
+ }
+
+ /**
+ * HNSW-specific absorb: packs the Lucene directory (or uses an existing
combined-form file) into
+ * a single combined file and absorbs it into {@code columns.psf}.
+ *
+ * <p>If the combined-form file already exists (creator ran with the flag on
and produced it),
+ * it is absorbed directly. If only the Lucene directory exists (operator
just flipped the flag
+ * on an existing segment), the directory is first packed into a transient
combined file, which is
+ * then absorbed and cleaned up.</p>
+ */
+ private void absorbHnswIntoColumnsPsf(SegmentDirectory.Writer segmentWriter,
String column, File v3Dir,
+ String segmentName)
+ throws Exception {
+ File combinedFile = new File(v3Dir,
+ column +
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */
true));
+ File hnswDir = new File(v3Dir,
+ column +
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */
false));
+ if (!combinedFile.exists()) {
+ if (!hnswDir.exists() || !hnswDir.isDirectory()) {
+ LOGGER.warn("Expected HNSW directory or combined file for column {}
not found; skipping absorb", column);
+ return;
+ }
+ // Pack the Lucene directory into a transient combined file for
absorption. The directory
+ // and the combined file are both cleaned up below after a successful
absorb (or by the
+ // crash-recovery branch above on a subsequent retry).
+ HnswVectorIndexCombined.combineHnswIndexFiles(hnswDir,
combinedFile.getAbsolutePath(), v3Dir.getParentFile(),
+ column);
+ }
+
+ // Crash-recovery: check whether the typed entry already exists.
+ if (segmentWriter.hasIndexFor(column, StandardIndexes.vector())) {
+ long existingSize = segmentWriter.getIndexFor(column,
StandardIndexes.vector()).size();
+ if (existingSize == combinedFile.length()) {
+ LOGGER.warn("HNSW vector index already present in columns.psf for
segment: {}, column: {}; "
+ + "deleting orphan files from a previously-crashed absorb run",
segmentName, column);
+ FileUtils.deleteQuietly(combinedFile);
+ // Clean up the Lucene directory regardless of createdTransient: if we
packed the directory
+ // into a transient combined file in this very run, the directory
still exists and must be
+ // removed to prevent hasCombinedFile from re-triggering on the next
segment load.
+ if (hnswDir.exists()) {
+ FileUtils.deleteDirectory(hnswDir);
+ }
+ return;
+ }
+ throw new IOException("HNSW vector index already present in columns.psf
for column: " + column
+ + " (size=" + existingSize + ") but combined file " +
combinedFile.getName()
+ + " has different size " + combinedFile.length() + ". Refusing to
overwrite — please remove "
+ + "the conflicting sidecar or rebuild the segment.");
+ }
Review Comment:
Same issue as the IVF absorb path: `absorbHnswIntoColumnsPsf` uses
`segmentWriter.hasIndexFor(column, StandardIndexes.vector())` for
crash-recovery, but that can be true solely because the legacy HNSW Lucene
directory exists on disk. That makes `getIndexFor(...).size()` throw and
prevents sidecar→columns.psf migration for existing segments when
`storeInSegmentFile` is toggled on.
Use the same “try getIndexFor; if present run size-check recovery” approach
here.
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/VectorIndexConfig.java:
##########
@@ -139,6 +149,31 @@ public VectorIndexConfig setProperties(Map<String, String>
properties) {
return this;
}
+ /**
+ * Whether the IVF vector index payload is consolidated into the segment's
combined index file
+ * ({@code columns.psf}) rather than written as a combined file. Default
{@code false}. Only
+ * applies to V3 segments and IVF backends; HNSW is unaffected.
+ *
Review Comment:
The Javadoc for `isStoreInSegmentFile()` says this flag “only applies … to
IVF backends; HNSW is unaffected”, but this PR wires the same flag through the
HNSW creator/reader/conversion path (combined packed file + `columns.psf` typed
entry). This is misleading for config consumers and reviewers.
Update the Javadoc to reflect that the flag applies to vector indexes in V3
segments (IVF and HNSW), with backend-specific details in the subsequent
paragraphs.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/vector/lucene99/HnswVectorIndexCombined.java:
##########
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.vector.lucene99;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneCombinedTextIndexConstants;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to pack a Lucene HNSW index directory (and its optional docId
mapping file) into
+ * a single combined file using the {@link
LuceneCombinedTextIndexConstants#MAGIC_NUMBER LUCENE_V2}
+ * layout. Mirrors {@code LuceneTextIndexCombined} — reuses the same format
constants to keep a
+ * single on-disk format shared across text and HNSW vector indexes.
+ *
+ * <p>Layout (identical to the text-index LUCENE_V2 format):</p>
+ * <pre>
+ * [Header]
+ * Magic "LUCENE_V2" 9 bytes
+ * Version 4 bytes (little-endian int)
+ * Total buffer size 8 bytes (little-endian long)
+ * File count 4 bytes (little-endian int)
+ * Reserved 4 bytes
+ *
+ * [File metadata, one entry per file]
+ * Name length 2 bytes (little-endian short)
+ * Name variable
+ * File offset 8 bytes (little-endian long)
+ * File size 8 bytes (little-endian long)
+ *
+ * [File data]
+ * Raw bytes of each file concatenated in metadata order
+ * </pre>
+ */
+public final class HnswVectorIndexCombined {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HnswVectorIndexCombined.class);
+
+ private HnswVectorIndexCombined() {
+ }
+
+ /**
+ * Packs all files in {@code hnswIndexDir} (plus the optional docId mapping
file) into a single
+ * combined file at {@code outputFilePath}.
+ *
+ * @param hnswIndexDir the Lucene HNSW index directory to pack
+ * @param outputFilePath destination path for the combined file
+ * @param segmentIndexDir when non-null, the segment's top-level index
directory; used to
+ * locate the docId mapping file for inclusion in
the packed output
+ * @param column column name; used to locate the docId mapping
file when
+ * {@code segmentIndexDir} is provided
+ * @throws IOException if any file operations fail
+ */
+ public static void combineHnswIndexFiles(File hnswIndexDir, String
outputFilePath,
+ @Nullable File segmentIndexDir, @Nullable String column)
+ throws IOException {
+ if (!hnswIndexDir.exists() || !hnswIndexDir.isDirectory()) {
+ throw new IllegalArgumentException(
+ "HNSW index directory does not exist or is not a directory: " +
hnswIndexDir);
+ }
+
+ LOGGER.info("Combining HNSW index files from directory: {}",
hnswIndexDir.getAbsolutePath());
+
+ Map<String, FileInfo> fileInfoMap = collectFiles(hnswIndexDir,
segmentIndexDir, column);
+ int fileCount = fileInfoMap.size();
+
+ if (fileCount == 0) {
+ throw new IOException("No files found in HNSW index directory: " +
hnswIndexDir);
+ }
+
+ long totalSize = calculateTotalBufferSize(fileInfoMap);
+ if (totalSize > Integer.MAX_VALUE) {
+ throw new IOException("Combined HNSW index size too large: " + totalSize
+ " bytes");
+ }
+
+ File outputFile = new File(outputFilePath);
+ try (FileChannel outputChannel = FileChannel.open(outputFile.toPath(),
StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE)) {
+ writeHeader(outputChannel, fileCount, (int) totalSize);
Review Comment:
`combineHnswIndexFiles` opens the output file with `CREATE, WRITE` but
without `TRUNCATE_EXISTING`. If the combined file already exists from a prior
failed attempt and the new combined payload is smaller, stale trailing bytes
will remain on disk. That can break later size-based checks (e.g.,
crash-recovery comparing `combinedFile.length()` to a typed-entry size) and
wastes space.
Open the channel with `TRUNCATE_EXISTING` (or explicitly delete the file
first).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]