lnbest0707-uber commented on code in PR #12788:
URL: https://github.com/apache/pinot/pull/12788#discussion_r1552476272


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java:
##########
@@ -0,0 +1,715 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.utils.Base64Utils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import 
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerV2Config;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This transformer evolves from {@link SchemaConformingTransformer} and is 
designed to support extra cases for
+ * better text searching:
+ *   - Support over-lapping schema fields, in which case it could support 
schema column "a" and "a.b" at the same time.
+ *     And it only allows primitive type fields to be the value.
+ *   - Extract flattened key-value pairs as mergedTextIndex for better text 
searching.
+ *   - Add shingle index tokenization functionality for extremely large text 
fields.
+ * <p>
+ * For example, consider this record:
+ * <pre>
+ * {
+ *   "a": 1,
+ *   "b": "2",
+ *   "c": {
+ *     "d": 3,
+ *     "e_noindex": 4,
+ *     "f_noindex": {
+ *       "g": 5
+ *      },
+ *     "x": {
+ *       "y": 9,
+ *       "z_noindex": 10
+ *     }
+ *   }
+ *   "h_noindex": "6",
+ *   "i_noindex": {
+ *     "j": 7,
+ *     "k": 8
+ *   }
+ * }
+ * </pre>
+ * And let's say the table's schema contains these fields:
+ * <ul>
+ *   <li>a</li>
+ *   <li>c</li>
+ *   <li>c.d</li>
+ * </ul>
+ * <p>
+ * The record would be transformed into the following (refer to {@link 
SchemaConformingTransformerV2Config} for
+ *  * default constant values):
+ * <pre>
+ * {
+ *   "a": 1,
+ *   "c": null,
+ *   "c.d": 3,
+ *   "json_data": {
+ *     "b": "2",
+ *     "c": {
+ *       "x": {
+ *         "y": 9
+ *       }
+ *     }
+ *   }
+ *   "json_data_no_idx": {
+ *     "c": {
+ *       "e_noindex": 4,
+ *       "f_noindex": {
+ *         "g": 5
+ *       },
+ *       "x": {
+ *         "z_noindex": 10
+ *       }
+ *     },
+ *     "h_noindex": "6",
+ *     "i_noindex": {
+ *       "j": 7,
+ *       "k": 8
+ *     }
+ *   },
+ *   "__mergedTextIndex": [
+ *     "1:a", "2:b", "3:c.d", "9:c.x.y"
+ *   ]
+ * }
+ * </pre>
+ * <p>
+ * The "__mergedTextIndex" could filter and manipulate the data based on the 
configuration in
+ * {@link SchemaConformingTransformerV2Config}.
+ */
+public class SchemaConformingTransformerV2 implements RecordTransformer {
+  private static final Logger _logger = 
LoggerFactory.getLogger(SchemaConformingTransformerV2.class);
+  private static final int MAXIMUM_LUCENE_TOKEN_SIZE = 32766;
+  private static final String MIN_TOKEN_LENGTH_DESCRIPTION =
+      "key length + `:` + shingle index overlap length + one non-overlap char";
+
+  private final boolean _continueOnError;
+  private final SchemaConformingTransformerV2Config _transformerConfig;
+  private final DataType _indexableExtrasFieldType;
+  private final DataType _unindexableExtrasFieldType;
+  private final DimensionFieldSpec _mergedTextIndexFieldSpec;
+  @Nullable
+  ServerMetrics _serverMetrics = null;
+  private SchemaTreeNode _schemaTree;
+  @Nullable
+  private PinotMeter _realtimeMergedTextIndexTruncatedTokenSizeMeter = null;
+  private String _tableName;
+  private long _mergedTextIndexTokenBytesCount = 0L;
+  private long _mergedTextIndexTokenCount = 0L;
+
+  public SchemaConformingTransformerV2(TableConfig tableConfig, Schema schema) 
{
+    if (null == tableConfig.getIngestionConfig() || null == 
tableConfig.getIngestionConfig()
+        .getSchemaConformingTransformerV2Config()) {
+      _continueOnError = false;
+      _transformerConfig = null;
+      _indexableExtrasFieldType = null;
+      _unindexableExtrasFieldType = null;
+      _mergedTextIndexFieldSpec = null;
+      return;
+    }
+
+    _continueOnError = tableConfig.getIngestionConfig().isContinueOnError();
+    _transformerConfig = 
tableConfig.getIngestionConfig().getSchemaConformingTransformerV2Config();
+    String indexableExtrasFieldName = 
_transformerConfig.getIndexableExtrasField();
+    _indexableExtrasFieldType =
+        indexableExtrasFieldName == null ? null : 
SchemaConformingTransformer.getAndValidateExtrasFieldType(schema,
+            indexableExtrasFieldName);
+    String unindexableExtrasFieldName = 
_transformerConfig.getUnindexableExtrasField();
+    _unindexableExtrasFieldType =
+        null == unindexableExtrasFieldName ? null : 
SchemaConformingTransformer.getAndValidateExtrasFieldType(schema,
+            unindexableExtrasFieldName);
+    _mergedTextIndexFieldSpec = 
schema.getDimensionSpec(_transformerConfig.getMergedTextIndexField());
+    _tableName = tableConfig.getTableName();
+    _schemaTree = validateSchemaAndCreateTree(schema, _transformerConfig);
+    _serverMetrics = ServerMetrics.get();
+  }
+
+  /**
+   * Validates the schema against the given transformer's configuration.
+   */
+  public static void validateSchema(@Nonnull Schema schema,
+      @Nonnull SchemaConformingTransformerV2Config transformerConfig) {
+    validateSchemaFieldNames(schema.getPhysicalColumnNames(), 
transformerConfig);
+
+    String indexableExtrasFieldName = 
transformerConfig.getIndexableExtrasField();
+    if (null != indexableExtrasFieldName) {
+      SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, 
indexableExtrasFieldName);
+    }
+    String unindexableExtrasFieldName = 
transformerConfig.getUnindexableExtrasField();
+    if (null != unindexableExtrasFieldName) {
+      SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, 
indexableExtrasFieldName);
+    }
+
+    validateSchemaAndCreateTree(schema, transformerConfig);
+  }
+
+  /**
+   * Heuristic filter to detect whether a byte array is longer than a 
specified length and contains only base64
+   * characters so that we treat it as encoded binary data.
+   * @param bytes array to check
+   * @param minLength byte array shorter than this length will not be treated 
as encoded binary data
+   * @return true if the input bytes is base64 encoded binary data by the 
heuristic above, false otherwise
+   */
+  public static boolean base64ValueFilter(final byte[] bytes, int minLength) {
+    return bytes.length >= minLength && 
Base64Utils.isBase64IgnoreTrailingPeriods(bytes);
+  }
+
+  /**
+   * Validates that none of the schema fields have names that conflict with 
the transformer's configuration.
+   */
+  private static void validateSchemaFieldNames(Set<String> schemaFields,
+      SchemaConformingTransformerV2Config transformerConfig) {
+    // Validate that none of the columns in the schema end with 
unindexableFieldSuffix
+    String unindexableFieldSuffix = 
transformerConfig.getUnindexableFieldSuffix();
+    if (null != unindexableFieldSuffix) {
+      for (String field : schemaFields) {
+        Preconditions.checkState(!field.endsWith(unindexableFieldSuffix), 
"Field '%s' has no-index suffix '%s'", field,
+            unindexableFieldSuffix);
+      }
+    }
+
+    // Validate that none of the columns in the schema end overlap with the 
fields in fieldPathsToDrop
+    Set<String> fieldPathsToDrop = transformerConfig.getFieldPathsToDrop();
+    if (null != fieldPathsToDrop) {
+      Set<String> fieldIntersection = new HashSet<>(schemaFields);
+      fieldIntersection.retainAll(fieldPathsToDrop);
+      Preconditions.checkState(fieldIntersection.isEmpty(), "Fields in schema 
overlap with fieldPathsToDrop");
+    }
+  }
+
+  /**
+   * Validates the schema with a SchemaConformingTransformerConfig instance 
and creates a tree representing the fields

Review Comment:
   Thanks for reminding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to