This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 896406d226e Add $partitionId as a new virtual column (#16721)
896406d226e is described below

commit 896406d226eaa636c525023d5567ccc7f379af7a
Author: tarun11Mavani <[email protected]>
AuthorDate: Sat Sep 6 19:37:46 2025 +0530

    Add $partitionId as a new virtual column (#16721)
    
    * Add $paritionId as a new virtual column
    
    * Use partition info from segment metadata
    
    * Add ITs
    
    * fix tests
    
    * fix tests
    
    * fix test
    
    * address review comments
---
 .../pinot/common/config/provider/TableCache.java   |   3 +
 .../PinotQueryResourceStaticValidationTest.java    |   2 +-
 .../pinot/controller/helix/TableCacheTest.java     |   4 +-
 .../tests/BaseClusterIntegrationTestSet.java       |  12 +-
 .../tests/NullHandlingIntegrationTest.java         |   6 +-
 .../tests/OfflineClusterIntegrationTest.java       |  12 +-
 ...PartitionLLCRealtimeClusterIntegrationTest.java |  37 ++++
 .../immutable/ImmutableSegmentImpl.java            |   2 +-
 .../immutable/ImmutableSegmentLoader.java          |   3 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |   6 +-
 .../PartitionIdVirtualColumnProvider.java          | 237 +++++++++++++++++++++
 .../virtualcolumn/VirtualColumnContext.java        |  14 ++
 .../VirtualColumnProviderFactory.java              |   5 +
 .../mutable/MutableSegmentImplRawMVTest.java       |   8 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   3 +-
 15 files changed, 333 insertions(+), 21 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 57fa9502e5a..840d398cf2d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -155,6 +155,9 @@ public interface TableCache extends PinotConfigProvider {
     if (!schema.hasColumn(BuiltInVirtualColumn.SEGMENTNAME)) {
       schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.SEGMENTNAME, 
FieldSpec.DataType.STRING, true));
     }
+    if (!schema.hasColumn(BuiltInVirtualColumn.PARTITIONID)) {
+      schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.PARTITIONID, 
FieldSpec.DataType.STRING, false));
+    }
   }
 
   static Map<Expression, Expression> createExpressionOverrideMap(String 
physicalOrLogicalTableName,
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
index ab715485d54..6ba23171769 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotQueryResourceStaticValidationTest.java
@@ -64,7 +64,7 @@ public class PinotQueryResourceStaticValidationTest {
     Assert.assertNotNull(provider.getTableConfig("testTable_OFFLINE"));
     Assert.assertNotNull(provider.getSchema("testTable"));
     Assert.assertNotNull(provider.getColumnNameMap("testTable"));
-    Assert.assertEquals(provider.getColumnNameMap("testTable").size(), 5); // 
2 columns + 3 built-in virtual columns
+    Assert.assertEquals(provider.getColumnNameMap("testTable").size(), 6); // 
2 columns + 4 built-in virtual columns
 
     
Assert.assertTrue(provider.getTableNameMap().containsKey("testTable_OFFLINE"));
     Assert.assertTrue(provider.getTableNameMap().containsKey("testTable"));
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index f35d97ccbec..6fcf472a69d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -303,6 +303,7 @@ public class TableCacheTest {
     expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId");
     expectedColumnMap.put(isCaseInsensitive ? "$hostname" : "$hostName", 
"$hostName");
     expectedColumnMap.put(isCaseInsensitive ? "$segmentname" : "$segmentName", 
"$segmentName");
+    expectedColumnMap.put(isCaseInsensitive ? "$partitionid" : "$partitionId", 
"$partitionId");
     return expectedColumnMap;
   }
 
@@ -310,7 +311,8 @@ public class TableCacheTest {
     return new 
Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("testColumn",
 DataType.INT)
         .addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT)
         .addSingleValueDimension(BuiltInVirtualColumn.HOSTNAME, 
DataType.STRING)
-        .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME, 
DataType.STRING).build();
+        .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME, 
DataType.STRING)
+        .addMultiValueDimension(BuiltInVirtualColumn.PARTITIONID, 
DataType.STRING).build();
   }
 
   @DataProvider(name = "testTableCacheDataProvider")
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index c6b4a40720c..b9c9072b821 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -445,11 +445,13 @@ public abstract class BaseClusterIntegrationTestSet 
extends BaseClusterIntegrati
     }
 
     // Check that the virtual columns work as expected (throws no exceptions)
-    getPinotConnection().execute("select $docId, $segmentName, $hostName from 
mytable");
-    getPinotConnection().execute("select $docId, $segmentName, $hostName from 
mytable where $docId < 5 limit 50");
-    getPinotConnection().execute("select $docId, $segmentName, $hostName from 
mytable where $docId = 5 limit 50");
-    getPinotConnection().execute("select $docId, $segmentName, $hostName from 
mytable where $docId > 19998 limit 50");
-    getPinotConnection().execute("select max($docId) from mytable group by 
$segmentName");
+    getPinotConnection().execute("select $docId, $segmentName, $hostName, 
$partitionId from mytable");
+    getPinotConnection().execute(
+        "select $docId, $segmentName, $hostName, $partitionId from mytable 
where $docId < 5 limit 50");
+    getPinotConnection().execute(
+        "select $docId, $segmentName, $hostName, $partitionId from mytable 
where $docId = 5 limit 50");
+    getPinotConnection().execute(
+        "select $docId, $segmentName, $hostName, $partitionId from mytable 
where $docId > 19998 limit 50");
   }
 
   /**
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index ae8506d0cdc..9e31c21292b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -372,7 +372,7 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet
     explainLogical(query,
         "Execution Plan\n"
             + "LogicalProject(EXPR$0=[1])\n"
-            + "  LogicalFilter(condition=[AND(IS NOT NULL($7), <>($7, 0))])\n"
+            + "  LogicalFilter(condition=[AND(IS NOT NULL($8), <>($8, 0))])\n"
             + "    PinotLogicalTableScan(table=[[default, mytable]])\n",
         
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, 
"false"));
   }
@@ -390,7 +390,7 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet
     explainLogical(query,
         "Execution Plan\n"
             + "LogicalProject(EXPR$0=[1])\n"
-            + "  LogicalFilter(condition=[<>($7, 0)])\n"
+            + "  LogicalFilter(condition=[<>($8, 0)])\n"
             + "    PinotLogicalTableScan(table=[[default, mytable]])\n",
         
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, 
"true"));
   }
@@ -408,7 +408,7 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet
     explainLogical(query,
         "Execution Plan\n"
             + "LogicalProject(EXPR$0=[1])\n"
-            + "  LogicalFilter(condition=[AND(IS NULL($7), <>($7, 0))])\n"
+            + "  LogicalFilter(condition=[AND(IS NULL($8), <>($8, 0))])\n"
             + "    PinotLogicalTableScan(table=[[default, mytable]])\n",
         
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, 
"false"));
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 83ea2146436..65e22627761 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3520,7 +3520,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         + "    LogicalProject(count=[$1], name=[$0])\n"
         + "      PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])\n"
         + "        PinotLogicalExchange(distribution=[hash[0]])\n"
-        + "          PinotLogicalAggregate(group=[{17}], agg#0=[COUNT()], 
aggType=[LEAF])\n"
+        + "          PinotLogicalAggregate(group=[{18}], agg#0=[COUNT()], 
aggType=[LEAF])\n"
         + "            PinotLogicalTableScan(table=[[default, mytable]])\n");
     assertEquals(response1Json.get("rows").get(0).get(2).asText(), "Rule 
Execution Times\n"
         + "Rule: SortRemove -> Time:*\n"
@@ -3798,26 +3798,26 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     JsonNode starColumnResponse =
         JsonUtils.stringToJsonNode(sendGetRequest(getControllerBaseApiUrl() + 
"/tables/mytable/metadata?columns=*"));
-    validateMetadataResponse(starColumnResponse, 82, 9);
+    validateMetadataResponse(starColumnResponse, 83, 10);
 
     JsonNode starEncodedColumnResponse =
         JsonUtils.stringToJsonNode(sendGetRequest(getControllerBaseApiUrl() + 
"/tables/mytable/metadata?columns=%2A"));
-    validateMetadataResponse(starEncodedColumnResponse, 82, 9);
+    validateMetadataResponse(starEncodedColumnResponse, 83, 10);
 
     JsonNode starWithExtraColumnResponse = 
JsonUtils.stringToJsonNode(sendGetRequest(
         getControllerBaseApiUrl() + "/tables/mytable/metadata?columns="
             + "CRSElapsedTime&columns=*&columns=OriginStateName"));
-    validateMetadataResponse(starWithExtraColumnResponse, 82, 9);
+    validateMetadataResponse(starWithExtraColumnResponse, 83, 10);
 
     JsonNode starWithExtraEncodedColumnResponse = 
JsonUtils.stringToJsonNode(sendGetRequest(
         getControllerBaseApiUrl() + "/tables/mytable/metadata?columns="
             + "CRSElapsedTime&columns=%2A&columns=OriginStateName"));
-    validateMetadataResponse(starWithExtraEncodedColumnResponse, 82, 9);
+    validateMetadataResponse(starWithExtraEncodedColumnResponse, 83, 10);
 
     JsonNode starWithExtraColumnWholeEncodedResponse = 
JsonUtils.stringToJsonNode(sendGetRequest(
         getControllerBaseApiUrl() + "/tables/mytable/metadata?columns="
             + "CRSElapsedTime%26columns%3D%2A%26columns%3DOriginStateName"));
-    validateMetadataResponse(starWithExtraColumnWholeEncodedResponse, 82, 9);
+    validateMetadataResponse(starWithExtraColumnWholeEncodedResponse, 83, 10);
   }
 
   private void validateMetadataResponse(JsonNode response, int numTotalColumn, 
int numMVColumn) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index 4b0c4e6c14a..d2ed66a7796 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
@@ -208,6 +209,42 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
   }
 
   @Test(dependsOnMethods = "testPartitionRouting")
+  public void testPartitionIdVirtualColumn()
+      throws Exception {
+    // Query to get partition ID information for all segments
+    String query = "SELECT $partitionId, $segmentName FROM mytable GROUP BY 
$segmentName, $partitionId";
+    JsonNode response = postQuery(query);
+
+    JsonNode resultTable = response.get("resultTable");
+    JsonNode rows = resultTable.get("rows");
+
+    assertTrue(rows.size() > 0, "Should have at least one segment result");
+
+    // Define expected partition ID values
+    Set<String> expectedPartitions = new HashSet<>();
+    expectedPartitions.add("DestState_0");
+    expectedPartitions.add("DestState_1");
+
+    // Validate that $partitionId virtual column returns expected results
+    for (int i = 0; i < rows.size(); i++) {
+      JsonNode row = rows.get(i);
+      String partitionIdResult = row.get(0).asText();
+      String segmentName = row.get(1).asText();
+
+      assertNotNull(partitionIdResult);
+      assertNotNull(segmentName);
+      assertTrue(LLCSegmentName.isLLCSegment(segmentName));
+
+      if (partitionIdResult.isBlank()) {
+        continue;
+      }
+      // Validate that partitionIdResult is one of the expected partition IDs
+      assertTrue(expectedPartitions.contains(partitionIdResult),
+          "Expected one of " + expectedPartitions + " but found: " + 
partitionIdResult);
+    }
+  }
+
+  @Test(dependsOnMethods = "testPartitionIdVirtualColumn")
   public void testNonPartitionedStream()
       throws Exception {
     // Push the second Avro file into Kafka without partitioning
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 6732d99ced2..1e39eee3fd6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -259,7 +259,7 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
     Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in 
schema: %s", column,
         schema.getSchemaName());
     return IndexSegmentUtils.createVirtualDataSource(
-        new VirtualColumnContext(fieldSpec, _segmentMetadata.getTotalDocs()));
+        new VirtualColumnContext(fieldSpec, _segmentMetadata.getTotalDocs(), 
_segmentMetadata));
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index 2172e5f014e..e32610ba61e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -215,7 +215,8 @@ public class ImmutableSegmentLoader {
     for (FieldSpec fieldSpec : segmentSchema.getAllFieldSpecs()) {
       if (fieldSpec.isVirtualColumn()) {
         String columnName = fieldSpec.getName();
-        VirtualColumnContext context = new VirtualColumnContext(fieldSpec, 
segmentMetadata.getTotalDocs());
+        VirtualColumnContext context =
+            new VirtualColumnContext(fieldSpec, 
segmentMetadata.getTotalDocs(), segmentMetadata);
         VirtualColumnProvider provider = 
VirtualColumnProviderFactory.buildProvider(context);
         indexContainerMap.put(columnName, 
provider.buildColumnIndexContainer(context));
         columnMetadataMap.put(columnName, provider.buildMetadata(context));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 2991efb8d1d..c13a17ec430 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -1159,7 +1159,8 @@ public class MutableSegmentImpl implements MutableSegment 
{
     FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
     if (fieldSpec != null && fieldSpec.isVirtualColumn()) {
       // Virtual column
-      VirtualColumnContext virtualColumnContext = new 
VirtualColumnContext(fieldSpec, _numDocsIndexed);
+      VirtualColumnContext virtualColumnContext =
+          new VirtualColumnContext(fieldSpec, _numDocsIndexed, 
_segmentMetadata);
       return 
VirtualColumnProviderFactory.buildProvider(virtualColumnContext).buildDataSource(virtualColumnContext);
     }
     return null;
@@ -1174,7 +1175,8 @@ public class MutableSegmentImpl implements MutableSegment 
{
     FieldSpec fieldSpec = schema.getFieldSpecFor(column);
     Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in 
schema: %s", column,
         schema.getSchemaName());
-    return IndexSegmentUtils.createVirtualDataSource(new 
VirtualColumnContext(fieldSpec, _numDocsIndexed));
+    return IndexSegmentUtils.createVirtualDataSource(
+        new VirtualColumnContext(fieldSpec, _numDocsIndexed, 
_segmentMetadata));
   }
 
   @Nullable
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/PartitionIdVirtualColumnProvider.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/PartitionIdVirtualColumnProvider.java
new file mode 100644
index 00000000000..f44b91a2704
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/PartitionIdVirtualColumnProvider.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.virtualcolumn;
+
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import 
org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.constant.ConstantMVInvertedIndexReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Virtual column provider that returns the partition information of the 
segment.
+ * Returns a multi-value string column with entries in the format 
"columnName_partitionId"
+ * for all partitioned columns in the segment. Returns empty array for 
segments without partition information.
+ */
+public class PartitionIdVirtualColumnProvider implements VirtualColumnProvider 
{
+
+  @Override
+  public ForwardIndexReader<?> buildForwardIndex(VirtualColumnContext context) 
{
+    List<String> partitionInfo = getPartitionInfo(context);
+    return new MultiValueConstantForwardIndexReader(partitionInfo.size());
+  }
+
+  @Override
+  public Dictionary buildDictionary(VirtualColumnContext context) {
+    List<String> partitionInfo = getPartitionInfo(context);
+    return new MultiValueConstantStringDictionary(partitionInfo);
+  }
+
+  @Override
+  public InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext 
context) {
+    return new ConstantMVInvertedIndexReader(context.getTotalDocCount());
+  }
+
+  @Override
+  public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) {
+    FieldSpec fieldSpec = context.getFieldSpec();
+    List<String> partitionInfo = getPartitionInfo(context);
+    int cardinality = partitionInfo.size();
+
+    ColumnMetadataImpl.Builder builder = new ColumnMetadataImpl.Builder()
+        .setFieldSpec(fieldSpec)
+        .setTotalDocs(context.getTotalDocCount())
+        .setCardinality(cardinality)
+        .setHasDictionary(true)
+        .setMaxNumberOfMultiValues(cardinality);
+
+    if (cardinality > 0) {
+      builder.setMinValue(partitionInfo.get(0))
+          .setMaxValue(partitionInfo.get(cardinality - 1));
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Extract partition information from segment metadata.
+   * Returns a list of strings in format "columnName_partitionId" for all 
partitioned columns.
+   */
+  private List<String> getPartitionInfo(VirtualColumnContext context) {
+    List<String> partitionInfo = new ArrayList<>();
+    SegmentMetadata segmentMetadata = context.getSegmentMetadata();
+
+    if (segmentMetadata != null && segmentMetadata.getColumnMetadataMap() != 
null) {
+      // Get partition info from all partitioned columns in the segment 
metadata
+      Map<String, ColumnMetadata> columnMetadataMap = 
segmentMetadata.getColumnMetadataMap();
+      for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
+        String columnName = entry.getKey();
+        ColumnMetadata columnMetadata = entry.getValue();
+        Set<Integer> partitions = columnMetadata.getPartitions();
+        if (partitions != null && !partitions.isEmpty()) {
+          // Add all partition IDs for this column
+          for (Integer partitionId : partitions) {
+            partitionInfo.add(columnName + "_" + partitionId);
+          }
+        }
+      }
+    }
+
+    // Ensure we always have at least one entry for multi-value columns
+    if (partitionInfo.isEmpty()) {
+      partitionInfo.add(""); // Empty string indicates no partition information
+    }
+
+    return partitionInfo;
+  }
+
+  /**
+   * Forward index reader for multi-value partition column.
+   * Returns all dictionary IDs (0, 1, 2, ..., n-1) for each document.
+   */
+  private static class MultiValueConstantForwardIndexReader implements 
ForwardIndexReader<ForwardIndexReaderContext> {
+    private final int _numValues;
+    private final int[] _dictIds;
+
+    public MultiValueConstantForwardIndexReader(int numValues) {
+      _numValues = Math.max(1, numValues);
+      _dictIds = new int[_numValues];
+      for (int i = 0; i < _numValues; i++) {
+        _dictIds[i] = i;
+      }
+    }
+
+    @Override
+    public boolean isDictionaryEncoded() {
+      return true;
+    }
+
+    @Override
+    public boolean isSingleValue() {
+      return false;
+    }
+
+    @Override
+    public DataType getStoredType() {
+      return DataType.INT;
+    }
+
+    @Override
+    public int getDictIdMV(int docId, int[] dictIdBuffer, 
ForwardIndexReaderContext context) {
+      for (int i = 0; i < _numValues; i++) {
+        dictIdBuffer[i] = i;
+      }
+      return _numValues;
+    }
+
+    @Override
+    public int[] getDictIdMV(int docId, ForwardIndexReaderContext context) {
+      return _dictIds;
+    }
+
+    @Override
+    public int getNumValuesMV(int docId, ForwardIndexReaderContext context) {
+      return _numValues;
+    }
+
+    @Override
+    public void close() {
+    }
+  }
+
+  /**
+   * Minimal dictionary that extends BaseImmutableDictionary for virtual 
columns.
+   * Follows the same pattern as StringDictionary but for in-memory virtual 
column values.
+   */
+  private static class MultiValueConstantStringDictionary extends 
BaseImmutableDictionary {
+    private final List<String> _values;
+    private final Object2IntOpenHashMap<String> _valueToIndexMap;
+
+    public MultiValueConstantStringDictionary(List<String> values) {
+      super(Math.max(1, values.size())); // Use virtual dictionary constructor
+      _values = new ArrayList<>(values);
+      if (_values.isEmpty()) {
+        _values.add(""); // Ensure at least one value
+      }
+
+      _valueToIndexMap = new Object2IntOpenHashMap<>(_values.size());
+      _valueToIndexMap.defaultReturnValue(-1);
+      for (int i = 0; i < _values.size(); i++) {
+        _valueToIndexMap.put(_values.get(i), i);
+      }
+    }
+
+    @Override
+    public DataType getValueType() {
+      return DataType.STRING;
+    }
+
+    @Override
+    public int insertionIndexOf(String stringValue) {
+      return _valueToIndexMap.getInt(stringValue);
+    }
+
+    @Override
+    public String get(int dictId) {
+      return _values.get(dictId);
+    }
+
+    @Override
+    public String getStringValue(int dictId) {
+      return _values.get(dictId);
+    }
+
+    @Override
+    public int getIntValue(int dictId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getLongValue(int dictId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public float getFloatValue(int dictId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double getDoubleValue(int dictId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public java.math.BigDecimal getBigDecimalValue(int dictId) {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnContext.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnContext.java
index c2616d27966..c26a40e7085 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnContext.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.segment.local.segment.virtualcolumn;
 
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -28,10 +30,17 @@ import org.apache.pinot.spi.data.FieldSpec;
 public class VirtualColumnContext {
   private final FieldSpec _fieldSpec;
   private final int _totalDocCount;
+  @Nullable
+  private final SegmentMetadata _segmentMetadata;
 
   public VirtualColumnContext(FieldSpec fieldSpec, int totalDocCount) {
+    this(fieldSpec, totalDocCount, null);
+  }
+
+  public VirtualColumnContext(FieldSpec fieldSpec, int totalDocCount, 
@Nullable SegmentMetadata segmentMetadata) {
     _fieldSpec = fieldSpec;
     _totalDocCount = totalDocCount;
+    _segmentMetadata = segmentMetadata;
   }
 
   public FieldSpec getFieldSpec() {
@@ -41,4 +50,9 @@ public class VirtualColumnContext {
   public int getTotalDocCount() {
     return _totalDocCount;
   }
+
+  @Nullable
+  public SegmentMetadata getSegmentMetadata() {
+    return _segmentMetadata;
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java
index e1605a048c4..2d0344a43d3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java
@@ -58,5 +58,10 @@ public class VirtualColumnProviderFactory {
       schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.SEGMENTNAME, 
FieldSpec.DataType.STRING, true,
           DefaultNullValueVirtualColumnProvider.class, segmentName));
     }
+
+    if (!schema.hasColumn(BuiltInVirtualColumn.PARTITIONID)) {
+      schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.PARTITIONID, 
FieldSpec.DataType.STRING, false,
+          PartitionIdVirtualColumnProvider.class));
+    }
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
index cf33bcf092d..448bfd242b9 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
@@ -50,6 +50,7 @@ import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
@@ -190,6 +191,13 @@ public class MutableSegmentImplRawMVTest implements 
PinotBuffersAfterClassCheckR
     for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
       if (!fieldSpec.isSingleValueField()) {
         String column = fieldSpec.getName();
+        // Skip $partitionId virtual column because this test is specifically 
for "raw MV" columns
+        // (MV columns with NO dictionary). $partitionId always has a 
dictionary
+        // (MultiValueConstantStringDictionary), so it doesn't fit the "raw 
MV" test pattern
+        // where getDictionary() is expected to return null.
+        if (BuiltInVirtualColumn.PARTITIONID.equals(column)) {
+          continue;
+        }
         DataSource actualDataSource = 
_mutableSegmentImpl.getDataSource(column);
         DataSource expectedDataSource = 
_immutableSegment.getDataSource(column);
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 19fcf02e8c0..9b5302cea6d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1751,7 +1751,8 @@ public class CommonConstants {
       public static final String DOCID = "$docId";
       public static final String HOSTNAME = "$hostName";
       public static final String SEGMENTNAME = "$segmentName";
-      public static final Set<String> BUILT_IN_VIRTUAL_COLUMNS = Set.of(DOCID, 
HOSTNAME, SEGMENTNAME);
+      public static final String PARTITIONID = "$partitionId";
+      public static final Set<String> BUILT_IN_VIRTUAL_COLUMNS = Set.of(DOCID, 
HOSTNAME, SEGMENTNAME, PARTITIONID);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to