This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d91e5db76c Add SchemaConformingTransformer to transform records with 
varying keys to fit a table's schema without dropping fields. (#11210)
d91e5db76c is described below

commit d91e5db76c53af9774db1c57c71520f4c196c059
Author: kirkrodrigues <2454684+kirkrodrig...@users.noreply.github.com>
AuthorDate: Thu Aug 24 17:08:55 2023 -0400

    Add SchemaConformingTransformer to transform records with varying keys to 
fit a table's schema without dropping fields. (#11210)
    
    * Add JsonLogTransformer to transform semi-structured log events to fit a 
table's schema without dropping fields.
    
    * Minor fix
    
    * Add unit tests for JsonLogTransformer.
    
    * Minor fixes.
    
    * Refactor docstrings.
    
    * Rename JsonLogTransformer to SchemaConformingTransformer
    
    * JsonLogTransformer: Pass through GenericRow's special keys
---
 .../apache/pinot/queries/TransformQueriesTest.java |   2 +-
 .../recordtransformer/CompositeTransformer.java    |  13 +-
 .../SchemaConformingTransformer.java               | 545 +++++++++++++++
 .../pinot/segment/local/utils/IngestionUtils.java  |   7 +
 .../segment/local/utils/TableConfigUtils.java      |   8 +
 .../SchemaConformingTransformerTest.java           | 731 +++++++++++++++++++++
 .../config/table/ingestion/IngestionConfig.java    |  15 +
 .../SchemaConformingTransformerConfig.java         |  78 +++
 .../apache/pinot/spi/data/readers/GenericRow.java  |   7 +
 .../pinot/spi/stream/StreamDataDecoderImpl.java    |   7 +
 10 files changed, 1408 insertions(+), 5 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index f3abbbc534..572080e44e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -135,7 +135,7 @@ public class TransformQueriesTest extends BaseQueriesTest {
         .setIngestionConfig(new IngestionConfig(null, null, null,
             Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3  == 
null || "
                 + "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 
}, INT_COL1, INT_COL1_V3)")),
-            null, null))
+            null, null, null))
         .build();
     Schema schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index e584944bda..dbc6cdbe3c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -48,7 +48,12 @@ public class CompositeTransformer implements 
RecordTransformer {
    *     destination columns
    *   </li>
    *   <li>
-   *     {@link DataTypeTransformer} after {@link FilterTransformer} to 
convert values to comply with the schema
+   *     Optional {@link SchemaConformingTransformer} after {@link 
FilterTransformer}, so that we can transform input
+   *     records that have varying fields to a fixed schema without dropping 
any fields
+   *   </li>
+   *   <li>
+   *     {@link DataTypeTransformer} after {@link SchemaConformingTransformer} 
to convert values to comply with the
+   *     schema
    *   </li>
    *   <li>
    *     Optional {@link TimeValidationTransformer} after {@link 
DataTypeTransformer} so that time value is converted to
@@ -67,9 +72,9 @@ public class CompositeTransformer implements 
RecordTransformer {
   public static CompositeTransformer getDefaultTransformer(TableConfig 
tableConfig, Schema schema) {
     return new CompositeTransformer(
         Stream.of(new ExpressionTransformer(tableConfig, schema), new 
FilterTransformer(tableConfig),
-                new DataTypeTransformer(tableConfig, schema), new 
TimeValidationTransformer(tableConfig, schema),
-                new NullValueTransformer(tableConfig, schema), new 
SanitizationTransformer(schema))
-            .filter(t -> !t.isNoOp()).collect(Collectors.toList()));
+                new SchemaConformingTransformer(tableConfig, schema), new 
DataTypeTransformer(tableConfig, schema),
+                new TimeValidationTransformer(tableConfig, schema), new 
NullValueTransformer(tableConfig, schema),
+                new SanitizationTransformer(schema)).filter(t -> 
!t.isNoOp()).collect(Collectors.toList()));
   }
 
   /**
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
new file mode 100644
index 0000000000..d8c7da6297
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
@@ -0,0 +1,545 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.TableConfig;
+import 
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This transformer transforms records with varying keys such that they can be 
stored in a table with a fixed schema.
+ * Since these records have varying keys, it is impractical to store each 
field in its own table column. At the same
+ * time, most (if not all) fields may be important to the user, so we should 
not drop any field unnecessarily. So this
+ * transformer primarily takes record-fields that don't exist in the schema 
and stores them in a type of catchall field.
+ * <p>
+ * For example, consider this record:
+ * <pre>
+ * {
+ *   "timestamp": 1687786535928,
+ *   "hostname": "host1",
+ *   "HOSTNAME": "host1",
+ *   "level": "INFO",
+ *   "message": "Started processing job1",
+ *   "tags": {
+ *     "platform": "data",
+ *     "service": "serializer",
+ *     "params": {
+ *       "queueLength": 5,
+ *       "timeout": 299,
+ *       "userData_noIndex": {
+ *         "nth": 99
+ *       }
+ *     }
+ *   }
+ * }
+ * </pre>
+ * And let's say the table's schema contains these fields:
+ * <ul>
+ *   <li>timestamp</li>
+ *   <li>hostname</li>
+ *   <li>level</li>
+ *   <li>message</li>
+ *   <li>tags.platform</li>
+ *   <li>tags.service</li>
+ *   <li>indexableExtras</li>
+ *   <li>unindexableExtras</li>
+ * </ul>
+ * <p>
+ * Without this transformer, the entire "tags" field would be dropped when 
storing the record in the table. However,
+ * with this transformer, the record would be transformed into the following:
+ * <pre>
+ * {
+ *   "timestamp": 1687786535928,
+ *   "hostname": "host1",
+ *   "level": "INFO",
+ *   "message": "Started processing job1",
+ *   "tags.platform": "data",
+ *   "tags.service": "serializer",
+ *   "indexableExtras": {
+ *     "tags": {
+ *       "params": {
+ *         "queueLength": 5,
+ *         "timeout": 299
+ *       }
+ *     }
+ *   },
+ *   "unindexableExtras": {
+ *     "tags": {
+ *       "userData_noIndex": {
+ *         "nth": 99
+ *       }
+ *     }
+ *   }
+ * }
+ * </pre>
+ * Notice that the transformer:
+ * <ul>
+ *   <li>Flattens nested fields which exist in the schema, like 
"tags.platform"</li>
+ *   <li>Drops some fields like "HOSTNAME", where "HOSTNAME" must be listed as 
a field in the config option
+ *   "fieldPathsToDrop".</li>
+ *   <li>Moves fields which don't exist in the schema and have the suffix 
"_noIndex" into the "unindexableExtras" field
+ *   (the field name is configurable)</li>
+ *   <li>Moves any remaining fields which don't exist in the schema into the 
"indexableExtras" field (the field name is
+ *   configurable)</li>
+ * </ul>
+ * <p>
+ * The "unindexableExtras" field allows the transformer to separate fields 
which don't need indexing (because they are
+ * only retrieved, not searched) from those that do. The transformer also has 
other configuration options specified in
+ * {@link SchemaConformingTransformerConfig}.
+ */
+public class SchemaConformingTransformer implements RecordTransformer {
+  private static final Logger _logger = 
LoggerFactory.getLogger(SchemaConformingTransformer.class);
+
+  private final boolean _continueOnError;
+  private final SchemaConformingTransformerConfig _transformerConfig;
+  private final DataType _indexableExtrasFieldType;
+  private final DataType _unindexableExtrasFieldType;
+
+  private Map<String, Object> _schemaTree;
+
+  /**
+   * Validates the schema against the given transformer's configuration.
+   */
+  public static void validateSchema(@Nonnull Schema schema,
+      @Nonnull SchemaConformingTransformerConfig transformerConfig) {
+    validateSchemaFieldNames(schema.getPhysicalColumnNames(), 
transformerConfig);
+
+    String indexableExtrasFieldName = 
transformerConfig.getIndexableExtrasField();
+    getAndValidateExtrasFieldType(schema, indexableExtrasFieldName);
+    String unindexableExtrasFieldName = 
transformerConfig.getUnindexableExtrasField();
+    if (null != unindexableExtrasFieldName) {
+      getAndValidateExtrasFieldType(schema, indexableExtrasFieldName);
+    }
+
+    validateSchemaAndCreateTree(schema);
+  }
+
+  /**
+   * Validates that none of the schema fields have names that conflict with 
the transformer's configuration.
+   */
+  private static void validateSchemaFieldNames(Set<String> schemaFields,
+      SchemaConformingTransformerConfig transformerConfig) {
+    // Validate that none of the columns in the schema end with 
unindexableFieldSuffix
+    String unindexableFieldSuffix = 
transformerConfig.getUnindexableFieldSuffix();
+    if (null != unindexableFieldSuffix) {
+      for (String field : schemaFields) {
+        Preconditions.checkState(!field.endsWith(unindexableFieldSuffix), 
"Field '%s' has no-index suffix '%s'", field,
+            unindexableFieldSuffix);
+      }
+    }
+
+    // Validate that none of the columns in the schema end overlap with the 
fields in fieldPathsToDrop
+    Set<String> fieldPathsToDrop = transformerConfig.getFieldPathsToDrop();
+    if (null != fieldPathsToDrop) {
+      Set<String> fieldIntersection = new HashSet<>(schemaFields);
+      fieldIntersection.retainAll(fieldPathsToDrop);
+      Preconditions.checkState(fieldIntersection.isEmpty(), "Fields in schema 
overlap with fieldPathsToDrop");
+    }
+  }
+
+  /**
+   * @return The field type for the given extras field
+   */
+  private static DataType getAndValidateExtrasFieldType(Schema schema, 
@Nonnull String extrasFieldName) {
+    FieldSpec fieldSpec = schema.getFieldSpecFor(extrasFieldName);
+    Preconditions.checkState(null != fieldSpec, "Field '%s' doesn't exist in 
schema", extrasFieldName);
+    DataType fieldDataType = fieldSpec.getDataType();
+    Preconditions.checkState(DataType.JSON == fieldDataType || DataType.STRING 
== fieldDataType,
+        "Field '%s' has unsupported type %s", fieldDataType.toString());
+    return fieldDataType;
+  }
+
+  /**
+   * Validates the schema with a SchemaConformingTransformerConfig instance 
and creates a tree representing the fields
+   * in the schema to be used when transforming input records. For instance, 
the field "a.b" in the schema would be
+   * un-flattened into "{a: b: null}" in the tree, allowing us to more easily 
process records containing the latter.
+   * @throws IllegalArgumentException if schema validation fails in one of two 
ways:
+   * <ul>
+   *   <li>One of the fields in the schema has a name which when interpreted 
as a JSON path, corresponds to an object
+   *   with an empty sub-key. E.g., the field name "a..b" corresponds to the 
JSON {"a": {"": {"b": ...}}}</li>
+   *   <li>Two fields in the schema have names which correspond to JSON paths 
where one is a child of the other. E.g.,
+   *   the field names "a.b" and "a.b.c" are considered invalid since "a.b.c" 
is a child of "a.b".</li>
+   * </ul>
+   */
+  private static Map<String, Object> validateSchemaAndCreateTree(@Nonnull 
Schema schema)
+      throws IllegalArgumentException {
+    Set<String> schemaFields = schema.getPhysicalColumnNames();
+
+    Map<String, Object> schemaTree = new HashMap<>();
+    List<String> subKeys = new ArrayList<>();
+    for (String field : schemaFields) {
+      int keySeparatorIdx = field.indexOf(JsonUtils.KEY_SEPARATOR);
+      if (-1 == keySeparatorIdx) {
+        // Not a flattened key
+        schemaTree.put(field, null);
+        continue;
+      }
+
+      subKeys.clear();
+      getAndValidateSubKeys(field, keySeparatorIdx, subKeys);
+
+      // Add all sub-keys except the leaf to the tree
+      Map<String, Object> currentNode = schemaTree;
+      for (int i = 0; i < subKeys.size() - 1; i++) {
+        String subKey = subKeys.get(i);
+
+        Map<String, Object> childNode;
+        if (currentNode.containsKey(subKey)) {
+          childNode = (Map<String, Object>) currentNode.get(subKey);
+          if (null == childNode) {
+            throw new IllegalArgumentException(
+                "Cannot handle field '" + String.join(JsonUtils.KEY_SEPARATOR, 
subKeys.subList(0, i + 1))
+                    + "' which overlaps with another field in the schema.");
+          }
+        } else {
+          childNode = new HashMap<>();
+          currentNode.put(subKey, childNode);
+        }
+        currentNode = childNode;
+      }
+      // Add the leaf pointing at null
+      String subKey = subKeys.get(subKeys.size() - 1);
+      if (currentNode.containsKey(subKey)) {
+        throw new IllegalArgumentException(
+            "Cannot handle field '" + field + "' which overlaps with another 
field in the schema.");
+      }
+      currentNode.put(subKey, null);
+    }
+
+    return schemaTree;
+  }
+
+  /**
+   * Given a JSON path (e.g. "k1.k2.k3"), returns all the sub-keys (e.g. 
["k1", "k2", "k3"])
+   * @param key The complete key
+   * @param firstKeySeparatorIdx The index of the first key separator in 
{@code key}
+   * @param subKeys Returns the sub-keys
+   * @throws IllegalArgumentException if any sub-key is empty
+   */
+  private static void getAndValidateSubKeys(String key, int 
firstKeySeparatorIdx, List<String> subKeys)
+      throws IllegalArgumentException {
+    int subKeyBeginIdx = 0;
+    int subKeyEndIdx = firstKeySeparatorIdx;
+    int keyLength = key.length();
+    while (true) {
+      // Validate and add the sub-key
+      String subKey = key.substring(subKeyBeginIdx, subKeyEndIdx);
+      if (subKey.isEmpty()) {
+        throw new IllegalArgumentException("Unsupported empty sub-key in '" + 
key + "'.");
+      }
+      subKeys.add(subKey);
+
+      // Advance to the beginning of the next sub-key
+      subKeyBeginIdx = subKeyEndIdx + 1;
+      if (subKeyBeginIdx >= keyLength) {
+        break;
+      }
+
+      // Find the end of the next sub-key
+      int keySeparatorIdx = key.indexOf(JsonUtils.KEY_SEPARATOR, 
subKeyBeginIdx);
+      if (-1 != keySeparatorIdx) {
+        subKeyEndIdx = keySeparatorIdx;
+      } else {
+        subKeyEndIdx = key.length();
+      }
+    }
+  }
+
+  public SchemaConformingTransformer(TableConfig tableConfig, Schema schema) {
+    if (null == tableConfig.getIngestionConfig() || null == 
tableConfig.getIngestionConfig()
+        .getSchemaConformingTransformerConfig()) {
+      _continueOnError = false;
+      _transformerConfig = null;
+      _indexableExtrasFieldType = null;
+      _unindexableExtrasFieldType = null;
+      return;
+    }
+
+    _continueOnError = tableConfig.getIngestionConfig().isContinueOnError();
+    _transformerConfig = 
tableConfig.getIngestionConfig().getSchemaConformingTransformerConfig();
+    String indexableExtrasFieldName = 
_transformerConfig.getIndexableExtrasField();
+    _indexableExtrasFieldType = getAndValidateExtrasFieldType(schema, 
indexableExtrasFieldName);
+    String unindexableExtrasFieldName = 
_transformerConfig.getUnindexableExtrasField();
+    _unindexableExtrasFieldType =
+        null == unindexableExtrasFieldName ? null : 
getAndValidateExtrasFieldType(schema, unindexableExtrasFieldName);
+
+    _schemaTree = validateSchemaAndCreateTree(schema);
+  }
+
+  @Override
+  public boolean isNoOp() {
+    return null == _transformerConfig;
+  }
+
+  @Nullable
+  @Override
+  public GenericRow transform(GenericRow record) {
+    GenericRow outputRecord = new GenericRow();
+
+    try {
+      ExtraFieldsContainer extraFieldsContainer =
+          new ExtraFieldsContainer(null != 
_transformerConfig.getUnindexableExtrasField());
+      for (Map.Entry<String, Object> recordEntry : 
record.getFieldToValueMap().entrySet()) {
+        String recordKey = recordEntry.getKey();
+        Object recordValue = recordEntry.getValue();
+        processField(_schemaTree, recordKey, recordKey, recordValue, 
extraFieldsContainer, outputRecord);
+      }
+      putExtrasField(_transformerConfig.getIndexableExtrasField(), 
_indexableExtrasFieldType,
+          extraFieldsContainer.getIndexableExtras(), outputRecord);
+      putExtrasField(_transformerConfig.getUnindexableExtrasField(), 
_unindexableExtrasFieldType,
+          extraFieldsContainer.getUnindexableExtras(), outputRecord);
+    } catch (Exception e) {
+      if (!_continueOnError) {
+        throw e;
+      }
+      _logger.debug("Couldn't transform record: {}", record.toString(), e);
+      outputRecord.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
+    }
+
+    return outputRecord;
+  }
+
+  /**
+   * Processes a field from the record and either:
+   * <ul>
+   *   <li>Drops it if it's in fieldPathsToDrop</li>
+   *   <li>Adds it to the output record if it's special or exists in the 
schema</li>
+   *   <li>Adds it to one of the extras fields</li>
+   * </ul>
+   * <p>
+   * This method works recursively to build the output record. It is similar 
to {@code addIndexableField} except it
+   * handles fields which exist in the schema.
+   * <p>
+   * One notable complication that this method (and {@code addIndexableField}) 
handles is adding nested fields (even
+   * ones more than two levels deep) to the "extras" fields. E.g., consider 
this record:
+   * <pre>
+   * {
+   *   a: {
+   *     b: {
+   *       c: 0,
+   *       d: 1
+   *     }
+   *   }
+   * }
+   * </pre>
+   * Assume "a.b.c" exists in the schema but "a.b.d" doesn't. This class 
processes the record recursively from the root
+   * node to the children, so it would only know that "a.b.d" doesn't exist 
when it gets to "d". At this point we need
+   * to add "d" and all of its parents to the indexableExtrasField. To do so 
efficiently, the class builds this branch
+   * starting from the leaf and attaches it to parent nodes as we return from 
each recursive call.
+   * @param schemaNode The current node in the schema tree
+   * @param keyJsonPath The JSON path (without the "$." prefix) of the current 
field
+   * @param key
+   * @param value
+   * @param extraFieldsContainer A container for the "extras" fields 
corresponding to this node.
+   * @param outputRecord Returns the record after transformation
+   */
+  private void processField(Map<String, Object> schemaNode, String 
keyJsonPath, String key, Object value,
+      ExtraFieldsContainer extraFieldsContainer, GenericRow outputRecord) {
+
+    if (StreamDataDecoderImpl.isSpecialKeyType(key) || 
GenericRow.isSpecialKeyType(key)) {
+      outputRecord.putValue(key, value);
+      return;
+    }
+
+    Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop();
+    if (null != fieldPathsToDrop && fieldPathsToDrop.contains(keyJsonPath)) {
+      return;
+    }
+
+    String unindexableFieldSuffix = 
_transformerConfig.getUnindexableFieldSuffix();
+    if (null != unindexableFieldSuffix && 
key.endsWith(unindexableFieldSuffix)) {
+      extraFieldsContainer.addUnindexableEntry(key, value);
+      return;
+    }
+
+    if (!schemaNode.containsKey(key)) {
+      addIndexableField(keyJsonPath, key, value, extraFieldsContainer);
+      return;
+    }
+
+    Map<String, Object> childSchemaNode = (Map<String, Object>) 
schemaNode.get(key);
+    boolean storeUnindexableExtras = 
_transformerConfig.getUnindexableExtrasField() != null;
+    if (null == childSchemaNode) {
+      if (!(value instanceof Map) || null == unindexableFieldSuffix) {
+        outputRecord.putValue(keyJsonPath, value);
+      } else {
+        // The field's value is a map which could contain a no-index field, so 
we need to keep traversing the map
+        ExtraFieldsContainer container = new 
ExtraFieldsContainer(storeUnindexableExtras);
+        addIndexableField(keyJsonPath, key, value, container);
+        Map<String, Object> indexableFields = container.getIndexableExtras();
+        outputRecord.putValue(keyJsonPath, indexableFields.get(key));
+        Map<String, Object> unindexableFields = 
container.getUnindexableExtras();
+        if (null != unindexableFields) {
+          extraFieldsContainer.addUnindexableEntry(key, 
unindexableFields.get(key));
+        }
+      }
+    } else {
+      if (!(value instanceof Map)) {
+        _logger.debug("Record doesn't match schema: Schema node '{}' is a map 
but record value is a {}", keyJsonPath,
+            value.getClass().getName());
+        extraFieldsContainer.addIndexableEntry(key, value);
+      } else {
+        ExtraFieldsContainer childExtraFieldsContainer = new 
ExtraFieldsContainer(storeUnindexableExtras);
+        Map<String, Object> valueAsMap = (Map<String, Object>) value;
+        for (Map.Entry<String, Object> entry : valueAsMap.entrySet()) {
+          String childKey = entry.getKey();
+          processField(childSchemaNode, keyJsonPath + JsonUtils.KEY_SEPARATOR 
+ childKey, childKey, entry.getValue(),
+              childExtraFieldsContainer, outputRecord);
+        }
+        extraFieldsContainer.addChild(key, childExtraFieldsContainer);
+      }
+    }
+  }
+
+  /**
+   * Adds an indexable field to the given {@code ExtrasFieldsContainer}.
+   * <p>
+   * This method is similar to {@code processField} except it doesn't handle 
fields which exist in the schema.
+   */
+  void addIndexableField(String recordJsonPath, String key, Object value, 
ExtraFieldsContainer extraFieldsContainer) {
+    Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop();
+    if (null != fieldPathsToDrop && fieldPathsToDrop.contains(recordJsonPath)) 
{
+      return;
+    }
+
+    String unindexableFieldSuffix = 
_transformerConfig.getUnindexableFieldSuffix();
+    if (null != unindexableFieldSuffix && 
key.endsWith(unindexableFieldSuffix)) {
+      extraFieldsContainer.addUnindexableEntry(key, value);
+      return;
+    }
+
+    boolean storeUnindexableExtras = 
_transformerConfig.getUnindexableExtrasField() != null;
+    if (!(value instanceof Map)) {
+      extraFieldsContainer.addIndexableEntry(key, value);
+    } else {
+      ExtraFieldsContainer childExtraFieldsContainer = new 
ExtraFieldsContainer(storeUnindexableExtras);
+      Map<String, Object> valueAsMap = (Map<String, Object>) value;
+      for (Map.Entry<String, Object> entry : valueAsMap.entrySet()) {
+        String childKey = entry.getKey();
+        addIndexableField(recordJsonPath + JsonUtils.KEY_SEPARATOR + childKey, 
childKey, entry.getValue(),
+            childExtraFieldsContainer);
+      }
+      extraFieldsContainer.addChild(key, childExtraFieldsContainer);
+    }
+  }
+
+  /**
+   * Converts (if necessary) and adds the given extras field to the output 
record
+   */
+  private void putExtrasField(String fieldName, DataType fieldType, 
Map<String, Object> field,
+      GenericRow outputRecord) {
+    if (null == field) {
+      return;
+    }
+
+    switch (fieldType) {
+      case JSON:
+        outputRecord.putValue(fieldName, field);
+        break;
+      case STRING:
+        try {
+          outputRecord.putValue(fieldName, JsonUtils.objectToString(field));
+        } catch (JsonProcessingException e) {
+          throw new RuntimeException("Failed to convert '" + fieldName + "' to 
string", e);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Cannot convert '" + fieldName 
+ "' to " + fieldType.name());
+    }
+  }
+}
+
+/**
+ * A class to encapsulate the "extras" fields (indexableExtras and 
unindexableExtras) at a node in the record (when
+ * viewed as a tree).
+ */
+class ExtraFieldsContainer {
+  private Map<String, Object> _indexableExtras = null;
+  private Map<String, Object> _unindexableExtras = null;
+  private final boolean _storeUnindexableExtras;
+
+  ExtraFieldsContainer(boolean storeUnindexableExtras) {
+    _storeUnindexableExtras = storeUnindexableExtras;
+  }
+
+  public Map<String, Object> getIndexableExtras() {
+    return _indexableExtras;
+  }
+
+  public Map<String, Object> getUnindexableExtras() {
+    return _unindexableExtras;
+  }
+
+  /**
+   * Adds the given kv-pair to the indexable extras field
+   */
+  public void addIndexableEntry(String key, Object value) {
+    if (null == _indexableExtras) {
+      _indexableExtras = new HashMap<>();
+    }
+    _indexableExtras.put(key, value);
+  }
+
+  /**
+   * Adds the given kv-pair to the unindexable extras field (if any)
+   */
+  public void addUnindexableEntry(String key, Object value) {
+    if (!_storeUnindexableExtras) {
+      return;
+    }
+    if (null == _unindexableExtras) {
+      _unindexableExtras = new HashMap<>();
+    }
+    _unindexableExtras.put(key, value);
+  }
+
+  /**
+   * Given a container corresponding to a child node, attach the extras from 
the child node to the extras in this node
+   * at the given key.
+   */
+  public void addChild(String key, ExtraFieldsContainer child) {
+    Map<String, Object> childIndexableFields = child.getIndexableExtras();
+    if (null != childIndexableFields) {
+      addIndexableEntry(key, childIndexableFields);
+    }
+
+    Map<String, Object> childUnindexableFields = child.getUnindexableExtras();
+    if (null != childUnindexableFields) {
+      addUnindexableEntry(key, childUnindexableFields);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index 1d73c4cfd5..f6943e5468 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -308,6 +308,13 @@ public final class IngestionUtils {
    */
   public static Set<String> getFieldsForRecordExtractor(@Nullable 
IngestionConfig ingestionConfig, Schema schema) {
     Set<String> fieldsForRecordExtractor = new HashSet<>();
+
+    if (null != ingestionConfig && null != 
ingestionConfig.getSchemaConformingTransformerConfig()) {
+      // The SchemaConformingTransformer requires that all fields are 
extracted, indicated by returning an empty set
+      // here. Compared to extracting the fields specified below, extracting 
all fields should be a superset.
+      return fieldsForRecordExtractor;
+    }
+
     extractFieldsFromIngestionConfig(ingestionConfig, 
fieldsForRecordExtractor);
     extractFieldsFromSchema(schema, fieldsForRecordExtractor);
     fieldsForRecordExtractor = 
getFieldsToReadWithComplexType(fieldsForRecordExtractor, ingestionConfig);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 6e82dea79e..a7d65d6343 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -44,6 +44,7 @@ import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import 
org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.IndexService;
@@ -68,6 +69,7 @@ import 
org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import 
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -498,6 +500,12 @@ public final class TableConfigUtils {
           }
         }
       }
+
+      SchemaConformingTransformerConfig schemaConformingTransformerConfig =
+          ingestionConfig.getSchemaConformingTransformerConfig();
+      if (null != schemaConformingTransformerConfig && null != schema) {
+        SchemaConformingTransformer.validateSchema(schema, 
schemaConformingTransformerConfig);
+      }
     }
   }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java
new file mode 100644
index 0000000000..e9b3ec3d6d
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java
@@ -0,0 +1,731 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import 
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.fail;
+
+
+public class SchemaConformingTransformerTest {
+  static final private String INDEXABLE_EXTRAS_FIELD_NAME = "indexableExtras";
+  static final private String UNINDEXABLE_EXTRAS_FIELD_NAME = 
"unindexableExtras";
+  static final private String UNINDEXABLE_FIELD_SUFFIX = "_noIndex";
+
+  static final private ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private TableConfig createDefaultTableConfig(String indexableExtrasField, 
String unindexableExtrasField,
+      String unindexableFieldSuffix, Set<String> fieldPathsToDrop) {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    SchemaConformingTransformerConfig schemaConformingTransformerConfig =
+        new SchemaConformingTransformerConfig(indexableExtrasField, 
unindexableExtrasField, unindexableFieldSuffix,
+            fieldPathsToDrop);
+    
ingestionConfig.setSchemaConformingTransformerConfig(schemaConformingTransformerConfig);
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig)
+        .build();
+  }
+
+  private Schema.SchemaBuilder createDefaultSchemaBuilder() {
+    return new 
Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME, 
DataType.JSON)
+        .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME, DataType.JSON);
+  }
+
+  @Test
+  public void testWithNoUnindexableFields() {
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "stringField":"a",
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      },
+      "nestedFields":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a"
+        }
+      }
+    }
+    */
+    final String inputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+            + 
"\"nullField\":null,\"stringField\":\"a\"},\"nestedFields\":{\"arrayField\":[0,1,2,3],"
+            + 
"\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,"
+            + "\"stringField\":\"a\"}}}";
+    String expectedOutputRecordJSONString;
+    Schema schema;
+
+    schema = createDefaultSchemaBuilder().build();
+    /*
+    {
+      "indexableExtras":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a"
+        },
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+          "mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"indexableExtras\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+            + 
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}}}}";
+    testTransformWithNoUnindexableFields(schema, inputRecordJSONString, 
expectedOutputRecordJSONString);
+
+    schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", 
DataType.INT)
+        .addSingleValueDimension("mapField", DataType.JSON)
+        .addSingleValueDimension("nestedFields.stringField", 
DataType.STRING).build();
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      },
+      "nestedFields.stringField":"a",
+      "indexableExtras":{
+        "nullField":null,
+        "stringField":"a",
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+            + 
"\"nestedFields.stringField\":\"a\",\"indexableExtras\":{\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"mapField\":{\"arrayField\":[0,1,2,3],"
+            + "\"nullField\":null,\"stringField\":\"a\"}}}}";
+    testTransformWithNoUnindexableFields(schema, inputRecordJSONString, 
expectedOutputRecordJSONString);
+
+    schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", 
DataType.INT)
+        .addSingleValueDimension("nullField", 
DataType.STRING).addSingleValueDimension("stringField", DataType.STRING)
+        .addSingleValueDimension("mapField", DataType.JSON)
+        .addMultiValueDimension("nestedFields.arrayField", DataType.INT)
+        .addSingleValueDimension("nestedFields.nullField", DataType.STRING)
+        .addSingleValueDimension("nestedFields.stringField", DataType.STRING)
+        .addSingleValueDimension("nestedFields.mapField", 
DataType.JSON).build();
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "stringField":"a",
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      },
+      "nestedFields.arrayField":[0, 1, 2, 3],
+      "nestedFields.nullField":null,
+      "nestedFields.stringField":"a",
+      "nestedFields.mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+            + 
"\"nullField\":null,\"stringField\":\"a\"},\"nestedFields.arrayField\":[0,1,2,3],\"nestedFields"
+            + 
".nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields.mapField\":{\"arrayField\":[0,1,2,"
+            + "3],\"nullField\":null,\"stringField\":\"a\"}}";
+    testTransformWithNoUnindexableFields(schema, inputRecordJSONString, 
expectedOutputRecordJSONString);
+  }
+
+  private void testTransformWithNoUnindexableFields(Schema schema, String 
inputRecordJSONString,
+      String expectedOutputRecordJSONString) {
+    testTransform(null, null, schema, null, inputRecordJSONString, 
expectedOutputRecordJSONString);
+    testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null, 
inputRecordJSONString, expectedOutputRecordJSONString);
+    testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, 
schema, null, inputRecordJSONString,
+        expectedOutputRecordJSONString);
+  }
+
+  @Test
+  public void testWithUnindexableFields() {
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "stringField":"a",
+      "intField_noIndex":9,
+      "string_noIndex":"z",
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "intField_noIndex":9,
+        "string_noIndex":"z"
+      },
+      "nestedFields":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        }
+      }
+    }
+    */
+    final String inputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"intField_noIndex\":9,"
+            + 
"\"string_noIndex\":\"z\",\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,"
+            + 
"\"stringField\":\"a\",\"intField_noIndex\":9,\"string_noIndex\":\"z\"},"
+            + 
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"intField_noIndex\":9,\"string_noIndex\":\"z\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+            + 
"\"nullField\":null,\"stringField\":\"a\",\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}";
+    String expectedOutputRecordJSONString;
+    Schema schema;
+
+    schema = createDefaultSchemaBuilder().build();
+    /*
+    {
+      "indexableExtras":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a"
+        },
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+          "mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"indexableExtras\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+            + 
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}}}}";
+    testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null, 
inputRecordJSONString, expectedOutputRecordJSONString);
+    /*
+    {
+      "indexableExtras":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a"
+        },
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+          "mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      },
+      "unindexableExtras":{
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField":{
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        },
+        "nestedFields":{
+          "intField_noIndex":9,
+          "string_noIndex":"z",
+          "mapField":{
+            "intField_noIndex":9,
+            "string_noIndex":"z"
+          }
+        }
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"indexableExtras\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+            + 
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}}},"
+            + 
"\"unindexableExtras\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\","
+            + "\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"},"
+            + 
"\"nestedFields\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\","
+            + 
"\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}}";
+    testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, 
schema, null, inputRecordJSONString,
+        expectedOutputRecordJSONString);
+
+    schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", 
DataType.INT)
+        .addSingleValueDimension("mapField", DataType.JSON)
+        .addSingleValueDimension("nestedFields.stringField", 
DataType.STRING).build();
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      },
+      "nestedFields.stringField":"a",
+      "indexableExtras":{
+        "nullField":null,
+        "stringField":"a",
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+            + 
"\"nestedFields.stringField\":\"a\",\"indexableExtras\":{\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"mapField\":{\"arrayField\":[0,1,2,3],"
+            + "\"nullField\":null,\"stringField\":\"a\"}}}}";
+    testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null, 
inputRecordJSONString, expectedOutputRecordJSONString);
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      },
+      "nestedFields.stringField":"a",
+      "indexableExtras":{
+        "nullField":null,
+        "stringField":"a",
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      },
+      "unindexableExtras":{
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField":{
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        },
+        "nestedFields":{
+          "intField_noIndex":9,
+          "string_noIndex":"z",
+          "mapField":{
+            "intField_noIndex":9,
+            "string_noIndex":"z"
+          }
+        }
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+            + 
"\"nestedFields.stringField\":\"a\",\"indexableExtras\":{\"nullField\":null,\"stringField\":\"a\","
+            + 
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"mapField\":{\"arrayField\":[0,1,2,3],"
+            + 
"\"nullField\":null,\"stringField\":\"a\"}}},\"unindexableExtras\":{\"intField_noIndex\":9,"
+            + 
"\"string_noIndex\":\"z\",\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"},"
+            + 
"\"nestedFields\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\","
+            + 
"\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}}";
+    testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, 
schema, null, inputRecordJSONString,
+        expectedOutputRecordJSONString);
+
+    schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", 
DataType.INT)
+        .addSingleValueDimension("nullField", 
DataType.STRING).addSingleValueDimension("stringField", DataType.STRING)
+        .addSingleValueDimension("mapField", DataType.JSON)
+        .addMultiValueDimension("nestedFields.arrayField", DataType.INT)
+        .addSingleValueDimension("nestedFields.nullField", DataType.STRING)
+        .addSingleValueDimension("nestedFields.stringField", DataType.STRING)
+        .addSingleValueDimension("nestedFields.mapField", 
DataType.JSON).build();
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "stringField":"a",
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      },
+      "nestedFields.arrayField":[0, 1, 2, 3],
+      "nestedFields.nullField":null,
+      "nestedFields.stringField":"a",
+      "nestedFields.mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+            + 
"\"nullField\":null,\"stringField\":\"a\"},\"nestedFields.arrayField\":[0,1,2,3],\"nestedFields"
+            + 
".nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields.mapField\":{\"arrayField\":[0,1,2,"
+            + "3],\"nullField\":null,\"stringField\":\"a\"} }";
+    testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null, 
inputRecordJSONString, expectedOutputRecordJSONString);
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "stringField":"a",
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      },
+      "nestedFields.arrayField":[0, 1, 2, 3],
+      "nestedFields.nullField":null,
+      "nestedFields.stringField":"a",
+      "nestedFields.mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a"
+      },
+      "unindexableExtras":{
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField":{
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        },
+        "nestedFields":{
+          "intField_noIndex":9,
+          "string_noIndex":"z",
+          "mapField":{
+            "intField_noIndex":9,
+            "string_noIndex":"z"
+          }
+        }
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+            + 
"\"nullField\":null,\"stringField\":\"a\"},\"nestedFields.arrayField\":[0,1,2,3],\"nestedFields"
+            + 
".nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields.mapField\":{\"arrayField\":[0,1,2,"
+            + 
"3],\"nullField\":null,\"stringField\":\"a\"},\"unindexableExtras\":{\"intField_noIndex\":9,"
+            + 
"\"string_noIndex\":\"z\",\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"},"
+            + 
"\"nestedFields\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\","
+            + 
"\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}}";
+    testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, 
schema, null, inputRecordJSONString,
+        expectedOutputRecordJSONString);
+  }
+
+  @Test
+  public void testFieldPathsToDrop() {
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "stringField":"a",
+      "boolField":false,
+      "nestedFields":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "boolField":false
+      }
+    }
+    */
+    final String inputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"boolField\":false,"
+            + 
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+            + "\"boolField\":false}}";
+    String expectedOutputRecordJSONString;
+    Schema schema;
+
+    schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", 
DataType.INT)
+        .addSingleValueDimension("nullField", DataType.STRING)
+        .addSingleValueDimension("nestedFields.stringField", DataType.STRING)
+        .addSingleValueDimension("nestedFields.boolField", 
DataType.BOOLEAN).build();
+    Set<String> fieldPathsToDrop = new HashSet<>(Arrays.asList("stringField", 
"nestedFields.arrayField"));
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "indexableExtras": {
+        "boolField":false,
+        "nestedFields": {
+          nullField":null
+        }
+      },
+      "nestedFields":{
+        "stringField":"a",
+        "boolField":false
+      }
+    }
+    */
+    expectedOutputRecordJSONString =
+        
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields"
+            + 
".boolField\":false,\"indexableExtras\":{\"boolField\":false,\"nestedFields\":{\"nullField\":null}}}";
+    testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, 
schema, fieldPathsToDrop,
+        inputRecordJSONString, expectedOutputRecordJSONString);
+  }
+
+  @Test
+  public void testIgnoringSpecialRowKeys() {
+    // Configure a FilterTransformer and a SchemaConformingTransformer such 
that the filter will introduce a special
+    // key $(SKIP_RECORD_KEY$) that the SchemaConformingTransformer should 
ignore
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setFilterConfig(new FilterConfig("intField = 1"));
+    SchemaConformingTransformerConfig schemaConformingTransformerConfig =
+        new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, 
UNINDEXABLE_EXTRAS_FIELD_NAME,
+            UNINDEXABLE_FIELD_SUFFIX, null);
+    
ingestionConfig.setSchemaConformingTransformerConfig(schemaConformingTransformerConfig);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
+
+    // Create a series of transformers: FilterTransformer -> 
SchemaConformingTransformer
+    List<RecordTransformer> transformers = new LinkedList<>();
+    transformers.add(new FilterTransformer(tableConfig));
+    Schema schema = 
createDefaultSchemaBuilder().addSingleValueDimension("intField", 
DataType.INT).build();
+    transformers.add(new SchemaConformingTransformer(tableConfig, schema));
+    CompositeTransformer compositeTransformer = new 
CompositeTransformer(transformers);
+
+    Map<String, Object> inputRecordMap = jsonStringToMap("{\"intField\":1}");
+    GenericRow inputRecord = createRowFromMap(inputRecordMap);
+    GenericRow outputRecord = compositeTransformer.transform(inputRecord);
+    Assert.assertNotNull(outputRecord);
+    // Check that the transformed record has $SKIP_RECORD_KEY$
+    Assert.assertFalse(IngestionUtils.shouldIngestRow(outputRecord));
+  }
+
+  @Test
+  public void testOverlappingSchemaFields() {
+    Assert.assertThrows(IllegalArgumentException.class, () -> {
+      Schema schema = 
createDefaultSchemaBuilder().addSingleValueDimension("a.b", DataType.STRING)
+          .addSingleValueDimension("a.b.c", DataType.INT).build();
+      SchemaConformingTransformer.validateSchema(schema,
+          new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, 
null, null, null));
+    });
+
+    // This is a repeat of the previous test but with fields reversed just in 
case they are processed in order
+    Assert.assertThrows(IllegalArgumentException.class, () -> {
+      Schema schema = 
createDefaultSchemaBuilder().addSingleValueDimension("a.b.c", DataType.INT)
+          .addSingleValueDimension("a.b", DataType.STRING).build();
+      SchemaConformingTransformer.validateSchema(schema,
+          new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, 
null, null, null));
+    });
+  }
+
+  @Test
+  public void testInvalidFieldNamesInSchema() {
+    // Ensure schema fields which end with unindexableFieldSuffix are caught 
as invalid
+    Assert.assertThrows(() -> {
+      Schema schema =
+          createDefaultSchemaBuilder().addSingleValueDimension("a" + 
UNINDEXABLE_FIELD_SUFFIX, DataType.STRING)
+              .addSingleValueDimension("a.b" + UNINDEXABLE_FIELD_SUFFIX, 
DataType.INT).build();
+      SchemaConformingTransformer.validateSchema(schema,
+          new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, 
null, UNINDEXABLE_FIELD_SUFFIX, null));
+    });
+
+    // Ensure schema fields which are in fieldPathsToDrop are caught as invalid
+    Assert.assertThrows(() -> {
+      Schema schema = 
createDefaultSchemaBuilder().addSingleValueDimension("a", DataType.STRING)
+          .addSingleValueDimension("b.c", DataType.INT).build();
+      Set<String> fieldPathsToDrop = new HashSet<>(Arrays.asList("a", "b.c"));
+      SchemaConformingTransformer.validateSchema(schema,
+          new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME, 
null, null, fieldPathsToDrop));
+    });
+  }
+
+  @Test
+  public void testSchemaRecordMismatch() {
+    Schema schema =
+        
createDefaultSchemaBuilder().addSingleValueDimension("nestedFields.mapField", 
DataType.JSON).build();
+    /*
+    {
+      "indexableExtras":{
+        "nestedFields":0,
+      }
+    }
+    */
+    // Schema field "nestedFields.map" is a Map but the record field is an 
int, so it should be stored in
+    // indexableExtras
+    testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX, 
schema, null, "{\"nestedFields\":0}",
+        "{\"indexableExtras\":{\"nestedFields\":0}}");
+  }
+
+  @Test
+  public void testFieldTypesForExtras() {
+    final String inputRecordJSONString = "{\"arrayField\":[0,1,2,3]}";
+
+    TableConfig tableConfig =
+        createDefaultTableConfig(INDEXABLE_EXTRAS_FIELD_NAME, 
UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX,
+            null);
+    Schema validSchema =
+        new 
Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME, 
DataType.STRING)
+            .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME, 
DataType.STRING).build();
+    GenericRow outputRecord = transformRow(tableConfig, validSchema, 
inputRecordJSONString);
+
+    Assert.assertNotNull(outputRecord);
+    // Validate that the indexable extras field contains the input record as a 
string
+    Assert.assertEquals(outputRecord.getValue(INDEXABLE_EXTRAS_FIELD_NAME), 
inputRecordJSONString);
+
+    // Validate that invalid field types are caught
+    Schema invalidSchema = new 
Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME, 
DataType.INT)
+        .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME, 
DataType.BOOLEAN).build();
+    Assert.assertThrows(() -> {
+      transformRow(tableConfig, invalidSchema, inputRecordJSONString);
+    });
+  }
+
+  @Test
+  public void testInvalidTransformerConfig() {
+    Assert.assertThrows(() -> {
+      createDefaultTableConfig(null, null, null, null);
+    });
+    Assert.assertThrows(() -> {
+      createDefaultTableConfig(null, UNINDEXABLE_EXTRAS_FIELD_NAME, null, 
null);
+    });
+    Assert.assertThrows(() -> {
+      createDefaultTableConfig(null, null, UNINDEXABLE_FIELD_SUFFIX, null);
+    });
+    Assert.assertThrows(() -> {
+      createDefaultTableConfig(null, UNINDEXABLE_EXTRAS_FIELD_NAME, 
UNINDEXABLE_FIELD_SUFFIX, null);
+    });
+    Assert.assertThrows(() -> {
+      createDefaultTableConfig(INDEXABLE_EXTRAS_FIELD_NAME, 
UNINDEXABLE_EXTRAS_FIELD_NAME, null, null);
+    });
+  }
+
+  /**
+   * Validates transforming the given row results in the expected row, where 
both rows are given as JSON strings
+   */
+  private void testTransform(String unindexableExtrasField, String 
unindexableFieldSuffix, Schema schema,
+      Set<String> fieldPathsToDrop, String inputRecordJSONString, String 
expectedOutputRecordJSONString) {
+    TableConfig tableConfig =
+        createDefaultTableConfig(INDEXABLE_EXTRAS_FIELD_NAME, 
unindexableExtrasField, unindexableFieldSuffix,
+            fieldPathsToDrop);
+    GenericRow outputRecord = transformRow(tableConfig, schema, 
inputRecordJSONString);
+
+    Assert.assertNotNull(outputRecord);
+    Map<String, Object> expectedOutputRecordMap = 
jsonStringToMap(expectedOutputRecordJSONString);
+    Assert.assertEquals(outputRecord.getFieldToValueMap(), 
expectedOutputRecordMap);
+  }
+
+  /**
+   * Transforms the given row (given as a JSON string) using the transformer
+   * @return The transformed row
+   */
+  private GenericRow transformRow(TableConfig tableConfig, Schema schema, 
String inputRecordJSONString) {
+    Map<String, Object> inputRecordMap = 
jsonStringToMap(inputRecordJSONString);
+    GenericRow inputRecord = createRowFromMap(inputRecordMap);
+    SchemaConformingTransformer schemaConformingTransformer = new 
SchemaConformingTransformer(tableConfig, schema);
+    return schemaConformingTransformer.transform(inputRecord);
+  }
+
+  /**
+   * @return A map representing the given JSON string
+   */
+  @Nonnull
+  private Map<String, Object> jsonStringToMap(String jsonString) {
+    try {
+      TypeReference<Map<String, Object>> typeRef = new TypeReference<>() {
+      };
+      return OBJECT_MAPPER.readValue(jsonString, typeRef);
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+    // Should never reach here
+    return null;
+  }
+
+  /**
+   * @return A new generic row with all the kv-pairs from the given map
+   */
+  private GenericRow createRowFromMap(Map<String, Object> map) {
+    GenericRow record = new GenericRow();
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+      record.putValue(entry.getKey(), entry.getValue());
+    }
+    return record;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 4b26f33cd7..86ff9cec6f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -45,6 +45,9 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Config related to handling complex type")
   private ComplexTypeConfig _complexTypeConfig;
 
+  @JsonPropertyDescription("Config related to the SchemaConformingTransformer")
+  private SchemaConformingTransformerConfig _schemaConformingTransformerConfig;
+
   @JsonPropertyDescription("Configs related to record aggregation function 
applied during ingestion")
   private List<AggregationConfig> _aggregationConfigs;
 
@@ -61,12 +64,14 @@ public class IngestionConfig extends BaseJsonConfig {
   public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig,
       @Nullable StreamIngestionConfig streamIngestionConfig, @Nullable 
FilterConfig filterConfig,
       @Nullable List<TransformConfig> transformConfigs, @Nullable 
ComplexTypeConfig complexTypeConfig,
+      @Nullable SchemaConformingTransformerConfig 
schemaConformingTransformerConfig,
       @Nullable List<AggregationConfig> aggregationConfigs) {
     _batchIngestionConfig = batchIngestionConfig;
     _streamIngestionConfig = streamIngestionConfig;
     _filterConfig = filterConfig;
     _transformConfigs = transformConfigs;
     _complexTypeConfig = complexTypeConfig;
+    _schemaConformingTransformerConfig = schemaConformingTransformerConfig;
     _aggregationConfigs = aggregationConfigs;
   }
 
@@ -98,6 +103,11 @@ public class IngestionConfig extends BaseJsonConfig {
     return _complexTypeConfig;
   }
 
+  @Nullable
+  public SchemaConformingTransformerConfig 
getSchemaConformingTransformerConfig() {
+    return _schemaConformingTransformerConfig;
+  }
+
   @Nullable
   public List<AggregationConfig> getAggregationConfigs() {
     return _aggregationConfigs;
@@ -135,6 +145,11 @@ public class IngestionConfig extends BaseJsonConfig {
     _complexTypeConfig = complexTypeConfig;
   }
 
+  public void setSchemaConformingTransformerConfig(
+      SchemaConformingTransformerConfig schemaConformingTransformerConfig) {
+    _schemaConformingTransformerConfig = schemaConformingTransformerConfig;
+  }
+
   public void setAggregationConfigs(List<AggregationConfig> 
aggregationConfigs) {
     _aggregationConfigs = aggregationConfigs;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
new file mode 100644
index 0000000000..96231f39d9
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.google.common.base.Preconditions;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+public class SchemaConformingTransformerConfig extends BaseJsonConfig {
+  @JsonPropertyDescription("Name of the field that should contain extra fields 
that are not part of the schema.")
+  private final String _indexableExtrasField;
+
+  @JsonPropertyDescription(
+      "Like indexableExtrasField except it only contains fields with the 
suffix in unindexableFieldSuffix.")
+  private final String _unindexableExtrasField;
+
+  @JsonPropertyDescription("The suffix of fields that must be stored in 
unindexableExtrasField")
+  private final String _unindexableFieldSuffix;
+
+  @JsonPropertyDescription("Array of field paths to drop")
+  private final Set<String> _fieldPathsToDrop;
+
+  @JsonCreator
+  public 
SchemaConformingTransformerConfig(@JsonProperty("indexableExtrasField") String 
indexableExtrasField,
+      @JsonProperty("unindexableExtrasField") @Nullable String 
unindexableExtrasField,
+      @JsonProperty("unindexableFieldSuffix") @Nullable String 
unindexableFieldSuffix,
+      @JsonProperty("fieldPathsToDrop") @Nullable Set<String> 
fieldPathsToDrop) {
+    Preconditions.checkArgument(indexableExtrasField != null, 
"indexableExtrasField must be set");
+    if (null != unindexableExtrasField) {
+      Preconditions.checkArgument(null != unindexableFieldSuffix,
+          "unindexableExtrasSuffix must be set if unindexableExtrasField is 
set");
+    }
+    _indexableExtrasField = indexableExtrasField;
+    _unindexableExtrasField = unindexableExtrasField;
+    _unindexableFieldSuffix = unindexableFieldSuffix;
+    _fieldPathsToDrop = fieldPathsToDrop;
+  }
+
+  public String getIndexableExtrasField() {
+    return _indexableExtrasField;
+  }
+
+  @Nullable
+  public String getUnindexableExtrasField() {
+    return _unindexableExtrasField;
+  }
+
+  @Nullable
+  public String getUnindexableFieldSuffix() {
+    return _unindexableFieldSuffix;
+  }
+
+  @Nullable
+  public Set<String> getFieldPathsToDrop() {
+    return _fieldPathsToDrop;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 8ef3838189..2bc15a0800 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -78,6 +78,13 @@ public class GenericRow implements Serializable {
   private final Map<String, Object> _fieldToValueMap = new HashMap<>();
   private final Set<String> _nullValueFields = new HashSet<>();
 
+  /**
+   * @return Whether the given key is one of the special types of keys 
($SKIP_RECORD_KEY$, etc.)
+   */
+  public static boolean isSpecialKeyType(String key) {
+    return key.equals(SKIP_RECORD_KEY) || key.equals(INCOMPLETE_RECORD_KEY) || 
key.equals(MULTIPLE_RECORDS_KEY);
+  }
+
   /**
    * Initializes the generic row from the given generic row (shallow copy). 
The row should be new created or cleared
    * before calling this method.
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
index 5093c2c8b9..89c70acaf8 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -34,6 +34,13 @@ public class StreamDataDecoderImpl implements 
StreamDataDecoder {
   private final StreamMessageDecoder _valueDecoder;
   private final GenericRow _reuse = new GenericRow();
 
+  /**
+   * @return Whether the given key is one of the special types of keys (__key, 
__header$, etc.)
+   */
+  public static boolean isSpecialKeyType(String key) {
+    return key.equals(KEY) || key.startsWith(HEADER_KEY_PREFIX) || 
key.startsWith(METADATA_KEY_PREFIX);
+  }
+
   public StreamDataDecoderImpl(StreamMessageDecoder valueDecoder) {
     _valueDecoder = valueDecoder;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to