Jackie-Jiang commented on code in PR #13103:
URL: https://github.com/apache/pinot/pull/13103#discussion_r1614040826


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java:
##########
@@ -36,47 +37,108 @@
  * </ul>
  * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all 
values follow the data types in
  * {@link FieldSpec}.
+ * This uses the MaxLengthExceedStrategy in the {@link FieldSpec} to decide 
what to do when the value exceeds the max.
+ * For TRIM_LENGTH, the value is trimmed to the max length.
+ * For SUBSTITUTE_DEFAULT_VALUE, the value is replaced with the default null 
value string.
+ * For FAIL_INGESTION, an exception is thrown and the record is skipped.
+ * For NO_ACTION, the value is kept as is if no NULL_CHARACTER present else 
trimmed till NULL.
+ * In the first 2 scenarios, this metric INCOMPLETE_REALTIME_ROWS_CONSUMED can 
be tracked to know if a trimmed /
+ * default record was persisted.
+ * In the third scenario, this metric ROWS_WITH_ERRORS can be tracked  to know 
if a record was skipped.
+ * In the last scenario, this metric INCOMPLETE_REALTIME_ROWS_CONSUMED can be 
tracked to know if a record was trimmed
+ * due to having a null character.
  */
 public class SanitizationTransformer implements RecordTransformer {
-  private final Map<String, Integer> _stringColumnMaxLengthMap = new 
HashMap<>();
+  private static final String NULL_CHARACTER = "\0";
+  private final Map<String, FieldSpec> _stringColumnToFieldSpecMap = new 
HashMap<>();
 
   public SanitizationTransformer(Schema schema) {
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
       if (!fieldSpec.isVirtualColumn() && fieldSpec.getDataType() == 
DataType.STRING) {
-        _stringColumnMaxLengthMap.put(fieldSpec.getName(), 
fieldSpec.getMaxLength());
+        _stringColumnToFieldSpecMap.put(fieldSpec.getName(), fieldSpec);
       }
     }
   }
 
   @Override
   public boolean isNoOp() {
-    return _stringColumnMaxLengthMap.isEmpty();
+    return _stringColumnToFieldSpecMap.isEmpty();
   }
 
   @Override
   public GenericRow transform(GenericRow record) {
-    for (Map.Entry<String, Integer> entry : 
_stringColumnMaxLengthMap.entrySet()) {
+    for (Map.Entry<String, FieldSpec> entry : 
_stringColumnToFieldSpecMap.entrySet()) {
       String stringColumn = entry.getKey();
-      int maxLength = entry.getValue();
       Object value = record.getValue(stringColumn);
       if (value instanceof String) {
         // Single-valued column
-        String stringValue = (String) value;
-        String sanitizedValue = StringUtil.sanitizeStringValue(stringValue, 
maxLength);
-        // NOTE: reference comparison
-        //noinspection StringEquality
-        if (sanitizedValue != stringValue) {
-          record.putValue(stringColumn, sanitizedValue);
+        Pair<String, Boolean> result = sanitizeValue(stringColumn, (String) 
value, entry.getValue());
+        record.putValue(stringColumn, result.getLeft());
+        if (result.getRight()) {
+          record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
         }
       } else {
         // Multi-valued column
         Object[] values = (Object[]) value;
-        int numValues = values.length;
-        for (int i = 0; i < numValues; i++) {
-          values[i] = StringUtil.sanitizeStringValue(values[i].toString(), 
maxLength);
+        for (int i = 0; i < values.length; i++) {
+          Pair<String, Boolean> result = sanitizeValue(stringColumn, 
values[i].toString(), entry.getValue());
+          values[i] = result.getLeft();
+          if (result.getRight()) {
+            record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
+          }
         }
       }
     }
     return record;
   }
+
+  /**
+   * Sanitize the value for the given column. Usually a STRING column can be 
extended for JSON / BYTES in future.
+   * @param stringColumn column name
+   * @param value value of the column
+   * @param columnFieldSpec field spec of the column defined in schema
+   * @return the sanitized value and a boolean indicating if the value was 
sanitized
+   */
+  private Pair<String, Boolean> sanitizeValue(String stringColumn, String 
value, FieldSpec columnFieldSpec) {
+    String sanitizedValue = StringUtil.sanitizeStringValue(value, 
columnFieldSpec.getMaxLength());

Review Comment:
   We can pass `MaxLengthExceedStrategy` into this method and let the method 
handle it



##########
pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java:
##########
@@ -98,6 +100,10 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
     }
   }
 
+  public enum MaxLengthExceedStrategy {
+    TRIM_LENGTH, FAIL_INGESTION, SUBSTITUTE_DEFAULT_VALUE, NO_ACTION

Review Comment:
   Suggest replacing `FAIL_INGESTION` with `THROW_EXCEPTION`. Real-time 
ingestion flow is also using the transformer. When it throws exception, IIRC we 
will skip that row instead of failing the ingestion. IMO `THROW_EXCEPTION` can 
describe the behavior more accurately



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java:
##########
@@ -36,47 +37,108 @@
  * </ul>
  * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all 
values follow the data types in
  * {@link FieldSpec}.
+ * This uses the MaxLengthExceedStrategy in the {@link FieldSpec} to decide 
what to do when the value exceeds the max.
+ * For TRIM_LENGTH, the value is trimmed to the max length.
+ * For SUBSTITUTE_DEFAULT_VALUE, the value is replaced with the default null 
value string.
+ * For FAIL_INGESTION, an exception is thrown and the record is skipped.
+ * For NO_ACTION, the value is kept as is if no NULL_CHARACTER present else 
trimmed till NULL.
+ * In the first 2 scenarios, this metric INCOMPLETE_REALTIME_ROWS_CONSUMED can 
be tracked to know if a trimmed /
+ * default record was persisted.
+ * In the third scenario, this metric ROWS_WITH_ERRORS can be tracked  to know 
if a record was skipped.
+ * In the last scenario, this metric INCOMPLETE_REALTIME_ROWS_CONSUMED can be 
tracked to know if a record was trimmed
+ * due to having a null character.
  */
 public class SanitizationTransformer implements RecordTransformer {
-  private final Map<String, Integer> _stringColumnMaxLengthMap = new 
HashMap<>();
+  private static final String NULL_CHARACTER = "\0";
+  private final Map<String, FieldSpec> _stringColumnToFieldSpecMap = new 
HashMap<>();
 
   public SanitizationTransformer(Schema schema) {
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
       if (!fieldSpec.isVirtualColumn() && fieldSpec.getDataType() == 
DataType.STRING) {

Review Comment:
   Currently we are still only handling STRING columns. Let's also add BYTES 
and JSON into this transformer



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java:
##########
@@ -36,47 +37,108 @@
  * </ul>
  * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all 
values follow the data types in
  * {@link FieldSpec}.
+ * This uses the MaxLengthExceedStrategy in the {@link FieldSpec} to decide 
what to do when the value exceeds the max.
+ * For TRIM_LENGTH, the value is trimmed to the max length.
+ * For SUBSTITUTE_DEFAULT_VALUE, the value is replaced with the default null 
value string.
+ * For FAIL_INGESTION, an exception is thrown and the record is skipped.
+ * For NO_ACTION, the value is kept as is if no NULL_CHARACTER present else 
trimmed till NULL.
+ * In the first 2 scenarios, this metric INCOMPLETE_REALTIME_ROWS_CONSUMED can 
be tracked to know if a trimmed /
+ * default record was persisted.
+ * In the third scenario, this metric ROWS_WITH_ERRORS can be tracked  to know 
if a record was skipped.
+ * In the last scenario, this metric INCOMPLETE_REALTIME_ROWS_CONSUMED can be 
tracked to know if a record was trimmed
+ * due to having a null character.
  */
 public class SanitizationTransformer implements RecordTransformer {
-  private final Map<String, Integer> _stringColumnMaxLengthMap = new 
HashMap<>();
+  private static final String NULL_CHARACTER = "\0";
+  private final Map<String, FieldSpec> _stringColumnToFieldSpecMap = new 
HashMap<>();
 
   public SanitizationTransformer(Schema schema) {
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
       if (!fieldSpec.isVirtualColumn() && fieldSpec.getDataType() == 
DataType.STRING) {
-        _stringColumnMaxLengthMap.put(fieldSpec.getName(), 
fieldSpec.getMaxLength());
+        _stringColumnToFieldSpecMap.put(fieldSpec.getName(), fieldSpec);
       }
     }
   }
 
   @Override
   public boolean isNoOp() {
-    return _stringColumnMaxLengthMap.isEmpty();
+    return _stringColumnToFieldSpecMap.isEmpty();
   }
 
   @Override
   public GenericRow transform(GenericRow record) {
-    for (Map.Entry<String, Integer> entry : 
_stringColumnMaxLengthMap.entrySet()) {
+    for (Map.Entry<String, FieldSpec> entry : 
_stringColumnToFieldSpecMap.entrySet()) {
       String stringColumn = entry.getKey();
-      int maxLength = entry.getValue();
       Object value = record.getValue(stringColumn);
       if (value instanceof String) {
         // Single-valued column
-        String stringValue = (String) value;
-        String sanitizedValue = StringUtil.sanitizeStringValue(stringValue, 
maxLength);
-        // NOTE: reference comparison
-        //noinspection StringEquality
-        if (sanitizedValue != stringValue) {
-          record.putValue(stringColumn, sanitizedValue);
+        Pair<String, Boolean> result = sanitizeValue(stringColumn, (String) 
value, entry.getValue());
+        record.putValue(stringColumn, result.getLeft());
+        if (result.getRight()) {
+          record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
         }
       } else {
         // Multi-valued column
         Object[] values = (Object[]) value;
-        int numValues = values.length;
-        for (int i = 0; i < numValues; i++) {
-          values[i] = StringUtil.sanitizeStringValue(values[i].toString(), 
maxLength);
+        for (int i = 0; i < values.length; i++) {
+          Pair<String, Boolean> result = sanitizeValue(stringColumn, 
values[i].toString(), entry.getValue());
+          values[i] = result.getLeft();
+          if (result.getRight()) {
+            record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
+          }
         }
       }
     }
     return record;
   }
+
+  /**
+   * Sanitize the value for the given column. Usually a STRING column can be 
extended for JSON / BYTES in future.
+   * @param stringColumn column name
+   * @param value value of the column
+   * @param columnFieldSpec field spec of the column defined in schema
+   * @return the sanitized value and a boolean indicating if the value was 
sanitized
+   */
+  private Pair<String, Boolean> sanitizeValue(String stringColumn, String 
value, FieldSpec columnFieldSpec) {
+    String sanitizedValue = StringUtil.sanitizeStringValue(value, 
columnFieldSpec.getMaxLength());
+    FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy;
+    if (columnFieldSpec.getMaxLengthExceedStrategy() == null) {

Review Comment:
   Suggest doing this check in the constructor instead of on each row.
   We'll need maxLength, maxLengthExceedStrategy, defaultValue for each column. 
We can make a private class to hold them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to