anuragrai16 commented on code in PR #18643:
URL: https://github.com/apache/pinot/pull/18643#discussion_r3496578535
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java:
##########
@@ -98,13 +104,44 @@ public ImmutableSegmentImpl(
_dataSources =
new
Object2ObjectOpenHashMap<>(segmentMetadata.getColumnMetadataMap().size());
+ Map<String, Map<String, DataSource>> openStructDenseChildren = new
HashMap<>();
+ Map<String, DataSource> openStructSparseChildren = new HashMap<>();
+ Set<String> openStructParents = new HashSet<>();
+
for (Map.Entry<String, ColumnMetadata> entry :
segmentMetadata.getColumnMetadataMap().entrySet()) {
String colName = entry.getKey();
ColumnMetadata columnMetadata = entry.getValue();
+
+ if (columnMetadata instanceof ColumnMetadataImpl &&
((ColumnMetadataImpl) columnMetadata).isMaterializedChild()) {
+ String parent = ((ColumnMetadataImpl)
columnMetadata).getParentColumn();
+ openStructParents.add(parent);
+ DataSource childDs = new ImmutableDataSource(columnMetadata,
_indexContainerMap.get(colName));
+ if (OpenStructNaming.isSparseColumn(colName)) {
+ openStructSparseChildren.put(parent, childDs);
+ } else {
+ openStructDenseChildren.computeIfAbsent(parent, k -> new HashMap<>())
+ .put(OpenStructNaming.parseKey(colName), childDs);
+ }
+ continue;
+ }
+
if (columnMetadata.getFieldSpec().getDataType() ==
FieldSpec.DataType.MAP) {
- _dataSources.put(colName, new ImmutableMapDataSource(entry.getValue(),
_indexContainerMap.get(colName)));
+ _dataSources.put(colName, new ImmutableMapDataSource(columnMetadata,
_indexContainerMap.get(colName)));
} else {
- _dataSources.put(colName, new ImmutableDataSource(entry.getValue(),
_indexContainerMap.get(colName)));
+ _dataSources.put(colName, new ImmutableDataSource(columnMetadata,
_indexContainerMap.get(colName)));
+ }
+ }
+
+ if (!openStructParents.isEmpty()) {
+ Schema schema = segmentMetadata.getSchema();
+ for (String parent : openStructParents) {
+ FieldSpec fieldSpec = schema != null ? schema.getFieldSpecFor(parent)
: null;
+ if (!(fieldSpec instanceof ComplexFieldSpec)) {
Review Comment:
Can you explain this in the comments ? If the
schema is null or the fieldSpec is the wrong type, seems like the parent
column is silently skipped and no DataSource is created and no error logged.
The child columns were already skipped earlier. This means OPEN_STRUCT data
exists on disk but is completely invisible to queries ?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndex.java:
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.openstruct;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.index.map.SimpleColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.index.reader.OpenStructIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Manages per-key mutable columns for an OPEN_STRUCT column during real-time
consumption.
+/// Each discovered key gets its own {@link MutableKeyColumn}
(dictionary-encoded forward index +
+/// presence bitmap). Dense/sparse classification is deferred to seal time.
+///
+/// Single-writer for [#index]: the consuming thread calls this method.
Readers may
+/// concurrently read [#getKeys()] and [#getKeyColumns()] via the volatile map
swap.
+@SuppressWarnings("rawtypes")
+public class MutableOpenStructIndex implements
OpenStructIndexReader<ForwardIndexReaderContext>, MutableIndex {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MutableOpenStructIndex.class);
+
+ private final String _openStructColumn;
+ private final OpenStructIndexConfig _config;
+ private final Map<String, FieldSpec> _childFieldSpecs;
+ private final PinotDataBufferMemoryManager _memoryManager;
+ private final int _capacity;
+
+ // Volatile for lock-free reader access; writer always holds the
consuming-thread lock.
+ private volatile Map<String, MutableKeyColumn> _keyColumns = new HashMap<>();
Review Comment:
Should this be a concurrent hash map instead ? How do we enforce the single
thread write - dont see any safety locks in this path. Its documented, but not
enforced.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java:
##########
@@ -415,6 +421,43 @@ private <K, V> Map<K, V> unmodifiable(Map<K, V> map) {
return map == null ? null : Collections.unmodifiableMap(map);
}
+ public void addOpenStructChildConfigs(SegmentMetadataImpl segmentMetadata) {
+ if (_indexConfigsByColName == null || _dirty) {
+ refreshIndexConfigs();
+ }
+ for (Map.Entry<String, ColumnMetadata> entry :
segmentMetadata.getColumnMetadataMap().entrySet()) {
+ String childColumn = entry.getKey();
+ if (!childColumn.contains(OpenStructNaming.SEPARATOR) ||
_indexConfigsByColName.containsKey(childColumn)) {
+ continue;
+ }
+ if (OpenStructNaming.isSparseColumn(childColumn)) {
+ continue;
+ }
+ String parentColumn = OpenStructNaming.parseParentColumn(childColumn);
+ FieldIndexConfigs parentConfigs =
_indexConfigsByColName.get(parentColumn);
+ if (parentConfigs == null) {
+ continue;
+ }
+ IndexConfig osConfig =
parentConfigs.getConfig(StandardIndexes.openStruct());
+ if (!(osConfig instanceof OpenStructIndexConfig)) {
+ continue;
+ }
+ OpenStructIndexConfig openStructConfig = (OpenStructIndexConfig)
osConfig;
+ String key = OpenStructNaming.parseKey(childColumn);
+ FieldConfig keyFieldConfig = openStructConfig.getValueFieldConfig(key);
+ if (keyFieldConfig == null) {
+ keyFieldConfig = openStructConfig.getDefaultValueFieldConfig();
+ }
+ FieldSpec childFieldSpec = entry.getValue().getFieldSpec();
+ boolean enableInverted =
openStructConfig.shouldEnableInvertedIndexForKey(key);
+ FieldIndexConfigs childConfigs = new FieldIndexConfigs.Builder(
+ FieldIndexConfigsUtil.fromFieldConfig(keyFieldConfig,
childFieldSpec))
+ .add(StandardIndexes.inverted(), enableInverted ?
IndexConfig.ENABLED : IndexConfig.DISABLED)
+ .build();
+ _indexConfigsByColName.put(childColumn, childConfigs);
Review Comment:
Should we directly update _indexConfigsByColName after
refreshIndexConfigs(). A subsequent refreshIndexConfigs() call would rebuild
the map and wipe
these entries ? We should store overrides in a separate overlay map ?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/ImmutableOpenStructDataSource.java:
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.openstruct;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.segment.index.datasource.BaseDataSource;
+import
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
+import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.datasource.OpenStructDataSource;
+import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+
+/// Per-key {@link DataSource} accessor for sealed (immutable) segments with
an OPEN_STRUCT column.
+///
+/// Always columnar — there is no blob branch. Every key that was dense enough
during segment
+/// creation gets its own materialized {@link DataSource} (forward index +
optional inverted index /
+/// dictionary). Keys that did not meet the density threshold are stored in an
optional sparse
+/// column; the sparse {@link DataSource} is returned for any unmaterialized
key lookup.
+///
+/// Use [#isMaterialized(String)] and [#isFullyMaterialized()] together to
choose
+/// the query execution path:
+/// - Materialized key → fast path via per-key DataSource (inverted/dictionary
index available).
+/// - Not materialized + not fully materialized → fall back to the sparse
DataSource.
+/// - Not materialized + fully materialized → key is definitively absent;
short-circuit.
+///
+/// Thread-safety: immutable after construction; safe for concurrent reads.
+public class ImmutableOpenStructDataSource extends BaseDataSource implements
OpenStructDataSource {
+ private final ComplexFieldSpec _fieldSpec;
+ private final Map<String, DataSource> _perKeyDataSources;
+ @Nullable
+ private final DataSource _sparseDataSource;
+
+ public ImmutableOpenStructDataSource(ComplexFieldSpec fieldSpec, Map<String,
DataSource> perKeyDataSources,
+ @Nullable DataSource sparseDataSource, DataSourceMetadata
dataSourceMetadata,
+ ColumnIndexContainer indexContainer) {
+ super(dataSourceMetadata, indexContainer);
+ _fieldSpec = fieldSpec;
+ _perKeyDataSources = perKeyDataSources;
+ _sparseDataSource = sparseDataSource;
+ }
+
+ /// Convenience constructor for segment-load time. Synthesizes a minimal
{@link DataSourceMetadata}
+ /// for the parent OPEN_STRUCT column (which has no on-disk presence of its
own) and uses an empty
+ /// {@link ColumnIndexContainer} — all real readers live on the per-key data
sources.
+ public ImmutableOpenStructDataSource(ComplexFieldSpec fieldSpec, Map<String,
DataSource> perKeyDataSources,
+ @Nullable DataSource sparseDataSource, int numDocs) {
+ this(fieldSpec, perKeyDataSources, sparseDataSource,
+ new ImmutableOpenStructDataSourceMetadata(fieldSpec, numDocs),
+ new ColumnIndexContainer.FromMap.Builder().build());
+ }
+
+ @Override
+ public ComplexFieldSpec getFieldSpec() {
+ return _fieldSpec;
+ }
+
+ @Override
+ @Nullable
+ public DataSource getDataSource(String key) {
+ DataSource ds = _perKeyDataSources.get(key);
+ return ds != null ? ds : _sparseDataSource;
+ }
+
+ @Override
+ public boolean isMaterialized(String key) {
+ return _perKeyDataSources.containsKey(key);
+ }
+
+ @Override
+ public boolean isFullyMaterialized() {
+ return _sparseDataSource == null;
+ }
+
+ @Override
+ public Map<String, DataSource> getDataSources() {
+ return _perKeyDataSources;
+ }
+
+ @Override
+ @Nullable
+ public DataSourceMetadata getDataSourceMetadata(String key) {
+ DataSource ds = _perKeyDataSources.get(key);
+ return ds != null ? ds.getDataSourceMetadata() : null;
+ }
+
Review Comment:
Dont see `getMapValue(int docId)` implementation here, but the
`MutableOpenStructDataSource` has it. What's the implication ?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructDataSource.java:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.openstruct;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.segment.index.datasource.BaseDataSource;
+import
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.datasource.OpenStructDataSource;
+import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/// Per-key {@link DataSource} accessor for mutable (consuming) segments with
an OPEN_STRUCT column.
+///
+/// Always columnar — no blob branch. Per-key DataSources are synthesized on
demand from the
+/// underlying {@link MutableOpenStructIndex}; mutable mode always holds every
observed key, so
+/// {@link #isFullyMaterialized()} is unconditionally {@code true}.
+public class MutableOpenStructDataSource extends BaseDataSource implements
OpenStructDataSource {
+ private final ComplexFieldSpec _fieldSpec;
+ private final MutableOpenStructIndex _index;
+ private final int _numDocs;
+
+ public MutableOpenStructDataSource(ComplexFieldSpec fieldSpec,
MutableOpenStructIndex index, int numDocs) {
+ super(new MutableOpenStructDataSourceMetadata(fieldSpec, numDocs),
+ new ColumnIndexContainer.FromMap.Builder().build());
+ _fieldSpec = fieldSpec;
+ _index = index;
+ _numDocs = numDocs;
+ }
+
+ @Override
+ public ComplexFieldSpec getFieldSpec() {
+ return _fieldSpec;
+ }
+
+ @Override
+ @Nullable
+ public DataSource getDataSource(String key) {
+ Map<IndexType, IndexReader> indexes = _index.getIndexes(key);
+ if (indexes == null || indexes.isEmpty()) {
+ return null;
+ }
+ ColumnMetadata metadata = _index.getColumnMetadata(key);
+ return new ImmutableDataSource(metadata,
+ new ColumnIndexContainer.FromMap.Builder().withAll(indexes).build());
+ }
+
+ @Override
+ public boolean isMaterialized(String key) {
+ return _index.getKeyColumn(key) != null;
+ }
+
+ /// Mutable mode always holds every observed key in-memory; the sparse tier
exists only after seal.
+ @Override
+ public boolean isFullyMaterialized() {
+ return true;
+ }
+
+ @Override
+ public Map<String, DataSource> getDataSources() {
+ Map<String, DataSource> result = new HashMap<>();
+ for (String key : _index.getKeys()) {
Review Comment:
This calls the MutableOpenStructIndex.getKeys() in a loop while the
underlying is a volatile concurrent map. Lets take a snapshot of the map and
then run the loop ?
```
Map<String, MutableKeyColumn> snapshot = _keyColumns;
for (String key : snapshot.keySet()) { ... }
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndex.java:
##########
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.openstruct;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.index.map.SimpleColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.index.reader.OpenStructIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Manages per-key mutable columns for an OPEN_STRUCT column during real-time
consumption.
+/// Each discovered key gets its own {@link MutableKeyColumn}
(dictionary-encoded forward index +
+/// presence bitmap). Dense/sparse classification is deferred to seal time.
+///
+/// Single-writer for [#index]: the consuming thread calls this method.
Readers may
+/// concurrently read [#getKeys()] and [#getKeyColumns()] via the volatile map
swap.
+@SuppressWarnings("rawtypes")
+public class MutableOpenStructIndex implements
OpenStructIndexReader<ForwardIndexReaderContext>, MutableIndex {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MutableOpenStructIndex.class);
+
+ private final String _openStructColumn;
+ private final OpenStructIndexConfig _config;
+ private final Map<String, FieldSpec> _childFieldSpecs;
+ private final PinotDataBufferMemoryManager _memoryManager;
+ private final int _capacity;
+
+ // Volatile for lock-free reader access; writer always holds the
consuming-thread lock.
+ private volatile Map<String, MutableKeyColumn> _keyColumns = new HashMap<>();
+
+ public MutableOpenStructIndex(String openStructColumn, ComplexFieldSpec
fieldSpec,
+ OpenStructIndexConfig config, PinotDataBufferMemoryManager
memoryManager, int capacity) {
+ _openStructColumn = openStructColumn;
+ _config = config;
+ _memoryManager = memoryManager;
+ _capacity = capacity;
+
+ Map<String, FieldSpec> childFieldSpecs = fieldSpec.getChildFieldSpecs();
+ _childFieldSpecs = childFieldSpecs != null ? new
HashMap<>(childFieldSpecs) : new HashMap<>();
+ }
+
+ @Override
+ public void add(Object value, int dictId, int docId) {
+ index(docId, value);
+ }
+
+ @Override
+ public void add(Object[] values, @Nullable int[] dictIds, int docId) {
+ throw new UnsupportedOperationException("OPEN_STRUCT does not support
multi-value indexing");
+ }
+
+ /// Indexes the OPEN_STRUCT value for the given document. `value` must be a
+ /// `Map<String, Object>` or `null`; null and non-Map values are silently
skipped.
+ @SuppressWarnings("unchecked")
+ public void index(int docId, @Nullable Object value) {
+ if (!(value instanceof Map)) {
+ return;
+ }
+ Map<String, Object> map = (Map<String, Object>) value;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ String key = entry.getKey();
+ Object rawValue = entry.getValue();
+ if (rawValue == null) {
+ continue;
+ }
+
+ MutableKeyColumn keyCol = _keyColumns.get(key);
+ if (keyCol == null) {
+ // Mutable mode holds every observed key (see
MutableOpenStructDataSource#isFullyMaterialized);
+ // dense/sparse classification (maxDenseKeys / denseKeys) is applied
at seal time by the segment
+ // build, so no key is dropped during consumption.
+ // Resolve stored type and coerce BEFORE allocating a column so a
first-row coercion failure
+ // does not allocate a column that was never usable.
+ DataType resolvedType = resolveStoredType(key, rawValue);
+ if (resolvedType == null) {
+ continue;
+ }
+ Object coerced = tryCoerce(key, rawValue, resolvedType);
+ if (coerced == null) {
+ continue;
+ }
+ keyCol = allocateKeyColumn(key, resolvedType);
+ keyCol.setValue(docId, coerced);
+ continue;
+ }
+
+ DataType storedType = keyCol.getStoredType();
+ Object coerced = tryCoerce(key, rawValue, storedType);
+ if (coerced == null) {
+ continue;
+ }
+ keyCol.setValue(docId, coerced);
+ }
+ }
+
+ /// Resolves the stored type for a key without allocating any state. Returns
null when the type
+ /// cannot be inferred (caller should skip the entry).
+ @Nullable
+ private DataType resolveStoredType(String key, Object rawValue) {
+ FieldSpec spec = _childFieldSpecs.get(key);
+ DataType valueType;
+ if (spec != null) {
+ valueType = spec.getDataType();
+ } else {
+ valueType = OpenStructTypeInference.inferDataType(rawValue);
+ if (valueType == null) {
+ LOGGER.warn("OPEN_STRUCT '{}': could not infer DataType for key '{}'
from value of class '{}'."
+ + " Dropping the entry.",
+ _openStructColumn, key, rawValue.getClass().getName());
+ return null;
+ }
+ }
+ return valueType.getStoredType();
+ }
+
+ /// Coerces rawValue to storedType. Returns null on failure (logged at
WARN); the caller drops
+ /// the entry. Note: a successful coerce of a "null"-shaped raw value would
also return null —
+ /// but callers gate on rawValue != null before reaching here.
+ @Nullable
+ private Object tryCoerce(String key, Object rawValue, DataType storedType) {
+ try {
+ PinotDataType sourceType = PinotDataType.getSingleValueType(rawValue);
+ PinotDataType destType =
ColumnDataType.fromDataTypeSV(storedType).toPinotDataType();
Review Comment:
`ColumnDataType.fromDataTypeSV(storedType).toPinotDataType()` - This must be
re-resolving per doc per key? The stored type is fixed per key so can we cache
the destination PinotDataType at first-encounter time ?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitter.java:
##########
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.openstruct;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.creator.impl.BaseSegmentCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.StatsCollectorUtil;
+import
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import
org.apache.pinot.segment.local.segment.index.openstruct.OpenStructSupportedIndexes;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import
org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructNaming;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Splits an OPEN_STRUCT column into per-key materialized columns using
standard Pinot index
+/// creators. Dense keys become independent virtual columns; remaining keys go
into a single
+/// synthetic JSON column for sparse storage.
+///
+/// Lifecycle: instantiated by `BaseSegmentCreator` for OPEN_STRUCT columns.
Receives
+/// per-doc `Map<String, Object>` values via [#add(Map, int)], accumulates in
memory,
+/// then on [#seal()] writes per-key column files using standard creators.
+public class OpenStructColumnSplitter implements
ColumnarOpenStructIndexCreator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OpenStructColumnSplitter.class);
+
+ private final File _indexDir;
+ private final String _columnName;
+ private final Map<String, FieldSpec> _childFieldSpecs;
+ private final OpenStructIndexConfig _config;
+ private final int _maxDenseKeys;
+
+ // Per-key accumulation
+ private final Map<String, RoaringBitmap> _presenceBitmaps = new HashMap<>();
+ private final Map<String, List<Object>> _values = new HashMap<>();
+ private final Map<String, DataType> _inferredTypes = new HashMap<>();
+ private int _numDocs;
+
+ // Resolved at seal time
+ @Nullable
+ private Set<String> _resolvedDenseKeys;
+ private final Map<String, PropertiesConfiguration>
_materializedColumnMetadata = new LinkedHashMap<>();
+
+ public OpenStructColumnSplitter(File indexDir, String columnName, FieldSpec
fieldSpec,
+ OpenStructIndexConfig config) {
+ _indexDir = indexDir;
+ _columnName = columnName;
+ _config = config;
+ _maxDenseKeys = config.getMaxDenseKeys();
+
+ Map<String, FieldSpec> childFieldSpecs = null;
+ if (fieldSpec instanceof ComplexFieldSpec) {
+ ComplexFieldSpec complexSpec = (ComplexFieldSpec) fieldSpec;
+ childFieldSpecs = complexSpec.getChildFieldSpecs();
+ }
+ _childFieldSpecs = childFieldSpecs != null ? new
HashMap<>(childFieldSpecs) : new HashMap<>();
+ }
+
+ @Override
+ public void add(Object value, int docId)
+ throws IOException {
+ if (value instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = (Map<String, Object>) value;
+ addMap(map);
+ } else {
+ addMap(null);
+ }
+ }
+
+ @Override
+ public void add(Map<String, Object> openStructValue, int docId)
+ throws IOException {
+ addMap(openStructValue);
+ }
+
+ @Override
+ public void add(Object[] values, @Nullable int[] dictIds)
+ throws IOException {
+ throw new UnsupportedOperationException("OPEN_STRUCT index is single-value
only");
+ }
+
+ /// Returns the resolved dense-key set after [#seal()] or [#classify()].
+ /// Returns an empty set before resolution.
+ public Set<String> getResolvedDenseKeys() {
+ return _resolvedDenseKeys != null ?
Collections.unmodifiableSet(_resolvedDenseKeys) : Set.of();
+ }
+
+ /// Resolves dense vs sparse keys without writing any files. Exposed for
testing and for callers
+ /// that need the classification independent of file output. [#seal()] calls
this internally.
+ public Set<String> classify() {
+ if (_resolvedDenseKeys != null) {
+ return _resolvedDenseKeys;
+ }
+ if (_numDocs == 0 || _presenceBitmaps.isEmpty()) {
+ _resolvedDenseKeys = new LinkedHashSet<>();
+ return _resolvedDenseKeys;
+ }
+ List<String> allKeys = new ArrayList<>(_presenceBitmaps.keySet());
+ allKeys.sort((a, b) -> {
+ double fillA = (double) _presenceBitmaps.get(a).getCardinality() /
_numDocs;
+ double fillB = (double) _presenceBitmaps.get(b).getCardinality() /
_numDocs;
+ int cmp = Double.compare(fillB, fillA);
+ return cmp != 0 ? cmp : a.compareTo(b);
+ });
+
+ double minFillRate = _config.getDenseKeyMinFillRate();
+ _resolvedDenseKeys = new LinkedHashSet<>();
+
+ Set<String> configuredDenseKeys = _config.getDenseKeys();
+ for (String key : configuredDenseKeys) {
+ if (_presenceBitmaps.containsKey(key) && (_maxDenseKeys < 0 ||
_resolvedDenseKeys.size() < _maxDenseKeys)) {
+ _resolvedDenseKeys.add(key);
+ }
+ }
+
+ for (String key : allKeys) {
+ if (_resolvedDenseKeys.contains(key)) {
+ continue;
+ }
+ double fillRate = (double) _presenceBitmaps.get(key).getCardinality() /
_numDocs;
+ if ((_maxDenseKeys < 0 || _resolvedDenseKeys.size() < _maxDenseKeys) &&
fillRate >= minFillRate) {
+ _resolvedDenseKeys.add(key);
+ }
+ }
+ return _resolvedDenseKeys;
+ }
+
+ private void addMap(@Nullable Map<String, Object> map) {
+ if (map != null && !map.isEmpty()) {
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ String key = entry.getKey();
+ Object rawValue = entry.getValue();
+ if (rawValue == null) {
+ continue;
+ }
+ FieldSpec keySpec = _childFieldSpecs.get(key);
+ DataType valueType = keySpec != null
+ ? keySpec.getDataType()
+ : _inferredTypes.computeIfAbsent(key, k -> {
+ DataType inferred =
OpenStructTypeInference.inferDataType(rawValue);
+ return inferred != null ? inferred : DataType.STRING;
+ });
+ if (!_presenceBitmaps.containsKey(key)) {
+ _presenceBitmaps.put(key, new RoaringBitmap());
+ _values.put(key, new ArrayList<>());
+ }
+ _presenceBitmaps.get(key).add(_numDocs);
+ Object coerced;
+ try {
+ PinotDataType sourceType =
PinotDataType.getSingleValueType(rawValue);
+ PinotDataType destType =
ColumnDataType.fromDataTypeSV(valueType.getStoredType()).toPinotDataType();
+ coerced = destType.convert(rawValue, sourceType);
+ } catch (Exception e) {
+ LOGGER.warn("OPEN_STRUCT '{}': coercion failed for key '{}' value
'{}' to {}. Skipping.",
+ _columnName, key, rawValue, valueType, e);
+ _presenceBitmaps.get(key).remove(_numDocs);
+ continue;
+ }
+ _values.get(key).add(coerced);
+ }
+ }
+ _numDocs++;
+ }
+
+ @Override
+ public void seal()
+ throws IOException {
+ classify();
+ if (_resolvedDenseKeys == null || (_numDocs == 0 &&
_presenceBitmaps.isEmpty())) {
+ return;
+ }
+
+ for (String key : _resolvedDenseKeys) {
+ writeDenseKeyColumn(key);
+ }
+
+ List<String> sparseKeys = new ArrayList<>();
+ for (String key : _presenceBitmaps.keySet()) {
+ if (!_resolvedDenseKeys.contains(key)) {
+ sparseKeys.add(key);
+ }
+ }
+ if (!sparseKeys.isEmpty()) {
+ writeSparseJsonColumn(sparseKeys);
+ }
+
+ emitParentColumnMetadata(!sparseKeys.isEmpty());
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ // Nothing to close — sub-creators are created and closed within seal()
+ }
+
+ @Override
+ public Map<String, PropertiesConfiguration> getMaterializedColumnMetadata() {
+ return _materializedColumnMetadata;
+ }
+
+ private void writeDenseKeyColumn(String key)
+ throws IOException {
+ String materializedCol =
OpenStructNaming.materializedColumnName(_columnName, key);
+ FieldSpec keySpec = _childFieldSpecs.get(key);
+ DataType valueType = keySpec != null
+ ? keySpec.getDataType()
+ : _inferredTypes.getOrDefault(key, DataType.STRING);
+ DataType storedType = valueType.getStoredType();
+ RoaringBitmap presence = _presenceBitmaps.get(key);
+ List<Object> values = _values.get(key);
+
+ // Synthetic field spec for the materialized child. Its natural Pinot
dimension null value is the value
+ // stored for absent docs, so column metadata stays consistent with
on-disk content.
+ DimensionFieldSpec childFieldSpec = new
DimensionFieldSpec(materializedCol, storedType, true);
+ Object defaultValue = childFieldSpec.getDefaultNullValue();
+
+ // Collect statistics the standard way: present docs contribute their
value, absent docs the default
+ // (absent docs are also marked in the null vector below).
+ AbstractColumnStatisticsCollector statsCollector =
+ StatsCollectorUtil.createStatsCollector(childFieldSpec, null);
+ int statsOrdinal = 0;
+ for (int docId = 0; docId < _numDocs; docId++) {
+ statsCollector.collect(presence.contains(docId) ?
values.get(statsOrdinal++) : defaultValue);
+ }
+ statsCollector.seal();
+
+ // Build per-key index configuration from the key's FieldConfig (falling
back to the default), then apply
+ // the OPEN_STRUCT inverted-on default. No TableConfig/Schema required.
+ FieldConfig keyFieldConfig = _config.getValueFieldConfig(key);
+ if (keyFieldConfig == null) {
+ keyFieldConfig = _config.getDefaultValueFieldConfig();
+ }
+ boolean enableInverted = _config.shouldEnableInvertedIndexForKey(key);
+ FieldIndexConfigs configsForDecision = new FieldIndexConfigs.Builder(
+ FieldIndexConfigsUtil.fromFieldConfig(keyFieldConfig, childFieldSpec))
+ .add(StandardIndexes.inverted(), enableInverted ? IndexConfig.ENABLED
: IndexConfig.DISABLED)
+ .build();
+
+ boolean useDictionary = resolveUseDictionary(childFieldSpec,
configsForDecision, statsCollector);
+
+ // Reconcile dictionary + forward encoding with the final decision
(mirrors BaseSegmentCreator.adaptConfig);
+ // ForwardIndexCreatorFactory selects dict-vs-raw from the forward
config's EncodingType. A compression codec
+ // applies only to the raw forward format (LZ4 preserves the dense child's
current on-disk layout); attaching
+ // one to a dictionary-encoded forward is rejected by
ForwardIndexType.validate.
+ ForwardIndexConfig.Builder forwardBuilder = new ForwardIndexConfig.Builder(
+ useDictionary ? FieldConfig.EncodingType.DICTIONARY :
FieldConfig.EncodingType.RAW);
+ if (!useDictionary) {
+ forwardBuilder.withCompressionCodec(FieldConfig.CompressionCodec.LZ4);
+ }
+ FieldIndexConfigs fieldIndexConfigs = new
FieldIndexConfigs.Builder(configsForDecision)
+ .add(StandardIndexes.dictionary(),
+ useDictionary ? DictionaryIndexConfig.DEFAULT :
DictionaryIndexConfig.DISABLED)
+ .add(StandardIndexes.forward(), forwardBuilder.build())
+ .build();
+
+ int dictElementSize = writeColumnIndexes(materializedCol, storedType,
presence, values,
+ defaultValue, statsCollector, useDictionary, fieldIndexConfigs,
childFieldSpec);
+
+ NullValueVectorCreator nullCreator = new NullValueVectorCreator(_indexDir,
materializedCol);
+ try {
+ for (int docId = 0; docId < _numDocs; docId++) {
+ if (!presence.contains(docId)) {
+ nullCreator.setNull(docId);
+ }
+ }
+ nullCreator.seal();
+ } finally {
+ nullCreator.close();
+ }
+
+ PropertiesConfiguration props = new PropertiesConfiguration();
+ FieldConfig.EncodingType encoding =
+ useDictionary ? FieldConfig.EncodingType.DICTIONARY :
FieldConfig.EncodingType.RAW;
+ BaseSegmentCreator.addColumnMetadataInfo(props, materializedCol,
statsCollector, _numDocs, childFieldSpec,
+ useDictionary, dictElementSize, encoding, false);
+ // OPEN_STRUCT-specific keys not written by addColumnMetadataInfo.
+ props.setProperty(
+ V1Constants.MetadataKeys.Column.getKeyFor(materializedCol,
V1Constants.MetadataKeys.Column.PARENT_COLUMN),
+ _columnName);
+
props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol,
"hasNullValue"), true);
+ if (enableInverted && useDictionary) {
+
props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol,
"hasInvertedIndex"), true);
+ }
+ _materializedColumnMetadata.put(materializedCol, props);
+ }
+
+ /// Decides dictionary vs raw encoding for a materialized child column,
mirroring the three steps of
+ /// `BaseSegmentCreator.createDictionaryForColumn` with standard default
flags (optimizeDictionary
+ /// off => dictionary unless explicitly disabled and not required by an
enabled index).
+ private boolean resolveUseDictionary(FieldSpec childFieldSpec,
FieldIndexConfigs fieldIndexConfigs,
+ AbstractColumnStatisticsCollector statsCollector) {
+ if (DictionaryIndexConfig.requiresDictionary(childFieldSpec,
fieldIndexConfigs)) {
+ return true;
+ }
+ if
(fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isDisabled()) {
+ return false;
+ }
+ return DictionaryIndexType.ignoreDictionaryOverride(false, false,
+ IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD, null,
childFieldSpec, fieldIndexConfigs,
+ statsCollector.getCardinality(),
statsCollector.getTotalNumberOfEntries());
+ }
+
+ /// Writes the dictionary (when used) plus all vetted, enabled indexes for a
materialized child column through
+ /// the standard index-creator family, driven from a single per-doc loop.
Returns the dictionary element size in
+ /// bytes (0 when raw-encoded), for column metadata. The dictionary is built
separately because its build
+ /// lifecycle is CUSTOM and it supplies the dictIds the per-row creators
consume.
+ private int writeColumnIndexes(String materializedCol, DataType storedType,
RoaringBitmap presence,
+ List<Object> values, Object defaultValue,
AbstractColumnStatisticsCollector statsCollector,
+ boolean useDictionary, FieldIndexConfigs fieldIndexConfigs, FieldSpec
childFieldSpec)
+ throws IOException {
+ int dictElementSize = 0;
+ SegmentDictionaryCreator dictCreator = null;
+ try {
+ if (useDictionary) {
+ dictCreator = new SegmentDictionaryCreator(materializedCol, storedType,
+ new File(_indexDir, materializedCol +
V1Constants.Dict.FILE_EXTENSION), true);
+ dictCreator.build(statsCollector.getUniqueValuesSet());
+ }
+
+ // Index-creation context built from the sealed collector (a
ColumnShape) — no TableConfig required.
+ IndexCreationContext context =
+ new IndexCreationContext.Builder(_indexDir, null, statsCollector,
useDictionary, false)
+ .withOnHeap(false).build();
+
+ List<IndexCreator> creators = new ArrayList<>();
+ try {
+ for (IndexType<?, ?, ?> indexType :
IndexService.getInstance().getAllIndexes()) {
+ if (indexType.getIndexBuildLifecycle() !=
IndexType.BuildLifecycle.DURING_SEGMENT_CREATION) {
+ continue; // excludes dictionary (lifecycle CUSTOM), built
separately above
+ }
+ if
(!OpenStructSupportedIndexes.ALLOWED_PRETTY_NAMES.contains(indexType.getPrettyName()))
{
+ continue; // non-vetted indexes already rejected at table-config
validation; defensive backstop
+ }
+ IndexCreator creator = createColumnIndexCreator(indexType, context,
fieldIndexConfigs, materializedCol,
+ childFieldSpec);
+ if (creator != null) {
+ creators.add(creator);
+ }
+ }
+
+ int ordinal = 0;
+ for (int docId = 0; docId < _numDocs; docId++) {
+ Object value = presence.contains(docId) ? values.get(ordinal++) :
defaultValue;
+ int dictId = useDictionary ? dictCreator.indexOfSV(value) : -1;
+ for (IndexCreator creator : creators) {
+ creator.add(value, dictId);
+ }
+ }
+ for (IndexCreator creator : creators) {
+ creator.seal();
+ }
+ } finally {
+ for (IndexCreator creator : creators) {
+ creator.close();
+ }
+ if (dictCreator != null) {
+ dictElementSize = dictCreator.getNumBytesPerEntry();
+ dictCreator.seal();
+ }
+ }
+ } finally {
+ if (dictCreator != null) {
+ dictCreator.close();
+ }
+ }
+ return dictElementSize;
+ }
+
+ @Nullable
+ private static <C extends IndexConfig> IndexCreator
createColumnIndexCreator(IndexType<C, ?, ?> indexType,
+ IndexCreationContext context, FieldIndexConfigs fieldIndexConfigs,
String materializedCol,
+ FieldSpec childFieldSpec)
+ throws IOException {
+ // Materialized child columns exist in no schema/TableConfig, so the
standard table-config-time validation
+ // never sees them. Run the index type's own guards here against the
resolved child FieldSpec (e.g. range
+ // rejects a non-numeric column without a dictionary) so misconfigurations
fail with the canonical message
+ // instead of crashing opaquely inside the creator. validate() internally
no-ops when the index is disabled.
+ indexType.validate(fieldIndexConfigs, childFieldSpec, null);
+ C config = fieldIndexConfigs.getConfig(indexType);
+ if (!config.isEnabled() || !indexType.shouldCreateIndex(context, config)) {
+ return null;
+ }
+ try {
+ return indexType.createIndexCreator(context, config);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Failed to create " + indexType.getPrettyName() +
" creator for: " + materializedCol, e);
+ }
+ }
+
+ private void writeSparseJsonColumn(List<String> sparseKeys)
+ throws IOException {
+ String sparseCol = OpenStructNaming.sparseColumnName(_columnName);
+ int maxLen = 1;
+ String[] jsonPerDoc = new String[_numDocs];
+ int nonNullCount = 0;
+ for (int docId = 0; docId < _numDocs; docId++) {
+ Map<String, Object> sparseEntries = new LinkedHashMap<>();
+ for (String key : sparseKeys) {
+ RoaringBitmap presence = _presenceBitmaps.get(key);
+ if (presence != null && presence.contains(docId)) {
+ int ordinal = presence.rank(docId) - 1;
+ sparseEntries.put(key, _values.get(key).get(ordinal));
+ }
+ }
+ if (!sparseEntries.isEmpty()) {
+ try {
+ String json = JsonUtils.objectToString(sparseEntries);
+ jsonPerDoc[docId] = json;
+ maxLen = Math.max(maxLen,
json.getBytes(StandardCharsets.UTF_8).length);
Review Comment:
This is not needed per doc ? `json.getBytes(UTF_8).length` allocates a fresh
byte[] solely to measure UTF-8 byte length, then discards it, waste of memory
just to be cleaned up by GC ?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitter.java:
##########
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.openstruct;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.creator.impl.BaseSegmentCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.StatsCollectorUtil;
+import
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import
org.apache.pinot.segment.local.segment.index.openstruct.OpenStructSupportedIndexes;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import
org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructNaming;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Splits an OPEN_STRUCT column into per-key materialized columns using
standard Pinot index
+/// creators. Dense keys become independent virtual columns; remaining keys go
into a single
+/// synthetic JSON column for sparse storage.
+///
+/// Lifecycle: instantiated by `BaseSegmentCreator` for OPEN_STRUCT columns.
Receives
+/// per-doc `Map<String, Object>` values via [#add(Map, int)], accumulates in
memory,
+/// then on [#seal()] writes per-key column files using standard creators.
+public class OpenStructColumnSplitter implements
ColumnarOpenStructIndexCreator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OpenStructColumnSplitter.class);
+
+ private final File _indexDir;
+ private final String _columnName;
+ private final Map<String, FieldSpec> _childFieldSpecs;
+ private final OpenStructIndexConfig _config;
+ private final int _maxDenseKeys;
+
+ // Per-key accumulation
+ private final Map<String, RoaringBitmap> _presenceBitmaps = new HashMap<>();
+ private final Map<String, List<Object>> _values = new HashMap<>();
+ private final Map<String, DataType> _inferredTypes = new HashMap<>();
+ private int _numDocs;
+
+ // Resolved at seal time
+ @Nullable
+ private Set<String> _resolvedDenseKeys;
+ private final Map<String, PropertiesConfiguration>
_materializedColumnMetadata = new LinkedHashMap<>();
+
+ public OpenStructColumnSplitter(File indexDir, String columnName, FieldSpec
fieldSpec,
+ OpenStructIndexConfig config) {
+ _indexDir = indexDir;
+ _columnName = columnName;
+ _config = config;
+ _maxDenseKeys = config.getMaxDenseKeys();
+
+ Map<String, FieldSpec> childFieldSpecs = null;
+ if (fieldSpec instanceof ComplexFieldSpec) {
+ ComplexFieldSpec complexSpec = (ComplexFieldSpec) fieldSpec;
+ childFieldSpecs = complexSpec.getChildFieldSpecs();
+ }
+ _childFieldSpecs = childFieldSpecs != null ? new
HashMap<>(childFieldSpecs) : new HashMap<>();
+ }
+
+ @Override
+ public void add(Object value, int docId)
+ throws IOException {
+ if (value instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = (Map<String, Object>) value;
+ addMap(map);
+ } else {
+ addMap(null);
+ }
+ }
+
+ @Override
+ public void add(Map<String, Object> openStructValue, int docId)
+ throws IOException {
+ addMap(openStructValue);
+ }
+
+ @Override
+ public void add(Object[] values, @Nullable int[] dictIds)
+ throws IOException {
+ throw new UnsupportedOperationException("OPEN_STRUCT index is single-value
only");
+ }
+
+ /// Returns the resolved dense-key set after [#seal()] or [#classify()].
+ /// Returns an empty set before resolution.
+ public Set<String> getResolvedDenseKeys() {
+ return _resolvedDenseKeys != null ?
Collections.unmodifiableSet(_resolvedDenseKeys) : Set.of();
+ }
+
+ /// Resolves dense vs sparse keys without writing any files. Exposed for
testing and for callers
+ /// that need the classification independent of file output. [#seal()] calls
this internally.
+ public Set<String> classify() {
+ if (_resolvedDenseKeys != null) {
+ return _resolvedDenseKeys;
+ }
+ if (_numDocs == 0 || _presenceBitmaps.isEmpty()) {
+ _resolvedDenseKeys = new LinkedHashSet<>();
+ return _resolvedDenseKeys;
+ }
+ List<String> allKeys = new ArrayList<>(_presenceBitmaps.keySet());
+ allKeys.sort((a, b) -> {
+ double fillA = (double) _presenceBitmaps.get(a).getCardinality() /
_numDocs;
+ double fillB = (double) _presenceBitmaps.get(b).getCardinality() /
_numDocs;
+ int cmp = Double.compare(fillB, fillA);
+ return cmp != 0 ? cmp : a.compareTo(b);
+ });
+
+ double minFillRate = _config.getDenseKeyMinFillRate();
+ _resolvedDenseKeys = new LinkedHashSet<>();
+
+ Set<String> configuredDenseKeys = _config.getDenseKeys();
+ for (String key : configuredDenseKeys) {
+ if (_presenceBitmaps.containsKey(key) && (_maxDenseKeys < 0 ||
_resolvedDenseKeys.size() < _maxDenseKeys)) {
+ _resolvedDenseKeys.add(key);
+ }
+ }
+
+ for (String key : allKeys) {
+ if (_resolvedDenseKeys.contains(key)) {
+ continue;
+ }
+ double fillRate = (double) _presenceBitmaps.get(key).getCardinality() /
_numDocs;
+ if ((_maxDenseKeys < 0 || _resolvedDenseKeys.size() < _maxDenseKeys) &&
fillRate >= minFillRate) {
+ _resolvedDenseKeys.add(key);
+ }
+ }
+ return _resolvedDenseKeys;
+ }
+
+ private void addMap(@Nullable Map<String, Object> map) {
+ if (map != null && !map.isEmpty()) {
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ String key = entry.getKey();
+ Object rawValue = entry.getValue();
+ if (rawValue == null) {
+ continue;
+ }
+ FieldSpec keySpec = _childFieldSpecs.get(key);
+ DataType valueType = keySpec != null
+ ? keySpec.getDataType()
+ : _inferredTypes.computeIfAbsent(key, k -> {
+ DataType inferred =
OpenStructTypeInference.inferDataType(rawValue);
+ return inferred != null ? inferred : DataType.STRING;
+ });
+ if (!_presenceBitmaps.containsKey(key)) {
+ _presenceBitmaps.put(key, new RoaringBitmap());
+ _values.put(key, new ArrayList<>());
+ }
+ _presenceBitmaps.get(key).add(_numDocs);
+ Object coerced;
+ try {
+ PinotDataType sourceType =
PinotDataType.getSingleValueType(rawValue);
+ PinotDataType destType =
ColumnDataType.fromDataTypeSV(valueType.getStoredType()).toPinotDataType();
+ coerced = destType.convert(rawValue, sourceType);
+ } catch (Exception e) {
+ LOGGER.warn("OPEN_STRUCT '{}': coercion failed for key '{}' value
'{}' to {}. Skipping.",
+ _columnName, key, rawValue, valueType, e);
+ _presenceBitmaps.get(key).remove(_numDocs);
+ continue;
+ }
+ _values.get(key).add(coerced);
+ }
+ }
+ _numDocs++;
+ }
+
+ @Override
+ public void seal()
+ throws IOException {
+ classify();
+ if (_resolvedDenseKeys == null || (_numDocs == 0 &&
_presenceBitmaps.isEmpty())) {
+ return;
+ }
+
+ for (String key : _resolvedDenseKeys) {
+ writeDenseKeyColumn(key);
+ }
+
+ List<String> sparseKeys = new ArrayList<>();
+ for (String key : _presenceBitmaps.keySet()) {
+ if (!_resolvedDenseKeys.contains(key)) {
+ sparseKeys.add(key);
+ }
+ }
+ if (!sparseKeys.isEmpty()) {
+ writeSparseJsonColumn(sparseKeys);
+ }
+
+ emitParentColumnMetadata(!sparseKeys.isEmpty());
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ // Nothing to close — sub-creators are created and closed within seal()
+ }
+
+ @Override
+ public Map<String, PropertiesConfiguration> getMaterializedColumnMetadata() {
+ return _materializedColumnMetadata;
+ }
+
+ private void writeDenseKeyColumn(String key)
+ throws IOException {
+ String materializedCol =
OpenStructNaming.materializedColumnName(_columnName, key);
+ FieldSpec keySpec = _childFieldSpecs.get(key);
+ DataType valueType = keySpec != null
+ ? keySpec.getDataType()
+ : _inferredTypes.getOrDefault(key, DataType.STRING);
+ DataType storedType = valueType.getStoredType();
+ RoaringBitmap presence = _presenceBitmaps.get(key);
+ List<Object> values = _values.get(key);
+
+ // Synthetic field spec for the materialized child. Its natural Pinot
dimension null value is the value
+ // stored for absent docs, so column metadata stays consistent with
on-disk content.
+ DimensionFieldSpec childFieldSpec = new
DimensionFieldSpec(materializedCol, storedType, true);
+ Object defaultValue = childFieldSpec.getDefaultNullValue();
+
+ // Collect statistics the standard way: present docs contribute their
value, absent docs the default
+ // (absent docs are also marked in the null vector below).
+ AbstractColumnStatisticsCollector statsCollector =
+ StatsCollectorUtil.createStatsCollector(childFieldSpec, null);
+ int statsOrdinal = 0;
+ for (int docId = 0; docId < _numDocs; docId++) {
+ statsCollector.collect(presence.contains(docId) ?
values.get(statsOrdinal++) : defaultValue);
+ }
+ statsCollector.seal();
+
+ // Build per-key index configuration from the key's FieldConfig (falling
back to the default), then apply
+ // the OPEN_STRUCT inverted-on default. No TableConfig/Schema required.
+ FieldConfig keyFieldConfig = _config.getValueFieldConfig(key);
+ if (keyFieldConfig == null) {
+ keyFieldConfig = _config.getDefaultValueFieldConfig();
+ }
+ boolean enableInverted = _config.shouldEnableInvertedIndexForKey(key);
+ FieldIndexConfigs configsForDecision = new FieldIndexConfigs.Builder(
+ FieldIndexConfigsUtil.fromFieldConfig(keyFieldConfig, childFieldSpec))
+ .add(StandardIndexes.inverted(), enableInverted ? IndexConfig.ENABLED
: IndexConfig.DISABLED)
+ .build();
+
+ boolean useDictionary = resolveUseDictionary(childFieldSpec,
configsForDecision, statsCollector);
+
+ // Reconcile dictionary + forward encoding with the final decision
(mirrors BaseSegmentCreator.adaptConfig);
+ // ForwardIndexCreatorFactory selects dict-vs-raw from the forward
config's EncodingType. A compression codec
+ // applies only to the raw forward format (LZ4 preserves the dense child's
current on-disk layout); attaching
+ // one to a dictionary-encoded forward is rejected by
ForwardIndexType.validate.
+ ForwardIndexConfig.Builder forwardBuilder = new ForwardIndexConfig.Builder(
+ useDictionary ? FieldConfig.EncodingType.DICTIONARY :
FieldConfig.EncodingType.RAW);
+ if (!useDictionary) {
+ forwardBuilder.withCompressionCodec(FieldConfig.CompressionCodec.LZ4);
+ }
+ FieldIndexConfigs fieldIndexConfigs = new
FieldIndexConfigs.Builder(configsForDecision)
+ .add(StandardIndexes.dictionary(),
+ useDictionary ? DictionaryIndexConfig.DEFAULT :
DictionaryIndexConfig.DISABLED)
+ .add(StandardIndexes.forward(), forwardBuilder.build())
+ .build();
+
+ int dictElementSize = writeColumnIndexes(materializedCol, storedType,
presence, values,
+ defaultValue, statsCollector, useDictionary, fieldIndexConfigs,
childFieldSpec);
+
+ NullValueVectorCreator nullCreator = new NullValueVectorCreator(_indexDir,
materializedCol);
+ try {
+ for (int docId = 0; docId < _numDocs; docId++) {
+ if (!presence.contains(docId)) {
+ nullCreator.setNull(docId);
+ }
+ }
+ nullCreator.seal();
+ } finally {
+ nullCreator.close();
+ }
+
+ PropertiesConfiguration props = new PropertiesConfiguration();
+ FieldConfig.EncodingType encoding =
+ useDictionary ? FieldConfig.EncodingType.DICTIONARY :
FieldConfig.EncodingType.RAW;
+ BaseSegmentCreator.addColumnMetadataInfo(props, materializedCol,
statsCollector, _numDocs, childFieldSpec,
+ useDictionary, dictElementSize, encoding, false);
+ // OPEN_STRUCT-specific keys not written by addColumnMetadataInfo.
+ props.setProperty(
+ V1Constants.MetadataKeys.Column.getKeyFor(materializedCol,
V1Constants.MetadataKeys.Column.PARENT_COLUMN),
+ _columnName);
+
props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol,
"hasNullValue"), true);
+ if (enableInverted && useDictionary) {
+
props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol,
"hasInvertedIndex"), true);
+ }
+ _materializedColumnMetadata.put(materializedCol, props);
+ }
+
+ /// Decides dictionary vs raw encoding for a materialized child column,
mirroring the three steps of
+ /// `BaseSegmentCreator.createDictionaryForColumn` with standard default
flags (optimizeDictionary
+ /// off => dictionary unless explicitly disabled and not required by an
enabled index).
+ private boolean resolveUseDictionary(FieldSpec childFieldSpec,
FieldIndexConfigs fieldIndexConfigs,
+ AbstractColumnStatisticsCollector statsCollector) {
+ if (DictionaryIndexConfig.requiresDictionary(childFieldSpec,
fieldIndexConfigs)) {
+ return true;
+ }
+ if
(fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isDisabled()) {
+ return false;
+ }
+ return DictionaryIndexType.ignoreDictionaryOverride(false, false,
+ IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD, null,
childFieldSpec, fieldIndexConfigs,
+ statsCollector.getCardinality(),
statsCollector.getTotalNumberOfEntries());
+ }
+
+ /// Writes the dictionary (when used) plus all vetted, enabled indexes for a
materialized child column through
+ /// the standard index-creator family, driven from a single per-doc loop.
Returns the dictionary element size in
+ /// bytes (0 when raw-encoded), for column metadata. The dictionary is built
separately because its build
+ /// lifecycle is CUSTOM and it supplies the dictIds the per-row creators
consume.
+ private int writeColumnIndexes(String materializedCol, DataType storedType,
RoaringBitmap presence,
+ List<Object> values, Object defaultValue,
AbstractColumnStatisticsCollector statsCollector,
+ boolean useDictionary, FieldIndexConfigs fieldIndexConfigs, FieldSpec
childFieldSpec)
+ throws IOException {
+ int dictElementSize = 0;
+ SegmentDictionaryCreator dictCreator = null;
+ try {
+ if (useDictionary) {
+ dictCreator = new SegmentDictionaryCreator(materializedCol, storedType,
+ new File(_indexDir, materializedCol +
V1Constants.Dict.FILE_EXTENSION), true);
+ dictCreator.build(statsCollector.getUniqueValuesSet());
+ }
+
+ // Index-creation context built from the sealed collector (a
ColumnShape) — no TableConfig required.
+ IndexCreationContext context =
+ new IndexCreationContext.Builder(_indexDir, null, statsCollector,
useDictionary, false)
+ .withOnHeap(false).build();
+
+ List<IndexCreator> creators = new ArrayList<>();
+ try {
+ for (IndexType<?, ?, ?> indexType :
IndexService.getInstance().getAllIndexes()) {
+ if (indexType.getIndexBuildLifecycle() !=
IndexType.BuildLifecycle.DURING_SEGMENT_CREATION) {
+ continue; // excludes dictionary (lifecycle CUSTOM), built
separately above
+ }
+ if
(!OpenStructSupportedIndexes.ALLOWED_PRETTY_NAMES.contains(indexType.getPrettyName()))
{
+ continue; // non-vetted indexes already rejected at table-config
validation; defensive backstop
+ }
+ IndexCreator creator = createColumnIndexCreator(indexType, context,
fieldIndexConfigs, materializedCol,
+ childFieldSpec);
+ if (creator != null) {
+ creators.add(creator);
+ }
+ }
+
+ int ordinal = 0;
+ for (int docId = 0; docId < _numDocs; docId++) {
+ Object value = presence.contains(docId) ? values.get(ordinal++) :
defaultValue;
+ int dictId = useDictionary ? dictCreator.indexOfSV(value) : -1;
+ for (IndexCreator creator : creators) {
+ creator.add(value, dictId);
+ }
+ }
+ for (IndexCreator creator : creators) {
+ creator.seal();
+ }
+ } finally {
+ for (IndexCreator creator : creators) {
+ creator.close();
+ }
+ if (dictCreator != null) {
+ dictElementSize = dictCreator.getNumBytesPerEntry();
+ dictCreator.seal();
+ }
+ }
+ } finally {
+ if (dictCreator != null) {
+ dictCreator.close();
+ }
+ }
+ return dictElementSize;
+ }
+
+ @Nullable
+ private static <C extends IndexConfig> IndexCreator
createColumnIndexCreator(IndexType<C, ?, ?> indexType,
+ IndexCreationContext context, FieldIndexConfigs fieldIndexConfigs,
String materializedCol,
+ FieldSpec childFieldSpec)
+ throws IOException {
+ // Materialized child columns exist in no schema/TableConfig, so the
standard table-config-time validation
+ // never sees them. Run the index type's own guards here against the
resolved child FieldSpec (e.g. range
+ // rejects a non-numeric column without a dictionary) so misconfigurations
fail with the canonical message
+ // instead of crashing opaquely inside the creator. validate() internally
no-ops when the index is disabled.
+ indexType.validate(fieldIndexConfigs, childFieldSpec, null);
+ C config = fieldIndexConfigs.getConfig(indexType);
+ if (!config.isEnabled() || !indexType.shouldCreateIndex(context, config)) {
+ return null;
+ }
+ try {
+ return indexType.createIndexCreator(context, config);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Failed to create " + indexType.getPrettyName() +
" creator for: " + materializedCol, e);
+ }
+ }
+
+ private void writeSparseJsonColumn(List<String> sparseKeys)
+ throws IOException {
+ String sparseCol = OpenStructNaming.sparseColumnName(_columnName);
+ int maxLen = 1;
+ String[] jsonPerDoc = new String[_numDocs];
+ int nonNullCount = 0;
+ for (int docId = 0; docId < _numDocs; docId++) {
+ Map<String, Object> sparseEntries = new LinkedHashMap<>();
+ for (String key : sparseKeys) {
+ RoaringBitmap presence = _presenceBitmaps.get(key);
+ if (presence != null && presence.contains(docId)) {
+ int ordinal = presence.rank(docId) - 1;
+ sparseEntries.put(key, _values.get(key).get(ordinal));
+ }
+ }
+ if (!sparseEntries.isEmpty()) {
+ try {
+ String json = JsonUtils.objectToString(sparseEntries);
+ jsonPerDoc[docId] = json;
+ maxLen = Math.max(maxLen,
json.getBytes(StandardCharsets.UTF_8).length);
+ nonNullCount++;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to serialize sparse entries for
docId " + docId, e);
+ }
+ }
+ }
+
+ SingleValueVarByteRawIndexCreator fwdCreator = new
SingleValueVarByteRawIndexCreator(
+ _indexDir, ChunkCompressionType.LZ4, sparseCol, _numDocs,
DataType.STRING, maxLen);
+ NullValueVectorCreator nullCreator = new NullValueVectorCreator(_indexDir,
sparseCol);
+ try {
+ for (int docId = 0; docId < _numDocs; docId++) {
Review Comment:
Can we combine this loop for the above ? Seems like the above loop is just
allocating String[_numDocs] , no reason for double loop ?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitter.java:
##########
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.openstruct;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.segment.creator.impl.BaseSegmentCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.StatsCollectorUtil;
+import
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import
org.apache.pinot.segment.local.segment.index.openstruct.OpenStructSupportedIndexes;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import
org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.OpenStructNaming;
+import org.apache.pinot.spi.data.OpenStructTypeInference;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.PinotDataType;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Splits an OPEN_STRUCT column into per-key materialized columns using
standard Pinot index
+/// creators. Dense keys become independent virtual columns; remaining keys go
into a single
+/// synthetic JSON column for sparse storage.
+///
+/// Lifecycle: instantiated by `BaseSegmentCreator` for OPEN_STRUCT columns.
Receives
+/// per-doc `Map<String, Object>` values via [#add(Map, int)], accumulates in
memory,
+/// then on [#seal()] writes per-key column files using standard creators.
+public class OpenStructColumnSplitter implements
ColumnarOpenStructIndexCreator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OpenStructColumnSplitter.class);
+
+ private final File _indexDir;
+ private final String _columnName;
+ private final Map<String, FieldSpec> _childFieldSpecs;
+ private final OpenStructIndexConfig _config;
+ private final int _maxDenseKeys;
+
+ // Per-key accumulation
+ private final Map<String, RoaringBitmap> _presenceBitmaps = new HashMap<>();
+ private final Map<String, List<Object>> _values = new HashMap<>();
+ private final Map<String, DataType> _inferredTypes = new HashMap<>();
+ private int _numDocs;
+
+ // Resolved at seal time
+ @Nullable
+ private Set<String> _resolvedDenseKeys;
+ private final Map<String, PropertiesConfiguration>
_materializedColumnMetadata = new LinkedHashMap<>();
+
+ public OpenStructColumnSplitter(File indexDir, String columnName, FieldSpec
fieldSpec,
+ OpenStructIndexConfig config) {
+ _indexDir = indexDir;
+ _columnName = columnName;
+ _config = config;
+ _maxDenseKeys = config.getMaxDenseKeys();
+
+ Map<String, FieldSpec> childFieldSpecs = null;
+ if (fieldSpec instanceof ComplexFieldSpec) {
+ ComplexFieldSpec complexSpec = (ComplexFieldSpec) fieldSpec;
+ childFieldSpecs = complexSpec.getChildFieldSpecs();
+ }
+ _childFieldSpecs = childFieldSpecs != null ? new
HashMap<>(childFieldSpecs) : new HashMap<>();
+ }
+
+ @Override
+ public void add(Object value, int docId)
+ throws IOException {
+ if (value instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = (Map<String, Object>) value;
+ addMap(map);
+ } else {
+ addMap(null);
+ }
+ }
+
+ @Override
+ public void add(Map<String, Object> openStructValue, int docId)
+ throws IOException {
+ addMap(openStructValue);
+ }
+
+ @Override
+ public void add(Object[] values, @Nullable int[] dictIds)
+ throws IOException {
+ throw new UnsupportedOperationException("OPEN_STRUCT index is single-value
only");
+ }
+
+ /// Returns the resolved dense-key set after [#seal()] or [#classify()].
+ /// Returns an empty set before resolution.
+ public Set<String> getResolvedDenseKeys() {
+ return _resolvedDenseKeys != null ?
Collections.unmodifiableSet(_resolvedDenseKeys) : Set.of();
+ }
+
+ /// Resolves dense vs sparse keys without writing any files. Exposed for
testing and for callers
+ /// that need the classification independent of file output. [#seal()] calls
this internally.
+ public Set<String> classify() {
+ if (_resolvedDenseKeys != null) {
+ return _resolvedDenseKeys;
+ }
+ if (_numDocs == 0 || _presenceBitmaps.isEmpty()) {
+ _resolvedDenseKeys = new LinkedHashSet<>();
+ return _resolvedDenseKeys;
+ }
+ List<String> allKeys = new ArrayList<>(_presenceBitmaps.keySet());
+ allKeys.sort((a, b) -> {
+ double fillA = (double) _presenceBitmaps.get(a).getCardinality() /
_numDocs;
+ double fillB = (double) _presenceBitmaps.get(b).getCardinality() /
_numDocs;
+ int cmp = Double.compare(fillB, fillA);
+ return cmp != 0 ? cmp : a.compareTo(b);
+ });
+
+ double minFillRate = _config.getDenseKeyMinFillRate();
+ _resolvedDenseKeys = new LinkedHashSet<>();
+
+ Set<String> configuredDenseKeys = _config.getDenseKeys();
+ for (String key : configuredDenseKeys) {
+ if (_presenceBitmaps.containsKey(key) && (_maxDenseKeys < 0 ||
_resolvedDenseKeys.size() < _maxDenseKeys)) {
+ _resolvedDenseKeys.add(key);
+ }
+ }
+
+ for (String key : allKeys) {
+ if (_resolvedDenseKeys.contains(key)) {
+ continue;
+ }
+ double fillRate = (double) _presenceBitmaps.get(key).getCardinality() /
_numDocs;
Review Comment:
Recommendation for optimization, For each key in each doc, 3-4 HashMap
lookups for the same key in this code (containsKey, get for bitmap, get for
values). Can we use computeIfAbsent and cache references through the code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]