This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new de8ee32  [FLINK-24813][table-planner-blink] Improve 
ImplicitTypeConversionITCase
de8ee32 is described below

commit de8ee32d51a5a2b4ef9cfc814be37217aaad6d7b
Author: xuyang <xyzhong...@163.com>
AuthorDate: Fri Nov 12 14:54:57 2021 +0800

    [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase
    
    This closes #18049
---
 .../planner/codegen/calls/ScalarOperatorGens.scala |   4 +-
 .../ImplicitConversionEqualsFunctionITCase.java    | 239 +++++++++
 .../stream/sql/ImplicitTypeConversionITCase.java   | 552 ---------------------
 .../planner/expressions/SqlExpressionTest.scala    | 158 ------
 4 files changed, 241 insertions(+), 712 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index a45843d..c0fe098 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.data.binary.BinaryArrayData
 import org.apache.flink.table.data.util.{DataFormatConverters, MapDataUtil}
 import org.apache.flink.table.data.writer.{BinaryArrayWriter, BinaryRowWriter}
-import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAccess, 
binaryRowSetNull, binaryWriterWriteField, binaryWriterWriteNull, _}
+import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.codegen.GenerateUtils._
 import 
org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL, 
NEVER_NULL, NO_CODE}
 import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, 
CodeGeneratorContext, GeneratedExpression}
@@ -426,7 +426,7 @@ object ScalarOperatorGens {
     // but flink has not yet supported now
     if ((isNumeric(left.resultType) && isCharacterString(right.resultType))
       || (isNumeric(right.resultType) && isCharacterString(left.resultType))) {
-      throw new CodeGenException(
+      throw new ValidationException(
         "implicit type conversion between " +
           s"${left.resultType.getTypeRoot}" +
           s" and " +
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/ImplicitConversionEqualsFunctionITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/ImplicitConversionEqualsFunctionITCase.java
new file mode 100644
index 0000000..bbde80d
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/ImplicitConversionEqualsFunctionITCase.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import org.junit.runners.Parameterized;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+
+/**
+ * Tests for type conversions in '='. These tests are only for SQL API. We 
temporarily forbid "="
+ * between numeric and (var)char fields and throw an exception because it will 
produce wrong result.
+ * SEE [FLINK-24914].
+ */
+public class ImplicitConversionEqualsFunctionITCase extends 
BuiltInFunctionTestBase {
+
+    // numeric data
+    private static final byte TINY_INT_DATA = (byte) 1;
+    private static final short SMALL_INT_DATA = (short) 1;
+    private static final int INT_DATA = 1;
+    private static final long BIG_INT_DATA = 1L;
+    private static final float FLOAT_DATA = 1.0f;
+    private static final double DOUBLE_DATA = 1.0d;
+    private static final BigDecimal DECIMAL_DATA = new BigDecimal(1);
+
+    // time data
+    private static final String DATE_DATA = "2001-01-01";
+    private static final String TIME_DATA = "00:00:00";
+    private static final String TIMESTAMP_DATA = ("2001-01-01 00:00:00");
+
+    // string data
+    private static final String STRING_DATA_EQUALS_NUMERIC = "1";
+    private static final String STRING_DATA_EQUALS_DATE = "2001-01-01";
+    private static final String STRING_DATA_EQUALS_TIME = "00:00:00";
+    private static final String STRING_DATA_EQUALS_TIMESTAMP = "2001-01-01 
00:00:00";
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static List<TestSpec> testData() {
+        final List<TestSpec> specs = new ArrayList<>();
+        specs.addAll(implicitConversionBetweenNumeric());
+        specs.addAll(implicitConversionBetweenTimeAndString());
+        specs.addAll(unsupportedImplicitConversionBetweenNumericAndString());
+        return specs;
+    }
+
+    private static List<TestSpec> implicitConversionBetweenNumeric() {
+        return Arrays.asList(
+                TypeConversionTestBuilder.left(TINYINT(), TINY_INT_DATA)
+                        .right(TINYINT(), TINY_INT_DATA)
+                        .right(SMALLINT(), SMALL_INT_DATA)
+                        .right(INT(), INT_DATA)
+                        .right(BIGINT(), BIG_INT_DATA)
+                        .right(FLOAT(), FLOAT_DATA)
+                        .right(DOUBLE(), DOUBLE_DATA)
+                        .right(DECIMAL(1, 0), DECIMAL_DATA)
+                        .build(),
+                TypeConversionTestBuilder.left(SMALLINT(), SMALL_INT_DATA)
+                        .right(SMALLINT(), SMALL_INT_DATA)
+                        .right(INT(), INT_DATA)
+                        .right(BIGINT(), BIG_INT_DATA)
+                        .right(FLOAT(), FLOAT_DATA)
+                        .right(DOUBLE(), DOUBLE_DATA)
+                        .right(DECIMAL(1, 0), DECIMAL_DATA)
+                        .build(),
+                TypeConversionTestBuilder.left(INT(), INT_DATA)
+                        .right(INT(), INT_DATA)
+                        .right(BIGINT(), BIG_INT_DATA)
+                        .right(FLOAT(), FLOAT_DATA)
+                        .right(DOUBLE(), DOUBLE_DATA)
+                        .right(DECIMAL(1, 0), DECIMAL_DATA)
+                        .build(),
+                TypeConversionTestBuilder.left(BIGINT(), BIG_INT_DATA)
+                        .right(BIGINT(), BIG_INT_DATA)
+                        .right(FLOAT(), FLOAT_DATA)
+                        .right(DOUBLE(), DOUBLE_DATA)
+                        .right(DECIMAL(1, 0), DECIMAL_DATA)
+                        .build(),
+                TypeConversionTestBuilder.left(FLOAT(), FLOAT_DATA)
+                        .right(FLOAT(), FLOAT_DATA)
+                        .right(DOUBLE(), DOUBLE_DATA)
+                        .right(DECIMAL(1, 0), DECIMAL_DATA)
+                        .build(),
+                TypeConversionTestBuilder.left(DOUBLE(), DOUBLE_DATA)
+                        .right(DOUBLE(), DOUBLE_DATA)
+                        .right(DECIMAL(1, 0), DECIMAL_DATA)
+                        .build());
+    }
+
+    private static List<TestSpec> implicitConversionBetweenTimeAndString() {
+        return Arrays.asList(
+                TypeConversionTestBuilder.left(DATE(), DATE_DATA)
+                        .right(DATE(), DATE_DATA)
+                        .right(STRING(), STRING_DATA_EQUALS_DATE)
+                        .build(),
+                TypeConversionTestBuilder.left(TIME(), TIME_DATA)
+                        .right(STRING(), STRING_DATA_EQUALS_TIME)
+                        .build(),
+                TypeConversionTestBuilder.left(TIMESTAMP(), TIMESTAMP_DATA)
+                        .right(STRING(), STRING_DATA_EQUALS_TIMESTAMP)
+                        .build());
+    }
+
+    // unsupported temporarily
+    private static List<TestSpec> 
unsupportedImplicitConversionBetweenNumericAndString() {
+        return Collections.singletonList(
+                TypeConversionTestBuilder.left(STRING(), 
STRING_DATA_EQUALS_NUMERIC)
+                        .right(STRING(), STRING_DATA_EQUALS_NUMERIC)
+                        .fail(TINYINT(), TINY_INT_DATA)
+                        .fail(SMALLINT(), SMALL_INT_DATA)
+                        .fail(INT(), INT_DATA)
+                        .fail(BIGINT(), BIG_INT_DATA)
+                        .fail(FLOAT(), FLOAT_DATA)
+                        .fail(DOUBLE(), DOUBLE_DATA)
+                        .fail(DECIMAL(1, 0), DECIMAL_DATA)
+                        .build());
+    }
+
+    static class TypeConversionTestBuilder {
+        private DataType leftType;
+        private Object leftValue;
+        private final List<Object> rightDataOnSuccess = new ArrayList<>();
+        private final List<DataType> rightTypesOnSuccess = new ArrayList<>();
+        private final List<Object> rightDataOnFailure = new ArrayList<>();
+        private final List<DataType> rightTypesOnFailure = new ArrayList<>();
+
+        private static TypeConversionTestBuilder left(DataType leftType, 
Object leftValue) {
+            TypeConversionTestBuilder builder = new 
TypeConversionTestBuilder();
+            builder.leftType = leftType;
+            builder.leftValue = leftValue;
+            return builder;
+        }
+
+        private TypeConversionTestBuilder right(DataType rightType, Object 
rightValue) {
+            this.rightTypesOnSuccess.add(rightType);
+            this.rightDataOnSuccess.add(rightValue);
+            return this;
+        }
+
+        private TypeConversionTestBuilder fail(DataType rightType, Object 
rightValue) {
+            this.rightTypesOnFailure.add(rightType);
+            this.rightDataOnFailure.add(rightValue);
+            return this;
+        }
+
+        private TestSpec build() {
+            int columnBaseIdx = 0;
+            String leftColumnName = "f" + columnBaseIdx;
+            columnBaseIdx++;
+
+            TestSpec testSpec =
+                    TestSpec.forFunction(
+                            BuiltInFunctionDefinitions.EQUALS, "left: " + 
leftType.toString());
+
+            final List<Object> allData = new ArrayList<>();
+            allData.add(leftValue);
+            allData.addAll(rightDataOnSuccess);
+            allData.addAll(rightDataOnFailure);
+
+            final List<Object> allTypes = new ArrayList<>();
+            allTypes.add(leftType);
+            allTypes.addAll(rightTypesOnSuccess);
+            allTypes.addAll(rightTypesOnFailure);
+
+            testSpec.onFieldsWithData(allData.toArray())
+                    .andDataTypes(allTypes.toArray(new AbstractDataType<?>[] 
{}));
+
+            // test successful cases
+            for (int i = 0; i < rightTypesOnSuccess.size(); i++) {
+                String rightColumnName = "f" + (i + columnBaseIdx);
+                DataType rightType = rightTypesOnSuccess.get(i);
+                testSpec.testSqlResult(
+                        String.format(
+                                "CAST(%s AS %s) = CAST(%s AS %s)",
+                                leftColumnName, leftType.toString(), 
rightColumnName, rightType),
+                        true,
+                        BOOLEAN());
+            }
+
+            columnBaseIdx = columnBaseIdx + rightTypesOnSuccess.size();
+            // test failed cases
+            for (int i = 0; i < rightTypesOnFailure.size(); i++) {
+                String rightColumnName = "f" + (i + columnBaseIdx);
+                DataType rightType = rightTypesOnFailure.get(i);
+                String exceptionMsg =
+                        getImplicitConversionFromStringExceptionMsg(
+                                rightType.getLogicalType().getTypeRoot());
+                testSpec.testSqlError(
+                        String.format(
+                                "CAST(%s AS %s) = CAST(%s AS %s)",
+                                leftColumnName, leftType.toString(), 
rightColumnName, rightType),
+                        exceptionMsg);
+            }
+            return testSpec;
+        }
+
+        private String 
getImplicitConversionFromStringExceptionMsg(LogicalTypeRoot rightType) {
+            return String.format(
+                    "implicit type conversion between VARCHAR and %s is not 
supported now",
+                    rightType.toString());
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java
deleted file mode 100644
index 1ce73da..0000000
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.sql;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.scala.DataStream;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.bridge.scala.DataStreamConversions;
-import org.apache.flink.table.data.DecimalDataUtils;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
-import org.apache.flink.table.planner.codegen.CodeGenException;
-import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
-import org.apache.flink.table.planner.runtime.utils.TestingAppendRowDataSink;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BooleanType;
-import org.apache.flink.table.types.logical.DateType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.FloatType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.TinyIntType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.utils.LegacyRowResource;
-
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
-import static org.junit.Assert.assertEquals;
-
-/** test implicit type conversion between different types. */
-public class ImplicitTypeConversionITCase extends StreamingTestBase {
-
-    @Rule public LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
-
-    private List<String> testSingleTableSqlQueryWithOutputType(
-            String sqlQuery, InternalTypeInfo<RowData> outputType) {
-        GenericRowData rowData = new GenericRowData(14);
-        rowData.setField(0, (byte) 1);
-        rowData.setField(1, (short) 1);
-        rowData.setField(2, 1);
-        rowData.setField(3, (long) 1);
-        rowData.setField(4, DecimalDataUtils.castFrom(1, 1, 0));
-        rowData.setField(5, (float) 1);
-        rowData.setField(6, (double) 1);
-        int date = (int) LocalDate.parse("2001-01-01").toEpochDay();
-        rowData.setField(7, date);
-        int time = (int) (LocalTime.parse("00:00:00").toNanoOfDay() / 
1000000L);
-        rowData.setField(8, time);
-        TimestampData timestamp =
-                
TimestampData.fromLocalDateTime(LocalDateTime.parse("2001-01-01T00:00:00"));
-        rowData.setField(9, timestamp);
-        rowData.setField(10, StringData.fromString("1"));
-        rowData.setField(11, StringData.fromString("2001-01-01"));
-        rowData.setField(12, StringData.fromString("00:00:00"));
-        rowData.setField(13, StringData.fromString("2001-01-01 00:00:00"));
-        List data = Arrays.asList(rowData);
-
-        TypeInformation<RowData> tpe =
-                InternalTypeInfo.ofFields(
-                        new TinyIntType(),
-                        new SmallIntType(),
-                        new IntType(),
-                        new BigIntType(),
-                        new DecimalType(1, 0),
-                        new FloatType(),
-                        new DoubleType(),
-                        new DateType(),
-                        new TimeType(),
-                        new TimestampType(),
-                        new VarCharType(),
-                        new VarCharType(),
-                        new VarCharType(),
-                        new VarCharType());
-
-        DataStream ds = 
env().fromCollection(JavaScalaConversionUtil.toScala(data), tpe);
-        DataStreamConversions conversions = new DataStreamConversions(ds, tpe);
-
-        List<UnresolvedReferenceExpression> fields =
-                Arrays.asList(
-                        unresolvedRef("field_tinyint"),
-                        unresolvedRef("field_smallint"),
-                        unresolvedRef("field_int"),
-                        unresolvedRef("field_bigint"),
-                        unresolvedRef("field_decimal"),
-                        unresolvedRef("field_float"),
-                        unresolvedRef("field_double"),
-                        unresolvedRef("field_date"),
-                        unresolvedRef("field_time"),
-                        unresolvedRef("field_timestamp"),
-                        unresolvedRef("field_varchar_equals_numeric"),
-                        unresolvedRef("field_varchar_equals_date"),
-                        unresolvedRef("field_varchar_equals_time"),
-                        unresolvedRef("field_varchar_equals_timestamp"));
-
-        Table table = conversions.toTable(tEnv(), 
JavaScalaConversionUtil.toScala(fields));
-        tEnv().registerTable("TestTable", table);
-
-        Table resultTable = tEnv().sqlQuery(sqlQuery);
-        DataStream result = tEnv().toAppendStream(resultTable, outputType);
-        TestingAppendRowDataSink sink = new 
TestingAppendRowDataSink(outputType);
-        result.addSink(sink);
-        env().execute();
-
-        return new 
ArrayList<>(JavaScalaConversionUtil.toJava(sink.getAppendResults()));
-    }
-
-    @Test
-    public void testNumericConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_tinyint, field_smallint, field_int, 
field_bigint, "
-                        + "field_decimal, field_float, field_double "
-                        + "FROM TestTable WHERE "
-                        + "field_tinyint = field_smallint AND "
-                        + "field_tinyint = field_int AND "
-                        + "field_tinyint = field_bigint AND "
-                        + "field_tinyint = field_decimal AND "
-                        + "field_tinyint = field_float AND "
-                        + "field_tinyint = field_double AND "
-                        + "field_smallint = field_int AND "
-                        + "field_smallint = field_bigint AND "
-                        + "field_smallint = field_decimal AND "
-                        + "field_smallint = field_float AND "
-                        + "field_smallint = field_double AND "
-                        + "field_int = field_bigint AND "
-                        + "field_int = field_decimal AND "
-                        + "field_int = field_float AND "
-                        + "field_int = field_double AND "
-                        + "field_bigint = field_decimal AND "
-                        + "field_bigint = field_float AND "
-                        + "field_bigint = field_double AND "
-                        + "field_decimal = field_float AND "
-                        + "field_decimal = field_double AND "
-                        + "field_float = field_double";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(
-                        new TinyIntType(),
-                        new SmallIntType(),
-                        new IntType(),
-                        new BigIntType(),
-                        new DecimalType(),
-                        new FloatType(),
-                        new DoubleType());
-
-        List<String> expected = Arrays.asList("+I(1,1,1,1,1,1.0,1.0)");
-
-        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
-        Collections.sort(expected);
-        Collections.sort(actualResult);
-        assertEquals(expected, actualResult);
-    }
-
-    @Test
-    public void testDateAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_date, field_varchar_equals_date FROM TestTable "
-                        + "WHERE field_date = field_varchar_equals_date";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new DateType(), new VarCharType());
-
-        List<String> expected = Arrays.asList("+I(11323,2001-01-01)");
-
-        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
-        Collections.sort(expected);
-        Collections.sort(actualResult);
-        assertEquals(expected, actualResult);
-    }
-
-    @Test
-    public void testTimeAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_time, field_varchar_equals_time FROM TestTable "
-                        + "WHERE field_time = field_varchar_equals_time";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new TimeType(), new VarCharType());
-
-        List<String> expected = Arrays.asList("+I(0,00:00:00)");
-
-        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
-        Collections.sort(expected);
-        Collections.sort(actualResult);
-        assertEquals(expected, actualResult);
-    }
-
-    @Test
-    public void testTimestampAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_timestamp, field_varchar_equals_timestamp FROM 
TestTable "
-                        + "WHERE field_timestamp = 
field_varchar_equals_timestamp";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new TimestampType(), new 
VarCharType());
-
-        List<String> expected = Arrays.asList("+I(2001-01-01T00:00,2001-01-01 
00:00:00)");
-
-        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
-        Collections.sort(expected);
-        Collections.sort(actualResult);
-        assertEquals(expected, actualResult);
-    }
-
-    private String getFilterAndProjectionExceptionMessage(List<String> types) {
-        return String.format(
-                "implicit type conversion between " + "%s and %s" + " is not 
supported now",
-                types.get(0), types.get(1));
-    }
-
-    private void testSingleTableInvalidImplicitConversionTypes(
-            String sqlQuery, InternalTypeInfo<RowData> outputType, 
List<String> types) {
-        expectedException().expect(CodeGenException.class);
-        
expectedException().expectMessage(getFilterAndProjectionExceptionMessage(types));
-        testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
-    }
-
-    @Test
-    public void testInvalidTinyIntAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_tinyint, field_varchar_equals_numeric FROM 
TestTable "
-                        + "WHERE field_tinyint = field_varchar_equals_numeric";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new TinyIntType(), new 
VarCharType());
-
-        testSingleTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("TINYINT", "VARCHAR"));
-    }
-
-    @Test
-    public void testInvalidSmallIntAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_smallint, field_varchar_equals_numeric FROM 
TestTable "
-                        + "WHERE field_smallint = 
field_varchar_equals_numeric";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new SmallIntType(), new 
VarCharType());
-
-        testSingleTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("SMALLINT", "VARCHAR"));
-    }
-
-    @Test
-    public void testInvalidIntAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_int, field_varchar_equals_numeric FROM TestTable 
"
-                        + "WHERE field_int = field_varchar_equals_numeric";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new IntType(), new VarCharType());
-
-        testSingleTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("INTEGER", "VARCHAR"));
-    }
-
-    @Test
-    public void testInvalidBigIntAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_bigint, field_varchar_equals_numeric FROM 
TestTable "
-                        + "WHERE field_bigint = field_varchar_equals_numeric";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new BigIntType(), new VarCharType());
-
-        testSingleTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("BIGINT", "VARCHAR"));
-    }
-
-    @Test
-    public void testInvalidDecimalAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_decimal, field_varchar_equals_numeric FROM 
TestTable "
-                        + "WHERE field_decimal = field_varchar_equals_numeric";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new DecimalType(), new 
VarCharType());
-
-        testSingleTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("DECIMAL", "VARCHAR"));
-    }
-
-    @Test
-    public void testInvalidFloatAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_float, field_varchar_equals_numeric FROM 
TestTable "
-                        + "WHERE field_float = field_varchar_equals_numeric";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new FloatType(), new VarCharType());
-
-        testSingleTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("FLOAT", "VARCHAR"));
-    }
-
-    @Test
-    public void testInvalidDoubleAndVarCharConversionInFilter() {
-        String sqlQuery =
-                "SELECT field_double, field_varchar_equals_numeric FROM 
TestTable "
-                        + "WHERE field_double = field_varchar_equals_numeric";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new DoubleType(), new VarCharType());
-
-        testSingleTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("DOUBLE", "VARCHAR"));
-    }
-
-    @Test
-    public void testFloatAndDoubleConversionInProjection() {
-        String sqlQuery =
-                "SELECT field_float, field_double, field_float = field_double 
FROM TestTable";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new FloatType(), new DoubleType(), 
new BooleanType());
-
-        List<String> expected = Arrays.asList("+I(1.0,1.0,true)");
-
-        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
-        Collections.sort(expected);
-        Collections.sort(actualResult);
-        assertEquals(expected, actualResult);
-    }
-
-    @Test
-    public void testDateAndVarCharConversionInProjection() {
-        String sqlQuery =
-                "SELECT field_date, field_varchar_equals_date, "
-                        + "field_date = field_varchar_equals_date FROM 
TestTable";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new DateType(), new VarCharType(), 
new BooleanType());
-
-        List<String> expected = Arrays.asList("+I(11323,2001-01-01,true)");
-
-        List<String> actualResult = 
testSingleTableSqlQueryWithOutputType(sqlQuery, outputType);
-        Collections.sort(expected);
-        Collections.sort(actualResult);
-        assertEquals(expected, actualResult);
-    }
-
-    @Test
-    public void testInvalidDecimalAndVarCharConversionInProjection() {
-        String sqlQuery =
-                "SELECT field_decimal, field_varchar_equals_numeric, "
-                        + "field_decimal = field_varchar_equals_numeric FROM 
TestTable";
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new DecimalType(), new 
VarCharType(), new BooleanType());
-
-        testSingleTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("DECIMAL", "VARCHAR"));
-    }
-
-    private void registerTableA() {
-        GenericRowData rowDataA = new GenericRowData(6);
-        rowDataA.setField(0, 1);
-        rowDataA.setField(1, 1);
-        rowDataA.setField(2, 1);
-        int date = (int) LocalDate.parse("2001-01-01").toEpochDay();
-        rowDataA.setField(3, date);
-        int time = (int) (LocalTime.parse("00:00:00").toNanoOfDay() / 
1000000L);
-        rowDataA.setField(4, time);
-        TimestampData timestamp =
-                
TimestampData.fromLocalDateTime(LocalDateTime.parse("2001-01-01T00:00:00"));
-        rowDataA.setField(5, timestamp);
-        List<RowData> dataA = Arrays.asList(rowDataA);
-
-        TypeInformation<RowData> tpeA =
-                InternalTypeInfo.ofFields(
-                        new IntType(),
-                        new IntType(),
-                        new IntType(),
-                        new DateType(),
-                        new TimeType(),
-                        new TimestampType());
-
-        DataStream dsA = 
env().fromCollection(JavaScalaConversionUtil.toScala(dataA), tpeA);
-        DataStreamConversions conversions = new DataStreamConversions(dsA, 
tpeA);
-
-        List<UnresolvedReferenceExpression> fields =
-                Arrays.asList(
-                        unresolvedRef("a1"),
-                        unresolvedRef("a2"),
-                        unresolvedRef("a3"),
-                        unresolvedRef("a4"),
-                        unresolvedRef("a5"),
-                        unresolvedRef("a6"));
-
-        Table tableA = conversions.toTable(tEnv(), 
JavaScalaConversionUtil.toScala(fields));
-
-        tEnv().registerTable("A", tableA);
-    }
-
-    private void registerTableB() {
-        GenericRowData rowDataB = new GenericRowData(6);
-        rowDataB.setField(0, 1);
-        rowDataB.setField(1, (long) 1);
-        rowDataB.setField(2, StringData.fromString("1"));
-        rowDataB.setField(3, StringData.fromString("2001-01-01"));
-        rowDataB.setField(4, StringData.fromString("00:00:00"));
-        rowDataB.setField(5, StringData.fromString("2001-01-01 00:00:00"));
-        List<RowData> dataB = Arrays.asList(rowDataB);
-
-        TypeInformation<RowData> tpeB =
-                InternalTypeInfo.ofFields(
-                        new IntType(),
-                        new BigIntType(),
-                        new VarCharType(),
-                        new VarCharType(),
-                        new VarCharType(),
-                        new VarCharType());
-
-        DataStream dsB = 
env().fromCollection(JavaScalaConversionUtil.toScala(dataB), tpeB);
-        DataStreamConversions conversions = new DataStreamConversions(dsB, 
tpeB);
-
-        List<UnresolvedReferenceExpression> fields =
-                Arrays.asList(
-                        unresolvedRef("b1"),
-                        unresolvedRef("b2"),
-                        unresolvedRef("b3"),
-                        unresolvedRef("b4"),
-                        unresolvedRef("b5"),
-                        unresolvedRef("b6"));
-
-        Table tableB = conversions.toTable(tEnv(), 
JavaScalaConversionUtil.toScala(fields));
-
-        tEnv().registerTable("B", tableB);
-    }
-
-    private void testTwoTableJoinSqlQuery(String sqlQuery, 
InternalTypeInfo<RowData> outputType) {
-        registerTableA();
-        registerTableB();
-
-        Table resultTable = tEnv().sqlQuery(sqlQuery);
-        DataStream result = tEnv().toAppendStream(resultTable, outputType);
-        TestingAppendRowDataSink sink = new 
TestingAppendRowDataSink(outputType);
-        result.addSink(sink);
-        env().execute();
-
-        List<String> expected = Arrays.asList("+I(1,1)");
-
-        List<String> actualResult =
-                new 
ArrayList<>(JavaScalaConversionUtil.toJava(sink.getAppendResults()));
-
-        Collections.sort(expected);
-        Collections.sort(actualResult);
-        assertEquals(expected, actualResult);
-    }
-
-    private void testTwoTableInvalidImplicitConversionTypes(
-            String sqlQuery, InternalTypeInfo<RowData> outputType, 
List<String> types) {
-        expectedException().expect(TableException.class);
-        expectedException().expectMessage(getJoinOnExceptionMessage(types));
-        testTwoTableJoinSqlQuery(sqlQuery, outputType);
-    }
-
-    private String getJoinOnExceptionMessage(List<String> types) {
-        return String.format(
-                "implicit type conversion between "
-                        + "%s and %s"
-                        + " is not supported on join's condition now",
-                types.get(0), types.get(1));
-    }
-
-    @Test
-    public void testIntAndBigIntConversionInJoinOn() {
-        String sqlQuery = "SELECT a1, b1 from A join B on a2 = b2";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new IntType(), new IntType());
-
-        testTwoTableJoinSqlQuery(sqlQuery, outputType);
-    }
-
-    @Test
-    public void testInvalidIntAndVarCharConversionInJoinOn() {
-        String sqlQuery = "SELECT a1, b1 from A join B on a3 = b3";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new IntType(), new IntType());
-
-        testTwoTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("INTEGER", "VARCHAR(1)"));
-    }
-
-    @Test
-    public void testInvalidDateAndVarCharConversionInJoinOn() {
-        String sqlQuery = "SELECT a1, b1 from A join B on a4 = b4";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new IntType(), new IntType());
-
-        testTwoTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("DATE", "VARCHAR(1)"));
-    }
-
-    @Test
-    public void testInvalidTimeAndVarCharConversionInJoinOn() {
-        String sqlQuery = "SELECT a1, b1 from A join B on a5 = b5";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new IntType(), new IntType());
-
-        testTwoTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("TIME(0)", "VARCHAR(1)"));
-    }
-
-    @Test
-    public void testInvalidTimestampAndVarCharConversionInJoinOn() {
-        String sqlQuery = "SELECT a1, b1 from A join B on a6 = b6";
-
-        InternalTypeInfo<RowData> outputType =
-                InternalTypeInfo.ofFields(new IntType(), new IntType());
-
-        testTwoTableInvalidImplicitConversionTypes(
-                sqlQuery, outputType, Arrays.asList("TIMESTAMP(6)", 
"VARCHAR(1)"));
-    }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 2e6eb67..259cfcd 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.planner.codegen.CodeGenException
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
 import org.apache.flink.types.Row
 
@@ -77,163 +76,6 @@ class SqlExpressionTest extends ExpressionTestBase {
   }
 
   @Test
-  def testValidImplicitTypeConversion: Unit = {
-    // implicit type conversion between tinyint and others
-    testSqlApi("cast(1 as tinyint) = cast(1 as smallint)","true")
-    testSqlApi("cast(1 as tinyint) = cast(1 as int)","true")
-    testSqlApi("cast(1 as tinyint) = cast(1 as bigint)","true")
-    testSqlApi("cast(1 as tinyint) = cast(1 as decimal)","true")
-    testSqlApi("cast(1 as tinyint) = cast(1 as float)","true")
-    testSqlApi("cast(1 as tinyint) = cast(1 as double)","true")
-    testSqlApi("cast(1 as tinyint) = cast(2 as int)","false")
-    testSqlApi("cast(1 as tinyint) = cast(2 as bigint)","false")
-    testSqlApi("cast(1 as tinyint) = cast(2 as decimal)","false")
-    testSqlApi("cast(1 as tinyint) = cast(2.1 as float)","false")
-    testSqlApi("cast(1 as tinyint) = cast(2.1 as double)","false")
-
-    // implicit type conversion between smallint and others
-    testSqlApi("cast(1 as smallint) = cast(1 as int)","true")
-    testSqlApi("cast(1 as smallint) = cast(1 as bigint)","true")
-    testSqlApi("cast(1 as smallint) = cast(1 as decimal)","true")
-    testSqlApi("cast(1 as smallint) = cast(1 as float)","true")
-    testSqlApi("cast(1 as smallint) = cast(1 as double)","true")
-    testSqlApi("cast(1 as smallint) = cast(2 as int)","false")
-    testSqlApi("cast(1 as smallint) = cast(2 as bigint)","false")
-    testSqlApi("cast(1 as smallint) = cast(2 as decimal)","false")
-    testSqlApi("cast(1 as smallint) = cast(2.1 as float)","false")
-    testSqlApi("cast(1 as smallint) = cast(2.1 as double)","false")
-
-    // implicit type conversion between int and others
-    testSqlApi("cast(1 as int) = cast(1 as bigint)","true")
-    testSqlApi("cast(1 as int) = cast(1 as decimal)","true")
-    testSqlApi("cast(1 as int) = cast(1 as float)","true")
-    testSqlApi("cast(1 as int) = cast(1 as double)","true")
-    testSqlApi("cast(1 as int) = cast(2 as bigint)","false")
-    testSqlApi("cast(1 as int) = cast(2 as decimal)","false")
-    testSqlApi("cast(1 as int) = cast(2.1 as float)","false")
-    testSqlApi("cast(1 as int) = cast(2.1 as double)","false")
-
-    // implicit type conversion between bigint and others
-    testSqlApi("cast(1 as bigint) = cast(1 as decimal)","true")
-    testSqlApi("cast(1 as bigint) = cast(1 as float)","true")
-    testSqlApi("cast(1 as bigint) = cast(1 as double)","true")
-    testSqlApi("cast(1 as bigint) = cast(2 as decimal)","false")
-    testSqlApi("cast(1 as bigint) = cast(2.1 as float)","false")
-    testSqlApi("cast(1 as bigint) = cast(2.1 as double)","false")
-
-    // implicit type conversion between decimal and others
-    testSqlApi("cast(1 as decimal) = cast(1 as float)","true")
-    testSqlApi("cast(1 as decimal) = cast(1 as double)","true")
-    testSqlApi("cast(1 as decimal) = cast(2.1 as float)","false")
-    testSqlApi("cast(1 as decimal) = cast(2.1 as double)","false")
-
-    // implicit type conversion between float and others
-    testSqlApi("cast(1 as float) = cast(1 as double)","true")
-    testSqlApi("cast(1 as float) = cast(2.1 as double)","false")
-
-    // implicit type conversion between date and varchar
-    testSqlApi("cast('2001-01-01' as date) = cast('2001-01-01' as 
varchar)","true")
-    testSqlApi("cast('2001-01-01' as date) = cast('2001-01-02' as 
varchar)","false")
-
-    // implicit type conversion between date and char
-    testSqlApi("cast('2001-01-01' as date) = cast('2001-01-01' as 
char)","true")
-    testSqlApi("cast('2001-01-01' as date) = cast('2001-01-02' as 
char)","false")
-
-    // implicit type conversion between time and char
-    testSqlApi("cast('00:00:00.000000000' as time) = cast('00:00:00.000000000' 
as varchar)","true")
-    testSqlApi("cast('00:00:00.000000000' as time) = cast('00:00:01.000000000' 
as varchar)","false")
-
-    // implicit type conversion between time and char
-    testSqlApi("cast('2001-01-01 12:12:12.123' as timestamp)" +
-      " = cast('2001-01-01 12:12:12.1230' as varchar)","true")
-    testSqlApi("cast('1990-10-14 12:12:12.123' as timestamp)" +
-      " = cast('1990-10-15 12:12:12.123' as varchar)","false")
-  }
-
-  @Test
-  def testInValidImplicitTypeConversion: Unit = {
-    val typeChar = "CHAR"
-    val typeVarChar = "VARCHAR"
-
-    val typeTinyInt = "TINYINT"
-    val typeSmallInt = "SMALLINT"
-    val typeInt = "INTEGER"
-    val typeBigInt = "BIGINT"
-    val typeDecimal = "DECIMAL"
-    val typeFloat = "FLOAT"
-    val typeDouble = "DOUBLE"
-
-    // implicit type conversion between char and others
-    testInvalidImplicitConversionTypes(
-      "'1' = cast(1 as tinyint)",
-      List(typeChar,typeTinyInt))
-
-    testInvalidImplicitConversionTypes(
-      "'1' = cast(1 as smallint)",
-      List(typeChar,typeSmallInt))
-
-    testInvalidImplicitConversionTypes(
-      "'1' = cast(1 as int)",
-      List(typeChar,typeInt))
-
-    testInvalidImplicitConversionTypes(
-      "'1' = cast(1 as bigint)",
-      List(typeChar,typeBigInt))
-
-    testInvalidImplicitConversionTypes(
-      "'1.1' = cast(1.1 as decimal(2, 1))",
-      List(typeChar,typeDecimal))
-
-    testInvalidImplicitConversionTypes(
-      "'1.1' = cast(1.1 as float)",
-      List(typeChar,typeFloat))
-
-    testInvalidImplicitConversionTypes(
-      "'1.1' = cast(1.1 as double)",
-      List(typeChar,typeDouble))
-
-    // implicit type conversion between varchar and others
-    testInvalidImplicitConversionTypes(
-      "cast('1' as varchar) = cast(1 as tinyint)",
-      List(typeVarChar,typeTinyInt))
-
-    testInvalidImplicitConversionTypes(
-      "cast('1' as varchar) = cast(1 as smallint)",
-      List(typeVarChar,typeSmallInt))
-
-    testInvalidImplicitConversionTypes(
-
-      "cast('1' as varchar) = cast(1 as int)",
-      List(typeVarChar,typeInt))
-
-    testInvalidImplicitConversionTypes(
-      "cast('1' as varchar) = cast(1 as bigint)",
-      List(typeVarChar,typeBigInt))
-
-    testInvalidImplicitConversionTypes(
-      "cast('1.1' as varchar) = cast(1.1 as decimal(2, 1))",
-      List(typeVarChar,typeDecimal))
-
-    testInvalidImplicitConversionTypes(
-      "cast('1.1' as varchar) = cast(1.1 as float)",
-      List(typeVarChar,typeFloat))
-
-    testInvalidImplicitConversionTypes(
-      "cast('1.1' as varchar) = cast(1.1 as double)",
-      List(typeVarChar,typeDouble))
-  }
-
-  private def testInvalidImplicitConversionTypes(sql: String, types: 
List[String]): Unit ={
-    val expectedExceptionMessage =
-      "implicit type conversion between " +
-      s"${types(0)}" +
-      s" and " +
-      s"${types(1)}" +
-      s" is not supported now"
-    testExpectedSqlException(sql, expectedExceptionMessage, 
classOf[CodeGenException])
-  }
-
-  @Test
   def testLogicalFunctions(): Unit = {
     testSqlApi("TRUE OR FALSE", "true")
     testSqlApi("TRUE AND FALSE", "false")

Reply via email to