This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4e66857849d1ac793ad77d211d602295d08f827f Author: XuQianJin-Stars <forwar...@apache.com> AuthorDate: Mon Aug 15 15:06:03 2022 +0800 fix RowDataProjection with project and projectAsValues's NPE --- .../org/apache/hudi/util/RowDataProjection.java | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java index 8076d982b9..51df29faae 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java @@ -24,6 +24,8 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.Serializable; import java.util.Arrays; @@ -33,14 +35,19 @@ import java.util.List; * Utilities to project the row data with given positions. */ public class RowDataProjection implements Serializable { + private static final Logger LOG = LogManager.getLogger(RowDataProjection.class); + private static final long serialVersionUID = 1L; private final RowData.FieldGetter[] fieldGetters; + private final LogicalType[] types; + private RowDataProjection(LogicalType[] types, int[] positions) { ValidationUtils.checkArgument(types.length == positions.length, "types and positions should have the equal number"); this.fieldGetters = new RowData.FieldGetter[types.length]; + this.types = types; for (int i = 0; i < types.length; i++) { final LogicalType type = types[i]; final int pos = positions[i]; @@ -69,7 +76,12 @@ public class RowDataProjection implements Serializable { public RowData project(RowData rowData) { GenericRowData genericRowData = new GenericRowData(this.fieldGetters.length); for (int i = 0; i < this.fieldGetters.length; i++) { - final Object val = this.fieldGetters[i].getFieldOrNull(rowData); + Object val = null; + try { + val = rowData.isNullAt(i) ? null : this.fieldGetters[i].getFieldOrNull(rowData); + } catch (Throwable e) { + LOG.error(String.format("position=%s, fieldType=%s,\n data=%s", i, types[i].toString(), rowData.toString())); + } genericRowData.setField(i, val); } return genericRowData; @@ -81,7 +93,12 @@ public class RowDataProjection implements Serializable { public Object[] projectAsValues(RowData rowData) { Object[] values = new Object[this.fieldGetters.length]; for (int i = 0; i < this.fieldGetters.length; i++) { - final Object val = this.fieldGetters[i].getFieldOrNull(rowData); + Object val = null; + try { + val = rowData.isNullAt(i) ? null : this.fieldGetters[i].getFieldOrNull(rowData); + } catch (Throwable e) { + LOG.error(String.format("position=%s, fieldType=%s,\n data=%s", i, types[i].toString(), rowData.toString())); + } values[i] = val; } return values;