Jackie-Jiang commented on code in PR #18816: URL: https://github.com/apache/pinot/pull/18816#discussion_r3445028512
########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SourceFieldConfig.java: ########## @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table.ingestion; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.config.BaseJsonConfig; +import org.apache.pinot.spi.utils.PinotDataType; + + +/// Configures a data type fix for a single source (input) field during ingestion, applied before other transformers +/// consume the field. Useful when a source field arrives with a type that a downstream enricher or transform +/// expression does not expect (e.g. an epoch timestamp arriving as a `String`). +/// +/// Each config maps a source field ([#getName()]) to the target [PinotDataType] ([#getDataType()]) it should be +/// converted to. The [#isPreComplexTypeTransform()] flag selects when the fix runs: +/// - `true`: before the complex type transformation (and the pre-complex-type enrichers), so the corrected value can +/// feed complex type flattening and pre-complex-type enrichment. +/// - `false` (default): after the complex type transformation, so flattened/unnested fields can be fixed before the +/// post-complex-type enrichers and the expression transformer run. +/// +/// A source field may be configured at most once per phase. +public class SourceFieldConfig extends BaseJsonConfig { + @JsonPropertyDescription("Name of the source field to fix the data type for") + private final String _name; + + @JsonPropertyDescription("Target data type (PinotDataType name, e.g. INT, LONG, STRING, LONG_ARRAY) to convert " + + "the source field to") + private final PinotDataType _dataType; + + @JsonPropertyDescription("Whether the data type conversion is applied before the complex type transformation") + private final boolean _preComplexTypeTransform; + + @JsonCreator + public SourceFieldConfig(@JsonProperty(value = "name", required = true) String name, + @JsonProperty(value = "dataType", required = true) PinotDataType dataType, + @JsonProperty("preComplexTypeTransform") boolean preComplexTypeTransform) { + Preconditions.checkArgument(StringUtils.isNotEmpty(name), "'name' must be set in SourceFieldConfig"); + Preconditions.checkArgument(dataType != null, "'dataType' must be set in SourceFieldConfig for source field: %s", Review Comment: Done — switched to `StringUtils.isNotBlank(name)` and added a whitespace-only case to `SourceFieldConfigTest`. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java: ########## @@ -39,23 +39,42 @@ * <p>NOTE: should put this after all the values has been generated by other transformers (such as * {@link ExpressionTransformer}). After this, all values should be of the desired data types. */ Review Comment: Done — updated the class Javadoc to document both usages: schema columns (run after other transformers such as `ExpressionTransformer`) and source fields (run before them to fix input types). ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java: ########## @@ -38,6 +37,9 @@ public class IngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Config related to the stream data sources") private StreamIngestionConfig _streamIngestionConfig; + @JsonPropertyDescription("Configs to fix the data types of the source fields before applying other transforms") + private List<SourceFieldConfig> _sourceFieldConfigs; + Review Comment: That constructor was already `@Deprecated` and has no in-tree callers (`IngestionConfig` is built via setters everywhere). Removing it is an intentional cleanup; downstream code on the deprecated constructor can migrate to the no-arg constructor + setters. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java: ########## @@ -39,23 +39,42 @@ * <p>NOTE: should put this after all the values has been generated by other transformers (such as * {@link ExpressionTransformer}). After this, all values should be of the desired data types. */ -@SuppressWarnings("rawtypes") public class DataTypeTransformer implements RecordTransformer { private static final Logger LOGGER = LoggerFactory.getLogger(DataTypeTransformer.class); - private final Map<String, PinotDataType> _dataTypes = new HashMap<>(); + private final Map<String, PinotDataType> _dataTypes; private final boolean _continueOnError; private final ThrottledLogger _throttledLogger; + /// Creates a [DataTypeTransformer] that converts the (non-virtual) schema columns to the data types defined in the + /// [Schema]. public DataTypeTransformer(TableConfig tableConfig, Schema schema) { + this(tableConfig, extractSchemaDataTypes(schema)); + } + + /// Creates a [DataTypeTransformer] that converts the given columns to the provided [PinotDataType]s. This is useful + /// for fixing the data types of source fields before other transformers (such as [ExpressionTransformer]) consume + /// them. + public DataTypeTransformer(TableConfig tableConfig, Map<String, PinotDataType> dataTypes) { + _dataTypes = dataTypes; + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + _continueOnError = ingestionConfig != null && ingestionConfig.isContinueOnError(); + _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig); + } Review Comment: Both call sites build a fresh map that they neither retain nor mutate after construction, so direct assignment is safe and avoids an extra per-segment copy. `getInputColumns()` is consumed read-only by the transform pipeline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
