anuragrai16 commented on code in PR #18643:
URL: https://github.com/apache/pinot/pull/18643#discussion_r3496578535


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java:
##########
@@ -98,13 +104,44 @@ public ImmutableSegmentImpl(
     _dataSources =
         new 
Object2ObjectOpenHashMap<>(segmentMetadata.getColumnMetadataMap().size());
 
+    Map<String, Map<String, DataSource>> openStructDenseChildren = new 
HashMap<>();
+    Map<String, DataSource> openStructSparseChildren = new HashMap<>();
+    Set<String> openStructParents = new HashSet<>();
+
     for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
       String colName = entry.getKey();
       ColumnMetadata columnMetadata = entry.getValue();
+
+      if (columnMetadata instanceof ColumnMetadataImpl && 
((ColumnMetadataImpl) columnMetadata).isMaterializedChild()) {
+        String parent = ((ColumnMetadataImpl) 
columnMetadata).getParentColumn();
+        openStructParents.add(parent);
+        DataSource childDs = new ImmutableDataSource(columnMetadata, 
_indexContainerMap.get(colName));
+        if (OpenStructNaming.isSparseColumn(colName)) {
+          openStructSparseChildren.put(parent, childDs);
+        } else {
+          openStructDenseChildren.computeIfAbsent(parent, k -> new HashMap<>())
+              .put(OpenStructNaming.parseKey(colName), childDs);
+        }
+        continue;
+      }
+
       if (columnMetadata.getFieldSpec().getDataType() == 
FieldSpec.DataType.MAP) {
-        _dataSources.put(colName, new ImmutableMapDataSource(entry.getValue(), 
_indexContainerMap.get(colName)));
+        _dataSources.put(colName, new ImmutableMapDataSource(columnMetadata, 
_indexContainerMap.get(colName)));
       } else {
-        _dataSources.put(colName, new ImmutableDataSource(entry.getValue(), 
_indexContainerMap.get(colName)));
+        _dataSources.put(colName, new ImmutableDataSource(columnMetadata, 
_indexContainerMap.get(colName)));
+      }
+    }
+
+    if (!openStructParents.isEmpty()) {
+      Schema schema = segmentMetadata.getSchema();
+      for (String parent : openStructParents) {
+        FieldSpec fieldSpec = schema != null ? schema.getFieldSpecFor(parent) 
: null;
+        if (!(fieldSpec instanceof ComplexFieldSpec)) {

Review Comment:
   Can you explain this in the comments ?  If the
     schema is null or the fieldSpec is the wrong type, seems like the parent 
column is silently skipped and no DataSource is created and no error  logged. 
The child columns were already skipped earlier. This means OPEN_STRUCT data 
exists on disk but is completely invisible to queries ?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndex.java:
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.openstruct;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.index.map.SimpleColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.index.reader.OpenStructIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Manages per-key mutable columns for an OPEN_STRUCT column during real-time 
consumption.
+/// Each discovered key gets its own {@link MutableKeyColumn} 
(dictionary-encoded forward index +
+/// presence bitmap). Dense/sparse classification is deferred to seal time.
+///
+/// Single-writer for [#index]: the consuming thread calls this method. 
Readers may
+/// concurrently read [#getKeys()] and [#getKeyColumns()] via the volatile map 
swap.
+@SuppressWarnings("rawtypes")
+public class MutableOpenStructIndex implements 
OpenStructIndexReader<ForwardIndexReaderContext>, MutableIndex {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MutableOpenStructIndex.class);
+
+  private final String _openStructColumn;
+  private final OpenStructIndexConfig _config;
+  private final Map<String, FieldSpec> _childFieldSpecs;
+  private final PinotDataBufferMemoryManager _memoryManager;
+  private final int _capacity;
+
+  // Volatile for lock-free reader access; writer always holds the 
consuming-thread lock.
+  private volatile Map<String, MutableKeyColumn> _keyColumns = new HashMap<>();

Review Comment:
   Should this be a concurrent hash map instead ? How do we enforce the single 
thread write - dont see any safety locks in this path. Its documented, but not 
enforced. 
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java:
##########
@@ -415,6 +421,43 @@ private <K, V> Map<K, V> unmodifiable(Map<K, V> map) {
     return map == null ? null : Collections.unmodifiableMap(map);
   }
 
+  public void addOpenStructChildConfigs(SegmentMetadataImpl segmentMetadata) {
+    if (_indexConfigsByColName == null || _dirty) {
+      refreshIndexConfigs();
+    }
+    for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
+      String childColumn = entry.getKey();
+      if (!childColumn.contains(OpenStructNaming.SEPARATOR) || 
_indexConfigsByColName.containsKey(childColumn)) {
+        continue;
+      }
+      if (OpenStructNaming.isSparseColumn(childColumn)) {
+        continue;
+      }
+      String parentColumn = OpenStructNaming.parseParentColumn(childColumn);
+      FieldIndexConfigs parentConfigs = 
_indexConfigsByColName.get(parentColumn);
+      if (parentConfigs == null) {
+        continue;
+      }
+      IndexConfig osConfig = 
parentConfigs.getConfig(StandardIndexes.openStruct());
+      if (!(osConfig instanceof OpenStructIndexConfig)) {
+        continue;
+      }
+      OpenStructIndexConfig openStructConfig = (OpenStructIndexConfig) 
osConfig;
+      String key = OpenStructNaming.parseKey(childColumn);
+      FieldConfig keyFieldConfig = openStructConfig.getValueFieldConfig(key);
+      if (keyFieldConfig == null) {
+        keyFieldConfig = openStructConfig.getDefaultValueFieldConfig();
+      }
+      FieldSpec childFieldSpec = entry.getValue().getFieldSpec();
+      boolean enableInverted = 
openStructConfig.shouldEnableInvertedIndexForKey(key);
+      FieldIndexConfigs childConfigs = new FieldIndexConfigs.Builder(
+          FieldIndexConfigsUtil.fromFieldConfig(keyFieldConfig, 
childFieldSpec))
+          .add(StandardIndexes.inverted(), enableInverted ? 
IndexConfig.ENABLED : IndexConfig.DISABLED)
+          .build();
+      _indexConfigsByColName.put(childColumn, childConfigs);

Review Comment:
   Should we directly update _indexConfigsByColName after 
refreshIndexConfigs(). A subsequent refreshIndexConfigs() call would rebuild 
the map and wipe
     these entries ? We should store overrides in a separate overlay map ?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/ImmutableOpenStructDataSource.java:
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.openstruct;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.segment.index.datasource.BaseDataSource;
+import 
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
+import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.datasource.OpenStructDataSource;
+import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+
+/// Per-key {@link DataSource} accessor for sealed (immutable) segments with 
an OPEN_STRUCT column.
+///
+/// Always columnar — there is no blob branch. Every key that was dense enough 
during segment
+/// creation gets its own materialized {@link DataSource} (forward index + 
optional inverted index /
+/// dictionary). Keys that did not meet the density threshold are stored in an 
optional sparse
+/// column; the sparse {@link DataSource} is returned for any unmaterialized 
key lookup.
+///
+/// Use [#isMaterialized(String)] and [#isFullyMaterialized()] together to 
choose
+/// the query execution path:
+/// - Materialized key → fast path via per-key DataSource (inverted/dictionary 
index available).
+/// - Not materialized + not fully materialized → fall back to the sparse 
DataSource.
+/// - Not materialized + fully materialized → key is definitively absent; 
short-circuit.
+///
+/// Thread-safety: immutable after construction; safe for concurrent reads.
+public class ImmutableOpenStructDataSource extends BaseDataSource implements 
OpenStructDataSource {
+  private final ComplexFieldSpec _fieldSpec;
+  private final Map<String, DataSource> _perKeyDataSources;
+  @Nullable
+  private final DataSource _sparseDataSource;
+
+  public ImmutableOpenStructDataSource(ComplexFieldSpec fieldSpec, Map<String, 
DataSource> perKeyDataSources,
+      @Nullable DataSource sparseDataSource, DataSourceMetadata 
dataSourceMetadata,
+      ColumnIndexContainer indexContainer) {
+    super(dataSourceMetadata, indexContainer);
+    _fieldSpec = fieldSpec;
+    _perKeyDataSources = perKeyDataSources;
+    _sparseDataSource = sparseDataSource;
+  }
+
+  /// Convenience constructor for segment-load time. Synthesizes a minimal 
{@link DataSourceMetadata}
+  /// for the parent OPEN_STRUCT column (which has no on-disk presence of its 
own) and uses an empty
+  /// {@link ColumnIndexContainer} — all real readers live on the per-key data 
sources.
+  public ImmutableOpenStructDataSource(ComplexFieldSpec fieldSpec, Map<String, 
DataSource> perKeyDataSources,
+      @Nullable DataSource sparseDataSource, int numDocs) {
+    this(fieldSpec, perKeyDataSources, sparseDataSource,
+        new ImmutableOpenStructDataSourceMetadata(fieldSpec, numDocs),
+        new ColumnIndexContainer.FromMap.Builder().build());
+  }
+
+  @Override
+  public ComplexFieldSpec getFieldSpec() {
+    return _fieldSpec;
+  }
+
+  @Override
+  @Nullable
+  public DataSource getDataSource(String key) {
+    DataSource ds = _perKeyDataSources.get(key);
+    return ds != null ? ds : _sparseDataSource;
+  }
+
+  @Override
+  public boolean isMaterialized(String key) {
+    return _perKeyDataSources.containsKey(key);
+  }
+
+  @Override
+  public boolean isFullyMaterialized() {
+    return _sparseDataSource == null;
+  }
+
+  @Override
+  public Map<String, DataSource> getDataSources() {
+    return _perKeyDataSources;
+  }
+
+  @Override
+  @Nullable
+  public DataSourceMetadata getDataSourceMetadata(String key) {
+    DataSource ds = _perKeyDataSources.get(key);
+    return ds != null ? ds.getDataSourceMetadata() : null;
+  }
+

Review Comment:
   Dont see `getMapValue(int docId)` implementation here,  but the 
`MutableOpenStructDataSource` has it. What's the implication ?
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructDataSource.java:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.openstruct;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.segment.index.datasource.BaseDataSource;
+import 
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.datasource.OpenStructDataSource;
+import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/// Per-key {@link DataSource} accessor for mutable (consuming) segments with 
an OPEN_STRUCT column.
+///
+/// Always columnar — no blob branch. Per-key DataSources are synthesized on 
demand from the
+/// underlying {@link MutableOpenStructIndex}; mutable mode always holds every 
observed key, so
+/// {@link #isFullyMaterialized()} is unconditionally {@code true}.
+public class MutableOpenStructDataSource extends BaseDataSource implements 
OpenStructDataSource {
+  private final ComplexFieldSpec _fieldSpec;
+  private final MutableOpenStructIndex _index;
+  private final int _numDocs;
+
+  public MutableOpenStructDataSource(ComplexFieldSpec fieldSpec, 
MutableOpenStructIndex index, int numDocs) {
+    super(new MutableOpenStructDataSourceMetadata(fieldSpec, numDocs),
+        new ColumnIndexContainer.FromMap.Builder().build());
+    _fieldSpec = fieldSpec;
+    _index = index;
+    _numDocs = numDocs;
+  }
+
+  @Override
+  public ComplexFieldSpec getFieldSpec() {
+    return _fieldSpec;
+  }
+
+  @Override
+  @Nullable
+  public DataSource getDataSource(String key) {
+    Map<IndexType, IndexReader> indexes = _index.getIndexes(key);
+    if (indexes == null || indexes.isEmpty()) {
+      return null;
+    }
+    ColumnMetadata metadata = _index.getColumnMetadata(key);
+    return new ImmutableDataSource(metadata,
+        new ColumnIndexContainer.FromMap.Builder().withAll(indexes).build());
+  }
+
+  @Override
+  public boolean isMaterialized(String key) {
+    return _index.getKeyColumn(key) != null;
+  }
+
+  /// Mutable mode always holds every observed key in-memory; the sparse tier 
exists only after seal.
+  @Override
+  public boolean isFullyMaterialized() {
+    return true;
+  }
+
+  @Override
+  public Map<String, DataSource> getDataSources() {
+    Map<String, DataSource> result = new HashMap<>();
+    for (String key : _index.getKeys()) {

Review Comment:
   This calls the MutableOpenStructIndex.getKeys() in a loop while the 
underlying is a volatile concurrent map. Lets take a snapshot of the map and 
then run the loop ? 
   
   ```
   Map<String, MutableKeyColumn> snapshot = _keyColumns;
     for (String key : snapshot.keySet()) { ... }
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndex.java:
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.openstruct;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.index.map.SimpleColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.index.reader.OpenStructIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Manages per-key mutable columns for an OPEN_STRUCT column during real-time 
consumption.
+/// Each discovered key gets its own {@link MutableKeyColumn} 
(dictionary-encoded forward index +
+/// presence bitmap). Dense/sparse classification is deferred to seal time.
+///
+/// Single-writer for [#index]: the consuming thread calls this method. 
Readers may
+/// concurrently read [#getKeys()] and [#getKeyColumns()] via the volatile map 
swap.
+@SuppressWarnings("rawtypes")
+public class MutableOpenStructIndex implements 
OpenStructIndexReader<ForwardIndexReaderContext>, MutableIndex {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MutableOpenStructIndex.class);
+
+  private final String _openStructColumn;
+  private final OpenStructIndexConfig _config;
+  private final Map<String, FieldSpec> _childFieldSpecs;
+  private final PinotDataBufferMemoryManager _memoryManager;
+  private final int _capacity;
+
+  // Volatile for lock-free reader access; writer always holds the 
consuming-thread lock.
+  private volatile Map<String, MutableKeyColumn> _keyColumns = new HashMap<>();
+
+  public MutableOpenStructIndex(String openStructColumn, ComplexFieldSpec 
fieldSpec,
+      OpenStructIndexConfig config, PinotDataBufferMemoryManager 
memoryManager, int capacity) {
+    _openStructColumn = openStructColumn;
+    _config = config;
+    _memoryManager = memoryManager;
+    _capacity = capacity;
+
+    Map<String, FieldSpec> childFieldSpecs = fieldSpec.getChildFieldSpecs();
+    _childFieldSpecs = childFieldSpecs != null ? new 
HashMap<>(childFieldSpecs) : new HashMap<>();
+  }
+
+  @Override
+  public void add(Object value, int dictId, int docId) {
+    index(docId, value);
+  }
+
+  @Override
+  public void add(Object[] values, @Nullable int[] dictIds, int docId) {
+    throw new UnsupportedOperationException("OPEN_STRUCT does not support 
multi-value indexing");
+  }
+
+  /// Indexes the OPEN_STRUCT value for the given document. `value` must be a
+  /// `Map<String, Object>` or `null`; null and non-Map values are silently 
skipped.
+  @SuppressWarnings("unchecked")
+  public void index(int docId, @Nullable Object value) {
+    if (!(value instanceof Map)) {
+      return;
+    }
+    Map<String, Object> map = (Map<String, Object>) value;
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+      String key = entry.getKey();
+      Object rawValue = entry.getValue();
+      if (rawValue == null) {
+        continue;
+      }
+
+      MutableKeyColumn keyCol = _keyColumns.get(key);
+      if (keyCol == null) {
+        // Mutable mode holds every observed key (see 
MutableOpenStructDataSource#isFullyMaterialized);
+        // dense/sparse classification (maxDenseKeys / denseKeys) is applied 
at seal time by the segment
+        // build, so no key is dropped during consumption.
+        // Resolve stored type and coerce BEFORE allocating a column so a 
first-row coercion failure
+        // does not allocate a column that was never usable.
+        DataType resolvedType = resolveStoredType(key, rawValue);
+        if (resolvedType == null) {
+          continue;
+        }
+        Object coerced = tryCoerce(key, rawValue, resolvedType);
+        if (coerced == null) {
+          continue;
+        }
+        keyCol = allocateKeyColumn(key, resolvedType);
+        keyCol.setValue(docId, coerced);
+        continue;
+      }
+
+      DataType storedType = keyCol.getStoredType();
+      Object coerced = tryCoerce(key, rawValue, storedType);
+      if (coerced == null) {
+        continue;
+      }
+      keyCol.setValue(docId, coerced);
+    }
+  }
+
+  /// Resolves the stored type for a key without allocating any state. Returns 
null when the type
+  /// cannot be inferred (caller should skip the entry).
+  @Nullable
+  private DataType resolveStoredType(String key, Object rawValue) {
+    FieldSpec spec = _childFieldSpecs.get(key);
+    DataType valueType;
+    if (spec != null) {
+      valueType = spec.getDataType();
+    } else {
+      valueType = OpenStructTypeInference.inferDataType(rawValue);
+      if (valueType == null) {
+        LOGGER.warn("OPEN_STRUCT '{}': could not infer DataType for key '{}' 
from value of class '{}'."
+                + " Dropping the entry.",
+            _openStructColumn, key, rawValue.getClass().getName());
+        return null;
+      }
+    }
+    return valueType.getStoredType();
+  }
+
+  /// Coerces rawValue to storedType. Returns null on failure (logged at 
WARN); the caller drops
+  /// the entry. Note: a successful coerce of a "null"-shaped raw value would 
also return null —
+  /// but callers gate on rawValue != null before reaching here.
+  @Nullable
+  private Object tryCoerce(String key, Object rawValue, DataType storedType) {
+    try {
+      PinotDataType sourceType = PinotDataType.getSingleValueType(rawValue);
+      PinotDataType destType = 
ColumnDataType.fromDataTypeSV(storedType).toPinotDataType();

Review Comment:
   `ColumnDataType.fromDataTypeSV(storedType).toPinotDataType()` - This must be 
re-resolving per doc per key? The stored type is fixed per key so can we cache 
the destination PinotDataType at first-encounter time ?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitter.java:
##########
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.openstruct;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.creator.impl.BaseSegmentCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StatsCollectorUtil;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import 
org.apache.pinot.segment.local.segment.index.openstruct.OpenStructSupportedIndexes;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import 
org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructNaming;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Splits an OPEN_STRUCT column into per-key materialized columns using 
standard Pinot index
+/// creators. Dense keys become independent virtual columns; remaining keys go 
into a single
+/// synthetic JSON column for sparse storage.
+///
+/// Lifecycle: instantiated by `BaseSegmentCreator` for OPEN_STRUCT columns. 
Receives
+/// per-doc `Map<String, Object>` values via [#add(Map, int)], accumulates in 
memory,
+/// then on [#seal()] writes per-key column files using standard creators.
+public class OpenStructColumnSplitter implements 
ColumnarOpenStructIndexCreator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OpenStructColumnSplitter.class);
+
+  private final File _indexDir;
+  private final String _columnName;
+  private final Map<String, FieldSpec> _childFieldSpecs;
+  private final OpenStructIndexConfig _config;
+  private final int _maxDenseKeys;
+
+  // Per-key accumulation
+  private final Map<String, RoaringBitmap> _presenceBitmaps = new HashMap<>();
+  private final Map<String, List<Object>> _values = new HashMap<>();
+  private final Map<String, DataType> _inferredTypes = new HashMap<>();
+  private int _numDocs;
+
+  // Resolved at seal time
+  @Nullable
+  private Set<String> _resolvedDenseKeys;
+  private final Map<String, PropertiesConfiguration> 
_materializedColumnMetadata = new LinkedHashMap<>();
+
+  public OpenStructColumnSplitter(File indexDir, String columnName, FieldSpec 
fieldSpec,
+      OpenStructIndexConfig config) {
+    _indexDir = indexDir;
+    _columnName = columnName;
+    _config = config;
+    _maxDenseKeys = config.getMaxDenseKeys();
+
+    Map<String, FieldSpec> childFieldSpecs = null;
+    if (fieldSpec instanceof ComplexFieldSpec) {
+      ComplexFieldSpec complexSpec = (ComplexFieldSpec) fieldSpec;
+      childFieldSpecs = complexSpec.getChildFieldSpecs();
+    }
+    _childFieldSpecs = childFieldSpecs != null ? new 
HashMap<>(childFieldSpecs) : new HashMap<>();
+  }
+
+  @Override
+  public void add(Object value, int docId)
+      throws IOException {
+    if (value instanceof Map) {
+      @SuppressWarnings("unchecked")
+      Map<String, Object> map = (Map<String, Object>) value;
+      addMap(map);
+    } else {
+      addMap(null);
+    }
+  }
+
+  @Override
+  public void add(Map<String, Object> openStructValue, int docId)
+      throws IOException {
+    addMap(openStructValue);
+  }
+
+  @Override
+  public void add(Object[] values, @Nullable int[] dictIds)
+      throws IOException {
+    throw new UnsupportedOperationException("OPEN_STRUCT index is single-value 
only");
+  }
+
+  /// Returns the resolved dense-key set after [#seal()] or [#classify()].
+  /// Returns an empty set before resolution.
+  public Set<String> getResolvedDenseKeys() {
+    return _resolvedDenseKeys != null ? 
Collections.unmodifiableSet(_resolvedDenseKeys) : Set.of();
+  }
+
+  /// Resolves dense vs sparse keys without writing any files. Exposed for 
testing and for callers
+  /// that need the classification independent of file output. [#seal()] calls 
this internally.
+  public Set<String> classify() {
+    if (_resolvedDenseKeys != null) {
+      return _resolvedDenseKeys;
+    }
+    if (_numDocs == 0 || _presenceBitmaps.isEmpty()) {
+      _resolvedDenseKeys = new LinkedHashSet<>();
+      return _resolvedDenseKeys;
+    }
+    List<String> allKeys = new ArrayList<>(_presenceBitmaps.keySet());
+    allKeys.sort((a, b) -> {
+      double fillA = (double) _presenceBitmaps.get(a).getCardinality() / 
_numDocs;
+      double fillB = (double) _presenceBitmaps.get(b).getCardinality() / 
_numDocs;
+      int cmp = Double.compare(fillB, fillA);
+      return cmp != 0 ? cmp : a.compareTo(b);
+    });
+
+    double minFillRate = _config.getDenseKeyMinFillRate();
+    _resolvedDenseKeys = new LinkedHashSet<>();
+
+    Set<String> configuredDenseKeys = _config.getDenseKeys();
+    for (String key : configuredDenseKeys) {
+      if (_presenceBitmaps.containsKey(key) && (_maxDenseKeys < 0 || 
_resolvedDenseKeys.size() < _maxDenseKeys)) {
+        _resolvedDenseKeys.add(key);
+      }
+    }
+
+    for (String key : allKeys) {
+      if (_resolvedDenseKeys.contains(key)) {
+        continue;
+      }
+      double fillRate = (double) _presenceBitmaps.get(key).getCardinality() / 
_numDocs;
+      if ((_maxDenseKeys < 0 || _resolvedDenseKeys.size() < _maxDenseKeys) && 
fillRate >= minFillRate) {
+        _resolvedDenseKeys.add(key);
+      }
+    }
+    return _resolvedDenseKeys;
+  }
+
+  private void addMap(@Nullable Map<String, Object> map) {
+    if (map != null && !map.isEmpty()) {
+      for (Map.Entry<String, Object> entry : map.entrySet()) {
+        String key = entry.getKey();
+        Object rawValue = entry.getValue();
+        if (rawValue == null) {
+          continue;
+        }
+        FieldSpec keySpec = _childFieldSpecs.get(key);
+        DataType valueType = keySpec != null
+            ? keySpec.getDataType()
+            : _inferredTypes.computeIfAbsent(key, k -> {
+              DataType inferred = 
OpenStructTypeInference.inferDataType(rawValue);
+              return inferred != null ? inferred : DataType.STRING;
+            });
+        if (!_presenceBitmaps.containsKey(key)) {
+          _presenceBitmaps.put(key, new RoaringBitmap());
+          _values.put(key, new ArrayList<>());
+        }
+        _presenceBitmaps.get(key).add(_numDocs);
+        Object coerced;
+        try {
+          PinotDataType sourceType = 
PinotDataType.getSingleValueType(rawValue);
+          PinotDataType destType = 
ColumnDataType.fromDataTypeSV(valueType.getStoredType()).toPinotDataType();
+          coerced = destType.convert(rawValue, sourceType);
+        } catch (Exception e) {
+          LOGGER.warn("OPEN_STRUCT '{}': coercion failed for key '{}' value 
'{}' to {}. Skipping.",
+              _columnName, key, rawValue, valueType, e);
+          _presenceBitmaps.get(key).remove(_numDocs);
+          continue;
+        }
+        _values.get(key).add(coerced);
+      }
+    }
+    _numDocs++;
+  }
+
+  @Override
+  public void seal()
+      throws IOException {
+    classify();
+    if (_resolvedDenseKeys == null || (_numDocs == 0 && 
_presenceBitmaps.isEmpty())) {
+      return;
+    }
+
+    for (String key : _resolvedDenseKeys) {
+      writeDenseKeyColumn(key);
+    }
+
+    List<String> sparseKeys = new ArrayList<>();
+    for (String key : _presenceBitmaps.keySet()) {
+      if (!_resolvedDenseKeys.contains(key)) {
+        sparseKeys.add(key);
+      }
+    }
+    if (!sparseKeys.isEmpty()) {
+      writeSparseJsonColumn(sparseKeys);
+    }
+
+    emitParentColumnMetadata(!sparseKeys.isEmpty());
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // Nothing to close — sub-creators are created and closed within seal()
+  }
+
+  @Override
+  public Map<String, PropertiesConfiguration> getMaterializedColumnMetadata() {
+    return _materializedColumnMetadata;
+  }
+
+  private void writeDenseKeyColumn(String key)
+      throws IOException {
+    String materializedCol = 
OpenStructNaming.materializedColumnName(_columnName, key);
+    FieldSpec keySpec = _childFieldSpecs.get(key);
+    DataType valueType = keySpec != null
+        ? keySpec.getDataType()
+        : _inferredTypes.getOrDefault(key, DataType.STRING);
+    DataType storedType = valueType.getStoredType();
+    RoaringBitmap presence = _presenceBitmaps.get(key);
+    List<Object> values = _values.get(key);
+
+    // Synthetic field spec for the materialized child. Its natural Pinot 
dimension null value is the value
+    // stored for absent docs, so column metadata stays consistent with 
on-disk content.
+    DimensionFieldSpec childFieldSpec = new 
DimensionFieldSpec(materializedCol, storedType, true);
+    Object defaultValue = childFieldSpec.getDefaultNullValue();
+
+    // Collect statistics the standard way: present docs contribute their 
value, absent docs the default
+    // (absent docs are also marked in the null vector below).
+    AbstractColumnStatisticsCollector statsCollector =
+        StatsCollectorUtil.createStatsCollector(childFieldSpec, null);
+    int statsOrdinal = 0;
+    for (int docId = 0; docId < _numDocs; docId++) {
+      statsCollector.collect(presence.contains(docId) ? 
values.get(statsOrdinal++) : defaultValue);
+    }
+    statsCollector.seal();
+
+    // Build per-key index configuration from the key's FieldConfig (falling 
back to the default), then apply
+    // the OPEN_STRUCT inverted-on default. No TableConfig/Schema required.
+    FieldConfig keyFieldConfig = _config.getValueFieldConfig(key);
+    if (keyFieldConfig == null) {
+      keyFieldConfig = _config.getDefaultValueFieldConfig();
+    }
+    boolean enableInverted = _config.shouldEnableInvertedIndexForKey(key);
+    FieldIndexConfigs configsForDecision = new FieldIndexConfigs.Builder(
+        FieldIndexConfigsUtil.fromFieldConfig(keyFieldConfig, childFieldSpec))
+        .add(StandardIndexes.inverted(), enableInverted ? IndexConfig.ENABLED 
: IndexConfig.DISABLED)
+        .build();
+
+    boolean useDictionary = resolveUseDictionary(childFieldSpec, 
configsForDecision, statsCollector);
+
+    // Reconcile dictionary + forward encoding with the final decision 
(mirrors BaseSegmentCreator.adaptConfig);
+    // ForwardIndexCreatorFactory selects dict-vs-raw from the forward 
config's EncodingType. A compression codec
+    // applies only to the raw forward format (LZ4 preserves the dense child's 
current on-disk layout); attaching
+    // one to a dictionary-encoded forward is rejected by 
ForwardIndexType.validate.
+    ForwardIndexConfig.Builder forwardBuilder = new ForwardIndexConfig.Builder(
+        useDictionary ? FieldConfig.EncodingType.DICTIONARY : 
FieldConfig.EncodingType.RAW);
+    if (!useDictionary) {
+      forwardBuilder.withCompressionCodec(FieldConfig.CompressionCodec.LZ4);
+    }
+    FieldIndexConfigs fieldIndexConfigs = new 
FieldIndexConfigs.Builder(configsForDecision)
+        .add(StandardIndexes.dictionary(),
+            useDictionary ? DictionaryIndexConfig.DEFAULT : 
DictionaryIndexConfig.DISABLED)
+        .add(StandardIndexes.forward(), forwardBuilder.build())
+        .build();
+
+    int dictElementSize = writeColumnIndexes(materializedCol, storedType, 
presence, values,
+        defaultValue, statsCollector, useDictionary, fieldIndexConfigs, 
childFieldSpec);
+
+    NullValueVectorCreator nullCreator = new NullValueVectorCreator(_indexDir, 
materializedCol);
+    try {
+      for (int docId = 0; docId < _numDocs; docId++) {
+        if (!presence.contains(docId)) {
+          nullCreator.setNull(docId);
+        }
+      }
+      nullCreator.seal();
+    } finally {
+      nullCreator.close();
+    }
+
+    PropertiesConfiguration props = new PropertiesConfiguration();
+    FieldConfig.EncodingType encoding =
+        useDictionary ? FieldConfig.EncodingType.DICTIONARY : 
FieldConfig.EncodingType.RAW;
+    BaseSegmentCreator.addColumnMetadataInfo(props, materializedCol, 
statsCollector, _numDocs, childFieldSpec,
+        useDictionary, dictElementSize, encoding, false);
+    // OPEN_STRUCT-specific keys not written by addColumnMetadataInfo.
+    props.setProperty(
+        V1Constants.MetadataKeys.Column.getKeyFor(materializedCol, 
V1Constants.MetadataKeys.Column.PARENT_COLUMN),
+        _columnName);
+    
props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol, 
"hasNullValue"), true);
+    if (enableInverted && useDictionary) {
+      
props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol, 
"hasInvertedIndex"), true);
+    }
+    _materializedColumnMetadata.put(materializedCol, props);
+  }
+
+  /// Decides dictionary vs raw encoding for a materialized child column, 
mirroring the three steps of
+  /// `BaseSegmentCreator.createDictionaryForColumn` with standard default 
flags (optimizeDictionary
+  /// off => dictionary unless explicitly disabled and not required by an 
enabled index).
+  private boolean resolveUseDictionary(FieldSpec childFieldSpec, 
FieldIndexConfigs fieldIndexConfigs,
+      AbstractColumnStatisticsCollector statsCollector) {
+    if (DictionaryIndexConfig.requiresDictionary(childFieldSpec, 
fieldIndexConfigs)) {
+      return true;
+    }
+    if 
(fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isDisabled()) {
+      return false;
+    }
+    return DictionaryIndexType.ignoreDictionaryOverride(false, false,
+        IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD, null, 
childFieldSpec, fieldIndexConfigs,
+        statsCollector.getCardinality(), 
statsCollector.getTotalNumberOfEntries());
+  }
+
+  /// Writes the dictionary (when used) plus all vetted, enabled indexes for a 
materialized child column through
+  /// the standard index-creator family, driven from a single per-doc loop. 
Returns the dictionary element size in
+  /// bytes (0 when raw-encoded), for column metadata. The dictionary is built 
separately because its build
+  /// lifecycle is CUSTOM and it supplies the dictIds the per-row creators 
consume.
+  private int writeColumnIndexes(String materializedCol, DataType storedType, 
RoaringBitmap presence,
+      List<Object> values, Object defaultValue, 
AbstractColumnStatisticsCollector statsCollector,
+      boolean useDictionary, FieldIndexConfigs fieldIndexConfigs, FieldSpec 
childFieldSpec)
+      throws IOException {
+    int dictElementSize = 0;
+    SegmentDictionaryCreator dictCreator = null;
+    try {
+      if (useDictionary) {
+        dictCreator = new SegmentDictionaryCreator(materializedCol, storedType,
+            new File(_indexDir, materializedCol + 
V1Constants.Dict.FILE_EXTENSION), true);
+        dictCreator.build(statsCollector.getUniqueValuesSet());
+      }
+
+      // Index-creation context built from the sealed collector (a 
ColumnShape) — no TableConfig required.
+      IndexCreationContext context =
+          new IndexCreationContext.Builder(_indexDir, null, statsCollector, 
useDictionary, false)
+              .withOnHeap(false).build();
+
+      List<IndexCreator> creators = new ArrayList<>();
+      try {
+        for (IndexType<?, ?, ?> indexType : 
IndexService.getInstance().getAllIndexes()) {
+          if (indexType.getIndexBuildLifecycle() != 
IndexType.BuildLifecycle.DURING_SEGMENT_CREATION) {
+            continue;   // excludes dictionary (lifecycle CUSTOM), built 
separately above
+          }
+          if 
(!OpenStructSupportedIndexes.ALLOWED_PRETTY_NAMES.contains(indexType.getPrettyName()))
 {
+            continue;   // non-vetted indexes already rejected at table-config 
validation; defensive backstop
+          }
+          IndexCreator creator = createColumnIndexCreator(indexType, context, 
fieldIndexConfigs, materializedCol,
+              childFieldSpec);
+          if (creator != null) {
+            creators.add(creator);
+          }
+        }
+
+        int ordinal = 0;
+        for (int docId = 0; docId < _numDocs; docId++) {
+          Object value = presence.contains(docId) ? values.get(ordinal++) : 
defaultValue;
+          int dictId = useDictionary ? dictCreator.indexOfSV(value) : -1;
+          for (IndexCreator creator : creators) {
+            creator.add(value, dictId);
+          }
+        }
+        for (IndexCreator creator : creators) {
+          creator.seal();
+        }
+      } finally {
+        for (IndexCreator creator : creators) {
+          creator.close();
+        }
+        if (dictCreator != null) {
+          dictElementSize = dictCreator.getNumBytesPerEntry();
+          dictCreator.seal();
+        }
+      }
+    } finally {
+      if (dictCreator != null) {
+        dictCreator.close();
+      }
+    }
+    return dictElementSize;
+  }
+
+  @Nullable
+  private static <C extends IndexConfig> IndexCreator 
createColumnIndexCreator(IndexType<C, ?, ?> indexType,
+      IndexCreationContext context, FieldIndexConfigs fieldIndexConfigs, 
String materializedCol,
+      FieldSpec childFieldSpec)
+      throws IOException {
+    // Materialized child columns exist in no schema/TableConfig, so the 
standard table-config-time validation
+    // never sees them. Run the index type's own guards here against the 
resolved child FieldSpec (e.g. range
+    // rejects a non-numeric column without a dictionary) so misconfigurations 
fail with the canonical message
+    // instead of crashing opaquely inside the creator. validate() internally 
no-ops when the index is disabled.
+    indexType.validate(fieldIndexConfigs, childFieldSpec, null);
+    C config = fieldIndexConfigs.getConfig(indexType);
+    if (!config.isEnabled() || !indexType.shouldCreateIndex(context, config)) {
+      return null;
+    }
+    try {
+      return indexType.createIndexCreator(context, config);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Failed to create " + indexType.getPrettyName() + 
" creator for: " + materializedCol, e);
+    }
+  }
+
+  private void writeSparseJsonColumn(List<String> sparseKeys)
+      throws IOException {
+    String sparseCol = OpenStructNaming.sparseColumnName(_columnName);
+    int maxLen = 1;
+    String[] jsonPerDoc = new String[_numDocs];
+    int nonNullCount = 0;
+    for (int docId = 0; docId < _numDocs; docId++) {
+      Map<String, Object> sparseEntries = new LinkedHashMap<>();
+      for (String key : sparseKeys) {
+        RoaringBitmap presence = _presenceBitmaps.get(key);
+        if (presence != null && presence.contains(docId)) {
+          int ordinal = presence.rank(docId) - 1;
+          sparseEntries.put(key, _values.get(key).get(ordinal));
+        }
+      }
+      if (!sparseEntries.isEmpty()) {
+        try {
+          String json = JsonUtils.objectToString(sparseEntries);
+          jsonPerDoc[docId] = json;
+          maxLen = Math.max(maxLen, 
json.getBytes(StandardCharsets.UTF_8).length);

Review Comment:
   This is not needed per doc ? `json.getBytes(UTF_8).length` allocates a fresh 
byte[] solely to measure UTF-8 byte length, then discards it, waste of memory 
just to be cleaned up by GC ?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitter.java:
##########
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.openstruct;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.creator.impl.BaseSegmentCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StatsCollectorUtil;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import 
org.apache.pinot.segment.local.segment.index.openstruct.OpenStructSupportedIndexes;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import 
org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructNaming;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Splits an OPEN_STRUCT column into per-key materialized columns using 
standard Pinot index
+/// creators. Dense keys become independent virtual columns; remaining keys go 
into a single
+/// synthetic JSON column for sparse storage.
+///
+/// Lifecycle: instantiated by `BaseSegmentCreator` for OPEN_STRUCT columns. 
Receives
+/// per-doc `Map<String, Object>` values via [#add(Map, int)], accumulates in 
memory,
+/// then on [#seal()] writes per-key column files using standard creators.
+public class OpenStructColumnSplitter implements 
ColumnarOpenStructIndexCreator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OpenStructColumnSplitter.class);
+
+  private final File _indexDir;
+  private final String _columnName;
+  private final Map<String, FieldSpec> _childFieldSpecs;
+  private final OpenStructIndexConfig _config;
+  private final int _maxDenseKeys;
+
+  // Per-key accumulation
+  private final Map<String, RoaringBitmap> _presenceBitmaps = new HashMap<>();
+  private final Map<String, List<Object>> _values = new HashMap<>();
+  private final Map<String, DataType> _inferredTypes = new HashMap<>();
+  private int _numDocs;
+
+  // Resolved at seal time
+  @Nullable
+  private Set<String> _resolvedDenseKeys;
+  private final Map<String, PropertiesConfiguration> 
_materializedColumnMetadata = new LinkedHashMap<>();
+
+  public OpenStructColumnSplitter(File indexDir, String columnName, FieldSpec 
fieldSpec,
+      OpenStructIndexConfig config) {
+    _indexDir = indexDir;
+    _columnName = columnName;
+    _config = config;
+    _maxDenseKeys = config.getMaxDenseKeys();
+
+    Map<String, FieldSpec> childFieldSpecs = null;
+    if (fieldSpec instanceof ComplexFieldSpec) {
+      ComplexFieldSpec complexSpec = (ComplexFieldSpec) fieldSpec;
+      childFieldSpecs = complexSpec.getChildFieldSpecs();
+    }
+    _childFieldSpecs = childFieldSpecs != null ? new 
HashMap<>(childFieldSpecs) : new HashMap<>();
+  }
+
+  @Override
+  public void add(Object value, int docId)
+      throws IOException {
+    if (value instanceof Map) {
+      @SuppressWarnings("unchecked")
+      Map<String, Object> map = (Map<String, Object>) value;
+      addMap(map);
+    } else {
+      addMap(null);
+    }
+  }
+
+  @Override
+  public void add(Map<String, Object> openStructValue, int docId)
+      throws IOException {
+    addMap(openStructValue);
+  }
+
+  @Override
+  public void add(Object[] values, @Nullable int[] dictIds)
+      throws IOException {
+    throw new UnsupportedOperationException("OPEN_STRUCT index is single-value 
only");
+  }
+
+  /// Returns the resolved dense-key set after [#seal()] or [#classify()].
+  /// Returns an empty set before resolution.
+  public Set<String> getResolvedDenseKeys() {
+    return _resolvedDenseKeys != null ? 
Collections.unmodifiableSet(_resolvedDenseKeys) : Set.of();
+  }
+
+  /// Resolves dense vs sparse keys without writing any files. Exposed for 
testing and for callers
+  /// that need the classification independent of file output. [#seal()] calls 
this internally.
+  public Set<String> classify() {
+    if (_resolvedDenseKeys != null) {
+      return _resolvedDenseKeys;
+    }
+    if (_numDocs == 0 || _presenceBitmaps.isEmpty()) {
+      _resolvedDenseKeys = new LinkedHashSet<>();
+      return _resolvedDenseKeys;
+    }
+    List<String> allKeys = new ArrayList<>(_presenceBitmaps.keySet());
+    allKeys.sort((a, b) -> {
+      double fillA = (double) _presenceBitmaps.get(a).getCardinality() / 
_numDocs;
+      double fillB = (double) _presenceBitmaps.get(b).getCardinality() / 
_numDocs;
+      int cmp = Double.compare(fillB, fillA);
+      return cmp != 0 ? cmp : a.compareTo(b);
+    });
+
+    double minFillRate = _config.getDenseKeyMinFillRate();
+    _resolvedDenseKeys = new LinkedHashSet<>();
+
+    Set<String> configuredDenseKeys = _config.getDenseKeys();
+    for (String key : configuredDenseKeys) {
+      if (_presenceBitmaps.containsKey(key) && (_maxDenseKeys < 0 || 
_resolvedDenseKeys.size() < _maxDenseKeys)) {
+        _resolvedDenseKeys.add(key);
+      }
+    }
+
+    for (String key : allKeys) {
+      if (_resolvedDenseKeys.contains(key)) {
+        continue;
+      }
+      double fillRate = (double) _presenceBitmaps.get(key).getCardinality() / 
_numDocs;
+      if ((_maxDenseKeys < 0 || _resolvedDenseKeys.size() < _maxDenseKeys) && 
fillRate >= minFillRate) {
+        _resolvedDenseKeys.add(key);
+      }
+    }
+    return _resolvedDenseKeys;
+  }
+
+  private void addMap(@Nullable Map<String, Object> map) {
+    if (map != null && !map.isEmpty()) {
+      for (Map.Entry<String, Object> entry : map.entrySet()) {
+        String key = entry.getKey();
+        Object rawValue = entry.getValue();
+        if (rawValue == null) {
+          continue;
+        }
+        FieldSpec keySpec = _childFieldSpecs.get(key);
+        DataType valueType = keySpec != null
+            ? keySpec.getDataType()
+            : _inferredTypes.computeIfAbsent(key, k -> {
+              DataType inferred = 
OpenStructTypeInference.inferDataType(rawValue);
+              return inferred != null ? inferred : DataType.STRING;
+            });
+        if (!_presenceBitmaps.containsKey(key)) {
+          _presenceBitmaps.put(key, new RoaringBitmap());
+          _values.put(key, new ArrayList<>());
+        }
+        _presenceBitmaps.get(key).add(_numDocs);
+        Object coerced;
+        try {
+          PinotDataType sourceType = 
PinotDataType.getSingleValueType(rawValue);
+          PinotDataType destType = 
ColumnDataType.fromDataTypeSV(valueType.getStoredType()).toPinotDataType();
+          coerced = destType.convert(rawValue, sourceType);
+        } catch (Exception e) {
+          LOGGER.warn("OPEN_STRUCT '{}': coercion failed for key '{}' value 
'{}' to {}. Skipping.",
+              _columnName, key, rawValue, valueType, e);
+          _presenceBitmaps.get(key).remove(_numDocs);
+          continue;
+        }
+        _values.get(key).add(coerced);
+      }
+    }
+    _numDocs++;
+  }
+
+  @Override
+  public void seal()
+      throws IOException {
+    classify();
+    if (_resolvedDenseKeys == null || (_numDocs == 0 && 
_presenceBitmaps.isEmpty())) {
+      return;
+    }
+
+    for (String key : _resolvedDenseKeys) {
+      writeDenseKeyColumn(key);
+    }
+
+    List<String> sparseKeys = new ArrayList<>();
+    for (String key : _presenceBitmaps.keySet()) {
+      if (!_resolvedDenseKeys.contains(key)) {
+        sparseKeys.add(key);
+      }
+    }
+    if (!sparseKeys.isEmpty()) {
+      writeSparseJsonColumn(sparseKeys);
+    }
+
+    emitParentColumnMetadata(!sparseKeys.isEmpty());
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // Nothing to close — sub-creators are created and closed within seal()
+  }
+
+  @Override
+  public Map<String, PropertiesConfiguration> getMaterializedColumnMetadata() {
+    return _materializedColumnMetadata;
+  }
+
+  private void writeDenseKeyColumn(String key)
+      throws IOException {
+    String materializedCol = 
OpenStructNaming.materializedColumnName(_columnName, key);
+    FieldSpec keySpec = _childFieldSpecs.get(key);
+    DataType valueType = keySpec != null
+        ? keySpec.getDataType()
+        : _inferredTypes.getOrDefault(key, DataType.STRING);
+    DataType storedType = valueType.getStoredType();
+    RoaringBitmap presence = _presenceBitmaps.get(key);
+    List<Object> values = _values.get(key);
+
+    // Synthetic field spec for the materialized child. Its natural Pinot 
dimension null value is the value
+    // stored for absent docs, so column metadata stays consistent with 
on-disk content.
+    DimensionFieldSpec childFieldSpec = new 
DimensionFieldSpec(materializedCol, storedType, true);
+    Object defaultValue = childFieldSpec.getDefaultNullValue();
+
+    // Collect statistics the standard way: present docs contribute their 
value, absent docs the default
+    // (absent docs are also marked in the null vector below).
+    AbstractColumnStatisticsCollector statsCollector =
+        StatsCollectorUtil.createStatsCollector(childFieldSpec, null);
+    int statsOrdinal = 0;
+    for (int docId = 0; docId < _numDocs; docId++) {
+      statsCollector.collect(presence.contains(docId) ? 
values.get(statsOrdinal++) : defaultValue);
+    }
+    statsCollector.seal();
+
+    // Build per-key index configuration from the key's FieldConfig (falling 
back to the default), then apply
+    // the OPEN_STRUCT inverted-on default. No TableConfig/Schema required.
+    FieldConfig keyFieldConfig = _config.getValueFieldConfig(key);
+    if (keyFieldConfig == null) {
+      keyFieldConfig = _config.getDefaultValueFieldConfig();
+    }
+    boolean enableInverted = _config.shouldEnableInvertedIndexForKey(key);
+    FieldIndexConfigs configsForDecision = new FieldIndexConfigs.Builder(
+        FieldIndexConfigsUtil.fromFieldConfig(keyFieldConfig, childFieldSpec))
+        .add(StandardIndexes.inverted(), enableInverted ? IndexConfig.ENABLED 
: IndexConfig.DISABLED)
+        .build();
+
+    boolean useDictionary = resolveUseDictionary(childFieldSpec, 
configsForDecision, statsCollector);
+
+    // Reconcile dictionary + forward encoding with the final decision 
(mirrors BaseSegmentCreator.adaptConfig);
+    // ForwardIndexCreatorFactory selects dict-vs-raw from the forward 
config's EncodingType. A compression codec
+    // applies only to the raw forward format (LZ4 preserves the dense child's 
current on-disk layout); attaching
+    // one to a dictionary-encoded forward is rejected by 
ForwardIndexType.validate.
+    ForwardIndexConfig.Builder forwardBuilder = new ForwardIndexConfig.Builder(
+        useDictionary ? FieldConfig.EncodingType.DICTIONARY : 
FieldConfig.EncodingType.RAW);
+    if (!useDictionary) {
+      forwardBuilder.withCompressionCodec(FieldConfig.CompressionCodec.LZ4);
+    }
+    FieldIndexConfigs fieldIndexConfigs = new 
FieldIndexConfigs.Builder(configsForDecision)
+        .add(StandardIndexes.dictionary(),
+            useDictionary ? DictionaryIndexConfig.DEFAULT : 
DictionaryIndexConfig.DISABLED)
+        .add(StandardIndexes.forward(), forwardBuilder.build())
+        .build();
+
+    int dictElementSize = writeColumnIndexes(materializedCol, storedType, 
presence, values,
+        defaultValue, statsCollector, useDictionary, fieldIndexConfigs, 
childFieldSpec);
+
+    NullValueVectorCreator nullCreator = new NullValueVectorCreator(_indexDir, 
materializedCol);
+    try {
+      for (int docId = 0; docId < _numDocs; docId++) {
+        if (!presence.contains(docId)) {
+          nullCreator.setNull(docId);
+        }
+      }
+      nullCreator.seal();
+    } finally {
+      nullCreator.close();
+    }
+
+    PropertiesConfiguration props = new PropertiesConfiguration();
+    FieldConfig.EncodingType encoding =
+        useDictionary ? FieldConfig.EncodingType.DICTIONARY : 
FieldConfig.EncodingType.RAW;
+    BaseSegmentCreator.addColumnMetadataInfo(props, materializedCol, 
statsCollector, _numDocs, childFieldSpec,
+        useDictionary, dictElementSize, encoding, false);
+    // OPEN_STRUCT-specific keys not written by addColumnMetadataInfo.
+    props.setProperty(
+        V1Constants.MetadataKeys.Column.getKeyFor(materializedCol, 
V1Constants.MetadataKeys.Column.PARENT_COLUMN),
+        _columnName);
+    
props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol, 
"hasNullValue"), true);
+    if (enableInverted && useDictionary) {
+      
props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol, 
"hasInvertedIndex"), true);
+    }
+    _materializedColumnMetadata.put(materializedCol, props);
+  }
+
+  /// Decides dictionary vs raw encoding for a materialized child column, 
mirroring the three steps of
+  /// `BaseSegmentCreator.createDictionaryForColumn` with standard default 
flags (optimizeDictionary
+  /// off => dictionary unless explicitly disabled and not required by an 
enabled index).
+  private boolean resolveUseDictionary(FieldSpec childFieldSpec, 
FieldIndexConfigs fieldIndexConfigs,
+      AbstractColumnStatisticsCollector statsCollector) {
+    if (DictionaryIndexConfig.requiresDictionary(childFieldSpec, 
fieldIndexConfigs)) {
+      return true;
+    }
+    if 
(fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isDisabled()) {
+      return false;
+    }
+    return DictionaryIndexType.ignoreDictionaryOverride(false, false,
+        IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD, null, 
childFieldSpec, fieldIndexConfigs,
+        statsCollector.getCardinality(), 
statsCollector.getTotalNumberOfEntries());
+  }
+
+  /// Writes the dictionary (when used) plus all vetted, enabled indexes for a 
materialized child column through
+  /// the standard index-creator family, driven from a single per-doc loop. 
Returns the dictionary element size in
+  /// bytes (0 when raw-encoded), for column metadata. The dictionary is built 
separately because its build
+  /// lifecycle is CUSTOM and it supplies the dictIds the per-row creators 
consume.
+  private int writeColumnIndexes(String materializedCol, DataType storedType, 
RoaringBitmap presence,
+      List<Object> values, Object defaultValue, 
AbstractColumnStatisticsCollector statsCollector,
+      boolean useDictionary, FieldIndexConfigs fieldIndexConfigs, FieldSpec 
childFieldSpec)
+      throws IOException {
+    int dictElementSize = 0;
+    SegmentDictionaryCreator dictCreator = null;
+    try {
+      if (useDictionary) {
+        dictCreator = new SegmentDictionaryCreator(materializedCol, storedType,
+            new File(_indexDir, materializedCol + 
V1Constants.Dict.FILE_EXTENSION), true);
+        dictCreator.build(statsCollector.getUniqueValuesSet());
+      }
+
+      // Index-creation context built from the sealed collector (a 
ColumnShape) — no TableConfig required.
+      IndexCreationContext context =
+          new IndexCreationContext.Builder(_indexDir, null, statsCollector, 
useDictionary, false)
+              .withOnHeap(false).build();
+
+      List<IndexCreator> creators = new ArrayList<>();
+      try {
+        for (IndexType<?, ?, ?> indexType : 
IndexService.getInstance().getAllIndexes()) {
+          if (indexType.getIndexBuildLifecycle() != 
IndexType.BuildLifecycle.DURING_SEGMENT_CREATION) {
+            continue;   // excludes dictionary (lifecycle CUSTOM), built 
separately above
+          }
+          if 
(!OpenStructSupportedIndexes.ALLOWED_PRETTY_NAMES.contains(indexType.getPrettyName()))
 {
+            continue;   // non-vetted indexes already rejected at table-config 
validation; defensive backstop
+          }
+          IndexCreator creator = createColumnIndexCreator(indexType, context, 
fieldIndexConfigs, materializedCol,
+              childFieldSpec);
+          if (creator != null) {
+            creators.add(creator);
+          }
+        }
+
+        int ordinal = 0;
+        for (int docId = 0; docId < _numDocs; docId++) {
+          Object value = presence.contains(docId) ? values.get(ordinal++) : 
defaultValue;
+          int dictId = useDictionary ? dictCreator.indexOfSV(value) : -1;
+          for (IndexCreator creator : creators) {
+            creator.add(value, dictId);
+          }
+        }
+        for (IndexCreator creator : creators) {
+          creator.seal();
+        }
+      } finally {
+        for (IndexCreator creator : creators) {
+          creator.close();
+        }
+        if (dictCreator != null) {
+          dictElementSize = dictCreator.getNumBytesPerEntry();
+          dictCreator.seal();
+        }
+      }
+    } finally {
+      if (dictCreator != null) {
+        dictCreator.close();
+      }
+    }
+    return dictElementSize;
+  }
+
+  @Nullable
+  private static <C extends IndexConfig> IndexCreator 
createColumnIndexCreator(IndexType<C, ?, ?> indexType,
+      IndexCreationContext context, FieldIndexConfigs fieldIndexConfigs, 
String materializedCol,
+      FieldSpec childFieldSpec)
+      throws IOException {
+    // Materialized child columns exist in no schema/TableConfig, so the 
standard table-config-time validation
+    // never sees them. Run the index type's own guards here against the 
resolved child FieldSpec (e.g. range
+    // rejects a non-numeric column without a dictionary) so misconfigurations 
fail with the canonical message
+    // instead of crashing opaquely inside the creator. validate() internally 
no-ops when the index is disabled.
+    indexType.validate(fieldIndexConfigs, childFieldSpec, null);
+    C config = fieldIndexConfigs.getConfig(indexType);
+    if (!config.isEnabled() || !indexType.shouldCreateIndex(context, config)) {
+      return null;
+    }
+    try {
+      return indexType.createIndexCreator(context, config);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Failed to create " + indexType.getPrettyName() + 
" creator for: " + materializedCol, e);
+    }
+  }
+
+  private void writeSparseJsonColumn(List<String> sparseKeys)
+      throws IOException {
+    String sparseCol = OpenStructNaming.sparseColumnName(_columnName);
+    int maxLen = 1;
+    String[] jsonPerDoc = new String[_numDocs];
+    int nonNullCount = 0;
+    for (int docId = 0; docId < _numDocs; docId++) {
+      Map<String, Object> sparseEntries = new LinkedHashMap<>();
+      for (String key : sparseKeys) {
+        RoaringBitmap presence = _presenceBitmaps.get(key);
+        if (presence != null && presence.contains(docId)) {
+          int ordinal = presence.rank(docId) - 1;
+          sparseEntries.put(key, _values.get(key).get(ordinal));
+        }
+      }
+      if (!sparseEntries.isEmpty()) {
+        try {
+          String json = JsonUtils.objectToString(sparseEntries);
+          jsonPerDoc[docId] = json;
+          maxLen = Math.max(maxLen, 
json.getBytes(StandardCharsets.UTF_8).length);
+          nonNullCount++;
+        } catch (IOException e) {
+          throw new RuntimeException("Failed to serialize sparse entries for 
docId " + docId, e);
+        }
+      }
+    }
+
+    SingleValueVarByteRawIndexCreator fwdCreator = new 
SingleValueVarByteRawIndexCreator(
+        _indexDir, ChunkCompressionType.LZ4, sparseCol, _numDocs, 
DataType.STRING, maxLen);
+    NullValueVectorCreator nullCreator = new NullValueVectorCreator(_indexDir, 
sparseCol);
+    try {
+      for (int docId = 0; docId < _numDocs; docId++) {

Review Comment:
   Can we combine this loop for the above ? Seems like the above loop is just 
allocating String[_numDocs] , no reason for double loop ?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitter.java:
##########
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.openstruct;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.creator.impl.BaseSegmentCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StatsCollectorUtil;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import 
org.apache.pinot.segment.local.segment.index.openstruct.OpenStructSupportedIndexes;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import 
org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructNaming;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Splits an OPEN_STRUCT column into per-key materialized columns using 
standard Pinot index
+/// creators. Dense keys become independent virtual columns; remaining keys go 
into a single
+/// synthetic JSON column for sparse storage.
+///
+/// Lifecycle: instantiated by `BaseSegmentCreator` for OPEN_STRUCT columns. 
Receives
+/// per-doc `Map<String, Object>` values via [#add(Map, int)], accumulates in 
memory,
+/// then on [#seal()] writes per-key column files using standard creators.
+public class OpenStructColumnSplitter implements 
ColumnarOpenStructIndexCreator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OpenStructColumnSplitter.class);
+
+  private final File _indexDir;
+  private final String _columnName;
+  private final Map<String, FieldSpec> _childFieldSpecs;
+  private final OpenStructIndexConfig _config;
+  private final int _maxDenseKeys;
+
+  // Per-key accumulation
+  private final Map<String, RoaringBitmap> _presenceBitmaps = new HashMap<>();
+  private final Map<String, List<Object>> _values = new HashMap<>();
+  private final Map<String, DataType> _inferredTypes = new HashMap<>();
+  private int _numDocs;
+
+  // Resolved at seal time
+  @Nullable
+  private Set<String> _resolvedDenseKeys;
+  private final Map<String, PropertiesConfiguration> 
_materializedColumnMetadata = new LinkedHashMap<>();
+
+  public OpenStructColumnSplitter(File indexDir, String columnName, FieldSpec 
fieldSpec,
+      OpenStructIndexConfig config) {
+    _indexDir = indexDir;
+    _columnName = columnName;
+    _config = config;
+    _maxDenseKeys = config.getMaxDenseKeys();
+
+    Map<String, FieldSpec> childFieldSpecs = null;
+    if (fieldSpec instanceof ComplexFieldSpec) {
+      ComplexFieldSpec complexSpec = (ComplexFieldSpec) fieldSpec;
+      childFieldSpecs = complexSpec.getChildFieldSpecs();
+    }
+    _childFieldSpecs = childFieldSpecs != null ? new 
HashMap<>(childFieldSpecs) : new HashMap<>();
+  }
+
+  @Override
+  public void add(Object value, int docId)
+      throws IOException {
+    if (value instanceof Map) {
+      @SuppressWarnings("unchecked")
+      Map<String, Object> map = (Map<String, Object>) value;
+      addMap(map);
+    } else {
+      addMap(null);
+    }
+  }
+
+  @Override
+  public void add(Map<String, Object> openStructValue, int docId)
+      throws IOException {
+    addMap(openStructValue);
+  }
+
+  @Override
+  public void add(Object[] values, @Nullable int[] dictIds)
+      throws IOException {
+    throw new UnsupportedOperationException("OPEN_STRUCT index is single-value 
only");
+  }
+
+  /// Returns the resolved dense-key set after [#seal()] or [#classify()].
+  /// Returns an empty set before resolution.
+  public Set<String> getResolvedDenseKeys() {
+    return _resolvedDenseKeys != null ? 
Collections.unmodifiableSet(_resolvedDenseKeys) : Set.of();
+  }
+
+  /// Resolves dense vs sparse keys without writing any files. Exposed for 
testing and for callers
+  /// that need the classification independent of file output. [#seal()] calls 
this internally.
+  public Set<String> classify() {
+    if (_resolvedDenseKeys != null) {
+      return _resolvedDenseKeys;
+    }
+    if (_numDocs == 0 || _presenceBitmaps.isEmpty()) {
+      _resolvedDenseKeys = new LinkedHashSet<>();
+      return _resolvedDenseKeys;
+    }
+    List<String> allKeys = new ArrayList<>(_presenceBitmaps.keySet());
+    allKeys.sort((a, b) -> {
+      double fillA = (double) _presenceBitmaps.get(a).getCardinality() / 
_numDocs;
+      double fillB = (double) _presenceBitmaps.get(b).getCardinality() / 
_numDocs;
+      int cmp = Double.compare(fillB, fillA);
+      return cmp != 0 ? cmp : a.compareTo(b);
+    });
+
+    double minFillRate = _config.getDenseKeyMinFillRate();
+    _resolvedDenseKeys = new LinkedHashSet<>();
+
+    Set<String> configuredDenseKeys = _config.getDenseKeys();
+    for (String key : configuredDenseKeys) {
+      if (_presenceBitmaps.containsKey(key) && (_maxDenseKeys < 0 || 
_resolvedDenseKeys.size() < _maxDenseKeys)) {
+        _resolvedDenseKeys.add(key);
+      }
+    }
+
+    for (String key : allKeys) {
+      if (_resolvedDenseKeys.contains(key)) {
+        continue;
+      }
+      double fillRate = (double) _presenceBitmaps.get(key).getCardinality() / 
_numDocs;

Review Comment:
   Recommendation for optimization, For each key in each doc, 3-4 HashMap 
lookups for the same key in this code (containsKey, get for bitmap, get for 
values). Can we use computeIfAbsent and cache references  through the code 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to