matriv commented on a change in pull request #18611: URL: https://github.com/apache/flink/pull/18611#discussion_r798523671
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java ########## @@ -476,6 +477,15 @@ public OutType cast(DataType toType) { return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), typeLiteral(toType))); } + /** + * Converts a value to a given data type. + * Review comment: Please add the non-failure but `null` return vs normal `cast` ########## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java ########## @@ -117,7 +117,7 @@ private FlinkAssertions() {} * .hasMessageContaining(containsMessage)); * }</pre> */ - public static ThrowingConsumer<? extends Throwable> anyCauseMatches(String containsMessage) { + public static ThrowingConsumer<? super Throwable> anyCauseMatches(String containsMessage) { Review comment: why `super` ? ########## File path: docs/data/sql_functions.yml ########## @@ -561,7 +561,10 @@ conditional: conversion: - sql: CAST(value AS type) table: ANY.cast(TYPE) - description: Returns a new value being cast to type type. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS VARCHAR) returns NULL of type VARCHAR. + description: Returns a new value being cast to type type. A CAST error throws an exception and fails the job. If you're performing a cast operation that may fail, like INT to STRING, you should rather use TRY_CAST, in order to handle errors. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS VARCHAR) returns NULL of type VARCHAR; TRY_CAST('non-number' AS INT) throws an exception and fails the job. Review comment: I think we need to refer to the LEGACY BEHAVIOUR config param in `execution_config_configuration.html` to be 100% clear to the user. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java ########## @@ -144,58 +143,69 @@ private static void testResult( final TableResult result = resultTable.execute(); final Iterator<Row> iterator = result.collect(); - assertTrue(iterator.hasNext()); + assertThat(iterator).hasNext(); final Row row = iterator.next(); - assertFalse("No more rows expected.", iterator.hasNext()); + assertThat(iterator).as("No more rows expected.").isExhausted(); for (int i = 0; i < row.getArity(); i++) { - assertEquals( - "Logical type for spec [" + i + "] of test [" + testItem + "] doesn't match.", - expectedDataTypes.get(i).getLogicalType(), - result.getResolvedSchema().getColumnDataTypes().get(i).getLogicalType()); - - assertEquals( - "Result for spec [" + i + "] of test [" + testItem + "] doesn't match.", - // Use Row.equals() to enable equality for complex structure, i.e. byte[] - Row.of(testItem.results.get(i)), - Row.of(row.getField(i))); + assertThat(result.getResolvedSchema().getColumnDataTypes().get(i).getLogicalType()) + .as( + "Logical type for spec [" Review comment: nit: I would try to compact this to less line breaks. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java ########## @@ -144,58 +143,69 @@ private static void testResult( final TableResult result = resultTable.execute(); final Iterator<Row> iterator = result.collect(); - assertTrue(iterator.hasNext()); + assertThat(iterator).hasNext(); final Row row = iterator.next(); - assertFalse("No more rows expected.", iterator.hasNext()); + assertThat(iterator).as("No more rows expected.").isExhausted(); for (int i = 0; i < row.getArity(); i++) { - assertEquals( - "Logical type for spec [" + i + "] of test [" + testItem + "] doesn't match.", - expectedDataTypes.get(i).getLogicalType(), - result.getResolvedSchema().getColumnDataTypes().get(i).getLogicalType()); - - assertEquals( - "Result for spec [" + i + "] of test [" + testItem + "] doesn't match.", - // Use Row.equals() to enable equality for complex structure, i.e. byte[] - Row.of(testItem.results.get(i)), - Row.of(row.getField(i))); + assertThat(result.getResolvedSchema().getColumnDataTypes().get(i).getLogicalType()) + .as( + "Logical type for spec [" + + i + + "] of test [" + + testItem + + "] doesn't match.") + .isEqualTo(expectedDataTypes.get(i).getLogicalType()); + + assertThat(Row.of(row.getField(i))) + .as("Result for spec [" + i + "] of test [" + testItem + "] doesn't match.") + .isEqualTo( + // Use Row.equals() to enable equality for complex structure, i.e. + // byte[] + Row.of(testItem.results.get(i))); } } private static void testError( TableEnvironment env, Table inputTable, ErrorTestItem<?> testItem) { - try { - final TableResult tableResult; - if (testItem instanceof TableApiErrorTestItem) { - tableResult = - inputTable - .select(((TableApiErrorTestItem) testItem).expression()) - .execute(); - } else { - tableResult = - env.sqlQuery("SELECT " + testItem.expression() + " FROM " + inputTable) - .execute(); - } - if (testItem.expectedDuringValidation) { - fail("Error expected: " + testItem.errorMessage); - } - - try { - tableResult.await(); - fail("Error expected: " + testItem.errorMessage); - } catch (AssertionError e) { - throw e; - } catch (Throwable t) { - assertThat("Wrong error message", t, containsMessage(testItem.errorMessage)); - } - } catch (AssertionError e) { - throw e; - } catch (Throwable t) { - assertTrue(t instanceof ValidationException); - assertThat(t.getMessage(), containsString(testItem.errorMessage)); + AtomicReference<TableResult> tableResult = new AtomicReference<>(); + + Throwable t = + catchThrowable( + () -> { + if (testItem instanceof TableApiErrorTestItem) { + tableResult.set( Review comment: This and the else block, maybe extract them to methods to improve readability. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java ########## @@ -77,4 +82,18 @@ void testResolveConstructedToString() { assertThat(CastRuleProvider.resolve(new ArrayType(INT), new VarCharType(10))) .isSameAs(ArrayToStringCastRule.INSTANCE); } + + @Test + void testCanFail() { + LogicalType inputType = ROW(INT(), STRING()).getLogicalType(); + LogicalType fallibleTargetType = ROW(TINYINT(), TIME()).getLogicalType(); + LogicalType infallibleTargetType = ROW(TINYINT(), STRING()).getLogicalType(); + + assertThat(CastRuleProvider.canFail(INT, TINYINT)).isFalse(); Review comment: I would inverse it, since soon this can fail (overflow). ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java ########## @@ -1245,13 +1253,23 @@ private CastTestSpecBuilder fail(DataType dataType, Object src) { return fail(TestType.ERROR_SQL, dataType, src); } + private CastTestSpecBuilder failRuntime( + DataType dataType, Object src, Class<? extends Throwable> failureClass) { + this.testTypes.add(TestType.ERROR_RUNTIME); + this.columnTypes.add(dataType); + this.columnData.add(src); + this.expectedValues.add(failureClass); + return this; + } + private CastTestSpecBuilder fail(TestType type, DataType dataType, Object src) { Review comment: Maybe rename this to `failValidation`? ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java ########## @@ -77,4 +82,18 @@ void testResolveConstructedToString() { assertThat(CastRuleProvider.resolve(new ArrayType(INT), new VarCharType(10))) .isSameAs(ArrayToStringCastRule.INSTANCE); } + + @Test + void testCanFail() { + LogicalType inputType = ROW(INT(), STRING()).getLogicalType(); Review comment: here and below: `INT()` and `TINYINT()` are defined as `INT` and `TINYINT`. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java ########## @@ -206,7 +209,48 @@ protected Configuration configuration() { .testSqlResult( "CAST(CAST(x'68656C6C6F2063617374' AS BINARY(10)) AS VARCHAR)", "68656c6c6f2063617374", - STRING().notNull())); + STRING().notNull()), + TestSpec.forFunction( + BuiltInFunctionDefinitions.TRY_CAST, "try cast from STRING to TIME") + .onFieldsWithData("Flink", "12:34:56") + .andDataTypes(STRING(), STRING()) + .testResult( + $("f0").tryCast(TIME()), + "TRY_CAST(f0 AS TIME)", + null, + TIME().nullable()) + .testResult( + $("f1").tryCast(TIME()), + "TRY_CAST(f1 AS TIME)", + LocalTime.of(12, 34, 56, 0), + TIME().nullable()), + TestSpec.forFunction( + BuiltInFunctionDefinitions.TRY_CAST, + "try cast from TIME NOT NULL to STRING NOT NULL") + .onFieldsWithData(LocalTime.parse("12:34:56")) + .andDataTypes(TIME().notNull()) + .testResult( + $("f0").tryCast(STRING()), + "TRY_CAST(f0 AS STRING)", + "12:34:56", + // Because TIME to STRING cannot fail, the type inference should Review comment: ```suggestion // Because TIME to STRING cannot fail, the type inference should be ``` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.validate.SqlValidatorImpl; +import org.apache.calcite.sql2rel.SqlRexContext; +import org.apache.calcite.sql2rel.SqlRexConvertlet; +import org.apache.calcite.sql2rel.SqlRexConvertletTable; +import org.apache.calcite.sql2rel.StandardConvertletTable; + +import java.util.Collections; + +/** + * Custom Flink {@link SqlRexConvertletTable} to add custom {@link SqlNode} to {@link RexNode} + * conversions. + */ +@Internal +public class FlinkConvertletTable implements SqlRexConvertletTable { + + public static final FlinkConvertletTable INSTANCE = new FlinkConvertletTable(); + + private FlinkConvertletTable() {} + + @Override + public SqlRexConvertlet get(SqlCall call) { + if (call.getOperator().isName("TRY_CAST", false)) { + return this::convertTryCast; + } + return StandardConvertletTable.INSTANCE.get(call); + } + + // Slightly modified version of StandardConvertletTable::convertCast + private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) { + RelDataTypeFactory typeFactory = cx.getTypeFactory(); + final SqlNode leftNode = call.operand(0); + final SqlNode rightNode = call.operand(1); + + final RexNode valueRex = cx.convertExpression(leftNode); + + RelDataType type; + if (rightNode instanceof SqlIntervalQualifier) { + type = typeFactory.createSqlIntervalType((SqlIntervalQualifier) rightNode); + } else if (rightNode instanceof SqlDataTypeSpec) { + SqlDataTypeSpec dataType = ((SqlDataTypeSpec) rightNode); + type = dataType.deriveType(cx.getValidator()); + if (type == null) { + type = cx.getValidator().getValidatedNodeType(dataType.getTypeName()); + } + } else { + throw new IllegalStateException( + "Invalid right argument type for TRY_CAST: " + rightNode); + } + + final LogicalType fromLogicalType = FlinkTypeFactory.toLogicalType(valueRex.getType()); + final LogicalType toLogicalType = FlinkTypeFactory.toLogicalType(type); + + // This is nullable only and only if the cast rule can fail Review comment: just `only` would be enough I think. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java ########## @@ -33,7 +39,11 @@ public CastRulePredicate getPredicateDefinition() { } @Override - public boolean canFail() { - return false; + public boolean canFail(LogicalType inputLogicalType, LogicalType targetLogicalType) { + final List<LogicalType> inputFields = LogicalTypeChecks.getFieldTypes(inputLogicalType); + final List<LogicalType> targetFields = LogicalTypeChecks.getFieldTypes(targetLogicalType); + + return IntStream.range(0, Math.min(inputFields.size(), targetFields.size())) Review comment: To my understanding this is not correct, don't we need to check all the combinations of each of the inputFields with all the targetFields (nested loop)? ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java ########## @@ -77,4 +82,18 @@ void testResolveConstructedToString() { assertThat(CastRuleProvider.resolve(new ArrayType(INT), new VarCharType(10))) .isSameAs(ArrayToStringCastRule.INSTANCE); } + + @Test + void testCanFail() { + LogicalType inputType = ROW(INT(), STRING()).getLogicalType(); + LogicalType fallibleTargetType = ROW(TINYINT(), TIME()).getLogicalType(); Review comment: nit and personal preference: since they are use only once, maybe don't assign them to variables, and maybe just assign the `inputType` on top of the relevant assertions. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java ########## @@ -1245,13 +1253,23 @@ private CastTestSpecBuilder fail(DataType dataType, Object src) { return fail(TestType.ERROR_SQL, dataType, src); } + private CastTestSpecBuilder failRuntime( + DataType dataType, Object src, Class<? extends Throwable> failureClass) { + this.testTypes.add(TestType.ERROR_RUNTIME); + this.columnTypes.add(dataType); + this.columnData.add(src); + this.expectedValues.add(failureClass); + return this; + } + private CastTestSpecBuilder fail(TestType type, DataType dataType, Object src) { this.testTypes.add(type); this.columnTypes.add(dataType); this.columnData.add(src); return this; } + @SuppressWarnings("unchecked") Review comment: nit: maybe move it on top of the statements with the cast to `Class<? extends Throwable>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org