This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new de8ee32 [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase de8ee32 is described below commit de8ee32d51a5a2b4ef9cfc814be37217aaad6d7b Author: xuyang <xyzhong...@163.com> AuthorDate: Fri Nov 12 14:54:57 2021 +0800 [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase This closes #18049 --- .../planner/codegen/calls/ScalarOperatorGens.scala | 4 +- .../ImplicitConversionEqualsFunctionITCase.java | 239 +++++++++ .../stream/sql/ImplicitTypeConversionITCase.java | 552 --------------------- .../planner/expressions/SqlExpressionTest.scala | 158 ------ 4 files changed, 241 insertions(+), 712 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala index a45843d..c0fe098 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala @@ -22,7 +22,7 @@ import org.apache.flink.table.api.ValidationException import org.apache.flink.table.data.binary.BinaryArrayData import org.apache.flink.table.data.util.{DataFormatConverters, MapDataUtil} import org.apache.flink.table.data.writer.{BinaryArrayWriter, BinaryRowWriter} -import org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAccess, binaryRowSetNull, binaryWriterWriteField, binaryWriterWriteNull, _} +import org.apache.flink.table.planner.codegen.CodeGenUtils._ import org.apache.flink.table.planner.codegen.GenerateUtils._ import org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NULL, NO_CODE} import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, CodeGeneratorContext, GeneratedExpression} @@ -426,7 +426,7 @@ object ScalarOperatorGens { // but flink has not yet supported now if ((isNumeric(left.resultType) && isCharacterString(right.resultType)) || (isNumeric(right.resultType) && isCharacterString(left.resultType))) { - throw new CodeGenException( + throw new ValidationException( "implicit type conversion between " + s"${left.resultType.getTypeRoot}" + s" and " + diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/ImplicitConversionEqualsFunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/ImplicitConversionEqualsFunctionITCase.java new file mode 100644 index 0000000..bbde80d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/ImplicitConversionEqualsFunctionITCase.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions; + +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import org.junit.runners.Parameterized; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.BOOLEAN; +import static org.apache.flink.table.api.DataTypes.DATE; +import static org.apache.flink.table.api.DataTypes.DECIMAL; +import static org.apache.flink.table.api.DataTypes.DOUBLE; +import static org.apache.flink.table.api.DataTypes.FLOAT; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.SMALLINT; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TIME; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP; +import static org.apache.flink.table.api.DataTypes.TINYINT; + +/** + * Tests for type conversions in '='. These tests are only for SQL API. We temporarily forbid "=" + * between numeric and (var)char fields and throw an exception because it will produce wrong result. + * SEE [FLINK-24914]. + */ +public class ImplicitConversionEqualsFunctionITCase extends BuiltInFunctionTestBase { + + // numeric data + private static final byte TINY_INT_DATA = (byte) 1; + private static final short SMALL_INT_DATA = (short) 1; + private static final int INT_DATA = 1; + private static final long BIG_INT_DATA = 1L; + private static final float FLOAT_DATA = 1.0f; + private static final double DOUBLE_DATA = 1.0d; + private static final BigDecimal DECIMAL_DATA = new BigDecimal(1); + + // time data + private static final String DATE_DATA = "2001-01-01"; + private static final String TIME_DATA = "00:00:00"; + private static final String TIMESTAMP_DATA = ("2001-01-01 00:00:00"); + + // string data + private static final String STRING_DATA_EQUALS_NUMERIC = "1"; + private static final String STRING_DATA_EQUALS_DATE = "2001-01-01"; + private static final String STRING_DATA_EQUALS_TIME = "00:00:00"; + private static final String STRING_DATA_EQUALS_TIMESTAMP = "2001-01-01 00:00:00"; + + @Parameterized.Parameters(name = "{index}: {0}") + public static List<TestSpec> testData() { + final List<TestSpec> specs = new ArrayList<>(); + specs.addAll(implicitConversionBetweenNumeric()); + specs.addAll(implicitConversionBetweenTimeAndString()); + specs.addAll(unsupportedImplicitConversionBetweenNumericAndString()); + return specs; + } + + private static List<TestSpec> implicitConversionBetweenNumeric() { + return Arrays.asList( + TypeConversionTestBuilder.left(TINYINT(), TINY_INT_DATA) + .right(TINYINT(), TINY_INT_DATA) + .right(SMALLINT(), SMALL_INT_DATA) + .right(INT(), INT_DATA) + .right(BIGINT(), BIG_INT_DATA) + .right(FLOAT(), FLOAT_DATA) + .right(DOUBLE(), DOUBLE_DATA) + .right(DECIMAL(1, 0), DECIMAL_DATA) + .build(), + TypeConversionTestBuilder.left(SMALLINT(), SMALL_INT_DATA) + .right(SMALLINT(), SMALL_INT_DATA) + .right(INT(), INT_DATA) + .right(BIGINT(), BIG_INT_DATA) + .right(FLOAT(), FLOAT_DATA) + .right(DOUBLE(), DOUBLE_DATA) + .right(DECIMAL(1, 0), DECIMAL_DATA) + .build(), + TypeConversionTestBuilder.left(INT(), INT_DATA) + .right(INT(), INT_DATA) + .right(BIGINT(), BIG_INT_DATA) + .right(FLOAT(), FLOAT_DATA) + .right(DOUBLE(), DOUBLE_DATA) + .right(DECIMAL(1, 0), DECIMAL_DATA) + .build(), + TypeConversionTestBuilder.left(BIGINT(), BIG_INT_DATA) + .right(BIGINT(), BIG_INT_DATA) + .right(FLOAT(), FLOAT_DATA) + .right(DOUBLE(), DOUBLE_DATA) + .right(DECIMAL(1, 0), DECIMAL_DATA) + .build(), + TypeConversionTestBuilder.left(FLOAT(), FLOAT_DATA) + .right(FLOAT(), FLOAT_DATA) + .right(DOUBLE(), DOUBLE_DATA) + .right(DECIMAL(1, 0), DECIMAL_DATA) + .build(), + TypeConversionTestBuilder.left(DOUBLE(), DOUBLE_DATA) + .right(DOUBLE(), DOUBLE_DATA) + .right(DECIMAL(1, 0), DECIMAL_DATA) + .build()); + } + + private static List<TestSpec> implicitConversionBetweenTimeAndString() { + return Arrays.asList( + TypeConversionTestBuilder.left(DATE(), DATE_DATA) + .right(DATE(), DATE_DATA) + .right(STRING(), STRING_DATA_EQUALS_DATE) + .build(), + TypeConversionTestBuilder.left(TIME(), TIME_DATA) + .right(STRING(), STRING_DATA_EQUALS_TIME) + .build(), + TypeConversionTestBuilder.left(TIMESTAMP(), TIMESTAMP_DATA) + .right(STRING(), STRING_DATA_EQUALS_TIMESTAMP) + .build()); + } + + // unsupported temporarily + private static List<TestSpec> unsupportedImplicitConversionBetweenNumericAndString() { + return Collections.singletonList( + TypeConversionTestBuilder.left(STRING(), STRING_DATA_EQUALS_NUMERIC) + .right(STRING(), STRING_DATA_EQUALS_NUMERIC) + .fail(TINYINT(), TINY_INT_DATA) + .fail(SMALLINT(), SMALL_INT_DATA) + .fail(INT(), INT_DATA) + .fail(BIGINT(), BIG_INT_DATA) + .fail(FLOAT(), FLOAT_DATA) + .fail(DOUBLE(), DOUBLE_DATA) + .fail(DECIMAL(1, 0), DECIMAL_DATA) + .build()); + } + + static class TypeConversionTestBuilder { + private DataType leftType; + private Object leftValue; + private final List<Object> rightDataOnSuccess = new ArrayList<>(); + private final List<DataType> rightTypesOnSuccess = new ArrayList<>(); + private final List<Object> rightDataOnFailure = new ArrayList<>(); + private final List<DataType> rightTypesOnFailure = new ArrayList<>(); + + private static TypeConversionTestBuilder left(DataType leftType, Object leftValue) { + TypeConversionTestBuilder builder = new TypeConversionTestBuilder(); + builder.leftType = leftType; + builder.leftValue = leftValue; + return builder; + } + + private TypeConversionTestBuilder right(DataType rightType, Object rightValue) { + this.rightTypesOnSuccess.add(rightType); + this.rightDataOnSuccess.add(rightValue); + return this; + } + + private TypeConversionTestBuilder fail(DataType rightType, Object rightValue) { + this.rightTypesOnFailure.add(rightType); + this.rightDataOnFailure.add(rightValue); + return this; + } + + private TestSpec build() { + int columnBaseIdx = 0; + String leftColumnName = "f" + columnBaseIdx; + columnBaseIdx++; + + TestSpec testSpec = + TestSpec.forFunction( + BuiltInFunctionDefinitions.EQUALS, "left: " + leftType.toString()); + + final List<Object> allData = new ArrayList<>(); + allData.add(leftValue); + allData.addAll(rightDataOnSuccess); + allData.addAll(rightDataOnFailure); + + final List<Object> allTypes = new ArrayList<>(); + allTypes.add(leftType); + allTypes.addAll(rightTypesOnSuccess); + allTypes.addAll(rightTypesOnFailure); + + testSpec.onFieldsWithData(allData.toArray()) + .andDataTypes(allTypes.toArray(new AbstractDataType<?>[] {})); + + // test successful cases + for (int i = 0; i < rightTypesOnSuccess.size(); i++) { + String rightColumnName = "f" + (i + columnBaseIdx); + DataType rightType = rightTypesOnSuccess.get(i); + testSpec.testSqlResult( + String.format( + "CAST(%s AS %s) = CAST(%s AS %s)", + leftColumnName, leftType.toString(), rightColumnName, rightType), + true, + BOOLEAN()); + } + + columnBaseIdx = columnBaseIdx + rightTypesOnSuccess.size(); + // test failed cases + for (int i = 0; i < rightTypesOnFailure.size(); i++) { + String rightColumnName = "f" + (i + columnBaseIdx); + DataType rightType = rightTypesOnFailure.get(i); + String exceptionMsg = + getImplicitConversionFromStringExceptionMsg( + rightType.getLogicalType().getTypeRoot()); + testSpec.testSqlError( + String.format( + "CAST(%s AS %s) = CAST(%s AS %s)", + leftColumnName, leftType.toString(), rightColumnName, rightType), + exceptionMsg); + } + return testSpec; + } + + private String getImplicitConversionFromStringExceptionMsg(LogicalTypeRoot rightType) { + return String.format( + "implicit type conversion between VARCHAR and %s is not supported now", + rightType.toString()); + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java deleted file mode 100644 index 1ce73da..0000000 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ImplicitTypeConversionITCase.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.sql; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.api.scala.DataStream; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.bridge.scala.DataStreamConversions; -import org.apache.flink.table.data.DecimalDataUtils; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; -import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.expressions.UnresolvedReferenceExpression; -import org.apache.flink.table.planner.codegen.CodeGenException; -import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; -import org.apache.flink.table.planner.runtime.utils.TestingAppendRowDataSink; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimeType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarCharType; -import org.apache.flink.table.utils.LegacyRowResource; - -import org.junit.Rule; -import org.junit.Test; - -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; -import static org.junit.Assert.assertEquals; - -/** test implicit type conversion between different types. */ -public class ImplicitTypeConversionITCase extends StreamingTestBase { - - @Rule public LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE; - - private List<String> testSingleTableSqlQueryWithOutputType( - String sqlQuery, InternalTypeInfo<RowData> outputType) { - GenericRowData rowData = new GenericRowData(14); - rowData.setField(0, (byte) 1); - rowData.setField(1, (short) 1); - rowData.setField(2, 1); - rowData.setField(3, (long) 1); - rowData.setField(4, DecimalDataUtils.castFrom(1, 1, 0)); - rowData.setField(5, (float) 1); - rowData.setField(6, (double) 1); - int date = (int) LocalDate.parse("2001-01-01").toEpochDay(); - rowData.setField(7, date); - int time = (int) (LocalTime.parse("00:00:00").toNanoOfDay() / 1000000L); - rowData.setField(8, time); - TimestampData timestamp = - TimestampData.fromLocalDateTime(LocalDateTime.parse("2001-01-01T00:00:00")); - rowData.setField(9, timestamp); - rowData.setField(10, StringData.fromString("1")); - rowData.setField(11, StringData.fromString("2001-01-01")); - rowData.setField(12, StringData.fromString("00:00:00")); - rowData.setField(13, StringData.fromString("2001-01-01 00:00:00")); - List data = Arrays.asList(rowData); - - TypeInformation<RowData> tpe = - InternalTypeInfo.ofFields( - new TinyIntType(), - new SmallIntType(), - new IntType(), - new BigIntType(), - new DecimalType(1, 0), - new FloatType(), - new DoubleType(), - new DateType(), - new TimeType(), - new TimestampType(), - new VarCharType(), - new VarCharType(), - new VarCharType(), - new VarCharType()); - - DataStream ds = env().fromCollection(JavaScalaConversionUtil.toScala(data), tpe); - DataStreamConversions conversions = new DataStreamConversions(ds, tpe); - - List<UnresolvedReferenceExpression> fields = - Arrays.asList( - unresolvedRef("field_tinyint"), - unresolvedRef("field_smallint"), - unresolvedRef("field_int"), - unresolvedRef("field_bigint"), - unresolvedRef("field_decimal"), - unresolvedRef("field_float"), - unresolvedRef("field_double"), - unresolvedRef("field_date"), - unresolvedRef("field_time"), - unresolvedRef("field_timestamp"), - unresolvedRef("field_varchar_equals_numeric"), - unresolvedRef("field_varchar_equals_date"), - unresolvedRef("field_varchar_equals_time"), - unresolvedRef("field_varchar_equals_timestamp")); - - Table table = conversions.toTable(tEnv(), JavaScalaConversionUtil.toScala(fields)); - tEnv().registerTable("TestTable", table); - - Table resultTable = tEnv().sqlQuery(sqlQuery); - DataStream result = tEnv().toAppendStream(resultTable, outputType); - TestingAppendRowDataSink sink = new TestingAppendRowDataSink(outputType); - result.addSink(sink); - env().execute(); - - return new ArrayList<>(JavaScalaConversionUtil.toJava(sink.getAppendResults())); - } - - @Test - public void testNumericConversionInFilter() { - String sqlQuery = - "SELECT field_tinyint, field_smallint, field_int, field_bigint, " - + "field_decimal, field_float, field_double " - + "FROM TestTable WHERE " - + "field_tinyint = field_smallint AND " - + "field_tinyint = field_int AND " - + "field_tinyint = field_bigint AND " - + "field_tinyint = field_decimal AND " - + "field_tinyint = field_float AND " - + "field_tinyint = field_double AND " - + "field_smallint = field_int AND " - + "field_smallint = field_bigint AND " - + "field_smallint = field_decimal AND " - + "field_smallint = field_float AND " - + "field_smallint = field_double AND " - + "field_int = field_bigint AND " - + "field_int = field_decimal AND " - + "field_int = field_float AND " - + "field_int = field_double AND " - + "field_bigint = field_decimal AND " - + "field_bigint = field_float AND " - + "field_bigint = field_double AND " - + "field_decimal = field_float AND " - + "field_decimal = field_double AND " - + "field_float = field_double"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields( - new TinyIntType(), - new SmallIntType(), - new IntType(), - new BigIntType(), - new DecimalType(), - new FloatType(), - new DoubleType()); - - List<String> expected = Arrays.asList("+I(1,1,1,1,1,1.0,1.0)"); - - List<String> actualResult = testSingleTableSqlQueryWithOutputType(sqlQuery, outputType); - Collections.sort(expected); - Collections.sort(actualResult); - assertEquals(expected, actualResult); - } - - @Test - public void testDateAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_date, field_varchar_equals_date FROM TestTable " - + "WHERE field_date = field_varchar_equals_date"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new DateType(), new VarCharType()); - - List<String> expected = Arrays.asList("+I(11323,2001-01-01)"); - - List<String> actualResult = testSingleTableSqlQueryWithOutputType(sqlQuery, outputType); - Collections.sort(expected); - Collections.sort(actualResult); - assertEquals(expected, actualResult); - } - - @Test - public void testTimeAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_time, field_varchar_equals_time FROM TestTable " - + "WHERE field_time = field_varchar_equals_time"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new TimeType(), new VarCharType()); - - List<String> expected = Arrays.asList("+I(0,00:00:00)"); - - List<String> actualResult = testSingleTableSqlQueryWithOutputType(sqlQuery, outputType); - Collections.sort(expected); - Collections.sort(actualResult); - assertEquals(expected, actualResult); - } - - @Test - public void testTimestampAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_timestamp, field_varchar_equals_timestamp FROM TestTable " - + "WHERE field_timestamp = field_varchar_equals_timestamp"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new TimestampType(), new VarCharType()); - - List<String> expected = Arrays.asList("+I(2001-01-01T00:00,2001-01-01 00:00:00)"); - - List<String> actualResult = testSingleTableSqlQueryWithOutputType(sqlQuery, outputType); - Collections.sort(expected); - Collections.sort(actualResult); - assertEquals(expected, actualResult); - } - - private String getFilterAndProjectionExceptionMessage(List<String> types) { - return String.format( - "implicit type conversion between " + "%s and %s" + " is not supported now", - types.get(0), types.get(1)); - } - - private void testSingleTableInvalidImplicitConversionTypes( - String sqlQuery, InternalTypeInfo<RowData> outputType, List<String> types) { - expectedException().expect(CodeGenException.class); - expectedException().expectMessage(getFilterAndProjectionExceptionMessage(types)); - testSingleTableSqlQueryWithOutputType(sqlQuery, outputType); - } - - @Test - public void testInvalidTinyIntAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_tinyint, field_varchar_equals_numeric FROM TestTable " - + "WHERE field_tinyint = field_varchar_equals_numeric"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new TinyIntType(), new VarCharType()); - - testSingleTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("TINYINT", "VARCHAR")); - } - - @Test - public void testInvalidSmallIntAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_smallint, field_varchar_equals_numeric FROM TestTable " - + "WHERE field_smallint = field_varchar_equals_numeric"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new SmallIntType(), new VarCharType()); - - testSingleTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("SMALLINT", "VARCHAR")); - } - - @Test - public void testInvalidIntAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_int, field_varchar_equals_numeric FROM TestTable " - + "WHERE field_int = field_varchar_equals_numeric"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new IntType(), new VarCharType()); - - testSingleTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("INTEGER", "VARCHAR")); - } - - @Test - public void testInvalidBigIntAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_bigint, field_varchar_equals_numeric FROM TestTable " - + "WHERE field_bigint = field_varchar_equals_numeric"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new BigIntType(), new VarCharType()); - - testSingleTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("BIGINT", "VARCHAR")); - } - - @Test - public void testInvalidDecimalAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_decimal, field_varchar_equals_numeric FROM TestTable " - + "WHERE field_decimal = field_varchar_equals_numeric"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new DecimalType(), new VarCharType()); - - testSingleTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("DECIMAL", "VARCHAR")); - } - - @Test - public void testInvalidFloatAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_float, field_varchar_equals_numeric FROM TestTable " - + "WHERE field_float = field_varchar_equals_numeric"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new FloatType(), new VarCharType()); - - testSingleTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("FLOAT", "VARCHAR")); - } - - @Test - public void testInvalidDoubleAndVarCharConversionInFilter() { - String sqlQuery = - "SELECT field_double, field_varchar_equals_numeric FROM TestTable " - + "WHERE field_double = field_varchar_equals_numeric"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new DoubleType(), new VarCharType()); - - testSingleTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("DOUBLE", "VARCHAR")); - } - - @Test - public void testFloatAndDoubleConversionInProjection() { - String sqlQuery = - "SELECT field_float, field_double, field_float = field_double FROM TestTable"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new FloatType(), new DoubleType(), new BooleanType()); - - List<String> expected = Arrays.asList("+I(1.0,1.0,true)"); - - List<String> actualResult = testSingleTableSqlQueryWithOutputType(sqlQuery, outputType); - Collections.sort(expected); - Collections.sort(actualResult); - assertEquals(expected, actualResult); - } - - @Test - public void testDateAndVarCharConversionInProjection() { - String sqlQuery = - "SELECT field_date, field_varchar_equals_date, " - + "field_date = field_varchar_equals_date FROM TestTable"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new DateType(), new VarCharType(), new BooleanType()); - - List<String> expected = Arrays.asList("+I(11323,2001-01-01,true)"); - - List<String> actualResult = testSingleTableSqlQueryWithOutputType(sqlQuery, outputType); - Collections.sort(expected); - Collections.sort(actualResult); - assertEquals(expected, actualResult); - } - - @Test - public void testInvalidDecimalAndVarCharConversionInProjection() { - String sqlQuery = - "SELECT field_decimal, field_varchar_equals_numeric, " - + "field_decimal = field_varchar_equals_numeric FROM TestTable"; - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new DecimalType(), new VarCharType(), new BooleanType()); - - testSingleTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("DECIMAL", "VARCHAR")); - } - - private void registerTableA() { - GenericRowData rowDataA = new GenericRowData(6); - rowDataA.setField(0, 1); - rowDataA.setField(1, 1); - rowDataA.setField(2, 1); - int date = (int) LocalDate.parse("2001-01-01").toEpochDay(); - rowDataA.setField(3, date); - int time = (int) (LocalTime.parse("00:00:00").toNanoOfDay() / 1000000L); - rowDataA.setField(4, time); - TimestampData timestamp = - TimestampData.fromLocalDateTime(LocalDateTime.parse("2001-01-01T00:00:00")); - rowDataA.setField(5, timestamp); - List<RowData> dataA = Arrays.asList(rowDataA); - - TypeInformation<RowData> tpeA = - InternalTypeInfo.ofFields( - new IntType(), - new IntType(), - new IntType(), - new DateType(), - new TimeType(), - new TimestampType()); - - DataStream dsA = env().fromCollection(JavaScalaConversionUtil.toScala(dataA), tpeA); - DataStreamConversions conversions = new DataStreamConversions(dsA, tpeA); - - List<UnresolvedReferenceExpression> fields = - Arrays.asList( - unresolvedRef("a1"), - unresolvedRef("a2"), - unresolvedRef("a3"), - unresolvedRef("a4"), - unresolvedRef("a5"), - unresolvedRef("a6")); - - Table tableA = conversions.toTable(tEnv(), JavaScalaConversionUtil.toScala(fields)); - - tEnv().registerTable("A", tableA); - } - - private void registerTableB() { - GenericRowData rowDataB = new GenericRowData(6); - rowDataB.setField(0, 1); - rowDataB.setField(1, (long) 1); - rowDataB.setField(2, StringData.fromString("1")); - rowDataB.setField(3, StringData.fromString("2001-01-01")); - rowDataB.setField(4, StringData.fromString("00:00:00")); - rowDataB.setField(5, StringData.fromString("2001-01-01 00:00:00")); - List<RowData> dataB = Arrays.asList(rowDataB); - - TypeInformation<RowData> tpeB = - InternalTypeInfo.ofFields( - new IntType(), - new BigIntType(), - new VarCharType(), - new VarCharType(), - new VarCharType(), - new VarCharType()); - - DataStream dsB = env().fromCollection(JavaScalaConversionUtil.toScala(dataB), tpeB); - DataStreamConversions conversions = new DataStreamConversions(dsB, tpeB); - - List<UnresolvedReferenceExpression> fields = - Arrays.asList( - unresolvedRef("b1"), - unresolvedRef("b2"), - unresolvedRef("b3"), - unresolvedRef("b4"), - unresolvedRef("b5"), - unresolvedRef("b6")); - - Table tableB = conversions.toTable(tEnv(), JavaScalaConversionUtil.toScala(fields)); - - tEnv().registerTable("B", tableB); - } - - private void testTwoTableJoinSqlQuery(String sqlQuery, InternalTypeInfo<RowData> outputType) { - registerTableA(); - registerTableB(); - - Table resultTable = tEnv().sqlQuery(sqlQuery); - DataStream result = tEnv().toAppendStream(resultTable, outputType); - TestingAppendRowDataSink sink = new TestingAppendRowDataSink(outputType); - result.addSink(sink); - env().execute(); - - List<String> expected = Arrays.asList("+I(1,1)"); - - List<String> actualResult = - new ArrayList<>(JavaScalaConversionUtil.toJava(sink.getAppendResults())); - - Collections.sort(expected); - Collections.sort(actualResult); - assertEquals(expected, actualResult); - } - - private void testTwoTableInvalidImplicitConversionTypes( - String sqlQuery, InternalTypeInfo<RowData> outputType, List<String> types) { - expectedException().expect(TableException.class); - expectedException().expectMessage(getJoinOnExceptionMessage(types)); - testTwoTableJoinSqlQuery(sqlQuery, outputType); - } - - private String getJoinOnExceptionMessage(List<String> types) { - return String.format( - "implicit type conversion between " - + "%s and %s" - + " is not supported on join's condition now", - types.get(0), types.get(1)); - } - - @Test - public void testIntAndBigIntConversionInJoinOn() { - String sqlQuery = "SELECT a1, b1 from A join B on a2 = b2"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new IntType(), new IntType()); - - testTwoTableJoinSqlQuery(sqlQuery, outputType); - } - - @Test - public void testInvalidIntAndVarCharConversionInJoinOn() { - String sqlQuery = "SELECT a1, b1 from A join B on a3 = b3"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new IntType(), new IntType()); - - testTwoTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("INTEGER", "VARCHAR(1)")); - } - - @Test - public void testInvalidDateAndVarCharConversionInJoinOn() { - String sqlQuery = "SELECT a1, b1 from A join B on a4 = b4"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new IntType(), new IntType()); - - testTwoTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("DATE", "VARCHAR(1)")); - } - - @Test - public void testInvalidTimeAndVarCharConversionInJoinOn() { - String sqlQuery = "SELECT a1, b1 from A join B on a5 = b5"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new IntType(), new IntType()); - - testTwoTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("TIME(0)", "VARCHAR(1)")); - } - - @Test - public void testInvalidTimestampAndVarCharConversionInJoinOn() { - String sqlQuery = "SELECT a1, b1 from A join B on a6 = b6"; - - InternalTypeInfo<RowData> outputType = - InternalTypeInfo.ofFields(new IntType(), new IntType()); - - testTwoTableInvalidImplicitConversionTypes( - sqlQuery, outputType, Arrays.asList("TIMESTAMP(6)", "VARCHAR(1)")); - } -} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala index 2e6eb67..259cfcd 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.expressions import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.planner.codegen.CodeGenException import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase import org.apache.flink.types.Row @@ -77,163 +76,6 @@ class SqlExpressionTest extends ExpressionTestBase { } @Test - def testValidImplicitTypeConversion: Unit = { - // implicit type conversion between tinyint and others - testSqlApi("cast(1 as tinyint) = cast(1 as smallint)","true") - testSqlApi("cast(1 as tinyint) = cast(1 as int)","true") - testSqlApi("cast(1 as tinyint) = cast(1 as bigint)","true") - testSqlApi("cast(1 as tinyint) = cast(1 as decimal)","true") - testSqlApi("cast(1 as tinyint) = cast(1 as float)","true") - testSqlApi("cast(1 as tinyint) = cast(1 as double)","true") - testSqlApi("cast(1 as tinyint) = cast(2 as int)","false") - testSqlApi("cast(1 as tinyint) = cast(2 as bigint)","false") - testSqlApi("cast(1 as tinyint) = cast(2 as decimal)","false") - testSqlApi("cast(1 as tinyint) = cast(2.1 as float)","false") - testSqlApi("cast(1 as tinyint) = cast(2.1 as double)","false") - - // implicit type conversion between smallint and others - testSqlApi("cast(1 as smallint) = cast(1 as int)","true") - testSqlApi("cast(1 as smallint) = cast(1 as bigint)","true") - testSqlApi("cast(1 as smallint) = cast(1 as decimal)","true") - testSqlApi("cast(1 as smallint) = cast(1 as float)","true") - testSqlApi("cast(1 as smallint) = cast(1 as double)","true") - testSqlApi("cast(1 as smallint) = cast(2 as int)","false") - testSqlApi("cast(1 as smallint) = cast(2 as bigint)","false") - testSqlApi("cast(1 as smallint) = cast(2 as decimal)","false") - testSqlApi("cast(1 as smallint) = cast(2.1 as float)","false") - testSqlApi("cast(1 as smallint) = cast(2.1 as double)","false") - - // implicit type conversion between int and others - testSqlApi("cast(1 as int) = cast(1 as bigint)","true") - testSqlApi("cast(1 as int) = cast(1 as decimal)","true") - testSqlApi("cast(1 as int) = cast(1 as float)","true") - testSqlApi("cast(1 as int) = cast(1 as double)","true") - testSqlApi("cast(1 as int) = cast(2 as bigint)","false") - testSqlApi("cast(1 as int) = cast(2 as decimal)","false") - testSqlApi("cast(1 as int) = cast(2.1 as float)","false") - testSqlApi("cast(1 as int) = cast(2.1 as double)","false") - - // implicit type conversion between bigint and others - testSqlApi("cast(1 as bigint) = cast(1 as decimal)","true") - testSqlApi("cast(1 as bigint) = cast(1 as float)","true") - testSqlApi("cast(1 as bigint) = cast(1 as double)","true") - testSqlApi("cast(1 as bigint) = cast(2 as decimal)","false") - testSqlApi("cast(1 as bigint) = cast(2.1 as float)","false") - testSqlApi("cast(1 as bigint) = cast(2.1 as double)","false") - - // implicit type conversion between decimal and others - testSqlApi("cast(1 as decimal) = cast(1 as float)","true") - testSqlApi("cast(1 as decimal) = cast(1 as double)","true") - testSqlApi("cast(1 as decimal) = cast(2.1 as float)","false") - testSqlApi("cast(1 as decimal) = cast(2.1 as double)","false") - - // implicit type conversion between float and others - testSqlApi("cast(1 as float) = cast(1 as double)","true") - testSqlApi("cast(1 as float) = cast(2.1 as double)","false") - - // implicit type conversion between date and varchar - testSqlApi("cast('2001-01-01' as date) = cast('2001-01-01' as varchar)","true") - testSqlApi("cast('2001-01-01' as date) = cast('2001-01-02' as varchar)","false") - - // implicit type conversion between date and char - testSqlApi("cast('2001-01-01' as date) = cast('2001-01-01' as char)","true") - testSqlApi("cast('2001-01-01' as date) = cast('2001-01-02' as char)","false") - - // implicit type conversion between time and char - testSqlApi("cast('00:00:00.000000000' as time) = cast('00:00:00.000000000' as varchar)","true") - testSqlApi("cast('00:00:00.000000000' as time) = cast('00:00:01.000000000' as varchar)","false") - - // implicit type conversion between time and char - testSqlApi("cast('2001-01-01 12:12:12.123' as timestamp)" + - " = cast('2001-01-01 12:12:12.1230' as varchar)","true") - testSqlApi("cast('1990-10-14 12:12:12.123' as timestamp)" + - " = cast('1990-10-15 12:12:12.123' as varchar)","false") - } - - @Test - def testInValidImplicitTypeConversion: Unit = { - val typeChar = "CHAR" - val typeVarChar = "VARCHAR" - - val typeTinyInt = "TINYINT" - val typeSmallInt = "SMALLINT" - val typeInt = "INTEGER" - val typeBigInt = "BIGINT" - val typeDecimal = "DECIMAL" - val typeFloat = "FLOAT" - val typeDouble = "DOUBLE" - - // implicit type conversion between char and others - testInvalidImplicitConversionTypes( - "'1' = cast(1 as tinyint)", - List(typeChar,typeTinyInt)) - - testInvalidImplicitConversionTypes( - "'1' = cast(1 as smallint)", - List(typeChar,typeSmallInt)) - - testInvalidImplicitConversionTypes( - "'1' = cast(1 as int)", - List(typeChar,typeInt)) - - testInvalidImplicitConversionTypes( - "'1' = cast(1 as bigint)", - List(typeChar,typeBigInt)) - - testInvalidImplicitConversionTypes( - "'1.1' = cast(1.1 as decimal(2, 1))", - List(typeChar,typeDecimal)) - - testInvalidImplicitConversionTypes( - "'1.1' = cast(1.1 as float)", - List(typeChar,typeFloat)) - - testInvalidImplicitConversionTypes( - "'1.1' = cast(1.1 as double)", - List(typeChar,typeDouble)) - - // implicit type conversion between varchar and others - testInvalidImplicitConversionTypes( - "cast('1' as varchar) = cast(1 as tinyint)", - List(typeVarChar,typeTinyInt)) - - testInvalidImplicitConversionTypes( - "cast('1' as varchar) = cast(1 as smallint)", - List(typeVarChar,typeSmallInt)) - - testInvalidImplicitConversionTypes( - - "cast('1' as varchar) = cast(1 as int)", - List(typeVarChar,typeInt)) - - testInvalidImplicitConversionTypes( - "cast('1' as varchar) = cast(1 as bigint)", - List(typeVarChar,typeBigInt)) - - testInvalidImplicitConversionTypes( - "cast('1.1' as varchar) = cast(1.1 as decimal(2, 1))", - List(typeVarChar,typeDecimal)) - - testInvalidImplicitConversionTypes( - "cast('1.1' as varchar) = cast(1.1 as float)", - List(typeVarChar,typeFloat)) - - testInvalidImplicitConversionTypes( - "cast('1.1' as varchar) = cast(1.1 as double)", - List(typeVarChar,typeDouble)) - } - - private def testInvalidImplicitConversionTypes(sql: String, types: List[String]): Unit ={ - val expectedExceptionMessage = - "implicit type conversion between " + - s"${types(0)}" + - s" and " + - s"${types(1)}" + - s" is not supported now" - testExpectedSqlException(sql, expectedExceptionMessage, classOf[CodeGenException]) - } - - @Test def testLogicalFunctions(): Unit = { testSqlApi("TRUE OR FALSE", "true") testSqlApi("TRUE AND FALSE", "false")