This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d91e5db76c Add SchemaConformingTransformer to transform records with varying keys to fit a table's schema without dropping fields. (#11210) d91e5db76c is described below commit d91e5db76c53af9774db1c57c71520f4c196c059 Author: kirkrodrigues <2454684+kirkrodrig...@users.noreply.github.com> AuthorDate: Thu Aug 24 17:08:55 2023 -0400 Add SchemaConformingTransformer to transform records with varying keys to fit a table's schema without dropping fields. (#11210) * Add JsonLogTransformer to transform semi-structured log events to fit a table's schema without dropping fields. * Minor fix * Add unit tests for JsonLogTransformer. * Minor fixes. * Refactor docstrings. * Rename JsonLogTransformer to SchemaConformingTransformer * JsonLogTransformer: Pass through GenericRow's special keys --- .../apache/pinot/queries/TransformQueriesTest.java | 2 +- .../recordtransformer/CompositeTransformer.java | 13 +- .../SchemaConformingTransformer.java | 545 +++++++++++++++ .../pinot/segment/local/utils/IngestionUtils.java | 7 + .../segment/local/utils/TableConfigUtils.java | 8 + .../SchemaConformingTransformerTest.java | 731 +++++++++++++++++++++ .../config/table/ingestion/IngestionConfig.java | 15 + .../SchemaConformingTransformerConfig.java | 78 +++ .../apache/pinot/spi/data/readers/GenericRow.java | 7 + .../pinot/spi/stream/StreamDataDecoderImpl.java | 7 + 10 files changed, 1408 insertions(+), 5 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java index f3abbbc534..572080e44e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java @@ -135,7 +135,7 @@ public class TransformQueriesTest extends BaseQueriesTest { .setIngestionConfig(new IngestionConfig(null, null, null, Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 == null || " + "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")), - null, null)) + null, null, null)) .build(); Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java index e584944bda..dbc6cdbe3c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java @@ -48,7 +48,12 @@ public class CompositeTransformer implements RecordTransformer { * destination columns * </li> * <li> - * {@link DataTypeTransformer} after {@link FilterTransformer} to convert values to comply with the schema + * Optional {@link SchemaConformingTransformer} after {@link FilterTransformer}, so that we can transform input + * records that have varying fields to a fixed schema without dropping any fields + * </li> + * <li> + * {@link DataTypeTransformer} after {@link SchemaConformingTransformer} to convert values to comply with the + * schema * </li> * <li> * Optional {@link TimeValidationTransformer} after {@link DataTypeTransformer} so that time value is converted to @@ -67,9 +72,9 @@ public class CompositeTransformer implements RecordTransformer { public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig, Schema schema) { return new CompositeTransformer( Stream.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig), - new DataTypeTransformer(tableConfig, schema), new TimeValidationTransformer(tableConfig, schema), - new NullValueTransformer(tableConfig, schema), new SanitizationTransformer(schema)) - .filter(t -> !t.isNoOp()).collect(Collectors.toList())); + new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema), + new TimeValidationTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema), + new SanitizationTransformer(schema)).filter(t -> !t.isNoOp()).collect(Collectors.toList())); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java new file mode 100644 index 0000000000..d8c7da6297 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java @@ -0,0 +1,545 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.recordtransformer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This transformer transforms records with varying keys such that they can be stored in a table with a fixed schema. + * Since these records have varying keys, it is impractical to store each field in its own table column. At the same + * time, most (if not all) fields may be important to the user, so we should not drop any field unnecessarily. So this + * transformer primarily takes record-fields that don't exist in the schema and stores them in a type of catchall field. + * <p> + * For example, consider this record: + * <pre> + * { + * "timestamp": 1687786535928, + * "hostname": "host1", + * "HOSTNAME": "host1", + * "level": "INFO", + * "message": "Started processing job1", + * "tags": { + * "platform": "data", + * "service": "serializer", + * "params": { + * "queueLength": 5, + * "timeout": 299, + * "userData_noIndex": { + * "nth": 99 + * } + * } + * } + * } + * </pre> + * And let's say the table's schema contains these fields: + * <ul> + * <li>timestamp</li> + * <li>hostname</li> + * <li>level</li> + * <li>message</li> + * <li>tags.platform</li> + * <li>tags.service</li> + * <li>indexableExtras</li> + * <li>unindexableExtras</li> + * </ul> + * <p> + * Without this transformer, the entire "tags" field would be dropped when storing the record in the table. However, + * with this transformer, the record would be transformed into the following: + * <pre> + * { + * "timestamp": 1687786535928, + * "hostname": "host1", + * "level": "INFO", + * "message": "Started processing job1", + * "tags.platform": "data", + * "tags.service": "serializer", + * "indexableExtras": { + * "tags": { + * "params": { + * "queueLength": 5, + * "timeout": 299 + * } + * } + * }, + * "unindexableExtras": { + * "tags": { + * "userData_noIndex": { + * "nth": 99 + * } + * } + * } + * } + * </pre> + * Notice that the transformer: + * <ul> + * <li>Flattens nested fields which exist in the schema, like "tags.platform"</li> + * <li>Drops some fields like "HOSTNAME", where "HOSTNAME" must be listed as a field in the config option + * "fieldPathsToDrop".</li> + * <li>Moves fields which don't exist in the schema and have the suffix "_noIndex" into the "unindexableExtras" field + * (the field name is configurable)</li> + * <li>Moves any remaining fields which don't exist in the schema into the "indexableExtras" field (the field name is + * configurable)</li> + * </ul> + * <p> + * The "unindexableExtras" field allows the transformer to separate fields which don't need indexing (because they are + * only retrieved, not searched) from those that do. The transformer also has other configuration options specified in + * {@link SchemaConformingTransformerConfig}. + */ +public class SchemaConformingTransformer implements RecordTransformer { + private static final Logger _logger = LoggerFactory.getLogger(SchemaConformingTransformer.class); + + private final boolean _continueOnError; + private final SchemaConformingTransformerConfig _transformerConfig; + private final DataType _indexableExtrasFieldType; + private final DataType _unindexableExtrasFieldType; + + private Map<String, Object> _schemaTree; + + /** + * Validates the schema against the given transformer's configuration. + */ + public static void validateSchema(@Nonnull Schema schema, + @Nonnull SchemaConformingTransformerConfig transformerConfig) { + validateSchemaFieldNames(schema.getPhysicalColumnNames(), transformerConfig); + + String indexableExtrasFieldName = transformerConfig.getIndexableExtrasField(); + getAndValidateExtrasFieldType(schema, indexableExtrasFieldName); + String unindexableExtrasFieldName = transformerConfig.getUnindexableExtrasField(); + if (null != unindexableExtrasFieldName) { + getAndValidateExtrasFieldType(schema, indexableExtrasFieldName); + } + + validateSchemaAndCreateTree(schema); + } + + /** + * Validates that none of the schema fields have names that conflict with the transformer's configuration. + */ + private static void validateSchemaFieldNames(Set<String> schemaFields, + SchemaConformingTransformerConfig transformerConfig) { + // Validate that none of the columns in the schema end with unindexableFieldSuffix + String unindexableFieldSuffix = transformerConfig.getUnindexableFieldSuffix(); + if (null != unindexableFieldSuffix) { + for (String field : schemaFields) { + Preconditions.checkState(!field.endsWith(unindexableFieldSuffix), "Field '%s' has no-index suffix '%s'", field, + unindexableFieldSuffix); + } + } + + // Validate that none of the columns in the schema end overlap with the fields in fieldPathsToDrop + Set<String> fieldPathsToDrop = transformerConfig.getFieldPathsToDrop(); + if (null != fieldPathsToDrop) { + Set<String> fieldIntersection = new HashSet<>(schemaFields); + fieldIntersection.retainAll(fieldPathsToDrop); + Preconditions.checkState(fieldIntersection.isEmpty(), "Fields in schema overlap with fieldPathsToDrop"); + } + } + + /** + * @return The field type for the given extras field + */ + private static DataType getAndValidateExtrasFieldType(Schema schema, @Nonnull String extrasFieldName) { + FieldSpec fieldSpec = schema.getFieldSpecFor(extrasFieldName); + Preconditions.checkState(null != fieldSpec, "Field '%s' doesn't exist in schema", extrasFieldName); + DataType fieldDataType = fieldSpec.getDataType(); + Preconditions.checkState(DataType.JSON == fieldDataType || DataType.STRING == fieldDataType, + "Field '%s' has unsupported type %s", fieldDataType.toString()); + return fieldDataType; + } + + /** + * Validates the schema with a SchemaConformingTransformerConfig instance and creates a tree representing the fields + * in the schema to be used when transforming input records. For instance, the field "a.b" in the schema would be + * un-flattened into "{a: b: null}" in the tree, allowing us to more easily process records containing the latter. + * @throws IllegalArgumentException if schema validation fails in one of two ways: + * <ul> + * <li>One of the fields in the schema has a name which when interpreted as a JSON path, corresponds to an object + * with an empty sub-key. E.g., the field name "a..b" corresponds to the JSON {"a": {"": {"b": ...}}}</li> + * <li>Two fields in the schema have names which correspond to JSON paths where one is a child of the other. E.g., + * the field names "a.b" and "a.b.c" are considered invalid since "a.b.c" is a child of "a.b".</li> + * </ul> + */ + private static Map<String, Object> validateSchemaAndCreateTree(@Nonnull Schema schema) + throws IllegalArgumentException { + Set<String> schemaFields = schema.getPhysicalColumnNames(); + + Map<String, Object> schemaTree = new HashMap<>(); + List<String> subKeys = new ArrayList<>(); + for (String field : schemaFields) { + int keySeparatorIdx = field.indexOf(JsonUtils.KEY_SEPARATOR); + if (-1 == keySeparatorIdx) { + // Not a flattened key + schemaTree.put(field, null); + continue; + } + + subKeys.clear(); + getAndValidateSubKeys(field, keySeparatorIdx, subKeys); + + // Add all sub-keys except the leaf to the tree + Map<String, Object> currentNode = schemaTree; + for (int i = 0; i < subKeys.size() - 1; i++) { + String subKey = subKeys.get(i); + + Map<String, Object> childNode; + if (currentNode.containsKey(subKey)) { + childNode = (Map<String, Object>) currentNode.get(subKey); + if (null == childNode) { + throw new IllegalArgumentException( + "Cannot handle field '" + String.join(JsonUtils.KEY_SEPARATOR, subKeys.subList(0, i + 1)) + + "' which overlaps with another field in the schema."); + } + } else { + childNode = new HashMap<>(); + currentNode.put(subKey, childNode); + } + currentNode = childNode; + } + // Add the leaf pointing at null + String subKey = subKeys.get(subKeys.size() - 1); + if (currentNode.containsKey(subKey)) { + throw new IllegalArgumentException( + "Cannot handle field '" + field + "' which overlaps with another field in the schema."); + } + currentNode.put(subKey, null); + } + + return schemaTree; + } + + /** + * Given a JSON path (e.g. "k1.k2.k3"), returns all the sub-keys (e.g. ["k1", "k2", "k3"]) + * @param key The complete key + * @param firstKeySeparatorIdx The index of the first key separator in {@code key} + * @param subKeys Returns the sub-keys + * @throws IllegalArgumentException if any sub-key is empty + */ + private static void getAndValidateSubKeys(String key, int firstKeySeparatorIdx, List<String> subKeys) + throws IllegalArgumentException { + int subKeyBeginIdx = 0; + int subKeyEndIdx = firstKeySeparatorIdx; + int keyLength = key.length(); + while (true) { + // Validate and add the sub-key + String subKey = key.substring(subKeyBeginIdx, subKeyEndIdx); + if (subKey.isEmpty()) { + throw new IllegalArgumentException("Unsupported empty sub-key in '" + key + "'."); + } + subKeys.add(subKey); + + // Advance to the beginning of the next sub-key + subKeyBeginIdx = subKeyEndIdx + 1; + if (subKeyBeginIdx >= keyLength) { + break; + } + + // Find the end of the next sub-key + int keySeparatorIdx = key.indexOf(JsonUtils.KEY_SEPARATOR, subKeyBeginIdx); + if (-1 != keySeparatorIdx) { + subKeyEndIdx = keySeparatorIdx; + } else { + subKeyEndIdx = key.length(); + } + } + } + + public SchemaConformingTransformer(TableConfig tableConfig, Schema schema) { + if (null == tableConfig.getIngestionConfig() || null == tableConfig.getIngestionConfig() + .getSchemaConformingTransformerConfig()) { + _continueOnError = false; + _transformerConfig = null; + _indexableExtrasFieldType = null; + _unindexableExtrasFieldType = null; + return; + } + + _continueOnError = tableConfig.getIngestionConfig().isContinueOnError(); + _transformerConfig = tableConfig.getIngestionConfig().getSchemaConformingTransformerConfig(); + String indexableExtrasFieldName = _transformerConfig.getIndexableExtrasField(); + _indexableExtrasFieldType = getAndValidateExtrasFieldType(schema, indexableExtrasFieldName); + String unindexableExtrasFieldName = _transformerConfig.getUnindexableExtrasField(); + _unindexableExtrasFieldType = + null == unindexableExtrasFieldName ? null : getAndValidateExtrasFieldType(schema, unindexableExtrasFieldName); + + _schemaTree = validateSchemaAndCreateTree(schema); + } + + @Override + public boolean isNoOp() { + return null == _transformerConfig; + } + + @Nullable + @Override + public GenericRow transform(GenericRow record) { + GenericRow outputRecord = new GenericRow(); + + try { + ExtraFieldsContainer extraFieldsContainer = + new ExtraFieldsContainer(null != _transformerConfig.getUnindexableExtrasField()); + for (Map.Entry<String, Object> recordEntry : record.getFieldToValueMap().entrySet()) { + String recordKey = recordEntry.getKey(); + Object recordValue = recordEntry.getValue(); + processField(_schemaTree, recordKey, recordKey, recordValue, extraFieldsContainer, outputRecord); + } + putExtrasField(_transformerConfig.getIndexableExtrasField(), _indexableExtrasFieldType, + extraFieldsContainer.getIndexableExtras(), outputRecord); + putExtrasField(_transformerConfig.getUnindexableExtrasField(), _unindexableExtrasFieldType, + extraFieldsContainer.getUnindexableExtras(), outputRecord); + } catch (Exception e) { + if (!_continueOnError) { + throw e; + } + _logger.debug("Couldn't transform record: {}", record.toString(), e); + outputRecord.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true); + } + + return outputRecord; + } + + /** + * Processes a field from the record and either: + * <ul> + * <li>Drops it if it's in fieldPathsToDrop</li> + * <li>Adds it to the output record if it's special or exists in the schema</li> + * <li>Adds it to one of the extras fields</li> + * </ul> + * <p> + * This method works recursively to build the output record. It is similar to {@code addIndexableField} except it + * handles fields which exist in the schema. + * <p> + * One notable complication that this method (and {@code addIndexableField}) handles is adding nested fields (even + * ones more than two levels deep) to the "extras" fields. E.g., consider this record: + * <pre> + * { + * a: { + * b: { + * c: 0, + * d: 1 + * } + * } + * } + * </pre> + * Assume "a.b.c" exists in the schema but "a.b.d" doesn't. This class processes the record recursively from the root + * node to the children, so it would only know that "a.b.d" doesn't exist when it gets to "d". At this point we need + * to add "d" and all of its parents to the indexableExtrasField. To do so efficiently, the class builds this branch + * starting from the leaf and attaches it to parent nodes as we return from each recursive call. + * @param schemaNode The current node in the schema tree + * @param keyJsonPath The JSON path (without the "$." prefix) of the current field + * @param key + * @param value + * @param extraFieldsContainer A container for the "extras" fields corresponding to this node. + * @param outputRecord Returns the record after transformation + */ + private void processField(Map<String, Object> schemaNode, String keyJsonPath, String key, Object value, + ExtraFieldsContainer extraFieldsContainer, GenericRow outputRecord) { + + if (StreamDataDecoderImpl.isSpecialKeyType(key) || GenericRow.isSpecialKeyType(key)) { + outputRecord.putValue(key, value); + return; + } + + Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop(); + if (null != fieldPathsToDrop && fieldPathsToDrop.contains(keyJsonPath)) { + return; + } + + String unindexableFieldSuffix = _transformerConfig.getUnindexableFieldSuffix(); + if (null != unindexableFieldSuffix && key.endsWith(unindexableFieldSuffix)) { + extraFieldsContainer.addUnindexableEntry(key, value); + return; + } + + if (!schemaNode.containsKey(key)) { + addIndexableField(keyJsonPath, key, value, extraFieldsContainer); + return; + } + + Map<String, Object> childSchemaNode = (Map<String, Object>) schemaNode.get(key); + boolean storeUnindexableExtras = _transformerConfig.getUnindexableExtrasField() != null; + if (null == childSchemaNode) { + if (!(value instanceof Map) || null == unindexableFieldSuffix) { + outputRecord.putValue(keyJsonPath, value); + } else { + // The field's value is a map which could contain a no-index field, so we need to keep traversing the map + ExtraFieldsContainer container = new ExtraFieldsContainer(storeUnindexableExtras); + addIndexableField(keyJsonPath, key, value, container); + Map<String, Object> indexableFields = container.getIndexableExtras(); + outputRecord.putValue(keyJsonPath, indexableFields.get(key)); + Map<String, Object> unindexableFields = container.getUnindexableExtras(); + if (null != unindexableFields) { + extraFieldsContainer.addUnindexableEntry(key, unindexableFields.get(key)); + } + } + } else { + if (!(value instanceof Map)) { + _logger.debug("Record doesn't match schema: Schema node '{}' is a map but record value is a {}", keyJsonPath, + value.getClass().getName()); + extraFieldsContainer.addIndexableEntry(key, value); + } else { + ExtraFieldsContainer childExtraFieldsContainer = new ExtraFieldsContainer(storeUnindexableExtras); + Map<String, Object> valueAsMap = (Map<String, Object>) value; + for (Map.Entry<String, Object> entry : valueAsMap.entrySet()) { + String childKey = entry.getKey(); + processField(childSchemaNode, keyJsonPath + JsonUtils.KEY_SEPARATOR + childKey, childKey, entry.getValue(), + childExtraFieldsContainer, outputRecord); + } + extraFieldsContainer.addChild(key, childExtraFieldsContainer); + } + } + } + + /** + * Adds an indexable field to the given {@code ExtrasFieldsContainer}. + * <p> + * This method is similar to {@code processField} except it doesn't handle fields which exist in the schema. + */ + void addIndexableField(String recordJsonPath, String key, Object value, ExtraFieldsContainer extraFieldsContainer) { + Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop(); + if (null != fieldPathsToDrop && fieldPathsToDrop.contains(recordJsonPath)) { + return; + } + + String unindexableFieldSuffix = _transformerConfig.getUnindexableFieldSuffix(); + if (null != unindexableFieldSuffix && key.endsWith(unindexableFieldSuffix)) { + extraFieldsContainer.addUnindexableEntry(key, value); + return; + } + + boolean storeUnindexableExtras = _transformerConfig.getUnindexableExtrasField() != null; + if (!(value instanceof Map)) { + extraFieldsContainer.addIndexableEntry(key, value); + } else { + ExtraFieldsContainer childExtraFieldsContainer = new ExtraFieldsContainer(storeUnindexableExtras); + Map<String, Object> valueAsMap = (Map<String, Object>) value; + for (Map.Entry<String, Object> entry : valueAsMap.entrySet()) { + String childKey = entry.getKey(); + addIndexableField(recordJsonPath + JsonUtils.KEY_SEPARATOR + childKey, childKey, entry.getValue(), + childExtraFieldsContainer); + } + extraFieldsContainer.addChild(key, childExtraFieldsContainer); + } + } + + /** + * Converts (if necessary) and adds the given extras field to the output record + */ + private void putExtrasField(String fieldName, DataType fieldType, Map<String, Object> field, + GenericRow outputRecord) { + if (null == field) { + return; + } + + switch (fieldType) { + case JSON: + outputRecord.putValue(fieldName, field); + break; + case STRING: + try { + outputRecord.putValue(fieldName, JsonUtils.objectToString(field)); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to convert '" + fieldName + "' to string", e); + } + break; + default: + throw new UnsupportedOperationException("Cannot convert '" + fieldName + "' to " + fieldType.name()); + } + } +} + +/** + * A class to encapsulate the "extras" fields (indexableExtras and unindexableExtras) at a node in the record (when + * viewed as a tree). + */ +class ExtraFieldsContainer { + private Map<String, Object> _indexableExtras = null; + private Map<String, Object> _unindexableExtras = null; + private final boolean _storeUnindexableExtras; + + ExtraFieldsContainer(boolean storeUnindexableExtras) { + _storeUnindexableExtras = storeUnindexableExtras; + } + + public Map<String, Object> getIndexableExtras() { + return _indexableExtras; + } + + public Map<String, Object> getUnindexableExtras() { + return _unindexableExtras; + } + + /** + * Adds the given kv-pair to the indexable extras field + */ + public void addIndexableEntry(String key, Object value) { + if (null == _indexableExtras) { + _indexableExtras = new HashMap<>(); + } + _indexableExtras.put(key, value); + } + + /** + * Adds the given kv-pair to the unindexable extras field (if any) + */ + public void addUnindexableEntry(String key, Object value) { + if (!_storeUnindexableExtras) { + return; + } + if (null == _unindexableExtras) { + _unindexableExtras = new HashMap<>(); + } + _unindexableExtras.put(key, value); + } + + /** + * Given a container corresponding to a child node, attach the extras from the child node to the extras in this node + * at the given key. + */ + public void addChild(String key, ExtraFieldsContainer child) { + Map<String, Object> childIndexableFields = child.getIndexableExtras(); + if (null != childIndexableFields) { + addIndexableEntry(key, childIndexableFields); + } + + Map<String, Object> childUnindexableFields = child.getUnindexableExtras(); + if (null != childUnindexableFields) { + addUnindexableEntry(key, childUnindexableFields); + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index 1d73c4cfd5..f6943e5468 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -308,6 +308,13 @@ public final class IngestionUtils { */ public static Set<String> getFieldsForRecordExtractor(@Nullable IngestionConfig ingestionConfig, Schema schema) { Set<String> fieldsForRecordExtractor = new HashSet<>(); + + if (null != ingestionConfig && null != ingestionConfig.getSchemaConformingTransformerConfig()) { + // The SchemaConformingTransformer requires that all fields are extracted, indicated by returning an empty set + // here. Compared to extracting the fields specified below, extracting all fields should be a superset. + return fieldsForRecordExtractor; + } + extractFieldsFromIngestionConfig(ingestionConfig, fieldsForRecordExtractor); extractFieldsFromSchema(schema, fieldsForRecordExtractor); fieldsForRecordExtractor = getFieldsToReadWithComplexType(fieldsForRecordExtractor, ingestionConfig); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 6e82dea79e..a7d65d6343 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -44,6 +44,7 @@ import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; +import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer; import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.index.IndexService; @@ -68,6 +69,7 @@ import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; import org.apache.pinot.spi.data.FieldSpec; @@ -498,6 +500,12 @@ public final class TableConfigUtils { } } } + + SchemaConformingTransformerConfig schemaConformingTransformerConfig = + ingestionConfig.getSchemaConformingTransformerConfig(); + if (null != schemaConformingTransformerConfig && null != schema) { + SchemaConformingTransformer.validateSchema(schema, schemaConformingTransformerConfig); + } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java new file mode 100644 index 0000000000..e9b3ec3d6d --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java @@ -0,0 +1,731 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.recordtransformer; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; +import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.testng.AssertJUnit.fail; + + +public class SchemaConformingTransformerTest { + static final private String INDEXABLE_EXTRAS_FIELD_NAME = "indexableExtras"; + static final private String UNINDEXABLE_EXTRAS_FIELD_NAME = "unindexableExtras"; + static final private String UNINDEXABLE_FIELD_SUFFIX = "_noIndex"; + + static final private ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private TableConfig createDefaultTableConfig(String indexableExtrasField, String unindexableExtrasField, + String unindexableFieldSuffix, Set<String> fieldPathsToDrop) { + IngestionConfig ingestionConfig = new IngestionConfig(); + SchemaConformingTransformerConfig schemaConformingTransformerConfig = + new SchemaConformingTransformerConfig(indexableExtrasField, unindexableExtrasField, unindexableFieldSuffix, + fieldPathsToDrop); + ingestionConfig.setSchemaConformingTransformerConfig(schemaConformingTransformerConfig); + return new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig) + .build(); + } + + private Schema.SchemaBuilder createDefaultSchemaBuilder() { + return new Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME, DataType.JSON) + .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME, DataType.JSON); + } + + @Test + public void testWithNoUnindexableFields() { + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + } + */ + final String inputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3]," + + "\"nullField\":null,\"stringField\":\"a\"},\"nestedFields\":{\"arrayField\":[0,1,2,3]," + + "\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null," + + "\"stringField\":\"a\"}}}"; + String expectedOutputRecordJSONString; + Schema schema; + + schema = createDefaultSchemaBuilder().build(); + /* + { + "indexableExtras":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + } + } + */ + expectedOutputRecordJSONString = + "{\"indexableExtras\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"," + + "\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}," + + "\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"," + + "\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}}}}"; + testTransformWithNoUnindexableFields(schema, inputRecordJSONString, expectedOutputRecordJSONString); + + schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", DataType.INT) + .addSingleValueDimension("mapField", DataType.JSON) + .addSingleValueDimension("nestedFields.stringField", DataType.STRING).build(); + /* + { + "arrayField":[0, 1, 2, 3], + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields.stringField":"a", + "indexableExtras":{ + "nullField":null, + "stringField":"a", + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + } + } + */ + expectedOutputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}," + + "\"nestedFields.stringField\":\"a\",\"indexableExtras\":{\"nullField\":null,\"stringField\":\"a\"," + + "\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"mapField\":{\"arrayField\":[0,1,2,3]," + + "\"nullField\":null,\"stringField\":\"a\"}}}}"; + testTransformWithNoUnindexableFields(schema, inputRecordJSONString, expectedOutputRecordJSONString); + + schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", DataType.INT) + .addSingleValueDimension("nullField", DataType.STRING).addSingleValueDimension("stringField", DataType.STRING) + .addSingleValueDimension("mapField", DataType.JSON) + .addMultiValueDimension("nestedFields.arrayField", DataType.INT) + .addSingleValueDimension("nestedFields.nullField", DataType.STRING) + .addSingleValueDimension("nestedFields.stringField", DataType.STRING) + .addSingleValueDimension("nestedFields.mapField", DataType.JSON).build(); + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields.arrayField":[0, 1, 2, 3], + "nestedFields.nullField":null, + "nestedFields.stringField":"a", + "nestedFields.mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + */ + expectedOutputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3]," + + "\"nullField\":null,\"stringField\":\"a\"},\"nestedFields.arrayField\":[0,1,2,3],\"nestedFields" + + ".nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields.mapField\":{\"arrayField\":[0,1,2," + + "3],\"nullField\":null,\"stringField\":\"a\"}}"; + testTransformWithNoUnindexableFields(schema, inputRecordJSONString, expectedOutputRecordJSONString); + } + + private void testTransformWithNoUnindexableFields(Schema schema, String inputRecordJSONString, + String expectedOutputRecordJSONString) { + testTransform(null, null, schema, null, inputRecordJSONString, expectedOutputRecordJSONString); + testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null, inputRecordJSONString, expectedOutputRecordJSONString); + testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, schema, null, inputRecordJSONString, + expectedOutputRecordJSONString); + } + + @Test + public void testWithUnindexableFields() { + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z" + }, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z" + } + } + } + */ + final String inputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"intField_noIndex\":9," + + "\"string_noIndex\":\"z\",\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null," + + "\"stringField\":\"a\",\"intField_noIndex\":9,\"string_noIndex\":\"z\"}," + + "\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"," + + "\"intField_noIndex\":9,\"string_noIndex\":\"z\",\"mapField\":{\"arrayField\":[0,1,2,3]," + + "\"nullField\":null,\"stringField\":\"a\",\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}"; + String expectedOutputRecordJSONString; + Schema schema; + + schema = createDefaultSchemaBuilder().build(); + /* + { + "indexableExtras":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + } + } + */ + expectedOutputRecordJSONString = + "{\"indexableExtras\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"," + + "\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}," + + "\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"," + + "\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}}}}"; + testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null, inputRecordJSONString, expectedOutputRecordJSONString); + /* + { + "indexableExtras":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + }, + "unindexableExtras":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + }, + "nestedFields":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + } + } + } + } + */ + expectedOutputRecordJSONString = + "{\"indexableExtras\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"," + + "\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}," + + "\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"," + + "\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}}}," + + "\"unindexableExtras\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"," + + "\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}," + + "\"nestedFields\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"," + + "\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}}"; + testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, schema, null, inputRecordJSONString, + expectedOutputRecordJSONString); + + schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", DataType.INT) + .addSingleValueDimension("mapField", DataType.JSON) + .addSingleValueDimension("nestedFields.stringField", DataType.STRING).build(); + /* + { + "arrayField":[0, 1, 2, 3], + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields.stringField":"a", + "indexableExtras":{ + "nullField":null, + "stringField":"a", + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + } + } + */ + expectedOutputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}," + + "\"nestedFields.stringField\":\"a\",\"indexableExtras\":{\"nullField\":null,\"stringField\":\"a\"," + + "\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"mapField\":{\"arrayField\":[0,1,2,3]," + + "\"nullField\":null,\"stringField\":\"a\"}}}}"; + testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null, inputRecordJSONString, expectedOutputRecordJSONString); + /* + { + "arrayField":[0, 1, 2, 3], + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields.stringField":"a", + "indexableExtras":{ + "nullField":null, + "stringField":"a", + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + }, + "unindexableExtras":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + }, + "nestedFields":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + } + } + } + } + */ + expectedOutputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}," + + "\"nestedFields.stringField\":\"a\",\"indexableExtras\":{\"nullField\":null,\"stringField\":\"a\"," + + "\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"mapField\":{\"arrayField\":[0,1,2,3]," + + "\"nullField\":null,\"stringField\":\"a\"}}},\"unindexableExtras\":{\"intField_noIndex\":9," + + "\"string_noIndex\":\"z\",\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}," + + "\"nestedFields\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"," + + "\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}}"; + testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, schema, null, inputRecordJSONString, + expectedOutputRecordJSONString); + + schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", DataType.INT) + .addSingleValueDimension("nullField", DataType.STRING).addSingleValueDimension("stringField", DataType.STRING) + .addSingleValueDimension("mapField", DataType.JSON) + .addMultiValueDimension("nestedFields.arrayField", DataType.INT) + .addSingleValueDimension("nestedFields.nullField", DataType.STRING) + .addSingleValueDimension("nestedFields.stringField", DataType.STRING) + .addSingleValueDimension("nestedFields.mapField", DataType.JSON).build(); + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields.arrayField":[0, 1, 2, 3], + "nestedFields.nullField":null, + "nestedFields.stringField":"a", + "nestedFields.mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + */ + expectedOutputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3]," + + "\"nullField\":null,\"stringField\":\"a\"},\"nestedFields.arrayField\":[0,1,2,3],\"nestedFields" + + ".nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields.mapField\":{\"arrayField\":[0,1,2," + + "3],\"nullField\":null,\"stringField\":\"a\"} }"; + testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null, inputRecordJSONString, expectedOutputRecordJSONString); + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields.arrayField":[0, 1, 2, 3], + "nestedFields.nullField":null, + "nestedFields.stringField":"a", + "nestedFields.mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "unindexableExtras":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + }, + "nestedFields":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + } + } + } + } + */ + expectedOutputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3]," + + "\"nullField\":null,\"stringField\":\"a\"},\"nestedFields.arrayField\":[0,1,2,3],\"nestedFields" + + ".nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields.mapField\":{\"arrayField\":[0,1,2," + + "3],\"nullField\":null,\"stringField\":\"a\"},\"unindexableExtras\":{\"intField_noIndex\":9," + + "\"string_noIndex\":\"z\",\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}," + + "\"nestedFields\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"," + + "\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}}"; + testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, schema, null, inputRecordJSONString, + expectedOutputRecordJSONString); + } + + @Test + public void testFieldPathsToDrop() { + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "boolField":false, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "boolField":false + } + } + */ + final String inputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"boolField\":false," + + "\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"," + + "\"boolField\":false}}"; + String expectedOutputRecordJSONString; + Schema schema; + + schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", DataType.INT) + .addSingleValueDimension("nullField", DataType.STRING) + .addSingleValueDimension("nestedFields.stringField", DataType.STRING) + .addSingleValueDimension("nestedFields.boolField", DataType.BOOLEAN).build(); + Set<String> fieldPathsToDrop = new HashSet<>(Arrays.asList("stringField", "nestedFields.arrayField")); + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "indexableExtras": { + "boolField":false, + "nestedFields": { + nullField":null + } + }, + "nestedFields":{ + "stringField":"a", + "boolField":false + } + } + */ + expectedOutputRecordJSONString = + "{\"arrayField\":[0,1,2,3],\"nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields" + + ".boolField\":false,\"indexableExtras\":{\"boolField\":false,\"nestedFields\":{\"nullField\":null}}}"; + testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, schema, fieldPathsToDrop, + inputRecordJSONString, expectedOutputRecordJSONString); + } + + @Test + public void testIgnoringSpecialRowKeys() { + // Configure a FilterTransformer and a SchemaConformingTransformer such that the filter will introduce a special + // key $(SKIP_RECORD_KEY$) that the SchemaConformingTransformer should ignore + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setFilterConfig(new FilterConfig("intField = 1")); + SchemaConformingTransformerConfig schemaConformingTransformerConfig = + new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME, + UNINDEXABLE_FIELD_SUFFIX, null); + ingestionConfig.setSchemaConformingTransformerConfig(schemaConformingTransformerConfig); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build(); + + // Create a series of transformers: FilterTransformer -> SchemaConformingTransformer + List<RecordTransformer> transformers = new LinkedList<>(); + transformers.add(new FilterTransformer(tableConfig)); + Schema schema = createDefaultSchemaBuilder().addSingleValueDimension("intField", DataType.INT).build(); + transformers.add(new SchemaConformingTransformer(tableConfig, schema)); + CompositeTransformer compositeTransformer = new CompositeTransformer(transformers); + + Map<String, Object> inputRecordMap = jsonStringToMap("{\"intField\":1}"); + GenericRow inputRecord = createRowFromMap(inputRecordMap); + GenericRow outputRecord = compositeTransformer.transform(inputRecord); + Assert.assertNotNull(outputRecord); + // Check that the transformed record has $SKIP_RECORD_KEY$ + Assert.assertFalse(IngestionUtils.shouldIngestRow(outputRecord)); + } + + @Test + public void testOverlappingSchemaFields() { + Assert.assertThrows(IllegalArgumentException.class, () -> { + Schema schema = createDefaultSchemaBuilder().addSingleValueDimension("a.b", DataType.STRING) + .addSingleValueDimension("a.b.c", DataType.INT).build(); + SchemaConformingTransformer.validateSchema(schema, + new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, null, null, null)); + }); + + // This is a repeat of the previous test but with fields reversed just in case they are processed in order + Assert.assertThrows(IllegalArgumentException.class, () -> { + Schema schema = createDefaultSchemaBuilder().addSingleValueDimension("a.b.c", DataType.INT) + .addSingleValueDimension("a.b", DataType.STRING).build(); + SchemaConformingTransformer.validateSchema(schema, + new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, null, null, null)); + }); + } + + @Test + public void testInvalidFieldNamesInSchema() { + // Ensure schema fields which end with unindexableFieldSuffix are caught as invalid + Assert.assertThrows(() -> { + Schema schema = + createDefaultSchemaBuilder().addSingleValueDimension("a" + UNINDEXABLE_FIELD_SUFFIX, DataType.STRING) + .addSingleValueDimension("a.b" + UNINDEXABLE_FIELD_SUFFIX, DataType.INT).build(); + SchemaConformingTransformer.validateSchema(schema, + new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, null, UNINDEXABLE_FIELD_SUFFIX, null)); + }); + + // Ensure schema fields which are in fieldPathsToDrop are caught as invalid + Assert.assertThrows(() -> { + Schema schema = createDefaultSchemaBuilder().addSingleValueDimension("a", DataType.STRING) + .addSingleValueDimension("b.c", DataType.INT).build(); + Set<String> fieldPathsToDrop = new HashSet<>(Arrays.asList("a", "b.c")); + SchemaConformingTransformer.validateSchema(schema, + new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, null, null, fieldPathsToDrop)); + }); + } + + @Test + public void testSchemaRecordMismatch() { + Schema schema = + createDefaultSchemaBuilder().addSingleValueDimension("nestedFields.mapField", DataType.JSON).build(); + /* + { + "indexableExtras":{ + "nestedFields":0, + } + } + */ + // Schema field "nestedFields.map" is a Map but the record field is an int, so it should be stored in + // indexableExtras + testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, schema, null, "{\"nestedFields\":0}", + "{\"indexableExtras\":{\"nestedFields\":0}}"); + } + + @Test + public void testFieldTypesForExtras() { + final String inputRecordJSONString = "{\"arrayField\":[0,1,2,3]}"; + + TableConfig tableConfig = + createDefaultTableConfig(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, + null); + Schema validSchema = + new Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME, DataType.STRING).build(); + GenericRow outputRecord = transformRow(tableConfig, validSchema, inputRecordJSONString); + + Assert.assertNotNull(outputRecord); + // Validate that the indexable extras field contains the input record as a string + Assert.assertEquals(outputRecord.getValue(INDEXABLE_EXTRAS_FIELD_NAME), inputRecordJSONString); + + // Validate that invalid field types are caught + Schema invalidSchema = new Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME, DataType.INT) + .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME, DataType.BOOLEAN).build(); + Assert.assertThrows(() -> { + transformRow(tableConfig, invalidSchema, inputRecordJSONString); + }); + } + + @Test + public void testInvalidTransformerConfig() { + Assert.assertThrows(() -> { + createDefaultTableConfig(null, null, null, null); + }); + Assert.assertThrows(() -> { + createDefaultTableConfig(null, UNINDEXABLE_EXTRAS_FIELD_NAME, null, null); + }); + Assert.assertThrows(() -> { + createDefaultTableConfig(null, null, UNINDEXABLE_FIELD_SUFFIX, null); + }); + Assert.assertThrows(() -> { + createDefaultTableConfig(null, UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, null); + }); + Assert.assertThrows(() -> { + createDefaultTableConfig(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME, null, null); + }); + } + + /** + * Validates transforming the given row results in the expected row, where both rows are given as JSON strings + */ + private void testTransform(String unindexableExtrasField, String unindexableFieldSuffix, Schema schema, + Set<String> fieldPathsToDrop, String inputRecordJSONString, String expectedOutputRecordJSONString) { + TableConfig tableConfig = + createDefaultTableConfig(INDEXABLE_EXTRAS_FIELD_NAME, unindexableExtrasField, unindexableFieldSuffix, + fieldPathsToDrop); + GenericRow outputRecord = transformRow(tableConfig, schema, inputRecordJSONString); + + Assert.assertNotNull(outputRecord); + Map<String, Object> expectedOutputRecordMap = jsonStringToMap(expectedOutputRecordJSONString); + Assert.assertEquals(outputRecord.getFieldToValueMap(), expectedOutputRecordMap); + } + + /** + * Transforms the given row (given as a JSON string) using the transformer + * @return The transformed row + */ + private GenericRow transformRow(TableConfig tableConfig, Schema schema, String inputRecordJSONString) { + Map<String, Object> inputRecordMap = jsonStringToMap(inputRecordJSONString); + GenericRow inputRecord = createRowFromMap(inputRecordMap); + SchemaConformingTransformer schemaConformingTransformer = new SchemaConformingTransformer(tableConfig, schema); + return schemaConformingTransformer.transform(inputRecord); + } + + /** + * @return A map representing the given JSON string + */ + @Nonnull + private Map<String, Object> jsonStringToMap(String jsonString) { + try { + TypeReference<Map<String, Object>> typeRef = new TypeReference<>() { + }; + return OBJECT_MAPPER.readValue(jsonString, typeRef); + } catch (IOException e) { + fail(e.getMessage()); + } + // Should never reach here + return null; + } + + /** + * @return A new generic row with all the kv-pairs from the given map + */ + private GenericRow createRowFromMap(Map<String, Object> map) { + GenericRow record = new GenericRow(); + for (Map.Entry<String, Object> entry : map.entrySet()) { + record.putValue(entry.getKey(), entry.getValue()); + } + return record; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java index 4b26f33cd7..86ff9cec6f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java @@ -45,6 +45,9 @@ public class IngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Config related to handling complex type") private ComplexTypeConfig _complexTypeConfig; + @JsonPropertyDescription("Config related to the SchemaConformingTransformer") + private SchemaConformingTransformerConfig _schemaConformingTransformerConfig; + @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion") private List<AggregationConfig> _aggregationConfigs; @@ -61,12 +64,14 @@ public class IngestionConfig extends BaseJsonConfig { public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig, @Nullable StreamIngestionConfig streamIngestionConfig, @Nullable FilterConfig filterConfig, @Nullable List<TransformConfig> transformConfigs, @Nullable ComplexTypeConfig complexTypeConfig, + @Nullable SchemaConformingTransformerConfig schemaConformingTransformerConfig, @Nullable List<AggregationConfig> aggregationConfigs) { _batchIngestionConfig = batchIngestionConfig; _streamIngestionConfig = streamIngestionConfig; _filterConfig = filterConfig; _transformConfigs = transformConfigs; _complexTypeConfig = complexTypeConfig; + _schemaConformingTransformerConfig = schemaConformingTransformerConfig; _aggregationConfigs = aggregationConfigs; } @@ -98,6 +103,11 @@ public class IngestionConfig extends BaseJsonConfig { return _complexTypeConfig; } + @Nullable + public SchemaConformingTransformerConfig getSchemaConformingTransformerConfig() { + return _schemaConformingTransformerConfig; + } + @Nullable public List<AggregationConfig> getAggregationConfigs() { return _aggregationConfigs; @@ -135,6 +145,11 @@ public class IngestionConfig extends BaseJsonConfig { _complexTypeConfig = complexTypeConfig; } + public void setSchemaConformingTransformerConfig( + SchemaConformingTransformerConfig schemaConformingTransformerConfig) { + _schemaConformingTransformerConfig = schemaConformingTransformerConfig; + } + public void setAggregationConfigs(List<AggregationConfig> aggregationConfigs) { _aggregationConfigs = aggregationConfigs; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java new file mode 100644 index 0000000000..96231f39d9 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table.ingestion; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.google.common.base.Preconditions; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +public class SchemaConformingTransformerConfig extends BaseJsonConfig { + @JsonPropertyDescription("Name of the field that should contain extra fields that are not part of the schema.") + private final String _indexableExtrasField; + + @JsonPropertyDescription( + "Like indexableExtrasField except it only contains fields with the suffix in unindexableFieldSuffix.") + private final String _unindexableExtrasField; + + @JsonPropertyDescription("The suffix of fields that must be stored in unindexableExtrasField") + private final String _unindexableFieldSuffix; + + @JsonPropertyDescription("Array of field paths to drop") + private final Set<String> _fieldPathsToDrop; + + @JsonCreator + public SchemaConformingTransformerConfig(@JsonProperty("indexableExtrasField") String indexableExtrasField, + @JsonProperty("unindexableExtrasField") @Nullable String unindexableExtrasField, + @JsonProperty("unindexableFieldSuffix") @Nullable String unindexableFieldSuffix, + @JsonProperty("fieldPathsToDrop") @Nullable Set<String> fieldPathsToDrop) { + Preconditions.checkArgument(indexableExtrasField != null, "indexableExtrasField must be set"); + if (null != unindexableExtrasField) { + Preconditions.checkArgument(null != unindexableFieldSuffix, + "unindexableExtrasSuffix must be set if unindexableExtrasField is set"); + } + _indexableExtrasField = indexableExtrasField; + _unindexableExtrasField = unindexableExtrasField; + _unindexableFieldSuffix = unindexableFieldSuffix; + _fieldPathsToDrop = fieldPathsToDrop; + } + + public String getIndexableExtrasField() { + return _indexableExtrasField; + } + + @Nullable + public String getUnindexableExtrasField() { + return _unindexableExtrasField; + } + + @Nullable + public String getUnindexableFieldSuffix() { + return _unindexableFieldSuffix; + } + + @Nullable + public Set<String> getFieldPathsToDrop() { + return _fieldPathsToDrop; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java index 8ef3838189..2bc15a0800 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java @@ -78,6 +78,13 @@ public class GenericRow implements Serializable { private final Map<String, Object> _fieldToValueMap = new HashMap<>(); private final Set<String> _nullValueFields = new HashSet<>(); + /** + * @return Whether the given key is one of the special types of keys ($SKIP_RECORD_KEY$, etc.) + */ + public static boolean isSpecialKeyType(String key) { + return key.equals(SKIP_RECORD_KEY) || key.equals(INCOMPLETE_RECORD_KEY) || key.equals(MULTIPLE_RECORDS_KEY); + } + /** * Initializes the generic row from the given generic row (shallow copy). The row should be new created or cleared * before calling this method. diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java index 5093c2c8b9..89c70acaf8 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java @@ -34,6 +34,13 @@ public class StreamDataDecoderImpl implements StreamDataDecoder { private final StreamMessageDecoder _valueDecoder; private final GenericRow _reuse = new GenericRow(); + /** + * @return Whether the given key is one of the special types of keys (__key, __header$, etc.) + */ + public static boolean isSpecialKeyType(String key) { + return key.equals(KEY) || key.startsWith(HEADER_KEY_PREFIX) || key.startsWith(METADATA_KEY_PREFIX); + } + public StreamDataDecoderImpl(StreamMessageDecoder valueDecoder) { _valueDecoder = valueDecoder; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org