This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7f756b1887 [Feature][SQL function] Enhance SQL Transform COALESCE 
functions to support type cast (#9299)
7f756b1887 is described below

commit 7f756b18873ba2bc553abc1cd917a681f0e4bd6b
Author: ocean-zhc <[email protected]>
AuthorDate: Wed Aug 20 15:18:43 2025 +0800

    [Feature][SQL function] Enhance SQL Transform COALESCE functions to support 
type cast (#9299)
---
 docs/en/transform-v2/sql-functions.md              |  14 +-
 docs/zh/transform-v2/sql-functions.md              |  14 +-
 .../transform/sql/zeta/ZetaSQLFunction.java        |  14 +-
 .../seatunnel/transform/sql/zeta/ZetaSQLType.java  |  35 +++-
 .../sql/zeta/functions/SystemFunction.java         |  49 +++--
 .../seatunnel/transform/sql/SQLTransformTest.java  | 209 +++++++++++++++++++++
 6 files changed, 308 insertions(+), 27 deletions(-)

diff --git a/docs/en/transform-v2/sql-functions.md 
b/docs/en/transform-v2/sql-functions.md
index 0e659688c8..cde05da61d 100644
--- a/docs/en/transform-v2/sql-functions.md
+++ b/docs/en/transform-v2/sql-functions.md
@@ -1001,17 +1001,25 @@ TRY_CAST(NAME AS INT)
 
 ```COALESCE(aValue, bValue [,...])```
 
-Returns the first value that is not null.
+Returns the first value that is not null. If subsequent arguments have 
different data types from the first argument, they will be automatically 
converted to the type of the first argument.
 
 Example:
 
 COALESCE(A, B, C)
 
+Example with type conversion:
+
+```
+-- If A is a string field and B is an integer field
+-- B will be converted to string when A is null
+SELECT COALESCE(A, B) as result FROM my_table
+```
+
 ### IFNULL
 
 ```IFNULL(aValue, bValue)```
 
-Returns the first value that is not null.
+Returns the first value that is not null. If subsequent arguments have 
different data types from the first argument, they will be automatically 
converted to the type of the first argument.
 
 Example:
 
@@ -1105,7 +1113,7 @@ select UUID() as seatunnel_uuid
 ### ARRAY
 
 ```ARRAY<T> array(T, ...)```
-Create an array consisting of variadic elements and return it. Here, T can be 
either “column” or “literal”.
+Create an array consisting of variadic elements and return it. Here, T can be 
either "column" or "literal".
 
 Example:
 
diff --git a/docs/zh/transform-v2/sql-functions.md 
b/docs/zh/transform-v2/sql-functions.md
index add6809b5a..6af5761939 100644
--- a/docs/zh/transform-v2/sql-functions.md
+++ b/docs/zh/transform-v2/sql-functions.md
@@ -996,17 +996,25 @@ TRY_CAST(NAME AS INT)
 
 ```COALESCE(aValue, bValue [,...])```
 
-返回第一个非空值。
+返回第一个非空值。如果后续参数与第一个参数的数据类型不同,则会自动转换为第一个参数的类型。
 
 示例:
 
 COALESCE(A, B, C)
 
+类型转换示例:
+
+```
+-- 如果A是字符串类型而B是整数类型
+-- 当A为空时,B会被转换为字符串类型
+SELECT COALESCE(A, B) as result FROM my_table
+```
+
 ### IFNULL
 
 ```IFNULL(aValue, bValue)```
 
-返回第一个非空值。
+返回第一个非空值。如果后续参数与第一个参数的数据类型不同,则会自动转换为第一个参数的类型。
 
 示例:
 
@@ -1096,7 +1104,7 @@ select UUID() as seatunnel_uuid
 ### ARRAY
 
 ```ARRAY<T> array(T, ...)```
-创建一个由可变参数元素组成的数组并返回它。这里,T 可以是“列”或“常量”。。
+创建一个由可变参数元素组成的数组并返回它。这里,T 可以是"列"或"常量"。。
 
 示例:
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index af01127cd7..fdf37b439a 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -235,7 +235,7 @@ public class ZetaSQLFunction {
                     functionArgs.add(((StringValue) 
function.getFromExpression()).getValue());
                 }
             }
-            return executeFunctionExpr(TRIM, functionArgs);
+            return executeFunctionExpr(TRIM, functionArgs, expression);
         }
         if (expression instanceof SignedExpression) {
             SignedExpression signedExpression = (SignedExpression) expression;
@@ -345,7 +345,7 @@ public class ZetaSQLFunction {
                     functionArgs.add(computeForValue(funcArgExpression, 
inputFields));
                 }
             }
-            return executeFunctionExpr(functionName, functionArgs);
+            return executeFunctionExpr(functionName, functionArgs, expression);
         }
         if (expression instanceof TimeKeyExpression) {
             return executeTimeKeyExpr(((TimeKeyExpression) 
expression).getStringValue());
@@ -355,7 +355,7 @@ public class ZetaSQLFunction {
             List<Object> functionArgs = new ArrayList<>();
             functionArgs.add(computeForValue(extract.getExpression(), 
inputFields));
             functionArgs.add(extract.getName());
-            return executeFunctionExpr(ZetaSQLFunction.EXTRACT, functionArgs);
+            return executeFunctionExpr(ZetaSQLFunction.EXTRACT, functionArgs, 
expression);
         }
         if (expression instanceof Parenthesis) {
             Parenthesis parenthesis = (Parenthesis) expression;
@@ -405,7 +405,9 @@ public class ZetaSQLFunction {
         return elseExpression == null ? null : computeForValue(elseExpression, 
inputFields);
     }
 
-    public Object executeFunctionExpr(String functionName, List<Object> args) {
+    public Object executeFunctionExpr(
+            String functionName, List<Object> args, Expression expression) {
+        SeaTunnelDataType<?> targetType = 
zetaSQLType.getExpressionType(expression);
         switch (functionName.toUpperCase()) {
             case ASCII:
                 return StringFunction.ascii(args);
@@ -580,9 +582,9 @@ public class ZetaSQLFunction {
             case YEAR:
                 return DateTimeFunction.year(args);
             case COALESCE:
-                return SystemFunction.coalesce(args);
+                return SystemFunction.coalesce(args, targetType);
             case IFNULL:
-                return SystemFunction.ifnull(args);
+                return SystemFunction.ifnull(args, targetType);
             case NULLIF:
                 return SystemFunction.nullif(args);
             case ARRAY:
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 62fdd97dc7..a73aa83c55 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -436,10 +436,20 @@ public class ZetaSQLType {
             case ZetaSQLFunction.TIMESTAMPADD:
             case ZetaSQLFunction.ROUND:
             case ZetaSQLFunction.NULLIF:
-            case ZetaSQLFunction.COALESCE:
-            case ZetaSQLFunction.IFNULL:
-                // Result has the same type as first argument
                 return 
getExpressionType(function.getParameters().getExpressions().get(0));
+            case ZetaSQLFunction.IFNULL:
+            case ZetaSQLFunction.COALESCE:
+                List<Expression> expressions = getExpressions(function);
+
+                for (Expression expr : expressions) {
+                    SeaTunnelDataType<?> exprType = getExpressionType(expr);
+                    if (!(expr instanceof NullValue) && 
!BasicType.VOID_TYPE.equals(exprType)) {
+                        return exprType;
+                    }
+                }
+
+                // If all parameters are null, return the type of the first 
parameter
+                return getExpressionType(expressions.get(0));
             case ZetaSQLFunction.MULTI_IF:
                 ExpressionList multiIfExpressionList = 
function.getParameters();
                 if (multiIfExpressionList == null) {
@@ -478,7 +488,7 @@ public class ZetaSQLType {
                         List<SeaTunnelDataType<?>> argsType = new 
ArrayList<>();
                         ExpressionList expressionList = 
function.getParameters();
                         if (expressionList != null) {
-                            List<Expression> expressions = 
expressionList.getExpressions();
+                            expressions = expressionList.getExpressions();
                             if (expressions != null) {
                                 for (Expression expression : expressions) {
                                     
argsType.add(getExpressionType(expression));
@@ -494,6 +504,23 @@ public class ZetaSQLType {
         }
     }
 
+    private static List<Expression> getExpressions(Function function) {
+        ExpressionList parameters = function.getParameters();
+        if (parameters == null) {
+            throw new TransformException(
+                    CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                    function.getName() + " function requires at least one 
parameter");
+        }
+
+        List<Expression> expressions = parameters.getExpressions();
+        if (expressions == null || expressions.isEmpty()) {
+            throw new TransformException(
+                    CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                    function.getName() + " function requires at least one 
parameter");
+        }
+        return expressions;
+    }
+
     private SeaTunnelDataType<?> getTimeKeyExprType(TimeKeyExpression 
timeKeyExpression) {
         switch (timeKeyExpression.getStringValue().toUpperCase()) {
             case ZetaSQLFunction.CURRENT_DATE:
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
index 5a4b54bc0e..e7ea8eb16e 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
@@ -37,24 +37,35 @@ import java.util.Arrays;
 import java.util.List;
 
 public class SystemFunction {
-    public static Object coalesce(List<Object> args) {
-        Object v = null;
-        for (Object v2 : args) {
-            if (v2 != null) {
-                v = v2;
-                break;
+    /**
+     * Enhanced version of coalesce function that takes a target type 
parameter. This ensures that
+     * the result is always converted to the expected type regardless of which 
argument is non-null.
+     *
+     * @param args Function arguments
+     * @param targetType The target type that the result should be converted to
+     * @return The first non-null value converted to the target type
+     */
+    public static Object coalesce(List<Object> args, SeaTunnelDataType<?> 
targetType) {
+        Object result = coalesce(args);
+        return castAs(result, targetType);
+    }
+
+    private static Object coalesce(List<Object> args) {
+        for (Object arg : args) {
+            if (arg != null) {
+                return arg;
             }
         }
-        return v;
+        return null;
     }
 
-    public static Object ifnull(List<Object> args) {
+    public static Object ifnull(List<Object> args, SeaTunnelDataType<?> 
targetType) {
         if (args.size() != 2) {
             throw new TransformException(
                     CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                     String.format("Unsupported function IFNULL() arguments: 
%s", args));
         }
-        return coalesce(args);
+        return coalesce(args, targetType);
     }
 
     public static Object nullif(List<Object> args) {
@@ -109,10 +120,26 @@ public class SystemFunction {
                 return Short.parseShort(v1.toString());
             case "INT":
             case "INTEGER":
-                return Integer.parseInt(v1.toString());
+                if (v1 instanceof String) {
+                    return Integer.parseInt(v1.toString());
+                } else if (v1 instanceof Number) {
+                    return ((Number) v1).intValue();
+                } else {
+                    throw new TransformException(
+                            CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                            String.format("Unsupported CAST %s to INTEGER", 
v1));
+                }
             case "BIGINT":
             case "LONG":
-                return Long.parseLong(v1.toString());
+                if (v1 instanceof String) {
+                    return Long.parseLong(v1.toString());
+                } else if (v1 instanceof Number) {
+                    return ((Number) v1).longValue();
+                } else {
+                    throw new TransformException(
+                            CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                            String.format("Unsupported CAST %s to LONG", v1));
+                }
             case "BYTE":
                 return Byte.parseByte(v1.toString());
             case "BYTES":
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
index d2b214ec79..817f945dbf 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
@@ -500,6 +500,215 @@ public class SQLTransformTest {
     }
 
     @Test
+    public void testCoalesceTypeConversion() {
+        String tableName = "test";
+        String[] fields = new String[] {"id", "stringField", "intField", 
"doubleField"};
+        CatalogTable table =
+                CatalogTableUtil.getCatalogTable(
+                        tableName,
+                        new SeaTunnelRowType(
+                                fields,
+                                new SeaTunnelDataType[] {
+                                    BasicType.INT_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.INT_TYPE,
+                                    BasicType.DOUBLE_TYPE
+                                }));
+
+        // The first parameter to test COALESCE is the string type, followed 
by the integer type
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, COALESCE(stringField, intField) as 
result from dual"));
+        SQLTransform sqlTransform = new SQLTransform(config, table);
+        TableSchema tableSchema = sqlTransform.transformTableSchema();
+
+        // Verify that the field type is STRING
+        Assertions.assertEquals("result", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+
+        // The first field is not null, and the value of the first field 
should be directly returned
+        List<SeaTunnelRow> result =
+                sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
"test", 123, 123.45}));
+        Assertions.assertEquals("test", result.get(0).getField(1));
+
+        // The first field is null, and the value converted to the string 
should be returned.
+        result = sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
null, 123, 123.45}));
+        Assertions.assertEquals("123", result.get(0).getField(1));
+        // Make sure the return value is a string type rather than an integer 
type
+        Assertions.assertTrue(
+                result.get(0).getField(1) instanceof String,
+                "The result should be a string type, but is actually "
+                        + result.get(0).getField(1).getClass().getName());
+
+        // The first parameter to test COALESCE is the integer type, followed 
by the floating point
+        // type
+        config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, COALESCE(intField, doubleField) as 
result from dual"));
+        sqlTransform = new SQLTransform(config, table);
+        tableSchema = sqlTransform.transformTableSchema();
+
+        // Verify that the field type is INT
+        Assertions.assertEquals("result", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(BasicType.INT_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+
+        // The first field is not null, and the value of the first field 
should be directly
+        // returned
+        result = sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
"test", 123, 123.45}));
+        Assertions.assertEquals(123, result.get(0).getField(1));
+        Assertions.assertTrue(
+                result.get(0).getField(1) instanceof Integer,
+                "The result should be an integer type, but is actually "
+                        + result.get(0).getField(1).getClass().getName());
+
+        // The first field is null, and the value converted to an integer 
should be returned.
+        result =
+                sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
"test", null, 456.78}));
+        Assertions.assertEquals(456, result.get(0).getField(1));
+        Assertions.assertTrue(
+                result.get(0).getField(1) instanceof Integer,
+                "The result should be an integer type, but is actually "
+                        + result.get(0).getField(1).getClass().getName());
+
+        // Test COALESCE with null as first argument
+        config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, COALESCE(null, stringField, 
intField) as result from dual"));
+        sqlTransform = new SQLTransform(config, table);
+        tableSchema = sqlTransform.transformTableSchema();
+
+        // Verify that the result field type is STRING (since stringField is 
the first non-null
+        // parameter)
+        Assertions.assertEquals("result", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+
+        // Test with both stringField and intField having values
+        result = sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
"test", 123, 123.45}));
+        Assertions.assertEquals("test", result.get(0).getField(1));
+        Assertions.assertTrue(
+                result.get(0).getField(1) instanceof String,
+                "The result should be a string type, but is actually "
+                        + result.get(0).getField(1).getClass().getName());
+
+        // Test with stringField being null, should return intField as string
+        result = sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
null, 123, 123.45}));
+        Assertions.assertEquals("123", result.get(0).getField(1));
+        Assertions.assertTrue(
+                result.get(0).getField(1) instanceof String,
+                "The result should be a string type, but is actually "
+                        + result.get(0).getField(1).getClass().getName());
+    }
+
+    @Test
+    public void testIfNullTypeConversion() {
+        String tableName = "test";
+        String[] fields = new String[] {"id", "stringField", "intField", 
"doubleField"};
+        CatalogTable table =
+                CatalogTableUtil.getCatalogTable(
+                        tableName,
+                        new SeaTunnelRowType(
+                                fields,
+                                new SeaTunnelDataType[] {
+                                    BasicType.INT_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.INT_TYPE,
+                                    BasicType.DOUBLE_TYPE
+                                }));
+
+        // Test IFNULL with string field as first parameter and integer as 
second
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, IFNULL(stringField, intField) as 
result from dual"));
+        SQLTransform sqlTransform = new SQLTransform(config, table);
+        TableSchema tableSchema = sqlTransform.transformTableSchema();
+
+        // Verify that the field type is STRING
+        Assertions.assertEquals("result", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+
+        // The first field is not null, and the value of the first field 
should be directly returned
+        List<SeaTunnelRow> result =
+                sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
"test", 123, 123.45}));
+        Assertions.assertEquals("test", result.get(0).getField(1));
+
+        // The first field is null, and the value converted to the string 
should be returned.
+        result = sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
null, 123, 123.45}));
+        Assertions.assertEquals("123", result.get(0).getField(1));
+        // Make sure the return value is a string type rather than an integer 
type
+        Assertions.assertTrue(
+                result.get(0).getField(1) instanceof String,
+                "The result should be a string type, but is actually "
+                        + result.get(0).getField(1).getClass().getName());
+
+        // Test IFNULL with integer field as first parameter and double as 
second
+        config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, IFNULL(intField, doubleField) as 
result from dual"));
+        sqlTransform = new SQLTransform(config, table);
+        tableSchema = sqlTransform.transformTableSchema();
+
+        // Verify that the field type is INT
+        Assertions.assertEquals("result", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(BasicType.INT_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+
+        // The first field is not null, and the value of the first field 
should be directly
+        // returned
+        result = sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
"test", 123, 123.45}));
+        Assertions.assertEquals(123, result.get(0).getField(1));
+        Assertions.assertTrue(
+                result.get(0).getField(1) instanceof Integer,
+                "The result should be an integer type, but is actually "
+                        + result.get(0).getField(1).getClass().getName());
+
+        // The first field is null, and the value converted to an integer 
should be returned.
+        result =
+                sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
"test", null, 456.78}));
+        Assertions.assertEquals(456, result.get(0).getField(1));
+        Assertions.assertTrue(
+                result.get(0).getField(1) instanceof Integer,
+                "The result should be an integer type, but is actually "
+                        + result.get(0).getField(1).getClass().getName());
+
+        // Test IFNULL with null literal as first argument
+        config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, IFNULL(null, stringField) as 
result from dual"));
+        sqlTransform = new SQLTransform(config, table);
+        tableSchema = sqlTransform.transformTableSchema();
+
+        // Verify that the result field type is STRING
+        Assertions.assertEquals("result", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+
+        // Test with stringField having a value
+        result = sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
"test", 123, 123.45}));
+        Assertions.assertEquals("test", result.get(0).getField(1));
+        Assertions.assertTrue(
+                result.get(0).getField(1) instanceof String,
+                "The result should be a string type, but is actually "
+                        + result.get(0).getField(1).getClass().getName());
+
+        // Test with stringField being null, should return null
+        result = sqlTransform.transformRow(new SeaTunnelRow(new Object[] {1, 
null, 123, 123.45}));
+        Assertions.assertNull(result.get(0).getField(1));
+    }
+
     public void testCastTimestampValidate() {
         String querySql = "select CAST(`id` AS TIMESTAMP) AS idStr, name AS 
name from dual";
         SQLTransform sqlTransform =

Reply via email to