This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4e66857849d1ac793ad77d211d602295d08f827f
Author: XuQianJin-Stars <forwar...@apache.com>
AuthorDate: Mon Aug 15 15:06:03 2022 +0800

    fix RowDataProjection with project and projectAsValues's NPE
---
 .../org/apache/hudi/util/RowDataProjection.java     | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
index 8076d982b9..51df29faae 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.Serializable;
 import java.util.Arrays;
@@ -33,14 +35,19 @@ import java.util.List;
  * Utilities to project the row data with given positions.
  */
 public class RowDataProjection implements Serializable {
+  private static final Logger LOG = 
LogManager.getLogger(RowDataProjection.class);
+
   private static final long serialVersionUID = 1L;
 
   private final RowData.FieldGetter[] fieldGetters;
 
+  private final LogicalType[] types;
+
   private RowDataProjection(LogicalType[] types, int[] positions) {
     ValidationUtils.checkArgument(types.length == positions.length,
         "types and positions should have the equal number");
     this.fieldGetters = new RowData.FieldGetter[types.length];
+    this.types = types;
     for (int i = 0; i < types.length; i++) {
       final LogicalType type = types[i];
       final int pos = positions[i];
@@ -69,7 +76,12 @@ public class RowDataProjection implements Serializable {
   public RowData project(RowData rowData) {
     GenericRowData genericRowData = new 
GenericRowData(this.fieldGetters.length);
     for (int i = 0; i < this.fieldGetters.length; i++) {
-      final Object val = this.fieldGetters[i].getFieldOrNull(rowData);
+      Object val = null;
+      try {
+        val = rowData.isNullAt(i) ? null : 
this.fieldGetters[i].getFieldOrNull(rowData);
+      } catch (Throwable e) {
+        LOG.error(String.format("position=%s, fieldType=%s,\n data=%s", i, 
types[i].toString(), rowData.toString()));
+      }
       genericRowData.setField(i, val);
     }
     return genericRowData;
@@ -81,7 +93,12 @@ public class RowDataProjection implements Serializable {
   public Object[] projectAsValues(RowData rowData) {
     Object[] values = new Object[this.fieldGetters.length];
     for (int i = 0; i < this.fieldGetters.length; i++) {
-      final Object val = this.fieldGetters[i].getFieldOrNull(rowData);
+      Object val = null;
+      try {
+        val = rowData.isNullAt(i) ? null : 
this.fieldGetters[i].getFieldOrNull(rowData);
+      } catch (Throwable e) {
+        LOG.error(String.format("position=%s, fieldType=%s,\n data=%s", i, 
types[i].toString(), rowData.toString()));
+      }
       values[i] = val;
     }
     return values;

Reply via email to