This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fd7e2a16b0e [FLINK-38682][table-planner] Support unknown -> RAW cast 
during type inference in limited validator scope
fd7e2a16b0e is described below

commit fd7e2a16b0edcc40a7327130de78d45e87a6e669
Author: Ferenc Csaky <[email protected]>
AuthorDate: Mon Nov 17 12:40:52 2025 +0100

    [FLINK-38682][table-planner] Support unknown -> RAW cast during type 
inference in limited validator scope
---
 .../inference/TypeInferenceOperandChecker.java     | 32 +++++++++--
 .../planner/runtime/batch/sql/FunctionITCase.java  | 64 +++++++++++++++++++++-
 2 files changed, 91 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
index 467e2178ccc..e20c2c07548 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
@@ -19,10 +19,12 @@
 package org.apache.flink.table.planner.functions.inference;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.sql.parser.type.SqlRawTypeNameSpec;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.RawRelDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentCount;
 import org.apache.flink.table.types.inference.CallContext;
@@ -32,16 +34,20 @@ import 
org.apache.flink.table.types.inference.StaticArgumentTrait;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RawType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperandCountRange;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlTypeNameSpec;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlOperandMetadata;
@@ -240,10 +246,28 @@ public final class TypeInferenceOperandChecker
 
     /** Adopted from {@link 
org.apache.calcite.sql.validate.implicit.AbstractTypeCoercion}. */
     private SqlNode castTo(SqlNode node, RelDataType type) {
-        return SqlStdOperatorTable.CAST.createCall(
-                SqlParserPos.ZERO,
-                node,
-                
SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable()));
+        final SqlDataTypeSpec dataType;
+        if (type instanceof RawRelDataType) {
+            dataType = createRawDataTypeSpec((RawRelDataType) type);
+        } else {
+            dataType = 
SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable());
+        }
+
+        return SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node, 
dataType);
+    }
+
+    private SqlDataTypeSpec createRawDataTypeSpec(RawRelDataType type) {
+        final RawType<?> rawType = type.getRawType();
+
+        SqlNode className =
+                SqlLiteral.createCharString(
+                        rawType.getOriginatingClass().getName(), 
SqlParserPos.ZERO);
+        SqlNode serializer =
+                SqlLiteral.createCharString(rawType.getSerializerString(), 
SqlParserPos.ZERO);
+
+        SqlTypeNameSpec rawSpec = new SqlRawTypeNameSpec(className, 
serializer, SqlParserPos.ZERO);
+
+        return new SqlDataTypeSpec(rawSpec, null, type.isNullable(), 
SqlParserPos.ZERO);
     }
 
     /** Adopted from {@link 
org.apache.calcite.sql.validate.implicit.AbstractTypeCoercion}. */
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
index e07ec1372da..a85cf50c50c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql;
 
+import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.functions.ScalarFunction;
 import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
 import org.apache.flink.types.Row;
@@ -99,6 +101,49 @@ class FunctionITCase extends BatchTestBase {
         testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
     }
 
+    @Test
+    void testOrderByScopeRawTypeCast() throws Exception {
+        final List<Row> sourceData = List.of(Row.of(1), Row.of(2), Row.of(3), 
Row.of(4), Row.of(5));
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        tEnv().executeSql("CREATE TABLE Source(i INT) WITH ('connector' = 
'COLLECTION')");
+        tEnv().executeSql("CREATE TABLE Sink(i INT) WITH ('connector' = 
'COLLECTION')");
+
+        tEnv().createTemporarySystemFunction("CustomIntUdf", new 
CustomIntUdf());
+
+        tEnv().executeSql(
+                        "INSERT INTO Sink"
+                                + " SELECT i FROM Source"
+                                + " ORDER BY CustomIntUdf(NULL)")
+                .await();
+
+        assertThat(TestCollectionTableFactory.getResult()).hasSize(5);
+    }
+
+    @Test
+    void testHavingScopeRawTypeCast() throws Exception {
+        final List<Row> sourceData = List.of(Row.of(1), Row.of(2), Row.of(3), 
Row.of(4), Row.of(5));
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        tEnv().executeSql("CREATE TABLE Source(i INT) WITH ('connector' = 
'COLLECTION')");
+        tEnv().executeSql("CREATE TABLE Sink(i INT) WITH ('connector' = 
'COLLECTION')");
+
+        tEnv().createTemporarySystemFunction("CustomIntUdf", new 
CustomIntUdf());
+
+        tEnv().executeSql(
+                        "INSERT INTO Sink"
+                                + " SELECT SUM(i) AS s FROM Source"
+                                + " HAVING CustomIntUdf(NULL) = 0")
+                .await();
+
+        assertThat(TestCollectionTableFactory.getResult())
+                .singleElement()
+                .asString()
+                .contains("15");
+    }
+
     private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, 
String dropFunctionDDL)
             throws Exception {
         List<Row> sourceData =
@@ -123,7 +168,7 @@ class FunctionITCase extends BatchTestBase {
         Table t2 = tEnv().sqlQuery(query);
         t2.executeInsert("t2").await();
 
-        List<Row> result = TestCollectionTableFactory.RESULT();
+        List<Row> result = TestCollectionTableFactory.getResult();
         List<Row> expected =
                 Arrays.asList(
                         Row.of(1, "jark"),
@@ -139,4 +184,21 @@ class FunctionITCase extends BatchTestBase {
         // delete the function
         tEnv().executeSql(dropFunctionDDL);
     }
+
+    // ----- Test types / UDF -----
+
+    @DataTypeHint(value = "RAW", bridgedTo = CustomInt.class)
+    public static class CustomInt {
+        public Integer value;
+
+        public CustomInt(Integer v) {
+            this.value = v;
+        }
+    }
+
+    public static class CustomIntUdf extends ScalarFunction {
+        public Integer eval(CustomInt v) {
+            return 0;
+        }
+    }
 }

Reply via email to