This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 50a07cb8af [INLONG-11952][SDK] Transform handles field-level 
exceptions by nullifying only the affected field while preserving the entire 
record (#11953)
50a07cb8af is described below

commit 50a07cb8afc931dba23fef3ed3bc08023720a314
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jul 28 17:04:18 2025 +0800

    [INLONG-11952][SDK] Transform handles field-level exceptions by nullifying 
only the affected field while preserving the entire record (#11953)
---
 .../sdk/transform/utils/FieldToRowDataUtils.java   | 304 ++++++++++++++++++---
 .../temporal/TestFromUnixTimeFunction.java         |  20 ++
 2 files changed, 285 insertions(+), 39 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
index e27c0da21f..ff61d912b3 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.utils;
 
+import org.apache.inlong.sdk.transform.decode.TransformException;
+
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
@@ -25,6 +27,7 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 
 import java.io.Serializable;
@@ -32,13 +35,19 @@ import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class FieldToRowDataUtils {
 
-    private static final long serialVersionUID = 1L;
+    public static final String DAY_FORMAT = "yyyy-MM-dd";
+    public static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    public static final String MILLI_FORMAT = "yyyy-MM-dd HH:mm:ss.S";
 
     /**
      * Base class of Field To RowData Converters.
@@ -48,6 +57,47 @@ public class FieldToRowDataUtils {
         Object convert(Object obj);
     }
 
+    private static final AtomicBoolean isIgnoreError = new AtomicBoolean(true);
+
+    private static final Map<LogicalTypeRoot, FieldToRowDataConverter> 
converterMap = new ConcurrentHashMap<>();
+
+    static {
+        converterMap.put(LogicalTypeRoot.NULL, (obj) -> null);
+        converterMap.put(LogicalTypeRoot.BOOLEAN, (obj) -> parseBoolean(obj));
+        converterMap.put(LogicalTypeRoot.TINYINT, (obj) -> parseTinyint(obj));
+        converterMap.put(LogicalTypeRoot.SMALLINT, (obj) -> 
parseSmallint(obj));
+        converterMap.put(LogicalTypeRoot.INTERVAL_YEAR_MONTH, (obj) -> 
parseInteger(obj));
+        converterMap.put(LogicalTypeRoot.INTEGER, (obj) -> parseInteger(obj));
+        converterMap.put(LogicalTypeRoot.INTERVAL_DAY_TIME, (obj) -> 
parseLong(obj));
+        converterMap.put(LogicalTypeRoot.BIGINT, (obj) -> parseLong(obj));
+        converterMap.put(LogicalTypeRoot.FLOAT, (obj) -> parseFloat(obj));
+        converterMap.put(LogicalTypeRoot.DOUBLE, (obj) -> parseDouble(obj));
+        converterMap.put(LogicalTypeRoot.BINARY, (obj) -> parseBinary(obj));
+        converterMap.put(LogicalTypeRoot.VARBINARY, (obj) -> parseBinary(obj));
+        converterMap.put(LogicalTypeRoot.CHAR, (obj) -> parseVarchar(obj));
+        converterMap.put(LogicalTypeRoot.VARCHAR, (obj) -> parseVarchar(obj));
+        converterMap.put(LogicalTypeRoot.DATE, (obj) -> parseDate(obj));
+        converterMap.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, (obj) -> 
parseTimeWithoutTimeZone(obj));
+        converterMap.put(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, (obj) 
-> parseTimestampWithLocalTimeZone(obj));
+        converterMap.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, (obj) -> 
parseTimestampWithLocalTimeZone(obj));
+        converterMap.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, (obj) -> 
parseTimestampWithLocalTimeZone(obj));
+        converterMap.put(LogicalTypeRoot.DECIMAL, (obj) -> parseDecimal(obj));
+    }
+
+    private static final ThreadLocal<Map<String, SimpleDateFormat>> 
formatLocal = new ThreadLocal<>();
+
+    public static void setIgnoreError(boolean isIgnoreError) {
+        FieldToRowDataUtils.isIgnoreError.set(isIgnoreError);
+    }
+
+    public static boolean isIgnoreError() {
+        return isIgnoreError.get();
+    }
+
+    public static void replaceConverter(LogicalTypeRoot type, 
FieldToRowDataConverter converter) {
+        converterMap.put(type, converter);
+    }
+
     public static FieldToRowDataConverter createConverter(LogicalType 
logicalType) {
         return wrapIntoNullableConverter(createFieldRowConverter(logicalType));
     }
@@ -63,44 +113,12 @@ public class FieldToRowDataUtils {
     }
 
     private static FieldToRowDataConverter createFieldRowConverter(LogicalType 
fieldType) {
-        switch (fieldType.getTypeRoot()) {
-            case NULL:
-                return (obj) -> null;
-            case BOOLEAN:
-                return obj -> Boolean.parseBoolean(obj.toString());
-            case TINYINT:
-                return obj -> Byte.parseByte(obj.toString());
-            case SMALLINT:
-                return obj -> Short.parseShort(obj.toString());
-            case INTERVAL_YEAR_MONTH:
-            case INTEGER:
-                return obj -> Integer.parseInt(obj.toString());
-            case INTERVAL_DAY_TIME:
-            case BIGINT:
-                return obj -> Long.parseLong(obj.toString());
-            case FLOAT:
-                return obj -> Float.parseFloat(obj.toString());
-            case DOUBLE:
-                return obj -> Double.parseDouble(obj.toString());
-            case BINARY:
-            case VARBINARY:
-                return obj -> obj.toString().getBytes();
-            case CHAR:
-            case VARCHAR:
-                return (obj -> StringData.fromString((String) obj));
-            case DATE:
-                return (obj -> ((Date) obj).toLocalDate().toEpochDay());
-            case TIME_WITHOUT_TIME_ZONE:
-                return (obj -> ((Time) obj).toLocalTime().toSecondOfDay() * 
1000);
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_TIME_ZONE:
-                return obj -> TimestampData.fromTimestamp((Timestamp) obj);
-            case DECIMAL:
-                return obj -> DecimalData.fromBigDecimal(
-                        (BigDecimal) obj,
-                        DecimalType.DEFAULT_PRECISION,
-                        DecimalType.DEFAULT_SCALE);
+        LogicalTypeRoot type = fieldType.getTypeRoot();
+        FieldToRowDataConverter converter = converterMap.get(type);
+        if (converter != null) {
+            return converter;
+        }
+        switch (type) {
             case ARRAY:
                 return obj -> {
                     final Object[] array = (Object[]) obj;
@@ -132,4 +150,212 @@ public class FieldToRowDataUtils {
                 throw new UnsupportedOperationException("Unsupported type:" + 
fieldType);
         }
     }
+
+    private static Object parseBoolean(Object obj) {
+        try {
+            return Boolean.parseBoolean(obj.toString());
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseTinyint(Object obj) {
+        try {
+            return Byte.parseByte(obj.toString());
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseSmallint(Object obj) {
+        try {
+            return Short.parseShort(obj.toString());
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseInteger(Object obj) {
+        try {
+            return Integer.parseInt(obj.toString());
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseLong(Object obj) {
+        try {
+            return Long.parseLong(obj.toString());
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseFloat(Object obj) {
+        try {
+            return Float.parseFloat(obj.toString());
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseDouble(Object obj) {
+        try {
+            return Double.parseDouble(obj.toString());
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseBinary(Object obj) {
+        try {
+            return obj.toString().getBytes();
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseVarchar(Object obj) {
+        try {
+            return StringData.fromString((String) obj);
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseDate(Object obj) {
+        try {
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof Date) {
+                return ((Date) obj).toLocalDate().toEpochDay();
+            }
+            String strObj = obj.toString();
+            Date date = parseDateTime(strObj);
+            return date.toLocalDate().toEpochDay();
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Date parseDateTime(String str) {
+        try {
+            Map<String, SimpleDateFormat> formatMap = formatLocal.get();
+            if (formatMap == null) {
+                formatLocal.set(new ConcurrentHashMap<>());
+                formatMap = formatLocal.get();
+            }
+            int length = str.length();
+            if (length == DAY_FORMAT.length()) {
+                SimpleDateFormat format = 
formatMap.computeIfAbsent(DAY_FORMAT, k -> new SimpleDateFormat(DAY_FORMAT));
+                java.util.Date dateTime = format.parse(str);
+                return new Date(dateTime.getTime());
+            } else if (length == SECOND_FORMAT.length()) {
+                SimpleDateFormat format = 
formatMap.computeIfAbsent(SECOND_FORMAT,
+                        k -> new SimpleDateFormat(SECOND_FORMAT));
+                java.util.Date dateTime = format.parse(str);
+                return new Date(dateTime.getTime());
+            } else {
+                SimpleDateFormat format = 
formatMap.computeIfAbsent(MILLI_FORMAT,
+                        k -> new SimpleDateFormat(MILLI_FORMAT));
+                java.util.Date dateTime = format.parse(str);
+                return new Date(dateTime.getTime());
+            }
+        } catch (ParseException e) {
+            throw new TransformException(e.getMessage(), e);
+        }
+    }
+
+    private static Object parseTimeWithoutTimeZone(Object obj) {
+        try {
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof Time) {
+                return ((Time) obj).toLocalTime().toSecondOfDay() * 1000;
+            }
+            String strObj = obj.toString();
+            Date date = parseDateTime(strObj);
+            return new Time(date.getTime()).toLocalTime().toSecondOfDay() * 
1000;
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseTimestampWithLocalTimeZone(Object obj) {
+        try {
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof Timestamp) {
+                return TimestampData.fromTimestamp((Timestamp) obj);
+            }
+            String strObj = obj.toString();
+            Date date = parseDateTime(strObj);
+            return TimestampData.fromTimestamp(new Timestamp(date.getTime()));
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseDecimal(Object obj) {
+        try {
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof BigDecimal) {
+                return DecimalData.fromBigDecimal(
+                        (BigDecimal) obj,
+                        DecimalType.DEFAULT_PRECISION,
+                        DecimalType.DEFAULT_SCALE);
+            }
+            String strObj = obj.toString();
+            return DecimalData.fromBigDecimal(
+                    new BigDecimal(strObj),
+                    DecimalType.DEFAULT_PRECISION,
+                    DecimalType.DEFAULT_SCALE);
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestFromUnixTimeFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestFromUnixTimeFunction.java
index e879424764..9d8c24d7b4 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestFromUnixTimeFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestFromUnixTimeFunction.java
@@ -71,5 +71,25 @@ public class TestFromUnixTimeFunction extends 
AbstractFunctionTemporalTestBase {
         List<String> output4 = processor4.transform("can|apple|cloud|44|1|3", 
new HashMap<>());
         Assert.assertEquals(1, output4.size());
         Assert.assertEquals(output4.get(0), "result=197001010844");
+
+        String transformSql5 = "select 
concat(substr(from_unix_time(numeric1/1000),1,10),' 00:00:00') from source";
+        TransformConfig config5 = new TransformConfig(transformSql5);
+        TransformProcessor<String, String> processor5 = TransformProcessor
+                .create(config5, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case5: from_unix_time(floor(1753353737123/86400000)*86400)
+        List<String> output5 = 
processor5.transform("can|apple|cloud|1753353737123|1|3", new HashMap<>());
+        Assert.assertEquals(1, output5.size());
+        Assert.assertEquals(output5.get(0), "result=2025-07-24 00:00:00");
+
+        String transformSql6 = "select 
concat(substr(from_unix_time(numeric1/1000),1,13),':00:00') from source";
+        TransformConfig config6 = new TransformConfig(transformSql6);
+        TransformProcessor<String, String> processor6 = TransformProcessor
+                .create(config6, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case6: from_unix_time(floor(1753353737123/3600000)*3600)
+        List<String> output6 = 
processor6.transform("can|apple|cloud|1753353737123|1|3", new HashMap<>());
+        Assert.assertEquals(1, output6.size());
+        Assert.assertEquals(output6.get(0), "result=2025-07-24 18:00:00");
     }
 }

Reply via email to