This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 92c02fc747f7794f2c20ac161ad5d7b9c0f2c0f8 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Mon Nov 15 13:39:51 2021 +0100 [FLINK-24781][table-planner] Added CastRule#canFail and make sure ScalarOperatorGens wraps the cast invocation in a try-catch Signed-off-by: slinkydeveloper <francescogu...@gmail.com> --- .../functions/casting/AbstractCastRule.java | 5 ++ .../AbstractExpressionCodeGeneratorCastRule.java | 2 + .../table/planner/functions/casting/CastRule.java | 2 + .../CodeGeneratedExpressionCastExecutor.java | 7 ++- .../planner/codegen/calls/ScalarOperatorGens.scala | 60 +++++++++++++++++----- 5 files changed, 63 insertions(+), 13 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java index c193139..840c8df 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java @@ -31,4 +31,9 @@ abstract class AbstractCastRule<IN, OUT> implements CastRule<IN, OUT> { public CastRulePredicate getPredicateDefinition() { return predicate; } + + @Override + public boolean canFail() { + return false; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java index 0b14ddc..aa0a50b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java @@ -25,7 +25,9 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; import java.util.Collections; +import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType; import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.box; +import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.cast; import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.unbox; /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java index e93effb..58217e4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java @@ -45,6 +45,8 @@ public interface CastRule<IN, OUT> { CastExecutor<IN, OUT> create( Context context, LogicalType inputLogicalType, LogicalType targetLogicalType); + boolean canFail(); + /** Casting context. */ interface Context { ZoneId getSessionZoneId(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java index c94db8d..f39089a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java @@ -53,7 +53,12 @@ class CodeGeneratedExpressionCastExecutor<IN, OUT> implements CastExecutor<IN, O inputArray[0] = value; return (OUT) expressionEvaluator.evaluate(inputArray); } catch (InvocationTargetException e) { - throw new FlinkRuntimeException("Cannot execute the compiled expression", e); + if (e.getCause() instanceof TableException) { + // Expected exception created by the rule, so no need to wrap it + throw (TableException) e.getCause(); + } + throw new TableException( + "Cannot execute the compiled expression for an unknown cause", e); } } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala index 6a7fbb2..0cb0bea 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.codegen.calls -import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.data.binary.BinaryArrayData import org.apache.flink.table.planner.functions.casting.{CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule} import org.apache.flink.table.data.util.MapDataUtil @@ -953,17 +953,53 @@ object ScalarOperatorGens { targetType ) - val castCode = s"\n" + - s"// --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}\n" + - s"${castCodeBlock.getCode}" + - s"// --- End cast section\n" - - return GeneratedExpression( - castCodeBlock.getReturnTerm, - castCodeBlock.getIsNullTerm, - operand.code + castCode, - targetType - ) + if (codeGeneratorCastRule.canFail) { + val resultTerm = ctx.addReusableLocalVariable( + primitiveTypeTermForType(targetType), + "castRuleResult" + ) + val nullTerm = ctx.addReusableLocalVariable( + "boolean", + "castRuleResultIsNull" + ) + + // TODO this code belongs to TRY_CAST, more than to CAST. + // See https://issues.apache.org/jira/browse/FLINK-24385 for more details + val castCode = + s""" + | // --- Cast section generated by ${className(codeGeneratorCastRule.getClass)} + | try { + | ${castCodeBlock.getCode} + | $resultTerm = ${castCodeBlock.getReturnTerm}; + | $nullTerm = ${castCodeBlock.getIsNullTerm}; + | } catch (${className[Throwable]} e) { + | $resultTerm = ${primitiveDefaultValue(targetType)}; + | $nullTerm = true; + | } + | // --- End cast section + """.stripMargin + + return GeneratedExpression( + resultTerm, + nullTerm, + operand.code + "\n" + castCode, + targetType + ) + } else { + val castCode = + s""" + | // --- Cast section generated by ${className(codeGeneratorCastRule.getClass)} + | ${castCodeBlock.getCode} + | // --- End cast section + """.stripMargin + + return GeneratedExpression( + castCodeBlock.getReturnTerm, + castCodeBlock.getIsNullTerm, + operand.code + castCode, + targetType + ) + } case _ => }