Jackie-Jiang commented on code in PR #18816:
URL: https://github.com/apache/pinot/pull/18816#discussion_r3445028512


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SourceFieldConfig.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.utils.PinotDataType;
+
+
+/// Configures a data type fix for a single source (input) field during 
ingestion, applied before other transformers
+/// consume the field. Useful when a source field arrives with a type that a 
downstream enricher or transform
+/// expression does not expect (e.g. an epoch timestamp arriving as a 
`String`).
+///
+/// Each config maps a source field ([#getName()]) to the target 
[PinotDataType] ([#getDataType()]) it should be
+/// converted to. The [#isPreComplexTypeTransform()] flag selects when the fix 
runs:
+/// - `true`: before the complex type transformation (and the pre-complex-type 
enrichers), so the corrected value can
+///   feed complex type flattening and pre-complex-type enrichment.
+/// - `false` (default): after the complex type transformation, so 
flattened/unnested fields can be fixed before the
+///   post-complex-type enrichers and the expression transformer run.
+///
+/// A source field may be configured at most once per phase.
+public class SourceFieldConfig extends BaseJsonConfig {
+  @JsonPropertyDescription("Name of the source field to fix the data type for")
+  private final String _name;
+
+  @JsonPropertyDescription("Target data type (PinotDataType name, e.g. INT, 
LONG, STRING, LONG_ARRAY) to convert "
+      + "the source field to")
+  private final PinotDataType _dataType;
+
+  @JsonPropertyDescription("Whether the data type conversion is applied before 
the complex type transformation")
+  private final boolean _preComplexTypeTransform;
+
+  @JsonCreator
+  public SourceFieldConfig(@JsonProperty(value = "name", required = true) 
String name,
+      @JsonProperty(value = "dataType", required = true) PinotDataType 
dataType,
+      @JsonProperty("preComplexTypeTransform") boolean 
preComplexTypeTransform) {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(name), "'name' must be 
set in SourceFieldConfig");
+    Preconditions.checkArgument(dataType != null, "'dataType' must be set in 
SourceFieldConfig for source field: %s",

Review Comment:
   Done — switched to `StringUtils.isNotBlank(name)` and added a 
whitespace-only case to `SourceFieldConfigTest`.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java:
##########
@@ -39,23 +39,42 @@
  * <p>NOTE: should put this after all the values has been generated by other 
transformers (such as
  * {@link ExpressionTransformer}). After this, all values should be of the 
desired data types.
  */

Review Comment:
   Done — updated the class Javadoc to document both usages: schema columns 
(run after other transformers such as `ExpressionTransformer`) and source 
fields (run before them to fix input types).



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -38,6 +37,9 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Config related to the stream data sources")
   private StreamIngestionConfig _streamIngestionConfig;
 
+  @JsonPropertyDescription("Configs to fix the data types of the source fields 
before applying other transforms")
+  private List<SourceFieldConfig> _sourceFieldConfigs;
+

Review Comment:
   That constructor was already `@Deprecated` and has no in-tree callers 
(`IngestionConfig` is built via setters everywhere). Removing it is an 
intentional cleanup; downstream code on the deprecated constructor can migrate 
to the no-arg constructor + setters.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java:
##########
@@ -39,23 +39,42 @@
  * <p>NOTE: should put this after all the values has been generated by other 
transformers (such as
  * {@link ExpressionTransformer}). After this, all values should be of the 
desired data types.
  */
-@SuppressWarnings("rawtypes")
 public class DataTypeTransformer implements RecordTransformer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataTypeTransformer.class);
 
-  private final Map<String, PinotDataType> _dataTypes = new HashMap<>();
+  private final Map<String, PinotDataType> _dataTypes;
   private final boolean _continueOnError;
   private final ThrottledLogger _throttledLogger;
 
+  /// Creates a [DataTypeTransformer] that converts the (non-virtual) schema 
columns to the data types defined in the
+  /// [Schema].
   public DataTypeTransformer(TableConfig tableConfig, Schema schema) {
+    this(tableConfig, extractSchemaDataTypes(schema));
+  }
+
+  /// Creates a [DataTypeTransformer] that converts the given columns to the 
provided [PinotDataType]s. This is useful
+  /// for fixing the data types of source fields before other transformers 
(such as [ExpressionTransformer]) consume
+  /// them.
+  public DataTypeTransformer(TableConfig tableConfig, Map<String, 
PinotDataType> dataTypes) {
+    _dataTypes = dataTypes;
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+  }

Review Comment:
   Both call sites build a fresh map that they neither retain nor mutate after 
construction, so direct assignment is safe and avoids an extra per-segment 
copy. `getInputColumns()` is consumed read-only by the transform pipeline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to