xiangfu0 commented on code in PR #18852:
URL: https://github.com/apache/pinot/pull/18852#discussion_r3483406960
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java:
##########
@@ -160,8 +160,11 @@ public boolean hasIndexFor(String column, IndexType<?, ?,
?> type) {
if (type == StandardIndexes.text() &&
TextIndexUtils.hasTextIndex(_segmentDirectory, column)) {
return true;
}
- if (type == StandardIndexes.vector()) {
- return VectorIndexUtils.hasVectorIndex(_segmentDirectory, column);
+ // Vector index may live either as a combined file (legacy /
storeInSegmentFile=false) or as
+ // a typed entry inside columns.psf (storeInSegmentFile=true). Check both
— mirror the text
+ // pattern of "combined OR _columnEntries".
+ if (type == StandardIndexes.vector() &&
VectorIndexUtils.hasVectorIndex(_segmentDirectory, column)) {
+ return true;
}
IndexKey key = new IndexKey(column, type);
return _columnEntries.containsKey(key);
Review Comment:
Kept `hasIndexFor(col, vector())` as "legacy sidecar OR typed entry" on
purpose: `PhysicalColumnIndexContainer` gates *all* reader creation on
`hasIndexFor`, so a legacy-sidecar segment (payload only on disk, not in
`_columnEntries`) must report `true` or its vector reader would never be built.
Limiting it to `_columnEntries` would stop legacy vector indexes from loading
(and would diverge from the existing text-index pattern, where a Lucene sidecar
directory also makes `hasIndexFor` true).
Instead, the callers that specifically need "is the payload already packed
into columns.psf" now probe the typed entry directly via a new
`VectorIndexUtils.getConsolidatedVectorEntry()`, which returns `null` (rather
than throwing) when no typed slot exists. That fixes the mis-detection in
`VectorIndexHandler` (threads below) without breaking the load path. Fixed in
c04730d.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java:
##########
@@ -90,6 +122,290 @@ public boolean needUpdateIndices(SegmentDirectory.Reader
segmentReader) {
return false;
}
+ /**
+ * Absorbs a vector index sidecar into {@code columns.psf} as a typed entry,
then deletes the
+ * sibling. For IVF backends the bytes are copied verbatim; the on-disk IVF
header is the
+ * contract and the reader handles version dispatch. For HNSW, if a
combined-form file already
+ * exists (written by a creator run with the flag on), it is absorbed
directly; if only the
+ * Lucene directory remains (operator just flipped the flag on an existing
segment), it is first
+ * packed into a transient combined file, which is then absorbed and removed.
+ *
+ * <p><b>Crash recovery:</b> if a prior absorb crashed between {@code
newIndexFor} (which
+ * committed bytes into {@code columns.psf} and added a {@code
_columnEntries} entry) and the
+ * sidecar file deletion, the next load will see both forms. We detect that
state upfront via
+ * {@code hasIndexFor}, verify the typed entry's size matches the sidecar's
length (so we do not
+ * delete a sidecar that happens to coexist with an unrelated typed entry),
and clean up the
+ * orphan sidecar instead of re-running the absorb.</p>
+ */
+ private void absorbCombinedIntoColumnsPsf(SegmentDirectory.Writer
segmentWriter, String column,
+ VectorBackendType backendType, File indexDir)
+ throws Exception {
+ File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+ _segmentDirectory.getSegmentMetadata().getVersion());
+ String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+ if (backendType == VectorBackendType.HNSW) {
+ absorbHnswIntoColumnsPsf(segmentWriter, column, v3Dir, segmentName);
+ return;
+ }
+
+ // IVF path: prefer the combined-form file (freshly-written by a creator
run with flag=on);
+ // fall back to the legacy sidecar (segments built with flag=off that the
operator now wants
+ // consolidated).
+ File combinedFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ true));
+ File legacyFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ false));
+ File picked = combinedFile.exists() ? combinedFile : legacyFile;
+ if (!picked.exists()) {
+ LOGGER.warn("Expected vector index file {} not found during vector
consolidation; skipping", picked);
+ return;
+ }
+ // Crash-recovery: if a prior absorb committed bytes to columns.psf but
crashed before
+ // deleting the sidecar, the typed entry is already present. Detect that
directly via
+ // hasIndexFor + size check, rather than catching the duplicate-key
exception by message.
+ if (segmentWriter.hasIndexFor(column, StandardIndexes.vector())) {
+ long existingSize = segmentWriter.getIndexFor(column,
StandardIndexes.vector()).size();
+ if (existingSize == picked.length()) {
+ LOGGER.warn("Vector index already present in columns.psf for segment:
{}, column: {}; "
+ + "deleting orphan sidecar file from a previously-crashed absorb
run", segmentName, column);
+ FileUtils.deleteQuietly(picked);
+ return;
+ }
+ // Size mismatch — the typed entry is from a different build than the
sidecar. Refuse to
+ // proceed; an operator must reconcile manually rather than have us
guess.
+ throw new IOException("Vector index already present in columns.psf for
column: " + column
+ + " (size=" + existingSize + ") but sidecar file " + picked.getName()
+ + " has different size " + picked.length() + ". Refusing to
overwrite — please remove "
+ + "the conflicting sidecar or rebuild the segment.");
+ }
Review Comment:
Fixed in c04730d. The IVF absorb crash-recovery no longer trusts
`hasIndexFor`; it probes the typed entry via `getConsolidatedVectorEntry()` and
treats a missing slot as the normal first-absorb path (only running the
size-check recovery when a typed entry actually exists). Added a regression
test `testUpdateIndicesAbsorbsLegacySidecarWhenNoTypedEntryYet` that mocks the
real behavior (`hasIndexFor` true because the sidecar is on disk, `getIndexFor`
throws because no typed entry) and asserts the absorb proceeds and deletes the
sidecar.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java:
##########
@@ -90,6 +122,290 @@ public boolean needUpdateIndices(SegmentDirectory.Reader
segmentReader) {
return false;
}
+ /**
+ * Absorbs a vector index sidecar into {@code columns.psf} as a typed entry,
then deletes the
+ * sibling. For IVF backends the bytes are copied verbatim; the on-disk IVF
header is the
+ * contract and the reader handles version dispatch. For HNSW, if a
combined-form file already
+ * exists (written by a creator run with the flag on), it is absorbed
directly; if only the
+ * Lucene directory remains (operator just flipped the flag on an existing
segment), it is first
+ * packed into a transient combined file, which is then absorbed and removed.
+ *
+ * <p><b>Crash recovery:</b> if a prior absorb crashed between {@code
newIndexFor} (which
+ * committed bytes into {@code columns.psf} and added a {@code
_columnEntries} entry) and the
+ * sidecar file deletion, the next load will see both forms. We detect that
state upfront via
+ * {@code hasIndexFor}, verify the typed entry's size matches the sidecar's
length (so we do not
+ * delete a sidecar that happens to coexist with an unrelated typed entry),
and clean up the
+ * orphan sidecar instead of re-running the absorb.</p>
+ */
+ private void absorbCombinedIntoColumnsPsf(SegmentDirectory.Writer
segmentWriter, String column,
+ VectorBackendType backendType, File indexDir)
+ throws Exception {
+ File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+ _segmentDirectory.getSegmentMetadata().getVersion());
+ String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+ if (backendType == VectorBackendType.HNSW) {
+ absorbHnswIntoColumnsPsf(segmentWriter, column, v3Dir, segmentName);
+ return;
+ }
+
+ // IVF path: prefer the combined-form file (freshly-written by a creator
run with flag=on);
+ // fall back to the legacy sidecar (segments built with flag=off that the
operator now wants
+ // consolidated).
+ File combinedFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ true));
+ File legacyFile = new File(v3Dir,
+ column + VectorIndexUtils.getIndexFileExtension(backendType, /*
combined */ false));
+ File picked = combinedFile.exists() ? combinedFile : legacyFile;
+ if (!picked.exists()) {
+ LOGGER.warn("Expected vector index file {} not found during vector
consolidation; skipping", picked);
+ return;
+ }
+ // Crash-recovery: if a prior absorb committed bytes to columns.psf but
crashed before
+ // deleting the sidecar, the typed entry is already present. Detect that
directly via
+ // hasIndexFor + size check, rather than catching the duplicate-key
exception by message.
+ if (segmentWriter.hasIndexFor(column, StandardIndexes.vector())) {
+ long existingSize = segmentWriter.getIndexFor(column,
StandardIndexes.vector()).size();
+ if (existingSize == picked.length()) {
+ LOGGER.warn("Vector index already present in columns.psf for segment:
{}, column: {}; "
+ + "deleting orphan sidecar file from a previously-crashed absorb
run", segmentName, column);
+ FileUtils.deleteQuietly(picked);
+ return;
+ }
+ // Size mismatch — the typed entry is from a different build than the
sidecar. Refuse to
+ // proceed; an operator must reconcile manually rather than have us
guess.
+ throw new IOException("Vector index already present in columns.psf for
column: " + column
+ + " (size=" + existingSize + ") but sidecar file " + picked.getName()
+ + " has different size " + picked.length() + ". Refusing to
overwrite — please remove "
+ + "the conflicting sidecar or rebuild the segment.");
+ }
+ LOGGER.info("Absorbing vector sidecar file into columns.psf for segment:
{}, column: {} (backend={})",
+ segmentName, column, backendType);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, picked,
StandardIndexes.vector());
+ }
+
+ /**
+ * HNSW-specific absorb: packs the Lucene directory (or uses an existing
combined-form file) into
+ * a single combined file and absorbs it into {@code columns.psf}.
+ *
+ * <p>If the combined-form file already exists (creator ran with the flag on
and produced it),
+ * it is absorbed directly. If only the Lucene directory exists (operator
just flipped the flag
+ * on an existing segment), the directory is first packed into a transient
combined file, which is
+ * then absorbed and cleaned up.</p>
+ */
+ private void absorbHnswIntoColumnsPsf(SegmentDirectory.Writer segmentWriter,
String column, File v3Dir,
+ String segmentName)
+ throws Exception {
+ File combinedFile = new File(v3Dir,
+ column +
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */
true));
+ File hnswDir = new File(v3Dir,
+ column +
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */
false));
+ if (!combinedFile.exists()) {
+ if (!hnswDir.exists() || !hnswDir.isDirectory()) {
+ LOGGER.warn("Expected HNSW directory or combined file for column {}
not found; skipping absorb", column);
+ return;
+ }
+ // Pack the Lucene directory into a transient combined file for
absorption. The directory
+ // and the combined file are both cleaned up below after a successful
absorb (or by the
+ // crash-recovery branch above on a subsequent retry).
+ HnswVectorIndexCombined.combineHnswIndexFiles(hnswDir,
combinedFile.getAbsolutePath(), v3Dir.getParentFile(),
+ column);
+ }
+
+ // Crash-recovery: check whether the typed entry already exists.
+ if (segmentWriter.hasIndexFor(column, StandardIndexes.vector())) {
+ long existingSize = segmentWriter.getIndexFor(column,
StandardIndexes.vector()).size();
+ if (existingSize == combinedFile.length()) {
+ LOGGER.warn("HNSW vector index already present in columns.psf for
segment: {}, column: {}; "
+ + "deleting orphan files from a previously-crashed absorb run",
segmentName, column);
+ FileUtils.deleteQuietly(combinedFile);
+ // Clean up the Lucene directory regardless of createdTransient: if we
packed the directory
+ // into a transient combined file in this very run, the directory
still exists and must be
+ // removed to prevent hasCombinedFile from re-triggering on the next
segment load.
+ if (hnswDir.exists()) {
+ FileUtils.deleteDirectory(hnswDir);
+ }
+ return;
+ }
+ throw new IOException("HNSW vector index already present in columns.psf
for column: " + column
+ + " (size=" + existingSize + ") but combined file " +
combinedFile.getName()
+ + " has different size " + combinedFile.length() + ". Refusing to
overwrite — please remove "
+ + "the conflicting sidecar or rebuild the segment.");
+ }
Review Comment:
Same fix as the IVF path, applied to `absorbHnswIntoColumnsPsf` in c04730d —
it probes `getConsolidatedVectorEntry()` instead of `hasIndexFor` (which would
be true here just because the still-present legacy Lucene directory is on
disk), so a normal first absorb is no longer mis-read as already-absorbed.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java:
##########
@@ -147,16 +148,31 @@ private void copyIndexData(File v2Directory,
SegmentMetadataImpl v2Metadata, Fil
SegmentDirectory.Writer v3DataWriter = v3Segment.createWriter()) {
for (String column : v2Metadata.getAllColumns()) {
for (IndexType<?, ?, ?> indexType :
IndexService.getInstance().getAllIndexes()) {
- //If Text index files are combined merge into columns.psf else
no-op
+ // Text index: skip the standard copy when a legacy Lucene
directory is present
+ // ({@code copyLuceneTextIndexIfExists} below handles that as a
sibling copy).
+ // Combined .text.index files fall through to the standard path
and get packed.
if (indexType == StandardIndexes.text()) {
if (!TextIndexUtils.hasTextIndex(v2Directory, column)) {
copyIndexIfExists(v2DataReader, v3DataWriter, column,
indexType);
}
- } else {
- if (indexType != StandardIndexes.vector()) {
+ continue;
+ }
+ // Vector index mirrors text: skip the standard copy when a legacy
IVF sidecar (or
+ // HNSW directory) is present ({@code copyVectorIndexIfExists}
handles those as
+ // sibling copies). Combined .vector.ivfflat.combined.index /
.ivfpq.combined.index
+ // files fall through to the standard path and get packed into
columns.psf.
+ // Mixed state: if both legacy and combined exist (e.g. operator
toggled the flag
+ // and rebuilt without cleaning the old file), combined wins —
pack it and let
+ // copyVectorIndexIfExists drop the legacy sibling so the operator
gets exactly one
+ // copy of the bytes in the V3 segment.
+ if (indexType == StandardIndexes.vector()) {
+ boolean hasCombined =
VectorIndexUtils.hasCombinedFormVectorIndex(v2Directory, column);
+ if (hasCombined || !VectorIndexUtils.hasVectorIndex(v2Directory,
column)) {
copyIndexIfExists(v2DataReader, v3DataWriter, column,
indexType);
}
+ continue;
}
Review Comment:
Fixed in c04730d, with one adjustment: switching purely to
`!hasVectorIndex(...)` would silently lose the index in a mixed **IVF** state —
`copyVectorIndexIfExists` skips a legacy IVF sidecar when a combined sibling
exists, and the combined form would no longer be packed either, leaving the
column with no vector index in V3.
So the gate now packs the combined form into columns.psf only when the
resolved on-disk artifact is a regular file (`!resolved.isDirectory()`). A
legacy HNSW Lucene **directory** (which `findHnswVectorIndexFile` resolves
before the combined file) is deferred to the sibling copy — avoiding the
directory-as-buffer mapping you flagged — while a legacy IVF **file** still
packs, so the index is never lost. `VectorIndexHandler` re-absorbs the deferred
HNSW directory later if `storeInSegmentFile` stays on.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/vector/lucene99/HnswVectorIndexCombined.java:
##########
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.vector.lucene99;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneCombinedTextIndexConstants;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to pack a Lucene HNSW index directory (and its optional docId
mapping file) into
+ * a single combined file using the {@link
LuceneCombinedTextIndexConstants#MAGIC_NUMBER LUCENE_V2}
+ * layout. Mirrors {@code LuceneTextIndexCombined} — reuses the same format
constants to keep a
+ * single on-disk format shared across text and HNSW vector indexes.
+ *
+ * <p>Layout (identical to the text-index LUCENE_V2 format):</p>
+ * <pre>
+ * [Header]
+ * Magic "LUCENE_V2" 9 bytes
+ * Version 4 bytes (little-endian int)
+ * Total buffer size 8 bytes (little-endian long)
+ * File count 4 bytes (little-endian int)
+ * Reserved 4 bytes
+ *
+ * [File metadata, one entry per file]
+ * Name length 2 bytes (little-endian short)
+ * Name variable
+ * File offset 8 bytes (little-endian long)
+ * File size 8 bytes (little-endian long)
+ *
+ * [File data]
+ * Raw bytes of each file concatenated in metadata order
+ * </pre>
+ */
+public final class HnswVectorIndexCombined {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HnswVectorIndexCombined.class);
+
+ private HnswVectorIndexCombined() {
+ }
+
+ /**
+ * Packs all files in {@code hnswIndexDir} (plus the optional docId mapping
file) into a single
+ * combined file at {@code outputFilePath}.
+ *
+ * @param hnswIndexDir the Lucene HNSW index directory to pack
+ * @param outputFilePath destination path for the combined file
+ * @param segmentIndexDir when non-null, the segment's top-level index
directory; used to
+ * locate the docId mapping file for inclusion in
the packed output
+ * @param column column name; used to locate the docId mapping
file when
+ * {@code segmentIndexDir} is provided
+ * @throws IOException if any file operations fail
+ */
+ public static void combineHnswIndexFiles(File hnswIndexDir, String
outputFilePath,
+ @Nullable File segmentIndexDir, @Nullable String column)
+ throws IOException {
+ if (!hnswIndexDir.exists() || !hnswIndexDir.isDirectory()) {
+ throw new IllegalArgumentException(
+ "HNSW index directory does not exist or is not a directory: " +
hnswIndexDir);
+ }
+
+ LOGGER.info("Combining HNSW index files from directory: {}",
hnswIndexDir.getAbsolutePath());
+
+ Map<String, FileInfo> fileInfoMap = collectFiles(hnswIndexDir,
segmentIndexDir, column);
+ int fileCount = fileInfoMap.size();
+
+ if (fileCount == 0) {
+ throw new IOException("No files found in HNSW index directory: " +
hnswIndexDir);
+ }
+
+ long totalSize = calculateTotalBufferSize(fileInfoMap);
+ if (totalSize > Integer.MAX_VALUE) {
+ throw new IOException("Combined HNSW index size too large: " + totalSize
+ " bytes");
+ }
+
+ File outputFile = new File(outputFilePath);
+ try (FileChannel outputChannel = FileChannel.open(outputFile.toPath(),
StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE)) {
+ writeHeader(outputChannel, fileCount, (int) totalSize);
Review Comment:
Addressed — `combineHnswIndexFiles` opens the output channel with
`StandardOpenOption.TRUNCATE_EXISTING` so a leftover larger file from a
previously-crashed pack cannot leave stale trailing bytes that would inflate
the length and break the size-based crash-recovery check.
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/VectorIndexConfig.java:
##########
@@ -139,6 +149,31 @@ public VectorIndexConfig setProperties(Map<String, String>
properties) {
return this;
}
+ /**
+ * Whether the IVF vector index payload is consolidated into the segment's
combined index file
+ * ({@code columns.psf}) rather than written as a combined file. Default
{@code false}. Only
+ * applies to V3 segments and IVF backends; HNSW is unaffected.
+ *
Review Comment:
Updated — the `isStoreInSegmentFile()` Javadoc now states the flag applies
to V3 segments for both IVF and HNSW backends, with the backend-specific
build/read details in the following paragraphs.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswVectorIndexBufferReader.java:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.vector;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.lucene.store.Directory;
+import
org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexBufferReader;
+import
org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexHeader;
+import
org.apache.pinot.segment.local.segment.index.readers.text.PinotBufferLuceneDirectory;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility for reading an HNSW vector index from a combined buffer ({@link
PinotDataBuffer}).
+ *
+ * <p>The combined buffer uses the same {@code LUCENE_V2} layout as the text
index (written by
+ * {@code HnswVectorIndexCombined}). This class delegates header and metadata
parsing to
+ * {@link LuceneTextIndexBufferReader}, then constructs a {@link
PinotBufferLuceneDirectory} that
+ * Lucene's {@code DirectoryReader} can open without any filesystem access.</p>
+ *
+ * <p>Two entry-points are provided:</p>
+ * <ul>
+ * <li>{@link #createLuceneDirectory} — a read-only {@link Directory} over
the Lucene index
+ * files embedded in the buffer.</li>
+ * <li>{@link #extractDocIdMappingBuffer} — a sub-buffer view covering just
the docId mapping
+ * bytes, or {@code null} if the mapping was not packed.</li>
+ * </ul>
+ */
+public final class HnswVectorIndexBufferReader {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HnswVectorIndexBufferReader.class);
+
+ private HnswVectorIndexBufferReader() {
+ }
+
+ /**
+ * Creates a Lucene {@link Directory} that reads all Lucene index files from
a combined HNSW
+ * buffer. The docId mapping file (a Pinot-only artifact with the
+ * {@link
V1Constants.Indexes#VECTOR_HNSW_INDEX_DOCID_MAPPING_FILE_EXTENSION} suffix) is
+ * excluded from the directory view so that {@code DirectoryReader} does not
see an unknown file.
+ *
+ * <p>The directory does <em>not</em> own the buffer — closing it is a
no-op. The caller must
+ * keep the buffer alive for as long as the directory (and any reader opened
on it) is in use.</p>
+ *
+ * @param indexBuffer combined buffer in LUCENE_V2 format
+ * @param column column name; used to identify and exclude the mapping
file
+ * @return a read-only {@link Directory} backed by the buffer
+ * @throws IOException if the buffer is malformed or the magic/version check
fails
+ */
+ public static Directory createLuceneDirectory(PinotDataBuffer indexBuffer,
String column)
+ throws IOException {
+ List<String> fileNames =
LuceneTextIndexBufferReader.listFiles(indexBuffer);
+ Map<String, LuceneTextIndexHeader.FileInfo> fileMap = new
HashMap<>(fileNames.size());
+
+ String mappingFileName = column +
V1Constants.Indexes.VECTOR_HNSW_INDEX_DOCID_MAPPING_FILE_EXTENSION;
+ for (String name : fileNames) {
+ if (name.equals(mappingFileName)) {
+ // Skip: not a Lucene index file; presenting it to DirectoryReader
triggers an error.
+ continue;
+ }
+ LuceneTextIndexHeader.FileInfo info =
LuceneTextIndexBufferReader.getFileInfo(indexBuffer, name);
+ if (info != null) {
+ fileMap.put(name, info);
+ }
+ }
+
+ LOGGER.debug("Creating buffer-backed Lucene directory for HNSW column '{}'
with {} file(s)", column,
+ fileMap.size());
+ return new PinotBufferLuceneDirectory(indexBuffer, fileMap, column);
+ }
+
+ /**
+ * Extracts a sub-buffer covering the docId mapping bytes packed inside the
combined buffer.
+ *
+ * <p>Returns {@code null} when the mapping was not included (e.g. the index
was built from an
+ * in-memory segment that had not yet materialised a mapping file). The
caller must build the
+ * mapping from the Lucene index in that case.</p>
+ *
+ * <p>The returned sub-buffer is a {@link PinotDataBuffer#view view} of the
original buffer and
+ * shares its lifetime. The caller must not close it independently.</p>
+ *
+ * @param indexBuffer combined buffer in LUCENE_V2 format
+ * @param column column name; used to identify the mapping file entry
+ * @return sub-buffer for the mapping bytes, or {@code null} if not present
+ * @throws IOException if header parsing fails
+ */
+ @Nullable
+ public static PinotDataBuffer extractDocIdMappingBuffer(PinotDataBuffer
indexBuffer, String column)
+ throws IOException {
+ String mappingFileName = column +
V1Constants.Indexes.VECTOR_HNSW_INDEX_DOCID_MAPPING_FILE_EXTENSION;
+ LuceneTextIndexHeader.FileInfo fileInfo =
LuceneTextIndexBufferReader.getFileInfo(indexBuffer, mappingFileName);
+ if (fileInfo == null) {
+ return null;
+ }
+ return indexBuffer.view(fileInfo.getOffset(), fileInfo.getOffset() +
fileInfo.getSize());
Review Comment:
Good catch — fixed in df94a35. `extractDocIdMappingBuffer` now returns a
`LITTLE_ENDIAN` view, so `DocIdTranslator.getInt()` reads the Lucene→Pinot doc
ids unswapped regardless of the big-endian `columns.psf` order the view would
otherwise inherit. Added a regression test
(`testExtractDocIdMappingBufferIsLittleEndian`) that packs a known mapping and
reads it back through a big-endian columns.psf-style buffer, which fails
without the little-endian view.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]