xiangfu0 commented on code in PR #18852:
URL: https://github.com/apache/pinot/pull/18852#discussion_r3483406960


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java:
##########
@@ -160,8 +160,11 @@ public boolean hasIndexFor(String column, IndexType<?, ?, 
?> type) {
     if (type == StandardIndexes.text() && 
TextIndexUtils.hasTextIndex(_segmentDirectory, column)) {
       return true;
     }
-    if (type == StandardIndexes.vector()) {
-      return VectorIndexUtils.hasVectorIndex(_segmentDirectory, column);
+    // Vector index may live either as a combined file (legacy / 
storeInSegmentFile=false) or as
+    // a typed entry inside columns.psf (storeInSegmentFile=true). Check both 
— mirror the text
+    // pattern of "combined OR _columnEntries".
+    if (type == StandardIndexes.vector() && 
VectorIndexUtils.hasVectorIndex(_segmentDirectory, column)) {
+      return true;
     }
     IndexKey key = new IndexKey(column, type);
     return _columnEntries.containsKey(key);

Review Comment:
   Kept `hasIndexFor(col, vector())` as "legacy sidecar OR typed entry" on 
purpose: `PhysicalColumnIndexContainer` gates *all* reader creation on 
`hasIndexFor`, so a legacy-sidecar segment (payload only on disk, not in 
`_columnEntries`) must report `true` or its vector reader would never be built. 
Limiting it to `_columnEntries` would stop legacy vector indexes from loading 
(and would diverge from the existing text-index pattern, where a Lucene sidecar 
directory also makes `hasIndexFor` true).
   
   Instead, the callers that specifically need "is the payload already packed 
into columns.psf" now probe the typed entry directly via a new 
`VectorIndexUtils.getConsolidatedVectorEntry()`, which returns `null` (rather 
than throwing) when no typed slot exists. That fixes the mis-detection in 
`VectorIndexHandler` (threads below) without breaking the load path. Fixed in 
c04730d.
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java:
##########
@@ -90,6 +122,290 @@ public boolean needUpdateIndices(SegmentDirectory.Reader 
segmentReader) {
     return false;
   }
 
+  /**
+   * Absorbs a vector index sidecar into {@code columns.psf} as a typed entry, 
then deletes the
+   * sibling. For IVF backends the bytes are copied verbatim; the on-disk IVF 
header is the
+   * contract and the reader handles version dispatch. For HNSW, if a 
combined-form file already
+   * exists (written by a creator run with the flag on), it is absorbed 
directly; if only the
+   * Lucene directory remains (operator just flipped the flag on an existing 
segment), it is first
+   * packed into a transient combined file, which is then absorbed and removed.
+   *
+   * <p><b>Crash recovery:</b> if a prior absorb crashed between {@code 
newIndexFor} (which
+   * committed bytes into {@code columns.psf} and added a {@code 
_columnEntries} entry) and the
+   * sidecar file deletion, the next load will see both forms. We detect that 
state upfront via
+   * {@code hasIndexFor}, verify the typed entry's size matches the sidecar's 
length (so we do not
+   * delete a sidecar that happens to coexist with an unrelated typed entry), 
and clean up the
+   * orphan sidecar instead of re-running the absorb.</p>
+   */
+  private void absorbCombinedIntoColumnsPsf(SegmentDirectory.Writer 
segmentWriter, String column,
+      VectorBackendType backendType, File indexDir)
+      throws Exception {
+    File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+        _segmentDirectory.getSegmentMetadata().getVersion());
+    String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+    if (backendType == VectorBackendType.HNSW) {
+      absorbHnswIntoColumnsPsf(segmentWriter, column, v3Dir, segmentName);
+      return;
+    }
+
+    // IVF path: prefer the combined-form file (freshly-written by a creator 
run with flag=on);
+    // fall back to the legacy sidecar (segments built with flag=off that the 
operator now wants
+    // consolidated).
+    File combinedFile = new File(v3Dir,
+        column + VectorIndexUtils.getIndexFileExtension(backendType, /* 
combined */ true));
+    File legacyFile = new File(v3Dir,
+        column + VectorIndexUtils.getIndexFileExtension(backendType, /* 
combined */ false));
+    File picked = combinedFile.exists() ? combinedFile : legacyFile;
+    if (!picked.exists()) {
+      LOGGER.warn("Expected vector index file {} not found during vector 
consolidation; skipping", picked);
+      return;
+    }
+    // Crash-recovery: if a prior absorb committed bytes to columns.psf but 
crashed before
+    // deleting the sidecar, the typed entry is already present. Detect that 
directly via
+    // hasIndexFor + size check, rather than catching the duplicate-key 
exception by message.
+    if (segmentWriter.hasIndexFor(column, StandardIndexes.vector())) {
+      long existingSize = segmentWriter.getIndexFor(column, 
StandardIndexes.vector()).size();
+      if (existingSize == picked.length()) {
+        LOGGER.warn("Vector index already present in columns.psf for segment: 
{}, column: {}; "
+            + "deleting orphan sidecar file from a previously-crashed absorb 
run", segmentName, column);
+        FileUtils.deleteQuietly(picked);
+        return;
+      }
+      // Size mismatch — the typed entry is from a different build than the 
sidecar. Refuse to
+      // proceed; an operator must reconcile manually rather than have us 
guess.
+      throw new IOException("Vector index already present in columns.psf for 
column: " + column
+          + " (size=" + existingSize + ") but sidecar file " + picked.getName()
+          + " has different size " + picked.length() + ". Refusing to 
overwrite — please remove "
+          + "the conflicting sidecar or rebuild the segment.");
+    }

Review Comment:
   Fixed in c04730d. The IVF absorb crash-recovery no longer trusts 
`hasIndexFor`; it probes the typed entry via `getConsolidatedVectorEntry()` and 
treats a missing slot as the normal first-absorb path (only running the 
size-check recovery when a typed entry actually exists). Added a regression 
test `testUpdateIndicesAbsorbsLegacySidecarWhenNoTypedEntryYet` that mocks the 
real behavior (`hasIndexFor` true because the sidecar is on disk, `getIndexFor` 
throws because no typed entry) and asserts the absorb proceeds and deletes the 
sidecar.
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java:
##########
@@ -90,6 +122,290 @@ public boolean needUpdateIndices(SegmentDirectory.Reader 
segmentReader) {
     return false;
   }
 
+  /**
+   * Absorbs a vector index sidecar into {@code columns.psf} as a typed entry, 
then deletes the
+   * sibling. For IVF backends the bytes are copied verbatim; the on-disk IVF 
header is the
+   * contract and the reader handles version dispatch. For HNSW, if a 
combined-form file already
+   * exists (written by a creator run with the flag on), it is absorbed 
directly; if only the
+   * Lucene directory remains (operator just flipped the flag on an existing 
segment), it is first
+   * packed into a transient combined file, which is then absorbed and removed.
+   *
+   * <p><b>Crash recovery:</b> if a prior absorb crashed between {@code 
newIndexFor} (which
+   * committed bytes into {@code columns.psf} and added a {@code 
_columnEntries} entry) and the
+   * sidecar file deletion, the next load will see both forms. We detect that 
state upfront via
+   * {@code hasIndexFor}, verify the typed entry's size matches the sidecar's 
length (so we do not
+   * delete a sidecar that happens to coexist with an unrelated typed entry), 
and clean up the
+   * orphan sidecar instead of re-running the absorb.</p>
+   */
+  private void absorbCombinedIntoColumnsPsf(SegmentDirectory.Writer 
segmentWriter, String column,
+      VectorBackendType backendType, File indexDir)
+      throws Exception {
+    File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
+        _segmentDirectory.getSegmentMetadata().getVersion());
+    String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+
+    if (backendType == VectorBackendType.HNSW) {
+      absorbHnswIntoColumnsPsf(segmentWriter, column, v3Dir, segmentName);
+      return;
+    }
+
+    // IVF path: prefer the combined-form file (freshly-written by a creator 
run with flag=on);
+    // fall back to the legacy sidecar (segments built with flag=off that the 
operator now wants
+    // consolidated).
+    File combinedFile = new File(v3Dir,
+        column + VectorIndexUtils.getIndexFileExtension(backendType, /* 
combined */ true));
+    File legacyFile = new File(v3Dir,
+        column + VectorIndexUtils.getIndexFileExtension(backendType, /* 
combined */ false));
+    File picked = combinedFile.exists() ? combinedFile : legacyFile;
+    if (!picked.exists()) {
+      LOGGER.warn("Expected vector index file {} not found during vector 
consolidation; skipping", picked);
+      return;
+    }
+    // Crash-recovery: if a prior absorb committed bytes to columns.psf but 
crashed before
+    // deleting the sidecar, the typed entry is already present. Detect that 
directly via
+    // hasIndexFor + size check, rather than catching the duplicate-key 
exception by message.
+    if (segmentWriter.hasIndexFor(column, StandardIndexes.vector())) {
+      long existingSize = segmentWriter.getIndexFor(column, 
StandardIndexes.vector()).size();
+      if (existingSize == picked.length()) {
+        LOGGER.warn("Vector index already present in columns.psf for segment: 
{}, column: {}; "
+            + "deleting orphan sidecar file from a previously-crashed absorb 
run", segmentName, column);
+        FileUtils.deleteQuietly(picked);
+        return;
+      }
+      // Size mismatch — the typed entry is from a different build than the 
sidecar. Refuse to
+      // proceed; an operator must reconcile manually rather than have us 
guess.
+      throw new IOException("Vector index already present in columns.psf for 
column: " + column
+          + " (size=" + existingSize + ") but sidecar file " + picked.getName()
+          + " has different size " + picked.length() + ". Refusing to 
overwrite — please remove "
+          + "the conflicting sidecar or rebuild the segment.");
+    }
+    LOGGER.info("Absorbing vector sidecar file into columns.psf for segment: 
{}, column: {} (backend={})",
+        segmentName, column, backendType);
+    LoaderUtils.writeIndexToV3Format(segmentWriter, column, picked, 
StandardIndexes.vector());
+  }
+
+  /**
+   * HNSW-specific absorb: packs the Lucene directory (or uses an existing 
combined-form file) into
+   * a single combined file and absorbs it into {@code columns.psf}.
+   *
+   * <p>If the combined-form file already exists (creator ran with the flag on 
and produced it),
+   * it is absorbed directly. If only the Lucene directory exists (operator 
just flipped the flag
+   * on an existing segment), the directory is first packed into a transient 
combined file, which is
+   * then absorbed and cleaned up.</p>
+   */
+  private void absorbHnswIntoColumnsPsf(SegmentDirectory.Writer segmentWriter, 
String column, File v3Dir,
+      String segmentName)
+      throws Exception {
+    File combinedFile = new File(v3Dir,
+        column + 
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */ 
true));
+    File hnswDir = new File(v3Dir,
+        column + 
VectorIndexUtils.getIndexFileExtension(VectorBackendType.HNSW, /* combined */ 
false));
+    if (!combinedFile.exists()) {
+      if (!hnswDir.exists() || !hnswDir.isDirectory()) {
+        LOGGER.warn("Expected HNSW directory or combined file for column {} 
not found; skipping absorb", column);
+        return;
+      }
+      // Pack the Lucene directory into a transient combined file for 
absorption. The directory
+      // and the combined file are both cleaned up below after a successful 
absorb (or by the
+      // crash-recovery branch above on a subsequent retry).
+      HnswVectorIndexCombined.combineHnswIndexFiles(hnswDir, 
combinedFile.getAbsolutePath(), v3Dir.getParentFile(),
+          column);
+    }
+
+    // Crash-recovery: check whether the typed entry already exists.
+    if (segmentWriter.hasIndexFor(column, StandardIndexes.vector())) {
+      long existingSize = segmentWriter.getIndexFor(column, 
StandardIndexes.vector()).size();
+      if (existingSize == combinedFile.length()) {
+        LOGGER.warn("HNSW vector index already present in columns.psf for 
segment: {}, column: {}; "
+            + "deleting orphan files from a previously-crashed absorb run", 
segmentName, column);
+        FileUtils.deleteQuietly(combinedFile);
+        // Clean up the Lucene directory regardless of createdTransient: if we 
packed the directory
+        // into a transient combined file in this very run, the directory 
still exists and must be
+        // removed to prevent hasCombinedFile from re-triggering on the next 
segment load.
+        if (hnswDir.exists()) {
+          FileUtils.deleteDirectory(hnswDir);
+        }
+        return;
+      }
+      throw new IOException("HNSW vector index already present in columns.psf 
for column: " + column
+          + " (size=" + existingSize + ") but combined file " + 
combinedFile.getName()
+          + " has different size " + combinedFile.length() + ". Refusing to 
overwrite — please remove "
+          + "the conflicting sidecar or rebuild the segment.");
+    }

Review Comment:
   Same fix as the IVF path, applied to `absorbHnswIntoColumnsPsf` in c04730d — 
it probes `getConsolidatedVectorEntry()` instead of `hasIndexFor` (which would 
be true here just because the still-present legacy Lucene directory is on 
disk), so a normal first absorb is no longer mis-read as already-absorbed.
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java:
##########
@@ -147,16 +148,31 @@ private void copyIndexData(File v2Directory, 
SegmentMetadataImpl v2Metadata, Fil
           SegmentDirectory.Writer v3DataWriter = v3Segment.createWriter()) {
         for (String column : v2Metadata.getAllColumns()) {
           for (IndexType<?, ?, ?> indexType : 
IndexService.getInstance().getAllIndexes()) {
-            //If Text index files are combined merge into columns.psf else 
no-op
+            // Text index: skip the standard copy when a legacy Lucene 
directory is present
+            // ({@code copyLuceneTextIndexIfExists} below handles that as a 
sibling copy).
+            // Combined .text.index files fall through to the standard path 
and get packed.
             if (indexType == StandardIndexes.text()) {
               if (!TextIndexUtils.hasTextIndex(v2Directory, column)) {
                 copyIndexIfExists(v2DataReader, v3DataWriter, column, 
indexType);
               }
-            } else {
-              if (indexType != StandardIndexes.vector()) {
+              continue;
+            }
+            // Vector index mirrors text: skip the standard copy when a legacy 
IVF sidecar (or
+            // HNSW directory) is present ({@code copyVectorIndexIfExists} 
handles those as
+            // sibling copies). Combined .vector.ivfflat.combined.index / 
.ivfpq.combined.index
+            // files fall through to the standard path and get packed into 
columns.psf.
+            // Mixed state: if both legacy and combined exist (e.g. operator 
toggled the flag
+            // and rebuilt without cleaning the old file), combined wins — 
pack it and let
+            // copyVectorIndexIfExists drop the legacy sibling so the operator 
gets exactly one
+            // copy of the bytes in the V3 segment.
+            if (indexType == StandardIndexes.vector()) {
+              boolean hasCombined = 
VectorIndexUtils.hasCombinedFormVectorIndex(v2Directory, column);
+              if (hasCombined || !VectorIndexUtils.hasVectorIndex(v2Directory, 
column)) {
                 copyIndexIfExists(v2DataReader, v3DataWriter, column, 
indexType);
               }
+              continue;
             }

Review Comment:
   Fixed in c04730d, with one adjustment: switching purely to 
`!hasVectorIndex(...)` would silently lose the index in a mixed **IVF** state — 
`copyVectorIndexIfExists` skips a legacy IVF sidecar when a combined sibling 
exists, and the combined form would no longer be packed either, leaving the 
column with no vector index in V3.
   
   So the gate now packs the combined form into columns.psf only when the 
resolved on-disk artifact is a regular file (`!resolved.isDirectory()`). A 
legacy HNSW Lucene **directory** (which `findHnswVectorIndexFile` resolves 
before the combined file) is deferred to the sibling copy — avoiding the 
directory-as-buffer mapping you flagged — while a legacy IVF **file** still 
packs, so the index is never lost. `VectorIndexHandler` re-absorbs the deferred 
HNSW directory later if `storeInSegmentFile` stays on.
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/vector/lucene99/HnswVectorIndexCombined.java:
##########
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.vector.lucene99;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneCombinedTextIndexConstants;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to pack a Lucene HNSW index directory (and its optional docId 
mapping file) into
+ * a single combined file using the {@link 
LuceneCombinedTextIndexConstants#MAGIC_NUMBER LUCENE_V2}
+ * layout. Mirrors {@code LuceneTextIndexCombined} — reuses the same format 
constants to keep a
+ * single on-disk format shared across text and HNSW vector indexes.
+ *
+ * <p>Layout (identical to the text-index LUCENE_V2 format):</p>
+ * <pre>
+ * [Header]
+ *   Magic "LUCENE_V2"   9 bytes
+ *   Version             4 bytes (little-endian int)
+ *   Total buffer size   8 bytes (little-endian long)
+ *   File count          4 bytes (little-endian int)
+ *   Reserved            4 bytes
+ *
+ * [File metadata, one entry per file]
+ *   Name length         2 bytes (little-endian short)
+ *   Name                variable
+ *   File offset         8 bytes (little-endian long)
+ *   File size           8 bytes (little-endian long)
+ *
+ * [File data]
+ *   Raw bytes of each file concatenated in metadata order
+ * </pre>
+ */
+public final class HnswVectorIndexCombined {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HnswVectorIndexCombined.class);
+
+  private HnswVectorIndexCombined() {
+  }
+
+  /**
+   * Packs all files in {@code hnswIndexDir} (plus the optional docId mapping 
file) into a single
+   * combined file at {@code outputFilePath}.
+   *
+   * @param hnswIndexDir     the Lucene HNSW index directory to pack
+   * @param outputFilePath   destination path for the combined file
+   * @param segmentIndexDir  when non-null, the segment's top-level index 
directory; used to
+   *                         locate the docId mapping file for inclusion in 
the packed output
+   * @param column           column name; used to locate the docId mapping 
file when
+   *                         {@code segmentIndexDir} is provided
+   * @throws IOException if any file operations fail
+   */
+  public static void combineHnswIndexFiles(File hnswIndexDir, String 
outputFilePath,
+      @Nullable File segmentIndexDir, @Nullable String column)
+      throws IOException {
+    if (!hnswIndexDir.exists() || !hnswIndexDir.isDirectory()) {
+      throw new IllegalArgumentException(
+          "HNSW index directory does not exist or is not a directory: " + 
hnswIndexDir);
+    }
+
+    LOGGER.info("Combining HNSW index files from directory: {}", 
hnswIndexDir.getAbsolutePath());
+
+    Map<String, FileInfo> fileInfoMap = collectFiles(hnswIndexDir, 
segmentIndexDir, column);
+    int fileCount = fileInfoMap.size();
+
+    if (fileCount == 0) {
+      throw new IOException("No files found in HNSW index directory: " + 
hnswIndexDir);
+    }
+
+    long totalSize = calculateTotalBufferSize(fileInfoMap);
+    if (totalSize > Integer.MAX_VALUE) {
+      throw new IOException("Combined HNSW index size too large: " + totalSize 
+ " bytes");
+    }
+
+    File outputFile = new File(outputFilePath);
+    try (FileChannel outputChannel = FileChannel.open(outputFile.toPath(), 
StandardOpenOption.CREATE,
+        StandardOpenOption.WRITE)) {
+      writeHeader(outputChannel, fileCount, (int) totalSize);

Review Comment:
   Addressed — `combineHnswIndexFiles` opens the output channel with 
`StandardOpenOption.TRUNCATE_EXISTING` so a leftover larger file from a 
previously-crashed pack cannot leave stale trailing bytes that would inflate 
the length and break the size-based crash-recovery check.
   



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/VectorIndexConfig.java:
##########
@@ -139,6 +149,31 @@ public VectorIndexConfig setProperties(Map<String, String> 
properties) {
     return this;
   }
 
+  /**
+   * Whether the IVF vector index payload is consolidated into the segment's 
combined index file
+   * ({@code columns.psf}) rather than written as a combined file. Default 
{@code false}. Only
+   * applies to V3 segments and IVF backends; HNSW is unaffected.
+   *

Review Comment:
   Updated — the `isStoreInSegmentFile()` Javadoc now states the flag applies 
to V3 segments for both IVF and HNSW backends, with the backend-specific 
build/read details in the following paragraphs.
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswVectorIndexBufferReader.java:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.vector;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.lucene.store.Directory;
+import 
org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexBufferReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexHeader;
+import 
org.apache.pinot.segment.local.segment.index.readers.text.PinotBufferLuceneDirectory;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility for reading an HNSW vector index from a combined buffer ({@link 
PinotDataBuffer}).
+ *
+ * <p>The combined buffer uses the same {@code LUCENE_V2} layout as the text 
index (written by
+ * {@code HnswVectorIndexCombined}). This class delegates header and metadata 
parsing to
+ * {@link LuceneTextIndexBufferReader}, then constructs a {@link 
PinotBufferLuceneDirectory} that
+ * Lucene's {@code DirectoryReader} can open without any filesystem access.</p>
+ *
+ * <p>Two entry-points are provided:</p>
+ * <ul>
+ *   <li>{@link #createLuceneDirectory} — a read-only {@link Directory} over 
the Lucene index
+ *       files embedded in the buffer.</li>
+ *   <li>{@link #extractDocIdMappingBuffer} — a sub-buffer view covering just 
the docId mapping
+ *       bytes, or {@code null} if the mapping was not packed.</li>
+ * </ul>
+ */
+public final class HnswVectorIndexBufferReader {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HnswVectorIndexBufferReader.class);
+
+  private HnswVectorIndexBufferReader() {
+  }
+
+  /**
+   * Creates a Lucene {@link Directory} that reads all Lucene index files from 
a combined HNSW
+   * buffer. The docId mapping file (a Pinot-only artifact with the
+   * {@link 
V1Constants.Indexes#VECTOR_HNSW_INDEX_DOCID_MAPPING_FILE_EXTENSION} suffix) is
+   * excluded from the directory view so that {@code DirectoryReader} does not 
see an unknown file.
+   *
+   * <p>The directory does <em>not</em> own the buffer — closing it is a 
no-op. The caller must
+   * keep the buffer alive for as long as the directory (and any reader opened 
on it) is in use.</p>
+   *
+   * @param indexBuffer combined buffer in LUCENE_V2 format
+   * @param column      column name; used to identify and exclude the mapping 
file
+   * @return a read-only {@link Directory} backed by the buffer
+   * @throws IOException if the buffer is malformed or the magic/version check 
fails
+   */
+  public static Directory createLuceneDirectory(PinotDataBuffer indexBuffer, 
String column)
+      throws IOException {
+    List<String> fileNames = 
LuceneTextIndexBufferReader.listFiles(indexBuffer);
+    Map<String, LuceneTextIndexHeader.FileInfo> fileMap = new 
HashMap<>(fileNames.size());
+
+    String mappingFileName = column + 
V1Constants.Indexes.VECTOR_HNSW_INDEX_DOCID_MAPPING_FILE_EXTENSION;
+    for (String name : fileNames) {
+      if (name.equals(mappingFileName)) {
+        // Skip: not a Lucene index file; presenting it to DirectoryReader 
triggers an error.
+        continue;
+      }
+      LuceneTextIndexHeader.FileInfo info = 
LuceneTextIndexBufferReader.getFileInfo(indexBuffer, name);
+      if (info != null) {
+        fileMap.put(name, info);
+      }
+    }
+
+    LOGGER.debug("Creating buffer-backed Lucene directory for HNSW column '{}' 
with {} file(s)", column,
+        fileMap.size());
+    return new PinotBufferLuceneDirectory(indexBuffer, fileMap, column);
+  }
+
+  /**
+   * Extracts a sub-buffer covering the docId mapping bytes packed inside the 
combined buffer.
+   *
+   * <p>Returns {@code null} when the mapping was not included (e.g. the index 
was built from an
+   * in-memory segment that had not yet materialised a mapping file). The 
caller must build the
+   * mapping from the Lucene index in that case.</p>
+   *
+   * <p>The returned sub-buffer is a {@link PinotDataBuffer#view view} of the 
original buffer and
+   * shares its lifetime. The caller must not close it independently.</p>
+   *
+   * @param indexBuffer combined buffer in LUCENE_V2 format
+   * @param column      column name; used to identify the mapping file entry
+   * @return sub-buffer for the mapping bytes, or {@code null} if not present
+   * @throws IOException if header parsing fails
+   */
+  @Nullable
+  public static PinotDataBuffer extractDocIdMappingBuffer(PinotDataBuffer 
indexBuffer, String column)
+      throws IOException {
+    String mappingFileName = column + 
V1Constants.Indexes.VECTOR_HNSW_INDEX_DOCID_MAPPING_FILE_EXTENSION;
+    LuceneTextIndexHeader.FileInfo fileInfo = 
LuceneTextIndexBufferReader.getFileInfo(indexBuffer, mappingFileName);
+    if (fileInfo == null) {
+      return null;
+    }
+    return indexBuffer.view(fileInfo.getOffset(), fileInfo.getOffset() + 
fileInfo.getSize());

Review Comment:
   Good catch — fixed in df94a35. `extractDocIdMappingBuffer` now returns a 
`LITTLE_ENDIAN` view, so `DocIdTranslator.getInt()` reads the Lucene→Pinot doc 
ids unswapped regardless of the big-endian `columns.psf` order the view would 
otherwise inherit. Added a regression test 
(`testExtractDocIdMappingBufferIsLittleEndian`) that packs a known mapping and 
reads it back through a big-endian columns.psf-style buffer, which fails 
without the little-endian view.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to