matriv commented on a change in pull request #18611:
URL: https://github.com/apache/flink/pull/18611#discussion_r798523671



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
##########
@@ -476,6 +477,15 @@ public OutType cast(DataType toType) {
         return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), 
typeLiteral(toType)));
     }
 
+    /**
+     * Converts a value to a given data type.
+     *

Review comment:
       Please add the non-failure but `null` return vs normal `cast`

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
##########
@@ -117,7 +117,7 @@ private FlinkAssertions() {}
      *                  .hasMessageContaining(containsMessage));
      * }</pre>
      */
-    public static ThrowingConsumer<? extends Throwable> anyCauseMatches(String 
containsMessage) {
+    public static ThrowingConsumer<? super Throwable> anyCauseMatches(String 
containsMessage) {

Review comment:
       why `super` ?

##########
File path: docs/data/sql_functions.yml
##########
@@ -561,7 +561,10 @@ conditional:
 conversion:
   - sql: CAST(value AS type)
     table: ANY.cast(TYPE)
-    description: Returns a new value being cast to type type. E.g., CAST('42' 
AS INT) returns 42; CAST(NULL AS VARCHAR) returns NULL of type VARCHAR.
+    description: Returns a new value being cast to type type. A CAST error 
throws an exception and fails the job. If you're performing a cast operation 
that may fail, like INT to STRING, you should rather use TRY_CAST, in order to 
handle errors. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS VARCHAR) 
returns NULL of type VARCHAR; TRY_CAST('non-number' AS INT) throws an exception 
and fails the job.

Review comment:
       I think we need to refer to the LEGACY BEHAVIOUR config param in 
`execution_config_configuration.html` to be 100% clear to the user.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
##########
@@ -144,58 +143,69 @@ private static void testResult(
         final TableResult result = resultTable.execute();
         final Iterator<Row> iterator = result.collect();
 
-        assertTrue(iterator.hasNext());
+        assertThat(iterator).hasNext();
 
         final Row row = iterator.next();
 
-        assertFalse("No more rows expected.", iterator.hasNext());
+        assertThat(iterator).as("No more rows expected.").isExhausted();
 
         for (int i = 0; i < row.getArity(); i++) {
-            assertEquals(
-                    "Logical type for spec [" + i + "] of test [" + testItem + 
"] doesn't match.",
-                    expectedDataTypes.get(i).getLogicalType(),
-                    
result.getResolvedSchema().getColumnDataTypes().get(i).getLogicalType());
-
-            assertEquals(
-                    "Result for spec [" + i + "] of test [" + testItem + "] 
doesn't match.",
-                    // Use Row.equals() to enable equality for complex 
structure, i.e. byte[]
-                    Row.of(testItem.results.get(i)),
-                    Row.of(row.getField(i)));
+            
assertThat(result.getResolvedSchema().getColumnDataTypes().get(i).getLogicalType())
+                    .as(
+                            "Logical type for spec ["

Review comment:
       nit: I would try to compact this to less line breaks.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
##########
@@ -144,58 +143,69 @@ private static void testResult(
         final TableResult result = resultTable.execute();
         final Iterator<Row> iterator = result.collect();
 
-        assertTrue(iterator.hasNext());
+        assertThat(iterator).hasNext();
 
         final Row row = iterator.next();
 
-        assertFalse("No more rows expected.", iterator.hasNext());
+        assertThat(iterator).as("No more rows expected.").isExhausted();
 
         for (int i = 0; i < row.getArity(); i++) {
-            assertEquals(
-                    "Logical type for spec [" + i + "] of test [" + testItem + 
"] doesn't match.",
-                    expectedDataTypes.get(i).getLogicalType(),
-                    
result.getResolvedSchema().getColumnDataTypes().get(i).getLogicalType());
-
-            assertEquals(
-                    "Result for spec [" + i + "] of test [" + testItem + "] 
doesn't match.",
-                    // Use Row.equals() to enable equality for complex 
structure, i.e. byte[]
-                    Row.of(testItem.results.get(i)),
-                    Row.of(row.getField(i)));
+            
assertThat(result.getResolvedSchema().getColumnDataTypes().get(i).getLogicalType())
+                    .as(
+                            "Logical type for spec ["
+                                    + i
+                                    + "] of test ["
+                                    + testItem
+                                    + "] doesn't match.")
+                    .isEqualTo(expectedDataTypes.get(i).getLogicalType());
+
+            assertThat(Row.of(row.getField(i)))
+                    .as("Result for spec [" + i + "] of test [" + testItem + 
"] doesn't match.")
+                    .isEqualTo(
+                            // Use Row.equals() to enable equality for complex 
structure, i.e.
+                            // byte[]
+                            Row.of(testItem.results.get(i)));
         }
     }
 
     private static void testError(
             TableEnvironment env, Table inputTable, ErrorTestItem<?> testItem) 
{
-        try {
-            final TableResult tableResult;
-            if (testItem instanceof TableApiErrorTestItem) {
-                tableResult =
-                        inputTable
-                                .select(((TableApiErrorTestItem) 
testItem).expression())
-                                .execute();
-            } else {
-                tableResult =
-                        env.sqlQuery("SELECT " + testItem.expression() + " 
FROM " + inputTable)
-                                .execute();
-            }
-            if (testItem.expectedDuringValidation) {
-                fail("Error expected: " + testItem.errorMessage);
-            }
-
-            try {
-                tableResult.await();
-                fail("Error expected: " + testItem.errorMessage);
-            } catch (AssertionError e) {
-                throw e;
-            } catch (Throwable t) {
-                assertThat("Wrong error message", t, 
containsMessage(testItem.errorMessage));
-            }
-        } catch (AssertionError e) {
-            throw e;
-        } catch (Throwable t) {
-            assertTrue(t instanceof ValidationException);
-            assertThat(t.getMessage(), containsString(testItem.errorMessage));
+        AtomicReference<TableResult> tableResult = new AtomicReference<>();
+
+        Throwable t =
+                catchThrowable(
+                        () -> {
+                            if (testItem instanceof TableApiErrorTestItem) {
+                                tableResult.set(

Review comment:
       This and the else block, maybe extract them to methods to improve 
readability.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java
##########
@@ -77,4 +82,18 @@ void testResolveConstructedToString() {
         assertThat(CastRuleProvider.resolve(new ArrayType(INT), new 
VarCharType(10)))
                 .isSameAs(ArrayToStringCastRule.INSTANCE);
     }
+
+    @Test
+    void testCanFail() {
+        LogicalType inputType = ROW(INT(), STRING()).getLogicalType();
+        LogicalType fallibleTargetType = ROW(TINYINT(), 
TIME()).getLogicalType();
+        LogicalType infallibleTargetType = ROW(TINYINT(), 
STRING()).getLogicalType();
+
+        assertThat(CastRuleProvider.canFail(INT, TINYINT)).isFalse();

Review comment:
       I would inverse it, since soon this can fail (overflow).

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -1245,13 +1253,23 @@ private CastTestSpecBuilder fail(DataType dataType, 
Object src) {
             return fail(TestType.ERROR_SQL, dataType, src);
         }
 
+        private CastTestSpecBuilder failRuntime(
+                DataType dataType, Object src, Class<? extends Throwable> 
failureClass) {
+            this.testTypes.add(TestType.ERROR_RUNTIME);
+            this.columnTypes.add(dataType);
+            this.columnData.add(src);
+            this.expectedValues.add(failureClass);
+            return this;
+        }
+
         private CastTestSpecBuilder fail(TestType type, DataType dataType, 
Object src) {

Review comment:
       Maybe rename this to `failValidation`?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java
##########
@@ -77,4 +82,18 @@ void testResolveConstructedToString() {
         assertThat(CastRuleProvider.resolve(new ArrayType(INT), new 
VarCharType(10)))
                 .isSameAs(ArrayToStringCastRule.INSTANCE);
     }
+
+    @Test
+    void testCanFail() {
+        LogicalType inputType = ROW(INT(), STRING()).getLogicalType();

Review comment:
       here and below: `INT()` and `TINYINT()` are defined as `INT` and 
`TINYINT`.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
##########
@@ -206,7 +209,48 @@ protected Configuration configuration() {
                         .testSqlResult(
                                 "CAST(CAST(x'68656C6C6F2063617374' AS 
BINARY(10)) AS VARCHAR)",
                                 "68656c6c6f2063617374",
-                                STRING().notNull()));
+                                STRING().notNull()),
+                TestSpec.forFunction(
+                                BuiltInFunctionDefinitions.TRY_CAST, "try cast 
from STRING to TIME")
+                        .onFieldsWithData("Flink", "12:34:56")
+                        .andDataTypes(STRING(), STRING())
+                        .testResult(
+                                $("f0").tryCast(TIME()),
+                                "TRY_CAST(f0 AS TIME)",
+                                null,
+                                TIME().nullable())
+                        .testResult(
+                                $("f1").tryCast(TIME()),
+                                "TRY_CAST(f1 AS TIME)",
+                                LocalTime.of(12, 34, 56, 0),
+                                TIME().nullable()),
+                TestSpec.forFunction(
+                                BuiltInFunctionDefinitions.TRY_CAST,
+                                "try cast from TIME NOT NULL to STRING NOT 
NULL")
+                        .onFieldsWithData(LocalTime.parse("12:34:56"))
+                        .andDataTypes(TIME().notNull())
+                        .testResult(
+                                $("f0").tryCast(STRING()),
+                                "TRY_CAST(f0 AS STRING)",
+                                "12:34:56",
+                                // Because TIME to STRING cannot fail, the 
type inference should

Review comment:
       ```suggestion
                                   // Because TIME to STRING cannot fail, the 
type inference should be
   ```

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql2rel.SqlRexContext;
+import org.apache.calcite.sql2rel.SqlRexConvertlet;
+import org.apache.calcite.sql2rel.SqlRexConvertletTable;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
+
+import java.util.Collections;
+
+/**
+ * Custom Flink {@link SqlRexConvertletTable} to add custom {@link SqlNode} to 
{@link RexNode}
+ * conversions.
+ */
+@Internal
+public class FlinkConvertletTable implements SqlRexConvertletTable {
+
+    public static final FlinkConvertletTable INSTANCE = new 
FlinkConvertletTable();
+
+    private FlinkConvertletTable() {}
+
+    @Override
+    public SqlRexConvertlet get(SqlCall call) {
+        if (call.getOperator().isName("TRY_CAST", false)) {
+            return this::convertTryCast;
+        }
+        return StandardConvertletTable.INSTANCE.get(call);
+    }
+
+    // Slightly modified version of StandardConvertletTable::convertCast
+    private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) {
+        RelDataTypeFactory typeFactory = cx.getTypeFactory();
+        final SqlNode leftNode = call.operand(0);
+        final SqlNode rightNode = call.operand(1);
+
+        final RexNode valueRex = cx.convertExpression(leftNode);
+
+        RelDataType type;
+        if (rightNode instanceof SqlIntervalQualifier) {
+            type = typeFactory.createSqlIntervalType((SqlIntervalQualifier) 
rightNode);
+        } else if (rightNode instanceof SqlDataTypeSpec) {
+            SqlDataTypeSpec dataType = ((SqlDataTypeSpec) rightNode);
+            type = dataType.deriveType(cx.getValidator());
+            if (type == null) {
+                type = 
cx.getValidator().getValidatedNodeType(dataType.getTypeName());
+            }
+        } else {
+            throw new IllegalStateException(
+                    "Invalid right argument type for TRY_CAST: " + rightNode);
+        }
+
+        final LogicalType fromLogicalType = 
FlinkTypeFactory.toLogicalType(valueRex.getType());
+        final LogicalType toLogicalType = FlinkTypeFactory.toLogicalType(type);
+
+        // This is nullable only and only if the cast rule can fail

Review comment:
       just `only` would be enough I think.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
##########
@@ -33,7 +39,11 @@ public CastRulePredicate getPredicateDefinition() {
     }
 
     @Override
-    public boolean canFail() {
-        return false;
+    public boolean canFail(LogicalType inputLogicalType, LogicalType 
targetLogicalType) {
+        final List<LogicalType> inputFields = 
LogicalTypeChecks.getFieldTypes(inputLogicalType);
+        final List<LogicalType> targetFields = 
LogicalTypeChecks.getFieldTypes(targetLogicalType);
+
+        return IntStream.range(0, Math.min(inputFields.size(), 
targetFields.size()))

Review comment:
       To my understanding this is not correct, don't we need to check all the 
combinations of each of the inputFields with all the targetFields (nested loop)?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java
##########
@@ -77,4 +82,18 @@ void testResolveConstructedToString() {
         assertThat(CastRuleProvider.resolve(new ArrayType(INT), new 
VarCharType(10)))
                 .isSameAs(ArrayToStringCastRule.INSTANCE);
     }
+
+    @Test
+    void testCanFail() {
+        LogicalType inputType = ROW(INT(), STRING()).getLogicalType();
+        LogicalType fallibleTargetType = ROW(TINYINT(), 
TIME()).getLogicalType();

Review comment:
       nit and personal preference: since they are use only once, maybe don't 
assign them to variables, and maybe just assign the `inputType` on top of the 
relevant assertions.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -1245,13 +1253,23 @@ private CastTestSpecBuilder fail(DataType dataType, 
Object src) {
             return fail(TestType.ERROR_SQL, dataType, src);
         }
 
+        private CastTestSpecBuilder failRuntime(
+                DataType dataType, Object src, Class<? extends Throwable> 
failureClass) {
+            this.testTypes.add(TestType.ERROR_RUNTIME);
+            this.columnTypes.add(dataType);
+            this.columnData.add(src);
+            this.expectedValues.add(failureClass);
+            return this;
+        }
+
         private CastTestSpecBuilder fail(TestType type, DataType dataType, 
Object src) {
             this.testTypes.add(type);
             this.columnTypes.add(dataType);
             this.columnData.add(src);
             return this;
         }
 
+        @SuppressWarnings("unchecked")

Review comment:
       nit: maybe move it on top of the statements with the cast to `Class<? 
extends Throwable>`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to