This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 65e97d23ef Allow expression transformer cotinue on error (#9376)
65e97d23ef is described below

commit 65e97d23efaa3afc5b6af653575b5fe5bd26756d
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Sep 10 20:58:57 2022 -0700

    Allow expression transformer cotinue on error (#9376)
---
 .../recordtransformer/ExpressionTransformer.java   | 13 ++++-
 .../ExpressionTransformerTest.java                 | 62 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 2 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
index 1c208639b8..b3a29c02b1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
@@ -46,9 +46,11 @@ public class ExpressionTransformer implements 
RecordTransformer {
 
   @VisibleForTesting
   final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new 
LinkedHashMap<>();
+  private final boolean _continueOnError;
 
   public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
     Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
+    _continueOnError = tableConfig.getIngestionConfig() != null && 
tableConfig.getIngestionConfig().isContinueOnError();
     if (tableConfig.getIngestionConfig() != null && 
tableConfig.getIngestionConfig().getTransformConfigs() != null) {
       for (TransformConfig transformConfig : 
tableConfig.getIngestionConfig().getTransformConfigs()) {
         FunctionEvaluator previous = 
expressionEvaluators.put(transformConfig.getColumnName(),
@@ -125,8 +127,15 @@ public class ExpressionTransformer implements 
RecordTransformer {
       // Skip transformation if column value already exist.
       // NOTE: column value might already exist for OFFLINE data
       if (record.getValue(column) == null) {
-        Object result = transformFunctionEvaluator.evaluate(record);
-        record.putValue(column, result);
+        if (_continueOnError) {
+          try {
+            record.putValue(column, 
transformFunctionEvaluator.evaluate(record));
+          } catch (Exception e) {
+            record.putValue(column, null);
+          }
+        } else {
+          record.putValue(column, transformFunctionEvaluator.evaluate(record));
+        }
       }
     }
     return record;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
index 756ea7ae7b..fe17c20aa4 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
@@ -338,4 +338,66 @@ public class ExpressionTransformerTest {
             .setIngestionConfig(ingestionConfig).build();
     ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, schema);
   }
+
+  @Test
+  public void testTransformFunctionWithWrongInput() {
+    Schema pinotSchema = new Schema();
+    DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("x", 
FieldSpec.DataType.INT, true);
+    pinotSchema.addField(dimensionFieldSpec);
+    List<TransformConfig> transformConfigs = Collections.singletonList(
+        new TransformConfig("y", "plus(x, 10)"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTransformFunctionWithWrongInput")
+            .setIngestionConfig(ingestionConfig)
+            .build();
+    ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, pinotSchema);
+    // Valid case: x is int, y is int
+    GenericRow genericRow = new GenericRow();
+    genericRow.putValue("x", 10);
+    expressionTransformer.transform(genericRow);
+    Assert.assertEquals(genericRow.getValue("y"), 20.0);
+    // Invalid case: x is string, y is int
+    genericRow = new GenericRow();
+    genericRow.putValue("x", "abcd");
+    try {
+      expressionTransformer.transform(genericRow);
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "Caught exception while executing 
function: plus(x,'10')");
+    }
+  }
+
+  @Test
+  public void testTransformFunctionContinueOnError() {
+    Schema pinotSchema = new Schema();
+    DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("x", 
FieldSpec.DataType.INT, true);
+    pinotSchema.addField(dimensionFieldSpec);
+    List<TransformConfig> transformConfigs = Collections.singletonList(
+        new TransformConfig("y", "plus(x, 10)"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
+    ingestionConfig.setContinueOnError(true);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTransformFunctionWithWrongInput")
+            .setIngestionConfig(ingestionConfig)
+            .build();
+    ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, pinotSchema);
+    // Valid case: x is int, y is int
+    GenericRow genericRow = new GenericRow();
+    genericRow.putValue("x", 10);
+    expressionTransformer.transform(genericRow);
+    Assert.assertEquals(genericRow.getValue("y"), 20.0);
+    // Invalid case: x is string, y is int
+    genericRow = new GenericRow();
+    genericRow.putValue("x", "abcd");
+    expressionTransformer.transform(genericRow);
+    Assert.assertEquals(genericRow.getValue("y"), null);
+    // Invalid case: x is null, y is int
+    genericRow = new GenericRow();
+    genericRow.putValue("x", null);
+    expressionTransformer.transform(genericRow);
+    Assert.assertEquals(genericRow.getValue("y"), null);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to