xiangfu0 commented on code in PR #18229:
URL: https://github.com/apache/pinot/pull/18229#discussion_r3153297374


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -374,28 +377,88 @@ private boolean shouldDisableDictionary(String column, 
ColumnMetadata existingCo
     return true;
   }
 
-  private boolean shouldChangeRawCompressionType(String column, 
SegmentDirectory.Reader segmentReader)
+  private boolean shouldRewriteRawForwardIndex(String column, 
SegmentDirectory.Reader segmentReader)
       throws Exception {
     // The compression type for an existing segment can only be determined by 
reading the forward index header.
     ColumnMetadata existingColMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
     ChunkCompressionType existingCompressionType;
+    String existingCodecSpec;
 
     // Get the forward index reader factory and create a reader
     IndexReaderFactory<ForwardIndexReader> readerFactory = 
StandardIndexes.forward().getReaderFactory();
     try (ForwardIndexReader<?> fwdIndexReader = 
readerFactory.createIndexReader(segmentReader,
         _fieldIndexConfigs.get(column), existingColMetadata)) {
       existingCompressionType = fwdIndexReader.getCompressionType();
-      Preconditions.checkState(existingCompressionType != null,
-          "Existing compressionType cannot be null for raw forward index 
column=" + column);
+      existingCodecSpec = fwdIndexReader.getCodecSpec();
     }
 
-    // Get the new compression type.
-    ChunkCompressionType newCompressionType =
-        
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getChunkCompressionType();
+    ForwardIndexConfig fwdConfig = 
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
+
+    if (existingCompressionType == null) {
+      // V7 codec-pipeline only supports INT and LONG; guard against future 
type expansion reaching this path.
+      DataType existingStoredType = 
existingColMetadata.getDataType().getStoredType();
+      Preconditions.checkState(existingStoredType == DataType.INT || 
existingStoredType == DataType.LONG,
+          "V7 codec-pipeline segment for column=%s has unexpected stored type 
%s; expected INT or LONG",
+          column, existingStoredType);
+      // Codec-pipeline segment (version 7): compare the stored canonical spec 
against the configured codecSpec.
+      // If no codecSpec is configured, a legacy compressionCodec or default 
will apply to new segments only;
+      // existing V7 segments are left as-is (no rewrite needed).
+      String newCodecSpec = fwdConfig.getCodecSpec();
+      if (newCodecSpec == null) {
+        // Config may have reverted from codecSpec back to a legacy 
compressionCodec — schedule a
+        // rewrite back to the legacy format so the segment can be read by 
older servers.
+        // PASS_THROUGH (internally aliased as NONE) is excluded: 
PASS_THROUGH, CLP, CLPV2, and
+        // related codecs all resolve to ChunkCompressionType.PASS_THROUGH and 
do not represent a
+        // genuine rollback to a real compression codec.
+        ChunkCompressionType newCompressionType = 
fwdConfig.getChunkCompressionType();
+        if (newCompressionType != null && newCompressionType != 
ChunkCompressionType.PASS_THROUGH) {

Review Comment:
   Fixed: introduced 
`isLegacyRevertTargetForFixedByteSv(FieldConfig.CompressionCodec)` helper 
(`ForwardIndexHandler.java:479-497`). It returns `true` for `PASS_THROUGH`, 
`SNAPPY`, `ZSTANDARD`, `LZ4`, `GZIP`, `DELTA`, `DELTADELTA` and `false` for the 
CLP family / `MV_ENTRY_DICT` (which never apply to fixed-byte SV anyway). The 
handler now uses `fwdConfig.getCompressionCodec()` (the typed enum) instead of 
`getChunkCompressionType()` so explicit `compressionCodec=PASS_THROUGH` 
triggers a rewrite to the legacy format, enabling rollback to pre-V7 servers. 
The new 
`testComputeOperationChangeCompressionForV7CodecPipelineRollbackToLegacyCompressionCodec`
 test locks this in.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriterV7.java:
##########
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.writer.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Chunk-based raw (non-dictionary-encoded) forward index writer for 
single-value fixed-width
+ * columns (INT, LONG) that uses a {@link CodecPipelineExecutor} for encoding.
+ *
+ * <p>This writer introduces <strong>version 7</strong> of the fixed-byte 
chunk raw forward index
+ * format.  The on-disk layout is:
+ *
+ * <pre>
+ * File header:
+ *   version              (int, value = 7)
+ *   numChunks            (int)
+ *   numDocsPerChunk      (int, normalised to power-of-2)
+ *   sizeOfEntry          (int, bytes per logical value, e.g. 4 for INT)
+ *   totalDocs            (int)
+ *   codecSpecLength      (int, byte length of the UTF-8 encoded canonical 
codec spec)
+ *   dataHeaderStart      (int, byte offset from file start where chunk-offset 
table begins)
+ *   codecSpec            (byte[], UTF-8 encoded canonical spec, length = 
codecSpecLength)
+ *   chunkOffsets         (long[numChunks], absolute byte offset of each 
chunk's per-chunk header)
+ * Data (per chunk):
+ *   compressedSize       (int, byte length of the encoded payload that 
follows)
+ *   uncompressedSize     (int, byte length of the original decoded chunk data)
+ *   payload              (byte[], encoded chunk data, length = compressedSize)
+ * </pre>
+ *
+ * <p>Each chunk contains {@code numDocsPerChunk} values encoded by the 
pipeline.  Chunk offsets
+ * are 8-byte longs to support files larger than 2 GB.  The per-chunk size 
header allows readers
+ * to verify decompression output and to skip/read chunks without scanning 
adjacent offsets.
+ *
+ * <p>This class is <em>not</em> thread-safe.
+ */
+@NotThreadSafe
+public class FixedByteChunkForwardIndexWriterV7 implements Closeable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FixedByteChunkForwardIndexWriterV7.class);
+
+  public static final int VERSION = 
ForwardIndexConfig.CODEC_PIPELINE_WRITER_VERSION;
+
+  /** Bytes written before each chunk payload: compressedSize (int) + 
uncompressedSize (int). */
+  public static final int CHUNK_HEADER_BYTES = 2 * Integer.BYTES;
+
+  // Number of fixed int fields before the codec spec: version, numChunks, 
numDocsPerChunk,
+  // sizeOfEntry, totalDocs, codecSpecLength, dataHeaderStart
+  private static final int FIXED_HEADER_INT_COUNT = 7;
+
+  private final FileChannel _dataFile;
+  private final CodecPipelineExecutor _executor;
+  private final int _numDocsPerChunk;
+  private final int _sizeOfEntry;
+  private final ByteBuffer _header;
+  private final ByteBuffer _chunkBuffer;
+  private final int _numChunks;
+  private final int _totalDocs;
+
+  private long _dataOffset;
+  private int _docsWritten;
+  private int _chunksWritten;
+
+  /**
+   * Creates a new writer.
+   *
+   * @param file            output file
+   * @param executor        pre-validated pipeline executor
+   * @param totalDocs       total number of documents to write
+   * @param numDocsPerChunk target documents per chunk (will be rounded up to 
power-of-2)
+   * @param sizeOfEntry     bytes per value (e.g. 4 for INT, 8 for LONG)
+   */
+  public FixedByteChunkForwardIndexWriterV7(File file, CodecPipelineExecutor 
executor, int totalDocs,
+      int numDocsPerChunk, int sizeOfEntry)
+      throws IOException {
+    _executor = executor;
+    _sizeOfEntry = sizeOfEntry;
+    _totalDocs = totalDocs;
+    _numDocsPerChunk = normalizePower2(numDocsPerChunk);
+    _numChunks = (totalDocs + _numDocsPerChunk - 1) / _numDocsPerChunk;
+    _docsWritten = 0;
+    _chunksWritten = 0;
+
+    byte[] specBytes = 
executor.getCanonicalSpec().getBytes(StandardCharsets.UTF_8);
+
+    // Header layout:
+    //   7 ints of fixed fields
+    //   specBytes.length bytes of codec spec
+    //   numChunks longs of chunk offsets
+    long fixedHeaderBytesLong = (long) FIXED_HEADER_INT_COUNT * Integer.BYTES;
+    long dataHeaderStartLong = fixedHeaderBytesLong + specBytes.length;
+    long chunkOffsetTableBytesLong = (long) _numChunks * Long.BYTES;
+    long totalHeaderBytesLong = dataHeaderStartLong + 
chunkOffsetTableBytesLong;
+    if (totalHeaderBytesLong > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          "Header size " + totalHeaderBytesLong + " bytes exceeds 
Integer.MAX_VALUE. Reduce totalDocs or"
+              + " increase numDocsPerChunk.");
+    }
+    int dataHeaderStart = (int) dataHeaderStartLong;
+    int totalHeaderBytes = (int) totalHeaderBytesLong;
+
+    _header = ByteBuffer.allocateDirect(totalHeaderBytes);
+    _header.putInt(VERSION);
+    _header.putInt(_numChunks);
+    _header.putInt(_numDocsPerChunk);
+    _header.putInt(sizeOfEntry);
+    _header.putInt(totalDocs);
+    _header.putInt(specBytes.length);
+    _header.putInt(dataHeaderStart);
+    _header.put(specBytes);
+    // chunk offsets will be filled in during writeChunk() calls
+
+    _dataOffset = totalHeaderBytes;
+
+    long chunkSizeLong = (long) sizeOfEntry * _numDocsPerChunk;
+    if (chunkSizeLong > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          "Chunk size " + chunkSizeLong + " bytes overflows int. Reduce 
numDocsPerChunk or sizeOfEntry.");
+    }
+    _chunkBuffer = ByteBuffer.allocateDirect((int) chunkSizeLong);
+
+    _dataFile = new RandomAccessFile(file, "rw").getChannel();
+  }
+
+  /** Writes a 4-byte integer value. */
+  public void putInt(int value) {
+    _chunkBuffer.putInt(value);
+    _docsWritten++;
+    flushIfNeeded();
+  }
+
+  /** Writes an 8-byte long value. */
+  public void putLong(long value) {
+    _chunkBuffer.putLong(value);
+    _docsWritten++;
+    flushIfNeeded();
+  }
+
+  private void flushIfNeeded() {
+    if (_chunkBuffer.position() == _numDocsPerChunk * _sizeOfEntry) {
+      writeChunk();
+    }
+  }
+
+  private void writeChunk() {
+    _chunkBuffer.flip();
+    int uncompressedSize = _chunkBuffer.remaining();
+    try {
+      ByteBuffer encoded = _executor.compress(_chunkBuffer);
+      int compressedSize = encoded.remaining();
+
+      // Per-chunk header: compressedSize (int) + uncompressedSize (int)
+      ByteBuffer chunkHeader = ByteBuffer.allocateDirect(CHUNK_HEADER_BYTES);

Review Comment:
   Fixed: `FixedByteChunkForwardIndexWriterV7` now reuses a single `ByteBuffer 
_chunkHeaderBuffer = ByteBuffer.allocateDirect(CHUNK_HEADER_BYTES)` field (line 
87). `writeChunk()` calls `_chunkHeaderBuffer.clear()` then 
`putInt(compressedSize); putInt(uncompressedSize); flip()` for each chunk 
(lines 202-205). No per-chunk `allocateDirect` for the chunk header.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/CodecPipelineExecutor.java:
##########
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.codec;
+
+import com.github.luben.zstd.Zstd;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import net.jpountz.lz4.LZ4CompressorWithLength;
+import net.jpountz.lz4.LZ4DecompressorWithLength;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.codec.CodecContext;
+import org.apache.pinot.segment.spi.codec.CodecDefinition;
+import org.apache.pinot.segment.spi.codec.CodecInvocation;
+import org.apache.pinot.segment.spi.codec.CodecOptions;
+import org.apache.pinot.segment.spi.codec.CodecPipeline;
+import org.apache.pinot.segment.spi.codec.CodecSpecParser;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Executes a parsed and validated {@link CodecPipeline} for a single 
forward-index chunk.
+ *
+ * <p>Write path: values → transforms (in order) → compression → bytes stored 
on disk.
+ * Read path: bytes from disk → decompress → reverse transforms (in reverse 
order) → values.
+ *
+ * <p>The executor is constructed once per column from the canonical {@code 
codecSpec} string
+ * stored in the file header and is thread-safe for concurrent read calls (it 
holds no mutable
+ * per-call state).
+ *
+ * <h3>Buffer contract</h3>
+ * <ul>
+ *   <li>{@link #compress}: {@code src} is ready for read (position=0); 
returns a new
+ *       {@link ByteBuffer} ready for read containing the encoded bytes.</li>
+ *   <li>{@link #decompress}: {@code src} is ready for read; returns a new
+ *       {@link ByteBuffer} ready for read containing the decoded bytes.</li>
+ *   <li>{@link #maxCompressedSize}: returns an upper bound on encoded 
size.</li>
+ * </ul>
+ */
+public final class CodecPipelineExecutor {

Review Comment:
   Fixed: `CodecPipelineExecutor` is fully codec-agnostic. Lines 45-46 of the 
class Javadoc state explicitly: `"The executor is codec-agnostic: it holds an 
ordered list of BoundStage objects"`. Each stage is a `BoundStage<O extends 
CodecOptions>` (line 61) wrapping a generic `ChunkCodecHandler<O>` plus its 
parsed `CodecOptions` and `CodecContext`. The constructor (line 125) iterates 
over `pipeline.stages()`, resolves each invocation through 
`registry.getOrThrow(inv.name())`, and stores the resulting `BoundStage`s. 
There is no codec-specific code in the executor — `compress`/`decompress` 
simply iterate `_stages` and dispatch through the handler interface.



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java:
##########
@@ -2354,6 +2358,191 @@ private void validateMetadataProperties(String column, 
boolean hasDictionary, in
     assertEquals(columnMetadata.getMaxValue(), maxValue);
   }
 
+  /**
+   * Rollback test: when a V7 codec-pipeline forward index exists on disk but 
the table config
+   * is reverted to a legacy {@code compressionCodec} (no {@code codecSpec}),
+   * {@code shouldChangeRawCompressionType()} must schedule
+   * {@link ForwardIndexHandler.Operation#CHANGE_INDEX_COMPRESSION_TYPE} so 
the segment is
+   * rewritten back to the legacy format — enabling rollback to pre-V7 servers.
+   */
+  @Test
+  public void testComputeOperationNoChangeCompressionForV7CodecPipelineColumn()

Review Comment:
   Fixed: renamed to 
`testComputeOperationChangeCompressionForV7CodecPipelineRollbackToLegacyCompressionCodec`
 to reflect that this test verifies a CHANGE_INDEX_COMPRESSION_TYPE operation 
is scheduled (the rewrite *does* happen) when reverting from V7 codecSpec to a 
legacy compressionCodec.



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/codec/CodecSpecParser.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.codec;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Phase-1 (structural) parser for the Codec DSL.
+ *
+ * <p>Accepts the following grammar:
+ * <pre>
+ *   spec        ::= pipeline | invocation
+ *   pipeline    ::= "CODEC" "(" invocation ("," invocation)* ")"
+ *   invocation  ::= NAME
+ *                 | NAME "(" args ")"
+ *   args        ::= ε | arg ("," arg)*
+ *   arg         ::= [+-]? [0-9]+    // numeric literals; leading sign 
accepted but not used in v1
+ *   NAME        ::= [A-Za-z_][A-Za-z0-9_]*
+ * </pre>
+ *
+ * <p>Examples:
+ * <ul>
+ *   <li>{@code DELTA} → single-stage pipeline with no args</li>
+ *   <li>{@code ZSTD(3)} → single-stage pipeline, arg "3"</li>
+ *   <li>{@code CODEC(DELTA,ZSTD(8))} → two-stage pipeline</li>
+ * </ul>
+ *
+ * <p>This class performs only structural parsing; semantic validation (valid 
codec names,
+ * argument ranges, type compatibility) is done in a second phase by the 
registry and
+ * pipeline validator.
+ *
+ * <p>This class is stateless and thread-safe.
+ */
+public final class CodecSpecParser {
+
+  /**
+   * The keyword used for multi-stage pipeline notation (e.g. {@code 
CODEC(DELTA,ZSTD(3))}).
+   *
+   * <p>This name is permanently reserved — it cannot be registered as a codec 
name.
+   * See {@code CodecRegistry.register}.
+   */
+  public static final String PIPELINE_KEYWORD = "CODEC";
+
+  private CodecSpecParser() {
+  }
+
+  /**
+   * Parses {@code spec} into a {@link CodecPipeline} AST.
+   *
+   * @param spec the codec DSL string; must not be {@code null} or blank
+   * @return the parsed pipeline
+   * @throws IllegalArgumentException if the spec is syntactically invalid
+   */
+  public static CodecPipeline parse(String spec) {
+    if (spec == null || spec.isBlank()) {
+      throw new IllegalArgumentException("codecSpec must not be null or 
blank");
+    }
+    String trimmed = spec.trim();
+    Parser p = new Parser(trimmed);
+    CodecPipeline pipeline = p.parsePipelineOrInvocation();
+    p.expectEof();
+    return pipeline;
+  }
+
+  // -------------------------------------------------------------------------
+  // Internal recursive-descent parser
+  // -------------------------------------------------------------------------
+
+  private static final class Parser {
+    private final String _input;
+    private int _pos;
+
+    Parser(String input) {
+      _input = input;
+      _pos = 0;
+    }
+
+    CodecPipeline parsePipelineOrInvocation() {
+      String name = parseName();
+      if (PIPELINE_KEYWORD.equalsIgnoreCase(name) && peek() == '(') {
+        // Multi-stage pipeline: CODEC(stage,stage,...)
+        consume('(');
+        if (peek() == ')') {
+          throw new IllegalArgumentException("CODEC pipeline must have at 
least one stage in: " + _input);
+        }
+        List<CodecInvocation> stages = new ArrayList<>();
+        stages.add(parseInvocation());
+        while (peek() == ',') {
+          consume(',');
+          stages.add(parseInvocation());
+        }
+        consume(')');
+        return new CodecPipeline(stages);
+      }
+      // Single invocation; name already consumed, check for optional args
+      List<String> args = Collections.emptyList();
+      if (peek() == '(') {
+        consume('(');
+        args = parseArgs();
+        consume(')');
+      }
+      return new CodecPipeline(Collections.singletonList(new 
CodecInvocation(name.toUpperCase(), args)));
+    }
+
+    CodecInvocation parseInvocation() {
+      String name = parseName().toUpperCase();
+      List<String> args = Collections.emptyList();
+      if (peek() == '(') {
+        consume('(');
+        args = parseArgs();
+        consume(')');
+      }
+      return new CodecInvocation(name, args);
+    }
+
+    /** Parse NAME = [A-Za-z_][A-Za-z0-9_]* */
+    String parseName() {
+      skipWhitespace();
+      if (_pos >= _input.length()) {
+        throw new IllegalArgumentException("Expected codec name but reached 
end of input: " + _input);
+      }
+      char first = _input.charAt(_pos);
+      if (!Character.isLetter(first) && first != '_') {
+        throw new IllegalArgumentException(
+            "Expected codec name starting with letter at position " + _pos + " 
in: " + _input);
+      }

Review Comment:
   Fixed: `CodecSpecParser.parseName()` (lines 145-148) now reports `"Expected 
codec name starting with [A-Za-z_] at position N in: ..."` — the message 
exactly matches the accepted ASCII identifier grammar. The implementation also 
tightened to ASCII-only (`isAsciiIdentifierStart` / `isAsciiIdentifierPart` 
helpers, lines 156-162) so the grammar is locale-stable.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CodecPipelineIntegrationTest.java:
##########
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test for the codec pipeline forward index (version 7).
+ * Writes an offline table with INT and LONG raw columns encoded via the 
CODEC(DELTA,ZSTD(3)) pipeline,

Review Comment:
   Fixed: added `STR_RAW_COL` (`strRawZstd`) — a STRING column with 
`EncodingType.RAW` + legacy `compressionCodec=ZSTANDARD` — to 
`CodecPipelineIntegrationTest`. New test 
`testStringColumnWithRawLegacyCompression` queries this column alongside the 
codec-pipeline INT/LONG columns, verifying the legacy raw-string path coexists 
with V7 codec-pipeline columns in the same segment. Spot-checks individual 
values (`SELECT strRawZstd WHERE ts=N`) and asserts `COUNT(DISTINCT)` = 
NUM_DOCS.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/CodecRegistry.java:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.codec;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
+import org.apache.pinot.segment.spi.codec.CodecSpecParser;
+
+
+/**
+ * Registry of known {@link ChunkCodecHandler} instances, looked up by
+ * {@link ChunkCodecHandler#name()}.
+ *
+ * <p>The singleton {@link #DEFAULT} instance is pre-populated with all 
built-in codecs and is
+ * immutable after class-loading; calling {@link #register} on it will throw.
+ *
+ * <p>Custom registries (e.g. in tests) should be created with {@link 
#CodecRegistry()} and
+ * populated via {@link #register} before use.  The custom registry instance 
itself is not
+ * thread-safe for concurrent writes; callers must complete all registrations 
before sharing
+ * the registry across threads.
+ *
+ * <p>Lookup is case-insensitive.
+ */
+public final class CodecRegistry {
+
+  /**
+   * Default, immutable registry containing all built-in codecs.
+   * Safe for concurrent reads; does not allow {@link #register}.
+   */
+  public static final CodecRegistry DEFAULT;
+
+  static {
+    Map<String, ChunkCodecHandler<?>> m = new LinkedHashMap<>();
+    m.put(DeltaCodecDefinition.INSTANCE.name().toUpperCase(), 
DeltaCodecDefinition.INSTANCE);
+    m.put(DeltaDeltaCodecDefinition.INSTANCE.name().toUpperCase(), 
DeltaDeltaCodecDefinition.INSTANCE);
+    m.put(ZstdCodecDefinition.INSTANCE.name().toUpperCase(), 
ZstdCodecDefinition.INSTANCE);
+    m.put(Lz4CodecDefinition.INSTANCE.name().toUpperCase(), 
Lz4CodecDefinition.INSTANCE);
+    m.put(SnappyCodecDefinition.INSTANCE.name().toUpperCase(), 
SnappyCodecDefinition.INSTANCE);
+    m.put(GzipCodecDefinition.INSTANCE.name().toUpperCase(), 
GzipCodecDefinition.INSTANCE);
+    DEFAULT = new CodecRegistry(Collections.unmodifiableMap(m));
+  }

Review Comment:
   Fixed: PR description has been updated to reflect all 6 built-in codecs 
registered in `CodecRegistry.DEFAULT`: `DELTA`, `DELTADELTA`, `LZ4`, `ZSTD`, 
`SNAPPY`, `GZIP`. The new "Built-in codecs" table in the PR description and the 
codec catalog in `docs/design/codec-pipeline-v7.md` both list all six.



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/CodecPipelineForwardIndexTest.java:
##########
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.forward;
+
+import java.io.File;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.local.io.codec.CodecRegistry;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriterV7;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReaderV7;
+import org.apache.pinot.segment.spi.codec.CodecContext;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Roundtrip encode/decode tests for the codec-pipeline forward index (version 
7).
+ *
+ * <p>Covers:
+ * <ul>
+ *   <li>INT and LONG single-value columns</li>
+ *   <li>DELTA only, ZSTD only, CODEC(DELTA,ZSTD) pipelines</li>
+ *   <li>Version check on the written file</li>
+ *   <li>Canonical spec round-trips correctly through the header</li>
+ * </ul>
+ */
+public class CodecPipelineForwardIndexTest implements 
PinotBuffersAfterMethodCheckRule {
+
+  private static final int NUM_VALUES = 10_007;
+  private static final int DOCS_PER_CHUNK = 1024;
+  private static final String TEST_FILE_PREFIX =
+      System.getProperty("java.io.tmpdir") + File.separator + 
"CodecPipelineFwdTest";
+  // Fixed seed for deterministic test data — this test verifies correctness, 
not random coverage
+  private static final Random RANDOM = new Random(42L);
+

Review Comment:
   Fixed: `CodecPipelineForwardIndexTest` no longer uses a shared static 
`Random`. There is now `private static final long RANDOM_SEED = 42L` (line 61) 
and each test method constructs `Random random = new Random(RANDOM_SEED)` 
locally (e.g. lines 105 and 157). Each test invocation is self-contained and 
deterministic regardless of TestNG execution order or parallelism.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReaderV7.java:
##########
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.local.io.codec.CodecRegistry;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriterV7;
+import org.apache.pinot.segment.spi.codec.CodecContext;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Chunk-based single-value raw forward index reader for version-7 files 
written by
+ * {@link FixedByteChunkForwardIndexWriterV7}.
+ *
+ * <p>Reads the canonical codec spec from the file header, instantiates a
+ * {@link CodecPipelineExecutor}, and uses it to decode each chunk on demand.
+ *
+ * <p>Supported data types: INT, LONG.
+ */
+public final class FixedByteChunkSVForwardIndexReaderV7 implements 
ForwardIndexReader<ChunkReaderContext> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FixedByteChunkSVForwardIndexReaderV7.class);
+
+  public static final int VERSION = 
ForwardIndexConfig.CODEC_PIPELINE_WRITER_VERSION;
+
+  private final PinotDataBuffer _dataBuffer;
+  private final DataType _storedType;
+  private final int _numChunks;
+  private final int _numDocsPerChunk;
+  private final int _shift; // log2(numDocsPerChunk) for fast chunk id calc
+  private final int _totalDocs;
+  private final int _dataHeaderStart;
+  private final CodecPipelineExecutor _executor;
+  private final String _canonicalSpec;
+
+  public FixedByteChunkSVForwardIndexReaderV7(PinotDataBuffer dataBuffer, 
DataType storedType) {
+    _dataBuffer = dataBuffer;
+    _storedType = storedType;
+
+    if (storedType != DataType.INT && storedType != DataType.LONG) {
+      throw new IllegalArgumentException(
+          "FixedByteChunkSVForwardIndexReaderV7 only supports INT and LONG, 
got: " + storedType);
+    }
+
+    int offset = 0;
+    int version = dataBuffer.getInt(offset);
+    if (version != VERSION) {
+      throw new IllegalArgumentException("Expected version " + VERSION + " but 
got " + version);
+    }
+    offset += Integer.BYTES;
+
+    _numChunks = dataBuffer.getInt(offset);
+    offset += Integer.BYTES;
+
+    _numDocsPerChunk = dataBuffer.getInt(offset);
+    offset += Integer.BYTES;
+    if (_numDocsPerChunk <= 0 || (_numDocsPerChunk & (_numDocsPerChunk - 1)) 
!= 0) {
+      throw new IllegalArgumentException(
+          "Invalid numDocsPerChunk in forward index header: " + 
_numDocsPerChunk
+              + ". Expected a positive power of two.");
+    }
+    _shift = Integer.numberOfTrailingZeros(_numDocsPerChunk);
+

Review Comment:
   Fixed: `FixedByteChunkSVForwardIndexReaderV7` constructor now validates 
`numChunks == ceil(totalDocs / numDocsPerChunk)` (lines 109-115). Mismatched 
headers throw `IllegalArgumentException("Inconsistent header: numChunks=N but 
totalDocs=T / numDocsPerChunk=D => expected E. Segment may be corrupt.")`. 
Combined with the existing chunk-offset table bounds check and per-chunk-offset 
monotonicity validation (lines 130-145), `getChunkOffset()` cannot read past 
the chunk-offset table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to