This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 896406d226e Add $partitionId as a new virtual column (#16721)
896406d226e is described below
commit 896406d226eaa636c525023d5567ccc7f379af7a
Author: tarun11Mavani <[email protected]>
AuthorDate: Sat Sep 6 19:37:46 2025 +0530
Add $partitionId as a new virtual column (#16721)
* Add $paritionId as a new virtual column
* Use partition info from segment metadata
* Add ITs
* fix tests
* fix tests
* fix test
* address review comments
---
.../pinot/common/config/provider/TableCache.java | 3 +
.../PinotQueryResourceStaticValidationTest.java | 2 +-
.../pinot/controller/helix/TableCacheTest.java | 4 +-
.../tests/BaseClusterIntegrationTestSet.java | 12 +-
.../tests/NullHandlingIntegrationTest.java | 6 +-
.../tests/OfflineClusterIntegrationTest.java | 12 +-
...PartitionLLCRealtimeClusterIntegrationTest.java | 37 ++++
.../immutable/ImmutableSegmentImpl.java | 2 +-
.../immutable/ImmutableSegmentLoader.java | 3 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 6 +-
.../PartitionIdVirtualColumnProvider.java | 237 +++++++++++++++++++++
.../virtualcolumn/VirtualColumnContext.java | 14 ++
.../VirtualColumnProviderFactory.java | 5 +
.../mutable/MutableSegmentImplRawMVTest.java | 8 +
.../apache/pinot/spi/utils/CommonConstants.java | 3 +-
15 files changed, 333 insertions(+), 21 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 57fa9502e5a..840d398cf2d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -155,6 +155,9 @@ public interface TableCache extends PinotConfigProvider {
if (!schema.hasColumn(BuiltInVirtualColumn.SEGMENTNAME)) {
schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.SEGMENTNAME,
FieldSpec.DataType.STRING, true));
}
+ if (!schema.hasColumn(BuiltInVirtualColumn.PARTITIONID)) {
+ schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.PARTITIONID,
FieldSpec.DataType.STRING, false));
+ }
}
static Map<Expression, Expression> createExpressionOverrideMap(String
physicalOrLogicalTableName,
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
index ab715485d54..6ba23171769 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
@@ -64,7 +64,7 @@ public class PinotQueryResourceStaticValidationTest {
Assert.assertNotNull(provider.getTableConfig("testTable_OFFLINE"));
Assert.assertNotNull(provider.getSchema("testTable"));
Assert.assertNotNull(provider.getColumnNameMap("testTable"));
- Assert.assertEquals(provider.getColumnNameMap("testTable").size(), 5); //
2 columns + 3 built-in virtual columns
+ Assert.assertEquals(provider.getColumnNameMap("testTable").size(), 6); //
2 columns + 4 built-in virtual columns
Assert.assertTrue(provider.getTableNameMap().containsKey("testTable_OFFLINE"));
Assert.assertTrue(provider.getTableNameMap().containsKey("testTable"));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index f35d97ccbec..6fcf472a69d 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -303,6 +303,7 @@ public class TableCacheTest {
expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId");
expectedColumnMap.put(isCaseInsensitive ? "$hostname" : "$hostName",
"$hostName");
expectedColumnMap.put(isCaseInsensitive ? "$segmentname" : "$segmentName",
"$segmentName");
+ expectedColumnMap.put(isCaseInsensitive ? "$partitionid" : "$partitionId",
"$partitionId");
return expectedColumnMap;
}
@@ -310,7 +311,8 @@ public class TableCacheTest {
return new
Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("testColumn",
DataType.INT)
.addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT)
.addSingleValueDimension(BuiltInVirtualColumn.HOSTNAME,
DataType.STRING)
- .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME,
DataType.STRING).build();
+ .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME,
DataType.STRING)
+ .addMultiValueDimension(BuiltInVirtualColumn.PARTITIONID,
DataType.STRING).build();
}
@DataProvider(name = "testTableCacheDataProvider")
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index c6b4a40720c..b9c9072b821 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -445,11 +445,13 @@ public abstract class BaseClusterIntegrationTestSet
extends BaseClusterIntegrati
}
// Check that the virtual columns work as expected (throws no exceptions)
- getPinotConnection().execute("select $docId, $segmentName, $hostName from
mytable");
- getPinotConnection().execute("select $docId, $segmentName, $hostName from
mytable where $docId < 5 limit 50");
- getPinotConnection().execute("select $docId, $segmentName, $hostName from
mytable where $docId = 5 limit 50");
- getPinotConnection().execute("select $docId, $segmentName, $hostName from
mytable where $docId > 19998 limit 50");
- getPinotConnection().execute("select max($docId) from mytable group by
$segmentName");
+ getPinotConnection().execute("select $docId, $segmentName, $hostName,
$partitionId from mytable");
+ getPinotConnection().execute(
+ "select $docId, $segmentName, $hostName, $partitionId from mytable
where $docId < 5 limit 50");
+ getPinotConnection().execute(
+ "select $docId, $segmentName, $hostName, $partitionId from mytable
where $docId = 5 limit 50");
+ getPinotConnection().execute(
+ "select $docId, $segmentName, $hostName, $partitionId from mytable
where $docId > 19998 limit 50");
}
/**
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index ae8506d0cdc..9e31c21292b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -372,7 +372,7 @@ public class NullHandlingIntegrationTest extends
BaseClusterIntegrationTestSet
explainLogical(query,
"Execution Plan\n"
+ "LogicalProject(EXPR$0=[1])\n"
- + " LogicalFilter(condition=[AND(IS NOT NULL($7), <>($7, 0))])\n"
+ + " LogicalFilter(condition=[AND(IS NOT NULL($8), <>($8, 0))])\n"
+ " PinotLogicalTableScan(table=[[default, mytable]])\n",
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"false"));
}
@@ -390,7 +390,7 @@ public class NullHandlingIntegrationTest extends
BaseClusterIntegrationTestSet
explainLogical(query,
"Execution Plan\n"
+ "LogicalProject(EXPR$0=[1])\n"
- + " LogicalFilter(condition=[<>($7, 0)])\n"
+ + " LogicalFilter(condition=[<>($8, 0)])\n"
+ " PinotLogicalTableScan(table=[[default, mytable]])\n",
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"true"));
}
@@ -408,7 +408,7 @@ public class NullHandlingIntegrationTest extends
BaseClusterIntegrationTestSet
explainLogical(query,
"Execution Plan\n"
+ "LogicalProject(EXPR$0=[1])\n"
- + " LogicalFilter(condition=[AND(IS NULL($7), <>($7, 0))])\n"
+ + " LogicalFilter(condition=[AND(IS NULL($8), <>($8, 0))])\n"
+ " PinotLogicalTableScan(table=[[default, mytable]])\n",
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"false"));
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 83ea2146436..65e22627761 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3520,7 +3520,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
+ " LogicalProject(count=[$1], name=[$0])\n"
+ " PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)],
aggType=[FINAL])\n"
+ " PinotLogicalExchange(distribution=[hash[0]])\n"
- + " PinotLogicalAggregate(group=[{17}], agg#0=[COUNT()],
aggType=[LEAF])\n"
+ + " PinotLogicalAggregate(group=[{18}], agg#0=[COUNT()],
aggType=[LEAF])\n"
+ " PinotLogicalTableScan(table=[[default, mytable]])\n");
assertEquals(response1Json.get("rows").get(0).get(2).asText(), "Rule
Execution Times\n"
+ "Rule: SortRemove -> Time:*\n"
@@ -3798,26 +3798,26 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
JsonNode starColumnResponse =
JsonUtils.stringToJsonNode(sendGetRequest(getControllerBaseApiUrl() +
"/tables/mytable/metadata?columns=*"));
- validateMetadataResponse(starColumnResponse, 82, 9);
+ validateMetadataResponse(starColumnResponse, 83, 10);
JsonNode starEncodedColumnResponse =
JsonUtils.stringToJsonNode(sendGetRequest(getControllerBaseApiUrl() +
"/tables/mytable/metadata?columns=%2A"));
- validateMetadataResponse(starEncodedColumnResponse, 82, 9);
+ validateMetadataResponse(starEncodedColumnResponse, 83, 10);
JsonNode starWithExtraColumnResponse =
JsonUtils.stringToJsonNode(sendGetRequest(
getControllerBaseApiUrl() + "/tables/mytable/metadata?columns="
+ "CRSElapsedTime&columns=*&columns=OriginStateName"));
- validateMetadataResponse(starWithExtraColumnResponse, 82, 9);
+ validateMetadataResponse(starWithExtraColumnResponse, 83, 10);
JsonNode starWithExtraEncodedColumnResponse =
JsonUtils.stringToJsonNode(sendGetRequest(
getControllerBaseApiUrl() + "/tables/mytable/metadata?columns="
+ "CRSElapsedTime&columns=%2A&columns=OriginStateName"));
- validateMetadataResponse(starWithExtraEncodedColumnResponse, 82, 9);
+ validateMetadataResponse(starWithExtraEncodedColumnResponse, 83, 10);
JsonNode starWithExtraColumnWholeEncodedResponse =
JsonUtils.stringToJsonNode(sendGetRequest(
getControllerBaseApiUrl() + "/tables/mytable/metadata?columns="
+ "CRSElapsedTime%26columns%3D%2A%26columns%3DOriginStateName"));
- validateMetadataResponse(starWithExtraColumnWholeEncodedResponse, 82, 9);
+ validateMetadataResponse(starWithExtraColumnWholeEncodedResponse, 83, 10);
}
private void validateMetadataResponse(JsonNode response, int numTotalColumn,
int numMVColumn) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index 4b0c4e6c14a..d2ed66a7796 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
@@ -208,6 +209,42 @@ public class
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
}
@Test(dependsOnMethods = "testPartitionRouting")
+ public void testPartitionIdVirtualColumn()
+ throws Exception {
+ // Query to get partition ID information for all segments
+ String query = "SELECT $partitionId, $segmentName FROM mytable GROUP BY
$segmentName, $partitionId";
+ JsonNode response = postQuery(query);
+
+ JsonNode resultTable = response.get("resultTable");
+ JsonNode rows = resultTable.get("rows");
+
+ assertTrue(rows.size() > 0, "Should have at least one segment result");
+
+ // Define expected partition ID values
+ Set<String> expectedPartitions = new HashSet<>();
+ expectedPartitions.add("DestState_0");
+ expectedPartitions.add("DestState_1");
+
+ // Validate that $partitionId virtual column returns expected results
+ for (int i = 0; i < rows.size(); i++) {
+ JsonNode row = rows.get(i);
+ String partitionIdResult = row.get(0).asText();
+ String segmentName = row.get(1).asText();
+
+ assertNotNull(partitionIdResult);
+ assertNotNull(segmentName);
+ assertTrue(LLCSegmentName.isLLCSegment(segmentName));
+
+ if (partitionIdResult.isBlank()) {
+ continue;
+ }
+ // Validate that partitionIdResult is one of the expected partition IDs
+ assertTrue(expectedPartitions.contains(partitionIdResult),
+ "Expected one of " + expectedPartitions + " but found: " +
partitionIdResult);
+ }
+ }
+
+ @Test(dependsOnMethods = "testPartitionIdVirtualColumn")
public void testNonPartitionedStream()
throws Exception {
// Push the second Avro file into Kafka without partitioning
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 6732d99ced2..1e39eee3fd6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -259,7 +259,7 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in
schema: %s", column,
schema.getSchemaName());
return IndexSegmentUtils.createVirtualDataSource(
- new VirtualColumnContext(fieldSpec, _segmentMetadata.getTotalDocs()));
+ new VirtualColumnContext(fieldSpec, _segmentMetadata.getTotalDocs(),
_segmentMetadata));
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index 2172e5f014e..e32610ba61e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -215,7 +215,8 @@ public class ImmutableSegmentLoader {
for (FieldSpec fieldSpec : segmentSchema.getAllFieldSpecs()) {
if (fieldSpec.isVirtualColumn()) {
String columnName = fieldSpec.getName();
- VirtualColumnContext context = new VirtualColumnContext(fieldSpec,
segmentMetadata.getTotalDocs());
+ VirtualColumnContext context =
+ new VirtualColumnContext(fieldSpec,
segmentMetadata.getTotalDocs(), segmentMetadata);
VirtualColumnProvider provider =
VirtualColumnProviderFactory.buildProvider(context);
indexContainerMap.put(columnName,
provider.buildColumnIndexContainer(context));
columnMetadataMap.put(columnName, provider.buildMetadata(context));
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 2991efb8d1d..c13a17ec430 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -1159,7 +1159,8 @@ public class MutableSegmentImpl implements MutableSegment
{
FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
if (fieldSpec != null && fieldSpec.isVirtualColumn()) {
// Virtual column
- VirtualColumnContext virtualColumnContext = new
VirtualColumnContext(fieldSpec, _numDocsIndexed);
+ VirtualColumnContext virtualColumnContext =
+ new VirtualColumnContext(fieldSpec, _numDocsIndexed,
_segmentMetadata);
return
VirtualColumnProviderFactory.buildProvider(virtualColumnContext).buildDataSource(virtualColumnContext);
}
return null;
@@ -1174,7 +1175,8 @@ public class MutableSegmentImpl implements MutableSegment
{
FieldSpec fieldSpec = schema.getFieldSpecFor(column);
Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in
schema: %s", column,
schema.getSchemaName());
- return IndexSegmentUtils.createVirtualDataSource(new
VirtualColumnContext(fieldSpec, _numDocsIndexed));
+ return IndexSegmentUtils.createVirtualDataSource(
+ new VirtualColumnContext(fieldSpec, _numDocsIndexed,
_segmentMetadata));
}
@Nullable
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/PartitionIdVirtualColumnProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/PartitionIdVirtualColumnProvider.java
new file mode 100644
index 00000000000..f44b91a2704
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/PartitionIdVirtualColumnProvider.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.virtualcolumn;
+
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import
org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary;
+import
org.apache.pinot.segment.local.segment.index.readers.constant.ConstantMVInvertedIndexReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Virtual column provider that returns the partition information of the
segment.
+ * Returns a multi-value string column with entries in the format
"columnName_partitionId"
+ * for all partitioned columns in the segment. Returns empty array for
segments without partition information.
+ */
+public class PartitionIdVirtualColumnProvider implements VirtualColumnProvider
{
+
+ @Override
+ public ForwardIndexReader<?> buildForwardIndex(VirtualColumnContext context)
{
+ List<String> partitionInfo = getPartitionInfo(context);
+ return new MultiValueConstantForwardIndexReader(partitionInfo.size());
+ }
+
+ @Override
+ public Dictionary buildDictionary(VirtualColumnContext context) {
+ List<String> partitionInfo = getPartitionInfo(context);
+ return new MultiValueConstantStringDictionary(partitionInfo);
+ }
+
+ @Override
+ public InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext
context) {
+ return new ConstantMVInvertedIndexReader(context.getTotalDocCount());
+ }
+
+ @Override
+ public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) {
+ FieldSpec fieldSpec = context.getFieldSpec();
+ List<String> partitionInfo = getPartitionInfo(context);
+ int cardinality = partitionInfo.size();
+
+ ColumnMetadataImpl.Builder builder = new ColumnMetadataImpl.Builder()
+ .setFieldSpec(fieldSpec)
+ .setTotalDocs(context.getTotalDocCount())
+ .setCardinality(cardinality)
+ .setHasDictionary(true)
+ .setMaxNumberOfMultiValues(cardinality);
+
+ if (cardinality > 0) {
+ builder.setMinValue(partitionInfo.get(0))
+ .setMaxValue(partitionInfo.get(cardinality - 1));
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Extract partition information from segment metadata.
+ * Returns a list of strings in format "columnName_partitionId" for all
partitioned columns.
+ */
+ private List<String> getPartitionInfo(VirtualColumnContext context) {
+ List<String> partitionInfo = new ArrayList<>();
+ SegmentMetadata segmentMetadata = context.getSegmentMetadata();
+
+ if (segmentMetadata != null && segmentMetadata.getColumnMetadataMap() !=
null) {
+ // Get partition info from all partitioned columns in the segment
metadata
+ Map<String, ColumnMetadata> columnMetadataMap =
segmentMetadata.getColumnMetadataMap();
+ for (Map.Entry<String, ColumnMetadata> entry :
columnMetadataMap.entrySet()) {
+ String columnName = entry.getKey();
+ ColumnMetadata columnMetadata = entry.getValue();
+ Set<Integer> partitions = columnMetadata.getPartitions();
+ if (partitions != null && !partitions.isEmpty()) {
+ // Add all partition IDs for this column
+ for (Integer partitionId : partitions) {
+ partitionInfo.add(columnName + "_" + partitionId);
+ }
+ }
+ }
+ }
+
+ // Ensure we always have at least one entry for multi-value columns
+ if (partitionInfo.isEmpty()) {
+ partitionInfo.add(""); // Empty string indicates no partition information
+ }
+
+ return partitionInfo;
+ }
+
+ /**
+ * Forward index reader for multi-value partition column.
+ * Returns all dictionary IDs (0, 1, 2, ..., n-1) for each document.
+ */
+ private static class MultiValueConstantForwardIndexReader implements
ForwardIndexReader<ForwardIndexReaderContext> {
+ private final int _numValues;
+ private final int[] _dictIds;
+
+ public MultiValueConstantForwardIndexReader(int numValues) {
+ _numValues = Math.max(1, numValues);
+ _dictIds = new int[_numValues];
+ for (int i = 0; i < _numValues; i++) {
+ _dictIds[i] = i;
+ }
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return true;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return false;
+ }
+
+ @Override
+ public DataType getStoredType() {
+ return DataType.INT;
+ }
+
+ @Override
+ public int getDictIdMV(int docId, int[] dictIdBuffer,
ForwardIndexReaderContext context) {
+ for (int i = 0; i < _numValues; i++) {
+ dictIdBuffer[i] = i;
+ }
+ return _numValues;
+ }
+
+ @Override
+ public int[] getDictIdMV(int docId, ForwardIndexReaderContext context) {
+ return _dictIds;
+ }
+
+ @Override
+ public int getNumValuesMV(int docId, ForwardIndexReaderContext context) {
+ return _numValues;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+ /**
+ * Minimal dictionary that extends BaseImmutableDictionary for virtual
columns.
+ * Follows the same pattern as StringDictionary but for in-memory virtual
column values.
+ */
+ private static class MultiValueConstantStringDictionary extends
BaseImmutableDictionary {
+ private final List<String> _values;
+ private final Object2IntOpenHashMap<String> _valueToIndexMap;
+
+ public MultiValueConstantStringDictionary(List<String> values) {
+ super(Math.max(1, values.size())); // Use virtual dictionary constructor
+ _values = new ArrayList<>(values);
+ if (_values.isEmpty()) {
+ _values.add(""); // Ensure at least one value
+ }
+
+ _valueToIndexMap = new Object2IntOpenHashMap<>(_values.size());
+ _valueToIndexMap.defaultReturnValue(-1);
+ for (int i = 0; i < _values.size(); i++) {
+ _valueToIndexMap.put(_values.get(i), i);
+ }
+ }
+
+ @Override
+ public DataType getValueType() {
+ return DataType.STRING;
+ }
+
+ @Override
+ public int insertionIndexOf(String stringValue) {
+ return _valueToIndexMap.getInt(stringValue);
+ }
+
+ @Override
+ public String get(int dictId) {
+ return _values.get(dictId);
+ }
+
+ @Override
+ public String getStringValue(int dictId) {
+ return _values.get(dictId);
+ }
+
+ @Override
+ public int getIntValue(int dictId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getLongValue(int dictId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float getFloatValue(int dictId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double getDoubleValue(int dictId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public java.math.BigDecimal getBigDecimalValue(int dictId) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnContext.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnContext.java
index c2616d27966..c26a40e7085 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnContext.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnContext.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.segment.local.segment.virtualcolumn;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.data.FieldSpec;
@@ -28,10 +30,17 @@ import org.apache.pinot.spi.data.FieldSpec;
public class VirtualColumnContext {
private final FieldSpec _fieldSpec;
private final int _totalDocCount;
+ @Nullable
+ private final SegmentMetadata _segmentMetadata;
public VirtualColumnContext(FieldSpec fieldSpec, int totalDocCount) {
+ this(fieldSpec, totalDocCount, null);
+ }
+
+ public VirtualColumnContext(FieldSpec fieldSpec, int totalDocCount,
@Nullable SegmentMetadata segmentMetadata) {
_fieldSpec = fieldSpec;
_totalDocCount = totalDocCount;
+ _segmentMetadata = segmentMetadata;
}
public FieldSpec getFieldSpec() {
@@ -41,4 +50,9 @@ public class VirtualColumnContext {
public int getTotalDocCount() {
return _totalDocCount;
}
+
+ @Nullable
+ public SegmentMetadata getSegmentMetadata() {
+ return _segmentMetadata;
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java
index e1605a048c4..2d0344a43d3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java
@@ -58,5 +58,10 @@ public class VirtualColumnProviderFactory {
schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.SEGMENTNAME,
FieldSpec.DataType.STRING, true,
DefaultNullValueVirtualColumnProvider.class, segmentName));
}
+
+ if (!schema.hasColumn(BuiltInVirtualColumn.PARTITIONID)) {
+ schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.PARTITIONID,
FieldSpec.DataType.STRING, false,
+ PartitionIdVirtualColumnProvider.class));
+ }
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
index cf33bcf092d..448bfd242b9 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
@@ -50,6 +50,7 @@ import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
@@ -190,6 +191,13 @@ public class MutableSegmentImplRawMVTest implements
PinotBuffersAfterClassCheckR
for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
if (!fieldSpec.isSingleValueField()) {
String column = fieldSpec.getName();
+ // Skip $partitionId virtual column because this test is specifically
for "raw MV" columns
+ // (MV columns with NO dictionary). $partitionId always has a
dictionary
+ // (MultiValueConstantStringDictionary), so it doesn't fit the "raw
MV" test pattern
+ // where getDictionary() is expected to return null.
+ if (BuiltInVirtualColumn.PARTITIONID.equals(column)) {
+ continue;
+ }
DataSource actualDataSource =
_mutableSegmentImpl.getDataSource(column);
DataSource expectedDataSource =
_immutableSegment.getDataSource(column);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 19fcf02e8c0..9b5302cea6d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1751,7 +1751,8 @@ public class CommonConstants {
public static final String DOCID = "$docId";
public static final String HOSTNAME = "$hostName";
public static final String SEGMENTNAME = "$segmentName";
- public static final Set<String> BUILT_IN_VIRTUAL_COLUMNS = Set.of(DOCID,
HOSTNAME, SEGMENTNAME);
+ public static final String PARTITIONID = "$partitionId";
+ public static final Set<String> BUILT_IN_VIRTUAL_COLUMNS = Set.of(DOCID,
HOSTNAME, SEGMENTNAME, PARTITIONID);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]