Copilot commented on code in PR #18229:
URL: https://github.com/apache/pinot/pull/18229#discussion_r3217641119


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -2209,4 +2218,52 @@ private static void 
validateGorillaCompressionCodecIfPresent(FieldConfig fieldCo
         // no-op for other codecs
     }
   }
+
+  // Validates FieldConfig.codecSpec when present:
+  // - Syntax must be parseable by the Codec DSL parser.
+  // - Encoding type must be RAW (codec pipeline does not support dictionary 
encoding).
+  // - All codec stages must be registered and structurally valid (semantic 
check).
+  // - Transform stages are restricted to single-value INT or LONG columns.
+  private static void validateCodecSpecIfPresent(FieldConfig fieldConfig, 
FieldSpec fieldSpec) {
+    String codecSpec = fieldConfig.getCodecSpec();
+    if (codecSpec == null) {
+      return;
+    }
+    // Parse to catch syntax errors early at table-creation time
+    CodecPipeline pipeline;
+    try {
+      pipeline = CodecSpecParser.parse(codecSpec);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalStateException(
+          "Invalid codecSpec for column '" + fieldConfig.getName() + "': " + 
e.getMessage(), e);
+    }
+    DataType storedType = fieldSpec.getDataType().getStoredType();
+    // Must be RAW-encoded
+    Preconditions.checkState(fieldConfig.getEncodingType() == 
FieldConfig.EncodingType.RAW,
+        "codecSpec requires RAW encoding type for column: %s", 
fieldConfig.getName());

Review Comment:
   This error message does not match the assertion in `TableConfigUtilsTest` 
(which expects `codecSpec requires RAW forward-index encoding for column: 
intCol`) and also diverges from the message used in 
`ForwardIndexType.validateCodecSpec(...)`. As written, the test in this PR is 
likely to fail. Align the message string across validators (and update tests to 
match) to avoid brittle mismatches and inconsistent user-facing errors.
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -483,31 +488,155 @@ private boolean shouldDisableDictionary(String column, 
ColumnMetadata existingCo
     return true;
   }
 
-  private boolean shouldChangeRawCompressionType(String column, 
SegmentDirectory.Reader segmentReader)
+  private boolean shouldRewriteRawForwardIndex(String column, 
SegmentDirectory.Reader segmentReader)
       throws Exception {
     // The compression type for an existing segment can only be determined by 
reading the forward index header.
     ColumnMetadata existingColMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
     ChunkCompressionType existingCompressionType;
+    String existingCodecSpec;

Review Comment:
   The previous implementation enforced `existingCompressionType != null` for 
legacy (pre-V7) raw forward indexes. With the new V7 path, it's correct for 
`getCompressionType()` to be null when `existingCodecSpec != null`, but for 
legacy segments (where `existingCodecSpec == null`) a null compression type 
likely indicates a reader regression (e.g., a reader not overriding the default 
`ForwardIndexReader#getCompressionType()` implementation). Consider reinstating 
a `Preconditions.checkState(existingCompressionType != null, ...)` specifically 
in the legacy branch (after `existingCodecSpec == null` is established) so 
reload decisions don't silently degrade.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -483,31 +488,155 @@ private boolean shouldDisableDictionary(String column, 
ColumnMetadata existingCo
     return true;
   }
 
-  private boolean shouldChangeRawCompressionType(String column, 
SegmentDirectory.Reader segmentReader)
+  private boolean shouldRewriteRawForwardIndex(String column, 
SegmentDirectory.Reader segmentReader)
       throws Exception {
     // The compression type for an existing segment can only be determined by 
reading the forward index header.
     ColumnMetadata existingColMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
     ChunkCompressionType existingCompressionType;
+    String existingCodecSpec;
 
     // Get the forward index reader factory and create a reader
     IndexReaderFactory<ForwardIndexReader> readerFactory = 
StandardIndexes.forward().getReaderFactory();
     try (ForwardIndexReader<?> fwdIndexReader = 
readerFactory.createIndexReader(segmentReader,
         _fieldIndexConfigs.get(column), existingColMetadata)) {
       existingCompressionType = fwdIndexReader.getCompressionType();
-      Preconditions.checkState(existingCompressionType != null,
-          "Existing compressionType cannot be null for raw forward index 
column=" + column);
+      existingCodecSpec = fwdIndexReader.getCodecSpec();

Review Comment:
   The previous implementation enforced `existingCompressionType != null` for 
legacy (pre-V7) raw forward indexes. With the new V7 path, it's correct for 
`getCompressionType()` to be null when `existingCodecSpec != null`, but for 
legacy segments (where `existingCodecSpec == null`) a null compression type 
likely indicates a reader regression (e.g., a reader not overriding the default 
`ForwardIndexReader#getCompressionType()` implementation). Consider reinstating 
a `Preconditions.checkState(existingCompressionType != null, ...)` specifically 
in the legacy branch (after `existingCodecSpec == null` is established) so 
reload decisions don't silently degrade.



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/CodecPipelineForwardIndexTest.java:
##########
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.forward;
+
+import java.io.File;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.local.io.codec.CodecRegistry;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriterV7;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReaderV7;
+import org.apache.pinot.segment.spi.codec.CodecContext;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/// Roundtrip encode/decode tests for the codec-pipeline forward index 
(version 7).
+///
+/// Covers:
+///
+/// - INT and LONG single-value columns
+/// - DELTA only, ZSTD only, CODEC(DELTA,ZSTD) pipelines
+/// - Version check on the written file
+/// - Canonical spec round-trips correctly through the header
+public class CodecPipelineForwardIndexTest implements 
PinotBuffersAfterMethodCheckRule {
+
+  private static final int NUM_VALUES = 10_007;
+  private static final int DOCS_PER_CHUNK = 1024;
+  private static final String TEST_FILE_PREFIX =
+      System.getProperty("java.io.tmpdir") + File.separator + 
"CodecPipelineFwdTest";
+  // Fixed seed for deterministic test data — this test verifies correctness, 
not random coverage
+  // Construct a fresh Random per test method to avoid sharing mutable state 
across data-provider
+  // rows when TestNG runs them in parallel.
+  private static final long RANDOM_SEED = 42L;
+
+  @DataProvider(name = "specs")
+  public static Object[][] specs() {
+    return new Object[][]{
+        {"DELTA", DataType.INT},
+        {"DELTA", DataType.LONG},
+        {"ZSTD(3)", DataType.INT},
+        {"ZSTD(3)", DataType.LONG},
+        {"ZSTD(8)", DataType.INT},

Review Comment:
   `ZSTD(8)` as a compression-only `codecSpec` is treated as valid by this V7 
round-trip test, but the PR description and the table-config/creator validation 
logic explicitly reject compression-only specs that cannot be represented by 
existing raw writers (which includes non-default ZSTD levels). Either (a) 
remove/adjust these test cases to reflect the public config contract, or (b) 
update the contract/validation to allow compression-only V7 pipelines for SV 
INT/LONG (and ensure creator/validator paths can actually produce such 
segments).



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriterV7.java:
##########
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.writer.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+
+
+/// Chunk-based raw (non-dictionary-encoded) forward index writer for 
single-value fixed-width
+/// columns (INT, LONG) that uses a [CodecPipelineExecutor] for encoding.
+///
+/// This writer introduces **version 7** of the fixed-byte chunk raw forward 
index
+/// format.  The on-disk layout is:
+///
+/// ```
+/// File header:
+///   version              (int, value = 7)
+///   numChunks            (int)
+///   numDocsPerChunk      (int, normalised to power-of-2)
+///   sizeOfEntry          (int, bytes per logical value, e.g. 4 for INT)
+///   totalDocs            (int)
+///   codecSpecLength      (int, byte length of the UTF-8 encoded canonical 
codec spec)
+///   dataHeaderStart      (int, byte offset from file start where 
chunk-offset table begins)
+///   codecSpec            (byte[], UTF-8 encoded canonical spec, length = 
codecSpecLength)
+///   chunkOffsets         (long[numChunks], absolute byte offset of each 
chunk's per-chunk header)
+/// Data (per chunk):
+///   compressedSize       (int, byte length of the encoded payload that 
follows)
+///   uncompressedSize     (int, byte length of the original decoded chunk 
data)
+///   payload              (byte[], encoded chunk data, length = 
compressedSize)
+/// ```
+///
+/// Each chunk contains `numDocsPerChunk` values encoded by the pipeline.  
Chunk offsets
+/// are 8-byte longs to support files larger than 2 GB.  The per-chunk size 
header allows readers
+/// to verify decompression output and to skip/read chunks without scanning 
adjacent offsets.
+///
+/// This class is *not* thread-safe.
+@NotThreadSafe
+public class FixedByteChunkForwardIndexWriterV7 implements Closeable {
+
+  public static final int VERSION = 
ForwardIndexConfig.CODEC_PIPELINE_WRITER_VERSION;
+
+  /// Bytes written before each chunk payload: compressedSize (int) + 
uncompressedSize (int).
+  public static final int CHUNK_HEADER_BYTES = 2 * Integer.BYTES;
+
+  // Number of fixed int fields before the codec spec: version, numChunks, 
numDocsPerChunk,
+  // sizeOfEntry, totalDocs, codecSpecLength, dataHeaderStart
+  private static final int FIXED_HEADER_INT_COUNT = 7;
+
+  // Hold both the RAF and its FileChannel: closing the channel closes the 
underlying FD, but
+  // some JVM finalizers close the FD when the RAF becomes unreachable. 
Holding the RAF as a
+  // field anchors it to the writer's lifetime and removes any reliance on 
finalizer ordering.
+  private final RandomAccessFile _raf;
+  private final FileChannel _dataFile;
+  private final CodecPipelineExecutor _executor;
+  private final int _numDocsPerChunk;
+  private final int _sizeOfEntry;
+  private final int _chunkFullBytes;
+  private final ByteBuffer _header;
+  private final ByteBuffer _chunkBuffer;
+  private final ByteBuffer _chunkHeaderBuffer = 
ByteBuffer.allocateDirect(CHUNK_HEADER_BYTES);
+  private final int _numChunks;
+  private final int _totalDocs;
+
+  private long _dataOffset;
+  private int _docsWritten;
+  private int _chunksWritten;
+
+  /// Creates a new writer.
+  ///
+  /// @param file            output file
+  /// @param executor        pre-validated pipeline executor
+  /// @param totalDocs       total number of documents to write
+  /// @param numDocsPerChunk target documents per chunk (will be rounded up to 
power-of-2)
+  /// @param sizeOfEntry     bytes per value (e.g. 4 for INT, 8 for LONG)
+  public FixedByteChunkForwardIndexWriterV7(File file, CodecPipelineExecutor 
executor, int totalDocs,
+      int numDocsPerChunk, int sizeOfEntry)
+      throws IOException {
+    if (totalDocs < 0) {
+      throw new IllegalArgumentException("totalDocs must be non-negative, got: 
" + totalDocs);
+    }
+    _executor = executor;
+    _sizeOfEntry = sizeOfEntry;
+    _totalDocs = totalDocs;
+    _numDocsPerChunk = normalizePower2(numDocsPerChunk);
+    _chunkFullBytes = _numDocsPerChunk * sizeOfEntry;
+    _numChunks = (totalDocs + _numDocsPerChunk - 1) / _numDocsPerChunk;
+    _docsWritten = 0;
+    _chunksWritten = 0;
+
+    byte[] specBytes = 
executor.getCanonicalSpec().getBytes(StandardCharsets.UTF_8);
+
+    // Header layout:
+    //   7 ints of fixed fields
+    //   specBytes.length bytes of codec spec
+    //   numChunks longs of chunk offsets
+    long fixedHeaderBytesLong = (long) FIXED_HEADER_INT_COUNT * Integer.BYTES;
+    long dataHeaderStartLong = fixedHeaderBytesLong + specBytes.length;
+    long chunkOffsetTableBytesLong = (long) _numChunks * Long.BYTES;
+    long totalHeaderBytesLong = dataHeaderStartLong + 
chunkOffsetTableBytesLong;
+    if (totalHeaderBytesLong > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          "Header size " + totalHeaderBytesLong + " bytes exceeds 
Integer.MAX_VALUE. Reduce totalDocs or"
+              + " increase numDocsPerChunk.");
+    }
+    int dataHeaderStart = (int) dataHeaderStartLong;
+    int totalHeaderBytes = (int) totalHeaderBytesLong;
+
+    _header = ByteBuffer.allocateDirect(totalHeaderBytes);
+    _header.putInt(VERSION);
+    _header.putInt(_numChunks);
+    _header.putInt(_numDocsPerChunk);
+    _header.putInt(sizeOfEntry);
+    _header.putInt(totalDocs);
+    _header.putInt(specBytes.length);
+    _header.putInt(dataHeaderStart);
+    _header.put(specBytes);
+    // chunk offsets will be filled in during writeChunk() calls
+
+    _dataOffset = totalHeaderBytes;
+
+    long chunkSizeLong = (long) sizeOfEntry * _numDocsPerChunk;
+    if (chunkSizeLong > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          "Chunk size " + chunkSizeLong + " bytes overflows int. Reduce 
numDocsPerChunk or sizeOfEntry.");
+    }
+    // Open file first, then allocate the direct buffer under a try/catch so 
that an OOM during
+    // allocation closes the already-open file descriptor (the caller has no 
reference to a
+    // partially-constructed object and cannot invoke close() itself).
+    RandomAccessFile raf = new RandomAccessFile(file, "rw");
+    FileChannel channel = raf.getChannel();
+    try {
+      _chunkBuffer = ByteBuffer.allocateDirect((int) chunkSizeLong);
+    } catch (Throwable t) {
+      try {
+        raf.close();
+      } catch (IOException closeEx) {
+        t.addSuppressed(closeEx);
+      }
+      throw t;
+    }
+    _raf = raf;
+    _dataFile = channel;
+  }
+
+  /// Writes a 4-byte integer value.
+  public void putInt(int value) {
+    _chunkBuffer.putInt(value);
+    _docsWritten++;
+    flushIfNeeded();
+  }

Review Comment:
   `_docsWritten` is only validated against `_totalDocs` in `close()`. If 
callers accidentally write more than `totalDocs`, the writer will still emit 
bytes/chunks and only fail at close, potentially leaving a partially-written 
(and semantically invalid) file on disk. Consider adding an early guard in 
`putInt`/`putLong` (e.g., throw once `_docsWritten` would exceed `_totalDocs`) 
to fail fast and avoid producing extra output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to