This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 50a07cb8af [INLONG-11952][SDK] Transform handles field-level
exceptions by nullifying only the affected field while preserving the entire
record (#11953)
50a07cb8af is described below
commit 50a07cb8afc931dba23fef3ed3bc08023720a314
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jul 28 17:04:18 2025 +0800
[INLONG-11952][SDK] Transform handles field-level exceptions by nullifying
only the affected field while preserving the entire record (#11953)
---
.../sdk/transform/utils/FieldToRowDataUtils.java | 304 ++++++++++++++++++---
.../temporal/TestFromUnixTimeFunction.java | 20 ++
2 files changed, 285 insertions(+), 39 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
index e27c0da21f..ff61d912b3 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.utils;
+import org.apache.inlong.sdk.transform.decode.TransformException;
+
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
@@ -25,6 +27,7 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import java.io.Serializable;
@@ -32,13 +35,19 @@ import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
public class FieldToRowDataUtils {
- private static final long serialVersionUID = 1L;
+ public static final String DAY_FORMAT = "yyyy-MM-dd";
+ public static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ public static final String MILLI_FORMAT = "yyyy-MM-dd HH:mm:ss.S";
/**
* Base class of Field To RowData Converters.
@@ -48,6 +57,47 @@ public class FieldToRowDataUtils {
Object convert(Object obj);
}
+ private static final AtomicBoolean isIgnoreError = new AtomicBoolean(true);
+
+ private static final Map<LogicalTypeRoot, FieldToRowDataConverter>
converterMap = new ConcurrentHashMap<>();
+
+ static {
+ converterMap.put(LogicalTypeRoot.NULL, (obj) -> null);
+ converterMap.put(LogicalTypeRoot.BOOLEAN, (obj) -> parseBoolean(obj));
+ converterMap.put(LogicalTypeRoot.TINYINT, (obj) -> parseTinyint(obj));
+ converterMap.put(LogicalTypeRoot.SMALLINT, (obj) ->
parseSmallint(obj));
+ converterMap.put(LogicalTypeRoot.INTERVAL_YEAR_MONTH, (obj) ->
parseInteger(obj));
+ converterMap.put(LogicalTypeRoot.INTEGER, (obj) -> parseInteger(obj));
+ converterMap.put(LogicalTypeRoot.INTERVAL_DAY_TIME, (obj) ->
parseLong(obj));
+ converterMap.put(LogicalTypeRoot.BIGINT, (obj) -> parseLong(obj));
+ converterMap.put(LogicalTypeRoot.FLOAT, (obj) -> parseFloat(obj));
+ converterMap.put(LogicalTypeRoot.DOUBLE, (obj) -> parseDouble(obj));
+ converterMap.put(LogicalTypeRoot.BINARY, (obj) -> parseBinary(obj));
+ converterMap.put(LogicalTypeRoot.VARBINARY, (obj) -> parseBinary(obj));
+ converterMap.put(LogicalTypeRoot.CHAR, (obj) -> parseVarchar(obj));
+ converterMap.put(LogicalTypeRoot.VARCHAR, (obj) -> parseVarchar(obj));
+ converterMap.put(LogicalTypeRoot.DATE, (obj) -> parseDate(obj));
+ converterMap.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, (obj) ->
parseTimeWithoutTimeZone(obj));
+ converterMap.put(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, (obj)
-> parseTimestampWithLocalTimeZone(obj));
+ converterMap.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, (obj) ->
parseTimestampWithLocalTimeZone(obj));
+ converterMap.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, (obj) ->
parseTimestampWithLocalTimeZone(obj));
+ converterMap.put(LogicalTypeRoot.DECIMAL, (obj) -> parseDecimal(obj));
+ }
+
+ private static final ThreadLocal<Map<String, SimpleDateFormat>>
formatLocal = new ThreadLocal<>();
+
+ public static void setIgnoreError(boolean isIgnoreError) {
+ FieldToRowDataUtils.isIgnoreError.set(isIgnoreError);
+ }
+
+ public static boolean isIgnoreError() {
+ return isIgnoreError.get();
+ }
+
+ public static void replaceConverter(LogicalTypeRoot type,
FieldToRowDataConverter converter) {
+ converterMap.put(type, converter);
+ }
+
public static FieldToRowDataConverter createConverter(LogicalType
logicalType) {
return wrapIntoNullableConverter(createFieldRowConverter(logicalType));
}
@@ -63,44 +113,12 @@ public class FieldToRowDataUtils {
}
private static FieldToRowDataConverter createFieldRowConverter(LogicalType
fieldType) {
- switch (fieldType.getTypeRoot()) {
- case NULL:
- return (obj) -> null;
- case BOOLEAN:
- return obj -> Boolean.parseBoolean(obj.toString());
- case TINYINT:
- return obj -> Byte.parseByte(obj.toString());
- case SMALLINT:
- return obj -> Short.parseShort(obj.toString());
- case INTERVAL_YEAR_MONTH:
- case INTEGER:
- return obj -> Integer.parseInt(obj.toString());
- case INTERVAL_DAY_TIME:
- case BIGINT:
- return obj -> Long.parseLong(obj.toString());
- case FLOAT:
- return obj -> Float.parseFloat(obj.toString());
- case DOUBLE:
- return obj -> Double.parseDouble(obj.toString());
- case BINARY:
- case VARBINARY:
- return obj -> obj.toString().getBytes();
- case CHAR:
- case VARCHAR:
- return (obj -> StringData.fromString((String) obj));
- case DATE:
- return (obj -> ((Date) obj).toLocalDate().toEpochDay());
- case TIME_WITHOUT_TIME_ZONE:
- return (obj -> ((Time) obj).toLocalTime().toSecondOfDay() *
1000);
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_TIME_ZONE:
- return obj -> TimestampData.fromTimestamp((Timestamp) obj);
- case DECIMAL:
- return obj -> DecimalData.fromBigDecimal(
- (BigDecimal) obj,
- DecimalType.DEFAULT_PRECISION,
- DecimalType.DEFAULT_SCALE);
+ LogicalTypeRoot type = fieldType.getTypeRoot();
+ FieldToRowDataConverter converter = converterMap.get(type);
+ if (converter != null) {
+ return converter;
+ }
+ switch (type) {
case ARRAY:
return obj -> {
final Object[] array = (Object[]) obj;
@@ -132,4 +150,212 @@ public class FieldToRowDataUtils {
throw new UnsupportedOperationException("Unsupported type:" +
fieldType);
}
}
+
+ private static Object parseBoolean(Object obj) {
+ try {
+ return Boolean.parseBoolean(obj.toString());
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseTinyint(Object obj) {
+ try {
+ return Byte.parseByte(obj.toString());
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseSmallint(Object obj) {
+ try {
+ return Short.parseShort(obj.toString());
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseInteger(Object obj) {
+ try {
+ return Integer.parseInt(obj.toString());
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseLong(Object obj) {
+ try {
+ return Long.parseLong(obj.toString());
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseFloat(Object obj) {
+ try {
+ return Float.parseFloat(obj.toString());
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseDouble(Object obj) {
+ try {
+ return Double.parseDouble(obj.toString());
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseBinary(Object obj) {
+ try {
+ return obj.toString().getBytes();
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseVarchar(Object obj) {
+ try {
+ return StringData.fromString((String) obj);
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseDate(Object obj) {
+ try {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof Date) {
+ return ((Date) obj).toLocalDate().toEpochDay();
+ }
+ String strObj = obj.toString();
+ Date date = parseDateTime(strObj);
+ return date.toLocalDate().toEpochDay();
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Date parseDateTime(String str) {
+ try {
+ Map<String, SimpleDateFormat> formatMap = formatLocal.get();
+ if (formatMap == null) {
+ formatLocal.set(new ConcurrentHashMap<>());
+ formatMap = formatLocal.get();
+ }
+ int length = str.length();
+ if (length == DAY_FORMAT.length()) {
+ SimpleDateFormat format =
formatMap.computeIfAbsent(DAY_FORMAT, k -> new SimpleDateFormat(DAY_FORMAT));
+ java.util.Date dateTime = format.parse(str);
+ return new Date(dateTime.getTime());
+ } else if (length == SECOND_FORMAT.length()) {
+ SimpleDateFormat format =
formatMap.computeIfAbsent(SECOND_FORMAT,
+ k -> new SimpleDateFormat(SECOND_FORMAT));
+ java.util.Date dateTime = format.parse(str);
+ return new Date(dateTime.getTime());
+ } else {
+ SimpleDateFormat format =
formatMap.computeIfAbsent(MILLI_FORMAT,
+ k -> new SimpleDateFormat(MILLI_FORMAT));
+ java.util.Date dateTime = format.parse(str);
+ return new Date(dateTime.getTime());
+ }
+ } catch (ParseException e) {
+ throw new TransformException(e.getMessage(), e);
+ }
+ }
+
+ private static Object parseTimeWithoutTimeZone(Object obj) {
+ try {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof Time) {
+ return ((Time) obj).toLocalTime().toSecondOfDay() * 1000;
+ }
+ String strObj = obj.toString();
+ Date date = parseDateTime(strObj);
+ return new Time(date.getTime()).toLocalTime().toSecondOfDay() *
1000;
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseTimestampWithLocalTimeZone(Object obj) {
+ try {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof Timestamp) {
+ return TimestampData.fromTimestamp((Timestamp) obj);
+ }
+ String strObj = obj.toString();
+ Date date = parseDateTime(strObj);
+ return TimestampData.fromTimestamp(new Timestamp(date.getTime()));
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseDecimal(Object obj) {
+ try {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof BigDecimal) {
+ return DecimalData.fromBigDecimal(
+ (BigDecimal) obj,
+ DecimalType.DEFAULT_PRECISION,
+ DecimalType.DEFAULT_SCALE);
+ }
+ String strObj = obj.toString();
+ return DecimalData.fromBigDecimal(
+ new BigDecimal(strObj),
+ DecimalType.DEFAULT_PRECISION,
+ DecimalType.DEFAULT_SCALE);
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestFromUnixTimeFunction.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestFromUnixTimeFunction.java
index e879424764..9d8c24d7b4 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestFromUnixTimeFunction.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestFromUnixTimeFunction.java
@@ -71,5 +71,25 @@ public class TestFromUnixTimeFunction extends
AbstractFunctionTemporalTestBase {
List<String> output4 = processor4.transform("can|apple|cloud|44|1|3",
new HashMap<>());
Assert.assertEquals(1, output4.size());
Assert.assertEquals(output4.get(0), "result=197001010844");
+
+ String transformSql5 = "select
concat(substr(from_unix_time(numeric1/1000),1,10),' 00:00:00') from source";
+ TransformConfig config5 = new TransformConfig(transformSql5);
+ TransformProcessor<String, String> processor5 = TransformProcessor
+ .create(config5,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case5: from_unix_time(floor(1753353737123/86400000)*86400)
+ List<String> output5 =
processor5.transform("can|apple|cloud|1753353737123|1|3", new HashMap<>());
+ Assert.assertEquals(1, output5.size());
+ Assert.assertEquals(output5.get(0), "result=2025-07-24 00:00:00");
+
+ String transformSql6 = "select
concat(substr(from_unix_time(numeric1/1000),1,13),':00:00') from source";
+ TransformConfig config6 = new TransformConfig(transformSql6);
+ TransformProcessor<String, String> processor6 = TransformProcessor
+ .create(config6,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case6: from_unix_time(floor(1753353737123/3600000)*3600)
+ List<String> output6 =
processor6.transform("can|apple|cloud|1753353737123|1|3", new HashMap<>());
+ Assert.assertEquals(1, output6.size());
+ Assert.assertEquals(output6.get(0), "result=2025-07-24 18:00:00");
}
}