Copilot commented on code in PR #18229:
URL: https://github.com/apache/pinot/pull/18229#discussion_r3108140393


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/CodecRegistry.java:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.codec;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
+import org.apache.pinot.segment.spi.codec.CodecSpecParser;
+
+
+/**
+ * Registry of known {@link ChunkCodecHandler} instances, looked up by
+ * {@link ChunkCodecHandler#name()}.
+ *
+ * <p>The singleton {@link #DEFAULT} instance is pre-populated with all 
built-in codecs and is
+ * immutable after class-loading; calling {@link #register} on it will throw.
+ *
+ * <p>Custom registries (e.g. in tests) should be created with {@link 
#CodecRegistry()} and
+ * populated via {@link #register} before use.  The custom registry instance 
itself is not
+ * thread-safe for concurrent writes; callers must complete all registrations 
before sharing
+ * the registry across threads.
+ *
+ * <p>Lookup is case-insensitive.
+ */
+public final class CodecRegistry {
+
+  /**
+   * Default, immutable registry containing all built-in codecs.
+   * Safe for concurrent reads; does not allow {@link #register}.
+   */
+  public static final CodecRegistry DEFAULT;
+
+  static {
+    Map<String, ChunkCodecHandler<?>> m = new LinkedHashMap<>();
+    m.put(DeltaCodecDefinition.INSTANCE.name().toUpperCase(), 
DeltaCodecDefinition.INSTANCE);
+    m.put(DeltaDeltaCodecDefinition.INSTANCE.name().toUpperCase(), 
DeltaDeltaCodecDefinition.INSTANCE);
+    m.put(ZstdCodecDefinition.INSTANCE.name().toUpperCase(), 
ZstdCodecDefinition.INSTANCE);
+    m.put(Lz4CodecDefinition.INSTANCE.name().toUpperCase(), 
Lz4CodecDefinition.INSTANCE);
+    m.put(SnappyCodecDefinition.INSTANCE.name().toUpperCase(), 
SnappyCodecDefinition.INSTANCE);
+    m.put(GzipCodecDefinition.INSTANCE.name().toUpperCase(), 
GzipCodecDefinition.INSTANCE);
+    DEFAULT = new CodecRegistry(Collections.unmodifiableMap(m));
+  }

Review Comment:
   The PR description says the built-in codec registry contains DELTA/ZSTD/LZ4, 
but the default registry here also registers DELTADELTA, SNAPPY, and GZIP. 
Please either update the PR description to match the actual shipped built-ins, 
or drop these codecs from DEFAULT if they are not intended to be part of the 
initial contract.



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/CodecPipelineForwardIndexTest.java:
##########
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.forward;
+
+import java.io.File;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.local.io.codec.CodecRegistry;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriterV7;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReaderV7;
+import org.apache.pinot.segment.spi.codec.CodecContext;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Roundtrip encode/decode tests for the codec-pipeline forward index (version 
7).
+ *
+ * <p>Covers:
+ * <ul>
+ *   <li>INT and LONG single-value columns</li>
+ *   <li>DELTA only, ZSTD only, CODEC(DELTA,ZSTD) pipelines</li>
+ *   <li>Version check on the written file</li>
+ *   <li>Canonical spec round-trips correctly through the header</li>
+ * </ul>
+ */
+public class CodecPipelineForwardIndexTest implements 
PinotBuffersAfterMethodCheckRule {
+
+  private static final int NUM_VALUES = 10_007;
+  private static final int DOCS_PER_CHUNK = 1024;
+  private static final String TEST_FILE_PREFIX =
+      System.getProperty("java.io.tmpdir") + File.separator + 
"CodecPipelineFwdTest";
+  // Fixed seed for deterministic test data — this test verifies correctness, 
not random coverage
+  private static final Random RANDOM = new Random(42L);
+

Review Comment:
   The test uses a single static shared Random instance. Because it is stateful 
(and not thread-safe), the generated expected values depend on test execution 
order and can become flaky under parallel TestNG execution. Use a 
per-test/per-invocation Random seeded deterministically (e.g., derive a seed 
from spec+type) or precompute deterministic inputs without shared mutable RNG 
state.
   ```suggestion
     // Base seed for deterministic per-invocation test data generation.
     private static final long RANDOM_SEED = 42L;
   
     private static Random createRandom(String spec, DataType dataType) {
       long seed = RANDOM_SEED;
       seed = 31 * seed + spec.hashCode();
       seed = 31 * seed + dataType.hashCode();
       return new Random(seed);
     }
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriterV7.java:
##########
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.writer.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Chunk-based raw (non-dictionary-encoded) forward index writer for 
single-value fixed-width
+ * columns (INT, LONG) that uses a {@link CodecPipelineExecutor} for encoding.
+ *
+ * <p>This writer introduces <strong>version 7</strong> of the fixed-byte 
chunk raw forward index
+ * format.  The on-disk layout is:
+ *
+ * <pre>
+ * File header:
+ *   version              (int, value = 7)
+ *   numChunks            (int)
+ *   numDocsPerChunk      (int, normalised to power-of-2)
+ *   sizeOfEntry          (int, bytes per logical value, e.g. 4 for INT)
+ *   totalDocs            (int)
+ *   codecSpecLength      (int, byte length of the UTF-8 encoded canonical 
codec spec)
+ *   dataHeaderStart      (int, byte offset from file start where chunk-offset 
table begins)
+ *   codecSpec            (byte[], UTF-8 encoded canonical spec, length = 
codecSpecLength)
+ *   chunkOffsets         (long[numChunks], absolute byte offset of each 
chunk's per-chunk header)
+ * Data (per chunk):
+ *   compressedSize       (int, byte length of the encoded payload that 
follows)
+ *   uncompressedSize     (int, byte length of the original decoded chunk data)
+ *   payload              (byte[], encoded chunk data, length = compressedSize)
+ * </pre>
+ *
+ * <p>Each chunk contains {@code numDocsPerChunk} values encoded by the 
pipeline.  Chunk offsets
+ * are 8-byte longs to support files larger than 2 GB.  The per-chunk size 
header allows readers
+ * to verify decompression output and to skip/read chunks without scanning 
adjacent offsets.
+ *
+ * <p>This class is <em>not</em> thread-safe.
+ */
+@NotThreadSafe
+public class FixedByteChunkForwardIndexWriterV7 implements Closeable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FixedByteChunkForwardIndexWriterV7.class);
+
+  public static final int VERSION = 
ForwardIndexConfig.CODEC_PIPELINE_WRITER_VERSION;
+
+  /** Bytes written before each chunk payload: compressedSize (int) + 
uncompressedSize (int). */
+  public static final int CHUNK_HEADER_BYTES = 2 * Integer.BYTES;
+
+  // Number of fixed int fields before the codec spec: version, numChunks, 
numDocsPerChunk,
+  // sizeOfEntry, totalDocs, codecSpecLength, dataHeaderStart
+  private static final int FIXED_HEADER_INT_COUNT = 7;
+
+  private final FileChannel _dataFile;
+  private final CodecPipelineExecutor _executor;
+  private final int _numDocsPerChunk;
+  private final int _sizeOfEntry;
+  private final ByteBuffer _header;
+  private final ByteBuffer _chunkBuffer;
+  private final int _numChunks;
+  private final int _totalDocs;
+
+  private long _dataOffset;
+  private int _docsWritten;
+  private int _chunksWritten;
+
+  /**
+   * Creates a new writer.
+   *
+   * @param file            output file
+   * @param executor        pre-validated pipeline executor
+   * @param totalDocs       total number of documents to write
+   * @param numDocsPerChunk target documents per chunk (will be rounded up to 
power-of-2)
+   * @param sizeOfEntry     bytes per value (e.g. 4 for INT, 8 for LONG)
+   */
+  public FixedByteChunkForwardIndexWriterV7(File file, CodecPipelineExecutor 
executor, int totalDocs,
+      int numDocsPerChunk, int sizeOfEntry)
+      throws IOException {
+    _executor = executor;
+    _sizeOfEntry = sizeOfEntry;
+    _totalDocs = totalDocs;
+    _numDocsPerChunk = normalizePower2(numDocsPerChunk);
+    _numChunks = (totalDocs + _numDocsPerChunk - 1) / _numDocsPerChunk;
+    _docsWritten = 0;
+    _chunksWritten = 0;
+
+    byte[] specBytes = 
executor.getCanonicalSpec().getBytes(StandardCharsets.UTF_8);
+
+    // Header layout:
+    //   7 ints of fixed fields
+    //   specBytes.length bytes of codec spec
+    //   numChunks longs of chunk offsets
+    long fixedHeaderBytesLong = (long) FIXED_HEADER_INT_COUNT * Integer.BYTES;
+    long dataHeaderStartLong = fixedHeaderBytesLong + specBytes.length;
+    long chunkOffsetTableBytesLong = (long) _numChunks * Long.BYTES;
+    long totalHeaderBytesLong = dataHeaderStartLong + 
chunkOffsetTableBytesLong;
+    if (totalHeaderBytesLong > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          "Header size " + totalHeaderBytesLong + " bytes exceeds 
Integer.MAX_VALUE. Reduce totalDocs or"
+              + " increase numDocsPerChunk.");
+    }
+    int dataHeaderStart = (int) dataHeaderStartLong;
+    int totalHeaderBytes = (int) totalHeaderBytesLong;
+
+    _header = ByteBuffer.allocateDirect(totalHeaderBytes);
+    _header.putInt(VERSION);
+    _header.putInt(_numChunks);
+    _header.putInt(_numDocsPerChunk);
+    _header.putInt(sizeOfEntry);
+    _header.putInt(totalDocs);
+    _header.putInt(specBytes.length);
+    _header.putInt(dataHeaderStart);
+    _header.put(specBytes);
+    // chunk offsets will be filled in during writeChunk() calls
+
+    _dataOffset = totalHeaderBytes;
+
+    long chunkSizeLong = (long) sizeOfEntry * _numDocsPerChunk;
+    if (chunkSizeLong > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          "Chunk size " + chunkSizeLong + " bytes overflows int. Reduce 
numDocsPerChunk or sizeOfEntry.");
+    }
+    _chunkBuffer = ByteBuffer.allocateDirect((int) chunkSizeLong);
+
+    _dataFile = new RandomAccessFile(file, "rw").getChannel();
+  }
+
+  /** Writes a 4-byte integer value. */
+  public void putInt(int value) {
+    _chunkBuffer.putInt(value);
+    _docsWritten++;
+    flushIfNeeded();
+  }
+
+  /** Writes an 8-byte long value. */
+  public void putLong(long value) {
+    _chunkBuffer.putLong(value);
+    _docsWritten++;
+    flushIfNeeded();
+  }
+
+  private void flushIfNeeded() {
+    if (_chunkBuffer.position() == _numDocsPerChunk * _sizeOfEntry) {
+      writeChunk();
+    }
+  }
+
+  private void writeChunk() {
+    _chunkBuffer.flip();
+    int uncompressedSize = _chunkBuffer.remaining();
+    try {
+      ByteBuffer encoded = _executor.compress(_chunkBuffer);
+      int compressedSize = encoded.remaining();
+
+      // Per-chunk header: compressedSize (int) + uncompressedSize (int)
+      ByteBuffer chunkHeader = ByteBuffer.allocateDirect(CHUNK_HEADER_BYTES);

Review Comment:
   writeChunk() allocates a new direct ByteBuffer for the per-chunk header on 
every chunk. For large segments this can create substantial direct-memory 
allocation/cleanup overhead. Consider reusing a single (heap or direct) 8-byte 
buffer for the chunk header, or writing the two ints via a reusable ByteBuffer 
field.
   ```suggestion
         ByteBuffer chunkHeader = ByteBuffer.allocate(CHUNK_HEADER_BYTES);
   ```



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/codec/CodecSpecParser.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.codec;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Phase-1 (structural) parser for the Codec DSL.
+ *
+ * <p>Accepts the following grammar:
+ * <pre>
+ *   spec        ::= pipeline | invocation
+ *   pipeline    ::= "CODEC" "(" invocation ("," invocation)* ")"
+ *   invocation  ::= NAME
+ *                 | NAME "(" args ")"
+ *   args        ::= ε | arg ("," arg)*
+ *   arg         ::= [+-]? [0-9]+    // numeric literals; leading sign 
accepted but not used in v1
+ *   NAME        ::= [A-Za-z_][A-Za-z0-9_]*
+ * </pre>
+ *
+ * <p>Examples:
+ * <ul>
+ *   <li>{@code DELTA} → single-stage pipeline with no args</li>
+ *   <li>{@code ZSTD(3)} → single-stage pipeline, arg "3"</li>
+ *   <li>{@code CODEC(DELTA,ZSTD(8))} → two-stage pipeline</li>
+ * </ul>
+ *
+ * <p>This class performs only structural parsing; semantic validation (valid 
codec names,
+ * argument ranges, type compatibility) is done in a second phase by the 
registry and
+ * pipeline validator.
+ *
+ * <p>This class is stateless and thread-safe.
+ */
+public final class CodecSpecParser {
+
+  /**
+   * The keyword used for multi-stage pipeline notation (e.g. {@code 
CODEC(DELTA,ZSTD(3))}).
+   *
+   * <p>This name is permanently reserved — it cannot be registered as a codec 
name.
+   * See {@code CodecRegistry.register}.
+   */
+  public static final String PIPELINE_KEYWORD = "CODEC";
+
+  private CodecSpecParser() {
+  }
+
+  /**
+   * Parses {@code spec} into a {@link CodecPipeline} AST.
+   *
+   * @param spec the codec DSL string; must not be {@code null} or blank
+   * @return the parsed pipeline
+   * @throws IllegalArgumentException if the spec is syntactically invalid
+   */
+  public static CodecPipeline parse(String spec) {
+    if (spec == null || spec.isBlank()) {
+      throw new IllegalArgumentException("codecSpec must not be null or 
blank");
+    }
+    String trimmed = spec.trim();
+    Parser p = new Parser(trimmed);
+    CodecPipeline pipeline = p.parsePipelineOrInvocation();
+    p.expectEof();
+    return pipeline;
+  }
+
+  // -------------------------------------------------------------------------
+  // Internal recursive-descent parser
+  // -------------------------------------------------------------------------
+
+  private static final class Parser {
+    private final String _input;
+    private int _pos;
+
+    Parser(String input) {
+      _input = input;
+      _pos = 0;
+    }
+
+    CodecPipeline parsePipelineOrInvocation() {
+      String name = parseName();
+      if (PIPELINE_KEYWORD.equalsIgnoreCase(name) && peek() == '(') {
+        // Multi-stage pipeline: CODEC(stage,stage,...)
+        consume('(');
+        if (peek() == ')') {
+          throw new IllegalArgumentException("CODEC pipeline must have at 
least one stage in: " + _input);
+        }
+        List<CodecInvocation> stages = new ArrayList<>();
+        stages.add(parseInvocation());
+        while (peek() == ',') {
+          consume(',');
+          stages.add(parseInvocation());
+        }
+        consume(')');
+        return new CodecPipeline(stages);
+      }
+      // Single invocation; name already consumed, check for optional args
+      List<String> args = Collections.emptyList();
+      if (peek() == '(') {
+        consume('(');
+        args = parseArgs();
+        consume(')');
+      }
+      return new CodecPipeline(Collections.singletonList(new 
CodecInvocation(name.toUpperCase(), args)));
+    }
+
+    CodecInvocation parseInvocation() {
+      String name = parseName().toUpperCase();
+      List<String> args = Collections.emptyList();
+      if (peek() == '(') {
+        consume('(');
+        args = parseArgs();
+        consume(')');
+      }
+      return new CodecInvocation(name, args);
+    }
+
+    /** Parse NAME = [A-Za-z_][A-Za-z0-9_]* */
+    String parseName() {
+      skipWhitespace();
+      if (_pos >= _input.length()) {
+        throw new IllegalArgumentException("Expected codec name but reached 
end of input: " + _input);
+      }
+      char first = _input.charAt(_pos);
+      if (!Character.isLetter(first) && first != '_') {
+        throw new IllegalArgumentException(
+            "Expected codec name starting with letter at position " + _pos + " 
in: " + _input);
+      }

Review Comment:
   In CodecSpecParser.parseName(), the error message says the name must start 
with a letter, but the implementation also allows '_' as the first character. 
This makes diagnostics misleading for specs like "_FOO". Please update the 
exception message to match the accepted grammar (letter or underscore).



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java:
##########
@@ -2354,6 +2358,191 @@ private void validateMetadataProperties(String column, 
boolean hasDictionary, in
     assertEquals(columnMetadata.getMaxValue(), maxValue);
   }
 
+  /**
+   * Rollback test: when a V7 codec-pipeline forward index exists on disk but 
the table config
+   * is reverted to a legacy {@code compressionCodec} (no {@code codecSpec}),
+   * {@code shouldChangeRawCompressionType()} must schedule
+   * {@link ForwardIndexHandler.Operation#CHANGE_INDEX_COMPRESSION_TYPE} so 
the segment is
+   * rewritten back to the legacy format — enabling rollback to pre-V7 servers.
+   */
+  @Test
+  public void testComputeOperationNoChangeCompressionForV7CodecPipelineColumn()

Review Comment:
   This test method name is misleading: the Javadoc and assertions verify that 
a rewrite *is* scheduled for a V7 codec-pipeline segment when the config is 
reverted to legacy compressionCodec, but the name says "NoChangeCompression". 
Please rename the test to reflect the rollback/rewrite behavior being asserted.
   ```suggestion
     public void 
testComputeOperationChangeCompressionForV7CodecPipelineRollbackToLegacyCompressionCodec()
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReaderV7.java:
##########
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.local.io.codec.CodecRegistry;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriterV7;
+import org.apache.pinot.segment.spi.codec.CodecContext;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Chunk-based single-value raw forward index reader for version-7 files 
written by
+ * {@link FixedByteChunkForwardIndexWriterV7}.
+ *
+ * <p>Reads the canonical codec spec from the file header, instantiates a
+ * {@link CodecPipelineExecutor}, and uses it to decode each chunk on demand.
+ *
+ * <p>Supported data types: INT, LONG.
+ */
+public final class FixedByteChunkSVForwardIndexReaderV7 implements 
ForwardIndexReader<ChunkReaderContext> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FixedByteChunkSVForwardIndexReaderV7.class);
+
+  public static final int VERSION = 
ForwardIndexConfig.CODEC_PIPELINE_WRITER_VERSION;
+
+  private final PinotDataBuffer _dataBuffer;
+  private final DataType _storedType;
+  private final int _numChunks;
+  private final int _numDocsPerChunk;
+  private final int _shift; // log2(numDocsPerChunk) for fast chunk id calc
+  private final int _totalDocs;
+  private final int _dataHeaderStart;
+  private final CodecPipelineExecutor _executor;
+  private final String _canonicalSpec;
+
+  public FixedByteChunkSVForwardIndexReaderV7(PinotDataBuffer dataBuffer, 
DataType storedType) {
+    _dataBuffer = dataBuffer;
+    _storedType = storedType;
+
+    if (storedType != DataType.INT && storedType != DataType.LONG) {
+      throw new IllegalArgumentException(
+          "FixedByteChunkSVForwardIndexReaderV7 only supports INT and LONG, 
got: " + storedType);
+    }
+
+    int offset = 0;
+    int version = dataBuffer.getInt(offset);
+    if (version != VERSION) {
+      throw new IllegalArgumentException("Expected version " + VERSION + " but 
got " + version);
+    }
+    offset += Integer.BYTES;
+
+    _numChunks = dataBuffer.getInt(offset);
+    offset += Integer.BYTES;
+
+    _numDocsPerChunk = dataBuffer.getInt(offset);
+    offset += Integer.BYTES;
+    if (_numDocsPerChunk <= 0 || (_numDocsPerChunk & (_numDocsPerChunk - 1)) 
!= 0) {
+      throw new IllegalArgumentException(
+          "Invalid numDocsPerChunk in forward index header: " + 
_numDocsPerChunk
+              + ". Expected a positive power of two.");
+    }
+    _shift = Integer.numberOfTrailingZeros(_numDocsPerChunk);
+

Review Comment:
   FixedByteChunkSVForwardIndexReaderV7 reads `numChunks` from the header but 
never uses it to validate header consistency. If `totalDocs`, 
`numDocsPerChunk`, and `numChunks` disagree (corrupt or mismatched file), 
`getChunkOffset()` can read past the chunk-offset table. Consider validating 
`numChunks == ceil(totalDocs/numDocsPerChunk)` during construction and using 
`numChunks` to bounds-check `chunkId` before reading offsets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to