xiangfu0 commented on code in PR #18229:
URL: https://github.com/apache/pinot/pull/18229#discussion_r3153297374
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -374,28 +377,88 @@ private boolean shouldDisableDictionary(String column,
ColumnMetadata existingCo
return true;
}
- private boolean shouldChangeRawCompressionType(String column,
SegmentDirectory.Reader segmentReader)
+ private boolean shouldRewriteRawForwardIndex(String column,
SegmentDirectory.Reader segmentReader)
throws Exception {
// The compression type for an existing segment can only be determined by
reading the forward index header.
ColumnMetadata existingColMetadata =
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
ChunkCompressionType existingCompressionType;
+ String existingCodecSpec;
// Get the forward index reader factory and create a reader
IndexReaderFactory<ForwardIndexReader> readerFactory =
StandardIndexes.forward().getReaderFactory();
try (ForwardIndexReader<?> fwdIndexReader =
readerFactory.createIndexReader(segmentReader,
_fieldIndexConfigs.get(column), existingColMetadata)) {
existingCompressionType = fwdIndexReader.getCompressionType();
- Preconditions.checkState(existingCompressionType != null,
- "Existing compressionType cannot be null for raw forward index
column=" + column);
+ existingCodecSpec = fwdIndexReader.getCodecSpec();
}
- // Get the new compression type.
- ChunkCompressionType newCompressionType =
-
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getChunkCompressionType();
+ ForwardIndexConfig fwdConfig =
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
+
+ if (existingCompressionType == null) {
+ // V7 codec-pipeline only supports INT and LONG; guard against future
type expansion reaching this path.
+ DataType existingStoredType =
existingColMetadata.getDataType().getStoredType();
+ Preconditions.checkState(existingStoredType == DataType.INT ||
existingStoredType == DataType.LONG,
+ "V7 codec-pipeline segment for column=%s has unexpected stored type
%s; expected INT or LONG",
+ column, existingStoredType);
+ // Codec-pipeline segment (version 7): compare the stored canonical spec
against the configured codecSpec.
+ // If no codecSpec is configured, a legacy compressionCodec or default
will apply to new segments only;
+ // existing V7 segments are left as-is (no rewrite needed).
+ String newCodecSpec = fwdConfig.getCodecSpec();
+ if (newCodecSpec == null) {
+ // Config may have reverted from codecSpec back to a legacy
compressionCodec — schedule a
+ // rewrite back to the legacy format so the segment can be read by
older servers.
+ // PASS_THROUGH (internally aliased as NONE) is excluded:
PASS_THROUGH, CLP, CLPV2, and
+ // related codecs all resolve to ChunkCompressionType.PASS_THROUGH and
do not represent a
+ // genuine rollback to a real compression codec.
+ ChunkCompressionType newCompressionType =
fwdConfig.getChunkCompressionType();
+ if (newCompressionType != null && newCompressionType !=
ChunkCompressionType.PASS_THROUGH) {
Review Comment:
Fixed: introduced
`isLegacyRevertTargetForFixedByteSv(FieldConfig.CompressionCodec)` helper
(`ForwardIndexHandler.java:479-497`). It returns `true` for `PASS_THROUGH`,
`SNAPPY`, `ZSTANDARD`, `LZ4`, `GZIP`, `DELTA`, `DELTADELTA` and `false` for the
CLP family / `MV_ENTRY_DICT` (which never apply to fixed-byte SV anyway). The
handler now uses `fwdConfig.getCompressionCodec()` (the typed enum) instead of
`getChunkCompressionType()` so explicit `compressionCodec=PASS_THROUGH`
triggers a rewrite to the legacy format, enabling rollback to pre-V7 servers.
The new
`testComputeOperationChangeCompressionForV7CodecPipelineRollbackToLegacyCompressionCodec`
test locks this in.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriterV7.java:
##########
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.writer.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Chunk-based raw (non-dictionary-encoded) forward index writer for
single-value fixed-width
+ * columns (INT, LONG) that uses a {@link CodecPipelineExecutor} for encoding.
+ *
+ * <p>This writer introduces <strong>version 7</strong> of the fixed-byte
chunk raw forward index
+ * format. The on-disk layout is:
+ *
+ * <pre>
+ * File header:
+ * version (int, value = 7)
+ * numChunks (int)
+ * numDocsPerChunk (int, normalised to power-of-2)
+ * sizeOfEntry (int, bytes per logical value, e.g. 4 for INT)
+ * totalDocs (int)
+ * codecSpecLength (int, byte length of the UTF-8 encoded canonical
codec spec)
+ * dataHeaderStart (int, byte offset from file start where chunk-offset
table begins)
+ * codecSpec (byte[], UTF-8 encoded canonical spec, length =
codecSpecLength)
+ * chunkOffsets (long[numChunks], absolute byte offset of each
chunk's per-chunk header)
+ * Data (per chunk):
+ * compressedSize (int, byte length of the encoded payload that
follows)
+ * uncompressedSize (int, byte length of the original decoded chunk data)
+ * payload (byte[], encoded chunk data, length = compressedSize)
+ * </pre>
+ *
+ * <p>Each chunk contains {@code numDocsPerChunk} values encoded by the
pipeline. Chunk offsets
+ * are 8-byte longs to support files larger than 2 GB. The per-chunk size
header allows readers
+ * to verify decompression output and to skip/read chunks without scanning
adjacent offsets.
+ *
+ * <p>This class is <em>not</em> thread-safe.
+ */
+@NotThreadSafe
+public class FixedByteChunkForwardIndexWriterV7 implements Closeable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FixedByteChunkForwardIndexWriterV7.class);
+
+ public static final int VERSION =
ForwardIndexConfig.CODEC_PIPELINE_WRITER_VERSION;
+
+ /** Bytes written before each chunk payload: compressedSize (int) +
uncompressedSize (int). */
+ public static final int CHUNK_HEADER_BYTES = 2 * Integer.BYTES;
+
+ // Number of fixed int fields before the codec spec: version, numChunks,
numDocsPerChunk,
+ // sizeOfEntry, totalDocs, codecSpecLength, dataHeaderStart
+ private static final int FIXED_HEADER_INT_COUNT = 7;
+
+ private final FileChannel _dataFile;
+ private final CodecPipelineExecutor _executor;
+ private final int _numDocsPerChunk;
+ private final int _sizeOfEntry;
+ private final ByteBuffer _header;
+ private final ByteBuffer _chunkBuffer;
+ private final int _numChunks;
+ private final int _totalDocs;
+
+ private long _dataOffset;
+ private int _docsWritten;
+ private int _chunksWritten;
+
+ /**
+ * Creates a new writer.
+ *
+ * @param file output file
+ * @param executor pre-validated pipeline executor
+ * @param totalDocs total number of documents to write
+ * @param numDocsPerChunk target documents per chunk (will be rounded up to
power-of-2)
+ * @param sizeOfEntry bytes per value (e.g. 4 for INT, 8 for LONG)
+ */
+ public FixedByteChunkForwardIndexWriterV7(File file, CodecPipelineExecutor
executor, int totalDocs,
+ int numDocsPerChunk, int sizeOfEntry)
+ throws IOException {
+ _executor = executor;
+ _sizeOfEntry = sizeOfEntry;
+ _totalDocs = totalDocs;
+ _numDocsPerChunk = normalizePower2(numDocsPerChunk);
+ _numChunks = (totalDocs + _numDocsPerChunk - 1) / _numDocsPerChunk;
+ _docsWritten = 0;
+ _chunksWritten = 0;
+
+ byte[] specBytes =
executor.getCanonicalSpec().getBytes(StandardCharsets.UTF_8);
+
+ // Header layout:
+ // 7 ints of fixed fields
+ // specBytes.length bytes of codec spec
+ // numChunks longs of chunk offsets
+ long fixedHeaderBytesLong = (long) FIXED_HEADER_INT_COUNT * Integer.BYTES;
+ long dataHeaderStartLong = fixedHeaderBytesLong + specBytes.length;
+ long chunkOffsetTableBytesLong = (long) _numChunks * Long.BYTES;
+ long totalHeaderBytesLong = dataHeaderStartLong +
chunkOffsetTableBytesLong;
+ if (totalHeaderBytesLong > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException(
+ "Header size " + totalHeaderBytesLong + " bytes exceeds
Integer.MAX_VALUE. Reduce totalDocs or"
+ + " increase numDocsPerChunk.");
+ }
+ int dataHeaderStart = (int) dataHeaderStartLong;
+ int totalHeaderBytes = (int) totalHeaderBytesLong;
+
+ _header = ByteBuffer.allocateDirect(totalHeaderBytes);
+ _header.putInt(VERSION);
+ _header.putInt(_numChunks);
+ _header.putInt(_numDocsPerChunk);
+ _header.putInt(sizeOfEntry);
+ _header.putInt(totalDocs);
+ _header.putInt(specBytes.length);
+ _header.putInt(dataHeaderStart);
+ _header.put(specBytes);
+ // chunk offsets will be filled in during writeChunk() calls
+
+ _dataOffset = totalHeaderBytes;
+
+ long chunkSizeLong = (long) sizeOfEntry * _numDocsPerChunk;
+ if (chunkSizeLong > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException(
+ "Chunk size " + chunkSizeLong + " bytes overflows int. Reduce
numDocsPerChunk or sizeOfEntry.");
+ }
+ _chunkBuffer = ByteBuffer.allocateDirect((int) chunkSizeLong);
+
+ _dataFile = new RandomAccessFile(file, "rw").getChannel();
+ }
+
+ /** Writes a 4-byte integer value. */
+ public void putInt(int value) {
+ _chunkBuffer.putInt(value);
+ _docsWritten++;
+ flushIfNeeded();
+ }
+
+ /** Writes an 8-byte long value. */
+ public void putLong(long value) {
+ _chunkBuffer.putLong(value);
+ _docsWritten++;
+ flushIfNeeded();
+ }
+
+ private void flushIfNeeded() {
+ if (_chunkBuffer.position() == _numDocsPerChunk * _sizeOfEntry) {
+ writeChunk();
+ }
+ }
+
+ private void writeChunk() {
+ _chunkBuffer.flip();
+ int uncompressedSize = _chunkBuffer.remaining();
+ try {
+ ByteBuffer encoded = _executor.compress(_chunkBuffer);
+ int compressedSize = encoded.remaining();
+
+ // Per-chunk header: compressedSize (int) + uncompressedSize (int)
+ ByteBuffer chunkHeader = ByteBuffer.allocateDirect(CHUNK_HEADER_BYTES);
Review Comment:
Fixed: `FixedByteChunkForwardIndexWriterV7` now reuses a single `ByteBuffer
_chunkHeaderBuffer = ByteBuffer.allocateDirect(CHUNK_HEADER_BYTES)` field (line
87). `writeChunk()` calls `_chunkHeaderBuffer.clear()` then
`putInt(compressedSize); putInt(uncompressedSize); flip()` for each chunk
(lines 202-205). No per-chunk `allocateDirect` for the chunk header.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/CodecPipelineExecutor.java:
##########
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.codec;
+
+import com.github.luben.zstd.Zstd;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import net.jpountz.lz4.LZ4CompressorWithLength;
+import net.jpountz.lz4.LZ4DecompressorWithLength;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.codec.CodecContext;
+import org.apache.pinot.segment.spi.codec.CodecDefinition;
+import org.apache.pinot.segment.spi.codec.CodecInvocation;
+import org.apache.pinot.segment.spi.codec.CodecOptions;
+import org.apache.pinot.segment.spi.codec.CodecPipeline;
+import org.apache.pinot.segment.spi.codec.CodecSpecParser;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Executes a parsed and validated {@link CodecPipeline} for a single
forward-index chunk.
+ *
+ * <p>Write path: values → transforms (in order) → compression → bytes stored
on disk.
+ * Read path: bytes from disk → decompress → reverse transforms (in reverse
order) → values.
+ *
+ * <p>The executor is constructed once per column from the canonical {@code
codecSpec} string
+ * stored in the file header and is thread-safe for concurrent read calls (it
holds no mutable
+ * per-call state).
+ *
+ * <h3>Buffer contract</h3>
+ * <ul>
+ * <li>{@link #compress}: {@code src} is ready for read (position=0);
returns a new
+ * {@link ByteBuffer} ready for read containing the encoded bytes.</li>
+ * <li>{@link #decompress}: {@code src} is ready for read; returns a new
+ * {@link ByteBuffer} ready for read containing the decoded bytes.</li>
+ * <li>{@link #maxCompressedSize}: returns an upper bound on encoded
size.</li>
+ * </ul>
+ */
+public final class CodecPipelineExecutor {
Review Comment:
Fixed: `CodecPipelineExecutor` is fully codec-agnostic. Lines 45-46 of the
class Javadoc state explicitly: `"The executor is codec-agnostic: it holds an
ordered list of BoundStage objects"`. Each stage is a `BoundStage<O extends
CodecOptions>` (line 61) wrapping a generic `ChunkCodecHandler<O>` plus its
parsed `CodecOptions` and `CodecContext`. The constructor (line 125) iterates
over `pipeline.stages()`, resolves each invocation through
`registry.getOrThrow(inv.name())`, and stores the resulting `BoundStage`s.
There is no codec-specific code in the executor — `compress`/`decompress`
simply iterate `_stages` and dispatch through the handler interface.
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java:
##########
@@ -2354,6 +2358,191 @@ private void validateMetadataProperties(String column,
boolean hasDictionary, in
assertEquals(columnMetadata.getMaxValue(), maxValue);
}
+ /**
+ * Rollback test: when a V7 codec-pipeline forward index exists on disk but
the table config
+ * is reverted to a legacy {@code compressionCodec} (no {@code codecSpec}),
+ * {@code shouldChangeRawCompressionType()} must schedule
+ * {@link ForwardIndexHandler.Operation#CHANGE_INDEX_COMPRESSION_TYPE} so
the segment is
+ * rewritten back to the legacy format — enabling rollback to pre-V7 servers.
+ */
+ @Test
+ public void testComputeOperationNoChangeCompressionForV7CodecPipelineColumn()
Review Comment:
Fixed: renamed to
`testComputeOperationChangeCompressionForV7CodecPipelineRollbackToLegacyCompressionCodec`
to reflect that this test verifies a CHANGE_INDEX_COMPRESSION_TYPE operation
is scheduled (the rewrite *does* happen) when reverting from V7 codecSpec to a
legacy compressionCodec.
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/codec/CodecSpecParser.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.codec;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Phase-1 (structural) parser for the Codec DSL.
+ *
+ * <p>Accepts the following grammar:
+ * <pre>
+ * spec ::= pipeline | invocation
+ * pipeline ::= "CODEC" "(" invocation ("," invocation)* ")"
+ * invocation ::= NAME
+ * | NAME "(" args ")"
+ * args ::= ε | arg ("," arg)*
+ * arg ::= [+-]? [0-9]+ // numeric literals; leading sign
accepted but not used in v1
+ * NAME ::= [A-Za-z_][A-Za-z0-9_]*
+ * </pre>
+ *
+ * <p>Examples:
+ * <ul>
+ * <li>{@code DELTA} → single-stage pipeline with no args</li>
+ * <li>{@code ZSTD(3)} → single-stage pipeline, arg "3"</li>
+ * <li>{@code CODEC(DELTA,ZSTD(8))} → two-stage pipeline</li>
+ * </ul>
+ *
+ * <p>This class performs only structural parsing; semantic validation (valid
codec names,
+ * argument ranges, type compatibility) is done in a second phase by the
registry and
+ * pipeline validator.
+ *
+ * <p>This class is stateless and thread-safe.
+ */
+public final class CodecSpecParser {
+
+ /**
+ * The keyword used for multi-stage pipeline notation (e.g. {@code
CODEC(DELTA,ZSTD(3))}).
+ *
+ * <p>This name is permanently reserved — it cannot be registered as a codec
name.
+ * See {@code CodecRegistry.register}.
+ */
+ public static final String PIPELINE_KEYWORD = "CODEC";
+
+ private CodecSpecParser() {
+ }
+
+ /**
+ * Parses {@code spec} into a {@link CodecPipeline} AST.
+ *
+ * @param spec the codec DSL string; must not be {@code null} or blank
+ * @return the parsed pipeline
+ * @throws IllegalArgumentException if the spec is syntactically invalid
+ */
+ public static CodecPipeline parse(String spec) {
+ if (spec == null || spec.isBlank()) {
+ throw new IllegalArgumentException("codecSpec must not be null or
blank");
+ }
+ String trimmed = spec.trim();
+ Parser p = new Parser(trimmed);
+ CodecPipeline pipeline = p.parsePipelineOrInvocation();
+ p.expectEof();
+ return pipeline;
+ }
+
+ // -------------------------------------------------------------------------
+ // Internal recursive-descent parser
+ // -------------------------------------------------------------------------
+
+ private static final class Parser {
+ private final String _input;
+ private int _pos;
+
+ Parser(String input) {
+ _input = input;
+ _pos = 0;
+ }
+
+ CodecPipeline parsePipelineOrInvocation() {
+ String name = parseName();
+ if (PIPELINE_KEYWORD.equalsIgnoreCase(name) && peek() == '(') {
+ // Multi-stage pipeline: CODEC(stage,stage,...)
+ consume('(');
+ if (peek() == ')') {
+ throw new IllegalArgumentException("CODEC pipeline must have at
least one stage in: " + _input);
+ }
+ List<CodecInvocation> stages = new ArrayList<>();
+ stages.add(parseInvocation());
+ while (peek() == ',') {
+ consume(',');
+ stages.add(parseInvocation());
+ }
+ consume(')');
+ return new CodecPipeline(stages);
+ }
+ // Single invocation; name already consumed, check for optional args
+ List<String> args = Collections.emptyList();
+ if (peek() == '(') {
+ consume('(');
+ args = parseArgs();
+ consume(')');
+ }
+ return new CodecPipeline(Collections.singletonList(new
CodecInvocation(name.toUpperCase(), args)));
+ }
+
+ CodecInvocation parseInvocation() {
+ String name = parseName().toUpperCase();
+ List<String> args = Collections.emptyList();
+ if (peek() == '(') {
+ consume('(');
+ args = parseArgs();
+ consume(')');
+ }
+ return new CodecInvocation(name, args);
+ }
+
+ /** Parse NAME = [A-Za-z_][A-Za-z0-9_]* */
+ String parseName() {
+ skipWhitespace();
+ if (_pos >= _input.length()) {
+ throw new IllegalArgumentException("Expected codec name but reached
end of input: " + _input);
+ }
+ char first = _input.charAt(_pos);
+ if (!Character.isLetter(first) && first != '_') {
+ throw new IllegalArgumentException(
+ "Expected codec name starting with letter at position " + _pos + "
in: " + _input);
+ }
Review Comment:
Fixed: `CodecSpecParser.parseName()` (lines 145-148) now reports `"Expected
codec name starting with [A-Za-z_] at position N in: ..."` — the message
exactly matches the accepted ASCII identifier grammar. The implementation also
tightened to ASCII-only (`isAsciiIdentifierStart` / `isAsciiIdentifierPart`
helpers, lines 156-162) so the grammar is locale-stable.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CodecPipelineIntegrationTest.java:
##########
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test for the codec pipeline forward index (version 7).
+ * Writes an offline table with INT and LONG raw columns encoded via the
CODEC(DELTA,ZSTD(3)) pipeline,
Review Comment:
Fixed: added `STR_RAW_COL` (`strRawZstd`) — a STRING column with
`EncodingType.RAW` + legacy `compressionCodec=ZSTANDARD` — to
`CodecPipelineIntegrationTest`. New test
`testStringColumnWithRawLegacyCompression` queries this column alongside the
codec-pipeline INT/LONG columns, verifying the legacy raw-string path coexists
with V7 codec-pipeline columns in the same segment. Spot-checks individual
values (`SELECT strRawZstd WHERE ts=N`) and asserts `COUNT(DISTINCT)` =
NUM_DOCS.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/CodecRegistry.java:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.codec;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.codec.ChunkCodecHandler;
+import org.apache.pinot.segment.spi.codec.CodecSpecParser;
+
+
+/**
+ * Registry of known {@link ChunkCodecHandler} instances, looked up by
+ * {@link ChunkCodecHandler#name()}.
+ *
+ * <p>The singleton {@link #DEFAULT} instance is pre-populated with all
built-in codecs and is
+ * immutable after class-loading; calling {@link #register} on it will throw.
+ *
+ * <p>Custom registries (e.g. in tests) should be created with {@link
#CodecRegistry()} and
+ * populated via {@link #register} before use. The custom registry instance
itself is not
+ * thread-safe for concurrent writes; callers must complete all registrations
before sharing
+ * the registry across threads.
+ *
+ * <p>Lookup is case-insensitive.
+ */
+public final class CodecRegistry {
+
+ /**
+ * Default, immutable registry containing all built-in codecs.
+ * Safe for concurrent reads; does not allow {@link #register}.
+ */
+ public static final CodecRegistry DEFAULT;
+
+ static {
+ Map<String, ChunkCodecHandler<?>> m = new LinkedHashMap<>();
+ m.put(DeltaCodecDefinition.INSTANCE.name().toUpperCase(),
DeltaCodecDefinition.INSTANCE);
+ m.put(DeltaDeltaCodecDefinition.INSTANCE.name().toUpperCase(),
DeltaDeltaCodecDefinition.INSTANCE);
+ m.put(ZstdCodecDefinition.INSTANCE.name().toUpperCase(),
ZstdCodecDefinition.INSTANCE);
+ m.put(Lz4CodecDefinition.INSTANCE.name().toUpperCase(),
Lz4CodecDefinition.INSTANCE);
+ m.put(SnappyCodecDefinition.INSTANCE.name().toUpperCase(),
SnappyCodecDefinition.INSTANCE);
+ m.put(GzipCodecDefinition.INSTANCE.name().toUpperCase(),
GzipCodecDefinition.INSTANCE);
+ DEFAULT = new CodecRegistry(Collections.unmodifiableMap(m));
+ }
Review Comment:
Fixed: PR description has been updated to reflect all 6 built-in codecs
registered in `CodecRegistry.DEFAULT`: `DELTA`, `DELTADELTA`, `LZ4`, `ZSTD`,
`SNAPPY`, `GZIP`. The new "Built-in codecs" table in the PR description and the
codec catalog in `docs/design/codec-pipeline-v7.md` both list all six.
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/CodecPipelineForwardIndexTest.java:
##########
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.forward;
+
+import java.io.File;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.local.io.codec.CodecRegistry;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriterV7;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReaderV7;
+import org.apache.pinot.segment.spi.codec.CodecContext;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Roundtrip encode/decode tests for the codec-pipeline forward index (version
7).
+ *
+ * <p>Covers:
+ * <ul>
+ * <li>INT and LONG single-value columns</li>
+ * <li>DELTA only, ZSTD only, CODEC(DELTA,ZSTD) pipelines</li>
+ * <li>Version check on the written file</li>
+ * <li>Canonical spec round-trips correctly through the header</li>
+ * </ul>
+ */
+public class CodecPipelineForwardIndexTest implements
PinotBuffersAfterMethodCheckRule {
+
+ private static final int NUM_VALUES = 10_007;
+ private static final int DOCS_PER_CHUNK = 1024;
+ private static final String TEST_FILE_PREFIX =
+ System.getProperty("java.io.tmpdir") + File.separator +
"CodecPipelineFwdTest";
+ // Fixed seed for deterministic test data — this test verifies correctness,
not random coverage
+ private static final Random RANDOM = new Random(42L);
+
Review Comment:
Fixed: `CodecPipelineForwardIndexTest` no longer uses a shared static
`Random`. There is now `private static final long RANDOM_SEED = 42L` (line 61)
and each test method constructs `Random random = new Random(RANDOM_SEED)`
locally (e.g. lines 105 and 157). Each test invocation is self-contained and
deterministic regardless of TestNG execution order or parallelism.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReaderV7.java:
##########
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.segment.local.io.codec.CodecPipelineExecutor;
+import org.apache.pinot.segment.local.io.codec.CodecRegistry;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriterV7;
+import org.apache.pinot.segment.spi.codec.CodecContext;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Chunk-based single-value raw forward index reader for version-7 files
written by
+ * {@link FixedByteChunkForwardIndexWriterV7}.
+ *
+ * <p>Reads the canonical codec spec from the file header, instantiates a
+ * {@link CodecPipelineExecutor}, and uses it to decode each chunk on demand.
+ *
+ * <p>Supported data types: INT, LONG.
+ */
+public final class FixedByteChunkSVForwardIndexReaderV7 implements
ForwardIndexReader<ChunkReaderContext> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FixedByteChunkSVForwardIndexReaderV7.class);
+
+ public static final int VERSION =
ForwardIndexConfig.CODEC_PIPELINE_WRITER_VERSION;
+
+ private final PinotDataBuffer _dataBuffer;
+ private final DataType _storedType;
+ private final int _numChunks;
+ private final int _numDocsPerChunk;
+ private final int _shift; // log2(numDocsPerChunk) for fast chunk id calc
+ private final int _totalDocs;
+ private final int _dataHeaderStart;
+ private final CodecPipelineExecutor _executor;
+ private final String _canonicalSpec;
+
+ public FixedByteChunkSVForwardIndexReaderV7(PinotDataBuffer dataBuffer,
DataType storedType) {
+ _dataBuffer = dataBuffer;
+ _storedType = storedType;
+
+ if (storedType != DataType.INT && storedType != DataType.LONG) {
+ throw new IllegalArgumentException(
+ "FixedByteChunkSVForwardIndexReaderV7 only supports INT and LONG,
got: " + storedType);
+ }
+
+ int offset = 0;
+ int version = dataBuffer.getInt(offset);
+ if (version != VERSION) {
+ throw new IllegalArgumentException("Expected version " + VERSION + " but
got " + version);
+ }
+ offset += Integer.BYTES;
+
+ _numChunks = dataBuffer.getInt(offset);
+ offset += Integer.BYTES;
+
+ _numDocsPerChunk = dataBuffer.getInt(offset);
+ offset += Integer.BYTES;
+ if (_numDocsPerChunk <= 0 || (_numDocsPerChunk & (_numDocsPerChunk - 1))
!= 0) {
+ throw new IllegalArgumentException(
+ "Invalid numDocsPerChunk in forward index header: " +
_numDocsPerChunk
+ + ". Expected a positive power of two.");
+ }
+ _shift = Integer.numberOfTrailingZeros(_numDocsPerChunk);
+
Review Comment:
Fixed: `FixedByteChunkSVForwardIndexReaderV7` constructor now validates
`numChunks == ceil(totalDocs / numDocsPerChunk)` (lines 109-115). Mismatched
headers throw `IllegalArgumentException("Inconsistent header: numChunks=N but
totalDocs=T / numDocsPerChunk=D => expected E. Segment may be corrupt.")`.
Combined with the existing chunk-offset table bounds check and per-chunk-offset
monotonicity validation (lines 130-145), `getChunkOffset()` cannot read past
the chunk-offset table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]