This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8c029d0c27d [FLINK-39577][table] Reuse Calcite's `COALESCE` to apply
simplifications from `RexSimplify`
8c029d0c27d is described below
commit 8c029d0c27d51bf257c83068ba46af093fdd3bd6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue May 26 21:10:17 2026 +0200
[FLINK-39577][table] Reuse Calcite's `COALESCE` to apply simplifications
from `RexSimplify`
---
.../service/MaterializedTableStatementITCase.java | 2 +-
.../functions/BuiltInFunctionDefinitions.java | 3 +-
.../calcite/sql/fun/SqlCoalesceFunction.java | 96 +++++++
.../expressions/converter/DirectConvertRule.java | 3 +
.../functions/sql/FlinkSqlOperatorTable.java | 1 +
.../RemoveUnreachableCoalesceArgumentsRule.java | 212 ---------------
.../table/planner/codegen/ExprCodeGenerator.scala | 6 +
.../planner/codegen/calls/ScalarOperatorGens.scala | 45 ++-
.../planner/plan/rules/FlinkBatchRuleSets.scala | 20 +-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 20 +-
.../planner/functions/CoalesceFunctionITCase.java | 301 ++++++++++++++++++++-
.../plan/nodes/exec/common/CalcTestPrograms.java | 41 +++
.../plan/nodes/exec/stream/CalcRestoreTest.java | 3 +-
...RemoveUnreachableCoalesceArgumentsRuleTest.java | 5 +-
.../plan/batch/sql/agg/GroupingSetsTest.xml | 4 +-
.../RemoveUnreachableCoalesceArgumentsRuleTest.xml | 10 +-
...mplifyCoalesceWithEquiJoinConditionRuleTest.xml | 14 +-
.../table/planner/plan/stream/sql/CalcTest.xml | 12 +-
.../plan/stream/sql/agg/GroupingSetsTest.xml | 4 +-
.../plan/stream/sql/agg/WindowAggregateTest.xml | 4 +-
.../calc-coalesce/plan/calc-coalesce.json | 135 +++++++++
.../calc-coalesce/savepoint/_metadata | Bin 0 -> 8194 bytes
.../plan/calc-current-timestamp.json | 38 ++-
.../calc-current-timestamp/savepoint/_metadata | Bin 5868 -> 5860 bytes
.../runtime/functions/scalar/CoalesceFunction.java | 43 ---
25 files changed, 698 insertions(+), 324 deletions(-)
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 4e128e2cd8d..c3a64b5c8fe 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -1322,7 +1322,7 @@ class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatemen
assertThat(newTable.getExpandedQuery())
.hasToString(
String.format(
- "SELECT COALESCE(`tmp`.`user_id`, CAST(0 AS
BIGINT)) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`,
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
+ "SELECT COALESCE(`tmp`.`user_id`, 0) AS
`user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`,
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
+ "FROM (SELECT
`datagenSource`.`user_id`, `datagenSource`.`shop_id`,
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
`datagenSource`.`payment_amount_cents`\n"
+ "FROM `%s`.`%s`.`datagenSource` AS
`datagenSource`) AS `tmp`\n"
+ "GROUP BY ROW(`tmp`.`user_id`,
`tmp`.`shop_id`, `tmp`.`ds`)",
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index e3d30a5c8da..4d0c9b28dac 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -241,8 +241,7 @@ public final class BuiltInFunctionDefinitions {
.kind(SCALAR)
.inputTypeStrategy(varyingSequence(COMMON_ARG_NULLABLE,
COMMON_ARG_NULLABLE))
.outputTypeStrategy(nullableIfAllArgs(COMMON))
- .runtimeClass(
-
"org.apache.flink.table.runtime.functions.scalar.CoalesceFunction")
+ .runtimeDeferred()
.build();
public static final BuiltInFunctionDefinition ARRAY_APPEND =
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCoalesceFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCoalesceFunction.java
new file mode 100644
index 00000000000..59e99e5cba2
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCoalesceFunction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.fun;
+
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeTransforms;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** The class copied from Calcite in order to turn off COALESCE rewrite with
CASE ... WHEN ... */
+public class SqlCoalesceFunction extends SqlFunction {
+ // ~ Constructors
-----------------------------------------------------------
+
+ public SqlCoalesceFunction() {
+ // NOTE jvs 26-July-2006: We fill in the type strategies here,
+ // but normally they are not used because the validator invokes
+ // rewriteCall to convert COALESCE into CASE early. However,
+ // validator rewrite can optionally be disabled, in which case these
+ // strategies are used.
+ super(
+ "COALESCE",
+ SqlKind.COALESCE,
+
ReturnTypes.LEAST_RESTRICTIVE.andThen(SqlTypeTransforms.LEAST_NULLABLE),
+ null,
+ OperandTypes.SAME_VARIADIC,
+ SqlFunctionCategory.SYSTEM);
+ }
+
+ // ~ Methods
----------------------------------------------------------------
+
+ // ----- FLINK MODIFICATION BEGIN -----
+ // override SqlOperator
+ @Override
+ public SqlNode rewriteCall(SqlValidator validator, SqlCall call) {
+ validateQuantifier(validator, call); // check DISTINCT/ALL
+
+ List<SqlNode> operands = call.getOperandList();
+
+ if (operands.size() == 1) {
+ return operands.get(0);
+ }
+
+ SqlParserPos pos = call.getParserPosition();
+ List<SqlNode> nodes = new ArrayList<>();
+ for (SqlNode operand : operands) {
+ if (!SqlUtil.isNullLiteral(operand, false)) {
+ nodes.add(operand);
+ }
+ }
+
+ if (nodes.isEmpty()) {
+ return SqlLiteral.createNull(pos);
+ }
+ if (nodes.size() == 1) {
+ return nodes.get(0);
+ }
+
+ return new SqlBasicCall(this, nodes, pos);
+ }
+
+ // ----- FLINK MODIFICATION END -----
+
+ @Override
+ public SqlReturnTypeInference getReturnTypeInference() {
+ return requireNonNull(super.getReturnTypeInference(),
"returnTypeInference");
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index a2d4f8b0df0..17dffb8613f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -298,6 +298,9 @@ public class DirectConvertRule implements
CallExpressionConvertRule {
definitionSqlOperatorHashMap.put(
BuiltInFunctionDefinitions.ARRAY_ELEMENT,
FlinkSqlOperatorTable.ELEMENT);
+ definitionSqlOperatorHashMap.put(
+ BuiltInFunctionDefinitions.COALESCE,
FlinkSqlOperatorTable.COALESCE);
+
// crypto hash
definitionSqlOperatorHashMap.put(BuiltInFunctionDefinitions.MD5,
FlinkSqlOperatorTable.MD5);
definitionSqlOperatorHashMap.put(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index af57daf6b26..f44f1806df3 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -1203,6 +1203,7 @@ public class FlinkSqlOperatorTable extends
ReflectiveSqlOperatorTable {
public static final SqlFunction ABS = SqlStdOperatorTable.ABS;
public static final SqlFunction EXP = SqlStdOperatorTable.EXP;
public static final SqlFunction NULLIF = SqlStdOperatorTable.NULLIF;
+ public static final SqlFunction COALESCE = SqlStdOperatorTable.COALESCE;
public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
deleted file mode 100644
index 47cde317256..00000000000
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.rules.logical;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
-import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
-
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.sql.SqlOperator;
-import org.immutables.value.Value;
-
-import java.util.List;
-import java.util.function.Predicate;
-
-/**
- * Removes unreachable {@link BuiltInFunctionDefinitions#COALESCE} arguments.
- *
- * <p>An unreachable COALESCE argument is defined as any argument after the
first argument in the
- * argument list with a non-null type.
- */
-@Internal
[email protected]
-public class RemoveUnreachableCoalesceArgumentsRule
- extends RelRule<RemoveUnreachableCoalesceArgumentsRule.Config> {
-
- public static final RelRule<RemoveUnreachableCoalesceArgumentsRule.Config>
PROJECT_INSTANCE =
- Config.DEFAULT.withProject().toRule();
- public static final RelRule<RemoveUnreachableCoalesceArgumentsRule.Config>
FILTER_INSTANCE =
- Config.DEFAULT.withFilter().toRule();
- public static final RelRule<RemoveUnreachableCoalesceArgumentsRule.Config>
JOIN_INSTANCE =
- Config.DEFAULT.withJoin().toRule();
- public static final RelRule<RemoveUnreachableCoalesceArgumentsRule.Config>
CALC_INSTANCE =
- Config.DEFAULT.withCalc().toRule();
-
- public RemoveUnreachableCoalesceArgumentsRule(Config config) {
- super(config);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final RelNode relNode = call.rel(0);
- final RexBuilder rexBuilder = relNode.getCluster().getRexBuilder();
- call.transformTo(
- relNode.accept(new
UnreachableCoalesceArgumentsRemoveRexShuttle(rexBuilder)));
- }
-
- private static class UnreachableCoalesceArgumentsRemoveRexShuttle extends
RexShuttle {
- private final RexBuilder rexBuilder;
-
- private UnreachableCoalesceArgumentsRemoveRexShuttle(RexBuilder
rexBuilder) {
- this.rexBuilder = rexBuilder;
- }
-
- @Override
- public RexNode visitCall(RexCall call) {
- call = (RexCall) super.visitCall(call);
-
- // Not a coalesce invocation, skip it
- if (!operatorIsCoalesce(call.getOperator())) {
- return call;
- }
-
- final int firstNonNullableArgIndex =
getFirstNonNullableArgumentIndex(call);
-
- // If it's the first argument, just return the argument without
the coalesce invocation
- if (firstNonNullableArgIndex == 0) {
- RexNode operand = call.operands.get(0);
- if (call.getType().equals(operand.getType())) {
- return operand;
- } else {
- return rexBuilder.makeCast(call.getType(), operand);
- }
- }
-
- // If it's the last argument, or no non-null argument was found,
return the original
- // call
- if (firstNonNullableArgIndex == call.operands.size() - 1
- || firstNonNullableArgIndex == -1) {
- return call;
- }
-
- // Return the coalesce invocation with a trimmed argument list
- final List<RexNode> trimmedOperandsList =
- call.operands.subList(0, firstNonNullableArgIndex + 1);
- return call.clone(call.getType(), trimmedOperandsList);
- }
-
- private int getFirstNonNullableArgumentIndex(RexCall call) {
- for (int argIndex = 0; argIndex < call.operands.size();
argIndex++) {
- if (!call.operands.get(argIndex).getType().isNullable()) {
- return argIndex;
- }
- }
- return -1;
- }
- }
-
- private static boolean hasCoalesceInvocation(RexNode node) {
- return FlinkRexUtil.hasOperatorCallMatching(
- node,
RemoveUnreachableCoalesceArgumentsRule::operatorIsCoalesce);
- }
-
- private static boolean operatorIsCoalesce(SqlOperator op) {
- return op instanceof BridgingSqlFunction
- && ((BridgingSqlFunction) op)
- .getDefinition()
- .equals(BuiltInFunctionDefinitions.COALESCE);
- }
-
- //
---------------------------------------------------------------------------------------------
-
- /** Configuration for {@link RemoveUnreachableCoalesceArgumentsRule}. */
- @Value.Immutable(singleton = false)
- public interface Config extends RelRule.Config {
-
- Config DEFAULT =
-
ImmutableRemoveUnreachableCoalesceArgumentsRule.Config.builder()
- .build()
- .as(Config.class);
-
- @Override
- default RemoveUnreachableCoalesceArgumentsRule toRule() {
- return new RemoveUnreachableCoalesceArgumentsRule(this);
- }
-
- default Config withProject() {
- Predicate<Project> projectPredicate =
- lp ->
- lp.getProjects().stream()
- .anyMatch(
-
RemoveUnreachableCoalesceArgumentsRule
- ::hasCoalesceInvocation);
- final RelRule.OperandTransform projectTransform =
- operandBuilder ->
- operandBuilder
- .operand(Project.class)
- .predicate(projectPredicate)
- .anyInputs();
-
- return withOperandSupplier(projectTransform).as(Config.class);
- }
-
- default Config withFilter() {
- Predicate<Filter> filterPredicate =
- lf ->
-
RemoveUnreachableCoalesceArgumentsRule.hasCoalesceInvocation(
- lf.getCondition());
- final RelRule.OperandTransform filterTransform =
- operandBuilder ->
- operandBuilder
- .operand(Filter.class)
- .predicate(filterPredicate)
- .anyInputs();
-
- return withOperandSupplier(filterTransform).as(Config.class);
- }
-
- default Config withJoin() {
- Predicate<Join> joinPredicate =
- lj ->
-
RemoveUnreachableCoalesceArgumentsRule.hasCoalesceInvocation(
- lj.getCondition());
- final RelRule.OperandTransform joinTransform =
- operandBuilder ->
-
operandBuilder.operand(Join.class).predicate(joinPredicate).anyInputs();
-
- return withOperandSupplier(joinTransform).as(Config.class);
- }
-
- default Config withCalc() {
- Predicate<Calc> calcPredicate =
- lc ->
- lc.getProgram().getExprList().stream()
- .anyMatch(
-
RemoveUnreachableCoalesceArgumentsRule
- ::hasCoalesceInvocation);
- final RelRule.OperandTransform joinTransform =
- operandBuilder ->
-
operandBuilder.operand(Calc.class).predicate(calcPredicate).anyInputs();
-
- return withOperandSupplier(joinTransform).as(Config.class);
- }
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 0fc9db1e1c1..f07bf80bc05 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -772,6 +772,9 @@ class ExprCodeGenerator(
case CASE =>
generateIfElse(ctx, operands, resultType)
+ case COALESCE =>
+ generateCoalesce(ctx, operands, resultType)
+
case IS_TRUE =>
val operand = operands.head
requireBoolean(operand)
@@ -955,6 +958,9 @@ class ExprCodeGenerator(
case BuiltInFunctionDefinitions.REGEXP_REPLACE =>
StringCallGen.generateRegexpReplace(ctx, operands, resultType)
+ case BuiltInFunctionDefinitions.COALESCE =>
+ generateCoalesce(ctx, operands, resultType)
+
case _ =>
new BridgingSqlFunctionCallGen(call, rexProgram).generate(ctx,
operands, resultType)
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 28c0e06ca97..4e3a79cf7a2 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.codegen.calls
import org.apache.flink.table.api.{TableRuntimeException, ValidationException}
import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryStringData}
+import org.apache.flink.table.data.binary.BinaryArrayData
import org.apache.flink.table.data.util.MapDataUtil
import org.apache.flink.table.data.utils.CastExecutor
import org.apache.flink.table.data.writer.{BinaryArrayWriter, BinaryRowWriter}
@@ -40,7 +40,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
import
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldTypes,
getPrecision, getScale}
import
org.apache.flink.table.types.logical.utils.LogicalTypeMerging.findCommonType
-import org.apache.flink.table.utils.DateTimeUtils.{MILLIS_PER_DAY,
MILLIS_PER_SECOND}
+import org.apache.flink.table.utils.DateTimeUtils.MILLIS_PER_DAY
import org.apache.flink.table.utils.EncodingUtils
import org.apache.flink.types.ColumnList
import org.apache.flink.util.Preconditions.checkArgument
@@ -1034,6 +1034,47 @@ object ScalarOperatorGens {
}
}
+ def generateCoalesce(
+ ctx: CodeGeneratorContext,
+ operands: Seq[GeneratedExpression],
+ resultType: LogicalType): GeneratedExpression = {
+ if (operands.size == 1) {
+ generateCast(ctx, operands.head, resultType, nullOnFailure = false)
+ } else {
+ val condition =
+ if (operands.head.resultType.equals(resultType)) {
+ operands.head
+ } else {
+ generateCast(ctx, operands.head, resultType, nullOnFailure = false)
+ }
+ val falseAction = generateCoalesce(ctx, operands.tail, resultType)
+
+ val Seq(resultTerm, nullTerm) = newNames(ctx, "result", "isNull")
+ val resultTypeTerm = primitiveTypeTermForType(resultType)
+ val defaultValue = primitiveDefaultValue(resultType)
+
+ val operatorCode =
+ s"""
+ |$resultTypeTerm $resultTerm = $defaultValue;
+ |// coalesce
+ |${condition.code}
+ |boolean $nullTerm = ${condition.nullTerm};
+ |if (!$nullTerm) {
+ | $resultTerm = ${condition.resultTerm};
+ |} else {
+ | ${falseAction.code}
+ | $nullTerm = ${falseAction.nullTerm};
+ | if (!$nullTerm) {
+ | $resultTerm = ${falseAction.resultTerm};
+ | }
+ |}
+ |// end coalesce
+ |""".stripMargin.trim
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
+ }
+ }
+
def generateIfElse(
ctx: CodeGeneratorContext,
operands: Seq[GeneratedExpression],
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 7ff8a16a70e..f102f593ac3 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.rules
import org.apache.flink.table.planner.plan.nodes.logical._
+import
org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets.SIMPLIFY_COALESCE_RULES
import org.apache.flink.table.planner.plan.rules.logical._
import
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule
import org.apache.flink.table.planner.plan.rules.physical.batch._
@@ -78,10 +79,6 @@ object FlinkBatchRuleSets {
/** RuleSet to simplify coalesce invocations */
private val SIMPLIFY_COALESCE_RULES: RuleSet = RuleSets.ofList(
- RemoveUnreachableCoalesceArgumentsRule.PROJECT_INSTANCE,
- RemoveUnreachableCoalesceArgumentsRule.FILTER_INSTANCE,
- RemoveUnreachableCoalesceArgumentsRule.JOIN_INSTANCE,
- RemoveUnreachableCoalesceArgumentsRule.CALC_INSTANCE,
SimplifyCoalesceWithEquiJoinConditionRule.PROJECT_INSTANCE,
SimplifyCoalesceWithEquiJoinConditionRule.CALC_INSTANCE
)
@@ -117,11 +114,20 @@ object FlinkBatchRuleSets {
// let project transpose window operator.
CoreRules.PROJECT_WINDOW_TRANSPOSE,
// ensure union set operator have the same row type
- new CoerceInputsRule(classOf[LogicalUnion], false),
+ CoerceInputsRule.Config.DEFAULT
+ .withCoerceNames(false)
+ .withConsumerRelClass(classOf[LogicalUnion])
+ .toRule,
// ensure intersect set operator have the same row type
- new CoerceInputsRule(classOf[LogicalIntersect], false),
+ CoerceInputsRule.Config.DEFAULT
+ .withCoerceNames(false)
+ .withConsumerRelClass(classOf[LogicalIntersect])
+ .toRule,
// ensure except set operator have the same row type
- new CoerceInputsRule(classOf[LogicalMinus], false),
+ CoerceInputsRule.Config.DEFAULT
+ .withCoerceNames(false)
+ .withConsumerRelClass(classOf[LogicalMinus])
+ .toRule,
ConvertToNotInOrInRule.INSTANCE,
// optimize limit 0
PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 05de12a6950..25ad791f53c 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -79,12 +79,7 @@ object FlinkStreamRuleSets {
CoreRules.JOIN_REDUCE_EXPRESSIONS
)
- /** RuleSet to simplify coalesce invocations */
private val SIMPLIFY_COALESCE_RULES: RuleSet = RuleSets.ofList(
- RemoveUnreachableCoalesceArgumentsRule.PROJECT_INSTANCE,
- RemoveUnreachableCoalesceArgumentsRule.FILTER_INSTANCE,
- RemoveUnreachableCoalesceArgumentsRule.JOIN_INSTANCE,
- RemoveUnreachableCoalesceArgumentsRule.CALC_INSTANCE,
SimplifyCoalesceWithEquiJoinConditionRule.PROJECT_INSTANCE,
SimplifyCoalesceWithEquiJoinConditionRule.CALC_INSTANCE
)
@@ -117,11 +112,20 @@ object FlinkStreamRuleSets {
WindowPropertiesRules.WINDOW_PROPERTIES_RULE,
WindowPropertiesRules.WINDOW_PROPERTIES_HAVING_RULE,
// ensure union set operator have the same row type
- new CoerceInputsRule(classOf[LogicalUnion], false),
+ CoerceInputsRule.Config.DEFAULT
+ .withCoerceNames(false)
+ .withConsumerRelClass(classOf[LogicalUnion])
+ .toRule,
// ensure intersect set operator have the same row type
- new CoerceInputsRule(classOf[LogicalIntersect], false),
+ CoerceInputsRule.Config.DEFAULT
+ .withCoerceNames(false)
+ .withConsumerRelClass(classOf[LogicalIntersect])
+ .toRule,
// ensure except set operator have the same row type
- new CoerceInputsRule(classOf[LogicalMinus], false),
+ CoerceInputsRule.Config.DEFAULT
+ .withCoerceNames(false)
+ .withConsumerRelClass(classOf[LogicalMinus])
+ .toRule,
ConvertToNotInOrInRule.INSTANCE,
// optimize limit 0
PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CoalesceFunctionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CoalesceFunctionITCase.java
index 1a212141d14..ec64f2983ca 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CoalesceFunctionITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CoalesceFunctionITCase.java
@@ -19,21 +19,311 @@
package org.apache.flink.table.planner.functions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.stream.Stream;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
import static org.apache.flink.table.api.Expressions.coalesce;
+import static org.apache.flink.util.CollectionUtil.entry;
+import static org.apache.flink.util.CollectionUtil.map;
/** Test {@link BuiltInFunctionDefinitions#COALESCE} and its return type. */
class CoalesceFunctionITCase extends BuiltInFunctionTestBase {
@Override
Stream<TestSetSpec> getTestSetSpecs() {
- return Stream.of(
- TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE)
+ final List<TestSetSpec> specs = new ArrayList<>();
+ specs.addAll(allTypesBasic());
+ specs.addAll(typePromotion());
+ specs.addAll(lazyEvaluation());
+ specs.addAll(constants());
+ return specs.stream();
+ }
+
+ private static List<TestSetSpec> allTypesBasic() {
+ return List.of(
+ basicSpec("BOOLEAN", BOOLEAN(), true, false),
+ basicSpec("TINYINT", TINYINT(), (byte) 5, (byte) 10),
+ basicSpec("SMALLINT", SMALLINT(), (short) 100, (short) 200),
+ basicSpec("INT", INT(), 1, 2),
+ basicSpec("BIGINT", BIGINT(), 100L, 200L),
+ basicSpec("FLOAT", FLOAT(), 1.5f, 2.5f),
+ basicSpec("DOUBLE", DOUBLE(), 1.5d, 2.5d),
+ basicSpec(
+ "DECIMAL",
+ DECIMAL(5, 2),
+ new BigDecimal("123.45"),
+ new BigDecimal("234.56")),
+ basicSpec("CHAR", CHAR(5), "hello", "world"),
+ basicSpec("VARCHAR", VARCHAR(10), "hello", "world"),
+ basicSpec("STRING", STRING(), "hello", "world"),
+ basicSpec("BINARY", BINARY(2), new byte[] {0, 1}, new byte[]
{2, 3}),
+ basicSpec("VARBINARY", VARBINARY(5), new byte[] {0, 1, 2}, new
byte[] {3, 4}),
+ basicSpec("BYTES", BYTES(), new byte[] {0, 1, 2}, new byte[]
{3, 4, 5}),
+ basicSpec("DATE", DATE(), LocalDate.of(2026, 1, 1),
LocalDate.of(2026, 12, 31)),
+ basicSpec("TIME", TIME(), LocalTime.of(12, 34, 56),
LocalTime.of(23, 59, 59)),
+ basicSpec(
+ "TIMESTAMP",
+ TIMESTAMP(),
+ LocalDateTime.of(2026, 1, 1, 12, 0, 0),
+ LocalDateTime.of(2026, 12, 31, 23, 59, 59)),
+ basicSpec(
+ "TIMESTAMP_LTZ",
+ TIMESTAMP_LTZ(),
+ Instant.parse("2026-01-01T12:00:00Z"),
+ Instant.parse("2026-12-31T23:59:59Z")),
+ basicSpec(
+ "INTERVAL_MONTH",
+ INTERVAL(MONTH()),
+ Period.ofMonths(18),
+ Period.ofMonths(27)),
+ basicSpec(
+ "INTERVAL_SECOND",
+ INTERVAL(SECOND(3)),
+ Duration.ofMillis(12345),
+ Duration.ofMillis(67890)),
+ basicSpec("ARRAY", ARRAY(INT()), new Integer[] {1, 2, 3}, new
Integer[] {4, 5, 6}),
+ basicSpec(
+ "MAP",
+ MAP(STRING(), INT()),
+ map(entry("a", 1), entry("b", 2)),
+ map(entry("c", 3), entry("d", 4))),
+ rowSpec());
+ }
+
+ private static TestSetSpec basicSpec(String name, DataType type, Object
value1, Object value2) {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE,
name)
+ .onFieldsWithData(null, value1, value2)
+ .andDataTypes(type.nullable(), type.notNull(), type.notNull())
+ .testResult(coalesce($("f0"), $("f1")), "COALESCE(f0, f1)",
value1, type.notNull())
+ .testResult(coalesce($("f1"), $("f2")), "COALESCE(f1, f2)",
value1, type.notNull())
+ .testResult(
+ coalesce($("f0"), $("f0"), $("f2")),
+ "COALESCE(f0, f0, f2)",
+ value2,
+ type.notNull());
+ }
+
+ private static TestSetSpec rowSpec() {
+ DataType rowType = ROW(FIELD("a", INT()), FIELD("b", STRING()));
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE,
"ROW")
+ .onFieldsWithData(null, Row.of(1, "hello"), Row.of(2, "world"))
+ .andDataTypes(rowType.nullable(), rowType.notNull(),
rowType.notNull())
+ .testResult(
+ coalesce($("f0"), $("f1")),
+ "COALESCE(f0, f1)",
+ Row.of(1, "hello"),
+ rowType.notNull())
+ .testResult(
+ coalesce($("f1"), $("f2")),
+ "COALESCE(f1, f2)",
+ Row.of(1, "hello"),
+ rowType.notNull());
+ }
+
+ /**
+ * Verifies the LEAST_RESTRICTIVE return-type inference combined with
LEAST_NULLABLE: mixing
+ * compatible operand types yields the widest type, and nullability is
dropped if any operand is
+ * NOT NULL. Also for reference look at {@link LogicalTypeMerging}.
+ */
+ private static List<TestSetSpec> typePromotion() {
+ return List.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE,
"INT and BIGINT")
+ .onFieldsWithData(null, 1, 2L)
+ .andDataTypes(INT().nullable(), INT().nullable(),
BIGINT().notNull())
+ .testResult(
+ coalesce($("f0"), $("f1"), $("f2")),
+ "COALESCE(f0, f1, f2)",
+ 1L,
+ BIGINT().notNull()),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE,
"TINYINT and INT")
+ .onFieldsWithData(null, (byte) 7, 42)
+ .andDataTypes(TINYINT().nullable(),
TINYINT().nullable(), INT().notNull())
+ .testResult(
+ coalesce($("f0"), $("f1"), $("f2")),
+ "COALESCE(f0, f1, f2)",
+ 7,
+ INT().notNull()),
+
+ // TIMESTAMP precision widening: TIMESTAMP(0) < TIMESTAMP(3) →
declared
+ // TIMESTAMP(3). Calcite stores TIMESTAMP as a Long millis
value, so widening
+ // precision does not change the underlying value.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.COALESCE,
+ "TIMESTAMP(0) and TIMESTAMP(3)")
+ .onFieldsWithData(
+ null,
+ LocalDateTime.parse("2026-01-01T00:00:00"),
+ LocalDateTime.parse("2026-01-01T00:00:00.123"))
+ .andDataTypes(
+ TIMESTAMP(0).nullable(),
+ TIMESTAMP(0).nullable(),
+ TIMESTAMP(3).notNull())
+ .testResult(
+ coalesce($("f0"), $("f1"), $("f2")),
+ "COALESCE(f0, f1, f2)",
+ LocalDateTime.parse("2026-01-01T00:00:00"),
+ TIMESTAMP(3).notNull()),
+
+ // DECIMAL precision widening, same scale: DECIMAL(5,2) <
DECIMAL(10,2)
+ // → declared DECIMAL(10, 2).
+ // Same scale → underlying BigDecimal representation pre/post
simplify is
+ // identical.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.COALESCE,
+ "DECIMAL(5,2) and DECIMAL(10,2) (same scale)")
+ .onFieldsWithData(
+ null, new BigDecimal("1.23"), new
BigDecimal("9876543.21"))
+ .andDataTypes(
+ DECIMAL(5, 2).nullable(),
+ DECIMAL(5, 2).nullable(),
+ DECIMAL(10, 2).notNull())
+ .testResult(
+ coalesce($("f0"), $("f1"), $("f2")),
+ "COALESCE(f0, f1, f2)",
+ new BigDecimal("1.23"),
+ DECIMAL(10, 2).notNull()),
+
+ // Based on LogicalTypeMerging#createCommonExactNumericType
+ // d = max(p1 - s1, p2 - s2)
+ // s <= max(s1, s2)
+ // p = s + d
+ // EXAMPLE: DECIMAL precision and scale widening: DECIMAL(5,2)
< DECIMAL(10,4)
+ // d = max(p1-s1, p2-s2) = max(3, 6) = 6,
+ // scale = max(2, 4) = 4,
+ // p = 10.
+ // Thus DECIMAL(10, 4).
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.COALESCE,
+ "DECIMAL(5,2) and DECIMAL(10,4) (different
scale)")
+ .onFieldsWithData(null, new BigDecimal("1.23"), new
BigDecimal("4.5678"))
+ .andDataTypes(
+ DECIMAL(5, 2).nullable(),
+ DECIMAL(5, 2).nullable(),
+ DECIMAL(10, 4).notNull())
+ .testResult(
+ coalesce($("f0"), $("f1"), $("f2")),
+ "COALESCE(f0, f1, f2)",
+ new BigDecimal("1.2300"),
+ DECIMAL(10, 4).notNull()),
+
+ // INTERVAL YEAR TO MONTH — same shape on every operand.
Stored as a single int
+ // (months); precision is metadata, the underlying value is
unchanged.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.COALESCE, "INTERVAL
MONTH precision")
+ .onFieldsWithData(null, Period.ofMonths(2),
Period.ofMonths(5))
+ .andDataTypes(
+ INTERVAL(MONTH()).nullable(),
+ INTERVAL(MONTH()).nullable(),
+ INTERVAL(MONTH()).notNull())
+ .testResult(
+ coalesce($("f0"), $("f1"), $("f2")),
+ "COALESCE(f0, f1, f2)",
+ Period.ofMonths(2),
+ INTERVAL(MONTH()).notNull()),
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.COALESCE,
+ "INTERVAL SECOND same precision")
+ .onFieldsWithData(null, Duration.ofSeconds(10),
Duration.ofMillis(15000))
+ .andDataTypes(
+ INTERVAL(SECOND(3)).nullable(),
+ INTERVAL(SECOND(3)).nullable(),
+ INTERVAL(SECOND(3)).notNull())
+ .testResult(
+ coalesce($("f0"), $("f1"), $("f2")),
+ "COALESCE(f0, f1, f2)",
+ Duration.ofSeconds(10),
+ INTERVAL(SECOND(3)).notNull()));
+ }
+
+ /** Lazy evaluation: a non-null operand short-circuits the rest. */
+ private static List<TestSetSpec> lazyEvaluation() {
+ return List.of(
+ // First arg non-null at runtime: subsequent ThrowingFunction
must NOT be called.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.COALESCE,
+ "lazy: first operand non-null skips remainder")
+ .onFieldsWithData(1, 100)
+ .andDataTypes(INT().nullable(), INT().notNull())
+ .withFunction(ThrowingFunction.class)
+ .testResult(
+ coalesce($("f0"), call("ThrowingFunction",
$("f1"))),
+ "COALESCE(f0, ThrowingFunction(f1))",
+ 1,
+ INT().notNull()),
+ // Middle arg non-null at runtime: ThrowingFunction in the
third slot must NOT be
+ // called.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.COALESCE,
+ "lazy: middle operand non-null skips
remainder")
+ .onFieldsWithData(null, 5, 100)
+ .andDataTypes(INT().nullable(), INT().nullable(),
INT().notNull())
+ .withFunction(ThrowingFunction.class)
+ .testResult(
+ coalesce($("f0"), $("f1"),
call("ThrowingFunction", $("f2"))),
+ "COALESCE(f0, f1, ThrowingFunction(f2))",
+ 5,
+ INT().notNull()),
+ // Negative control: the previous operand IS null at runtime,
so ThrowingFunction
+ // must be reached and must throw.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.COALESCE,
+ "negative control: throwing UDF fires when
reached")
+ .onFieldsWithData(null, 100)
+ .andDataTypes(INT().nullable(), INT().notNull())
+ .withFunction(ThrowingFunction.class)
+ .testTableApiRuntimeError(
+ coalesce($("f0"), call("ThrowingFunction",
$("f1"))),
+ "ThrowingFunction was called")
+ .testSqlRuntimeError(
+ "COALESCE(f0, ThrowingFunction(f1))",
+ "ThrowingFunction was called"));
+ }
+
+ private static List<TestSetSpec> constants() {
+ return Collections.singletonList(
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.COALESCE,
+ "constants and nullability inference")
.onFieldsWithData(null, null, 1)
.andDataTypes(BIGINT().nullable(), INT().nullable(),
INT().notNull())
.testResult(
@@ -56,4 +346,11 @@ class CoalesceFunctionITCase extends
BuiltInFunctionTestBase {
// constant in the function invocation
BIGINT().notNull()));
}
+
+ /** Function that throws on every invocation. */
+ public static class ThrowingFunction extends ScalarFunction {
+ public int eval(int i) {
+ throw new RuntimeException("ThrowingFunction was called");
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
index 0a92bc0f41a..940c90212e4 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
+import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
@@ -300,4 +301,44 @@ public class CalcTestPrograms {
+ "FROM orders a LEFT JOIN
order_details_row b "
+ "ON a.order_id = b.r.order_id")
.build();
+
+ public static final TableTestProgram COALESCE =
+ TableTestProgram.of("calc-coalesce", "validates coalesce node")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "a DECIMAL(2, 1)",
+ "b DECIMAL(4, 2)",
+ "c TIMESTAMP(0)",
+ "d TIMESTAMP(3)")
+ .producedBeforeRestore(
+ Row.of(
+ null,
+ new BigDecimal("11.22"),
+ null,
+ LocalDateTime.of(
+ 1970, 1, 1, 0, 0,
0, 123_000_000)))
+ .producedAfterRestore(
+ Row.of(
+ new BigDecimal("5.3"),
+ null,
+ LocalDateTime.of(2000, 2,
2, 2, 2, 2),
+ null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("x DECIMAL(4, 2)", "y
TIMESTAMP(3)")
+ .consumedBeforeRestore(
+ Row.of(
+ new BigDecimal("11.22"),
+ LocalDateTime.of(
+ 1970, 1, 1, 0, 0,
0, 123_000_000)))
+ .consumedAfterRestore(
+ Row.of(
+ new BigDecimal("5.30"),
+ LocalDateTime.of(2000, 2,
2, 2, 2, 2)))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT COALESCE(a, b) AS x,
COALESCE(c, d) AS y FROM t")
+ .build();
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
index 8de2e05e054..e26f2aa1864 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
@@ -42,6 +42,7 @@ public class CalcRestoreTest extends RestoreTestBase {
CalcTestPrograms.CALC_SARG,
CalcTestPrograms.CALC_UDF_SIMPLE,
CalcTestPrograms.CALC_UDF_COMPLEX,
- CalcTestPrograms.CALC_CURRENT_TIMESTAMP);
+ CalcTestPrograms.CALC_CURRENT_TIMESTAMP,
+ CalcTestPrograms.COALESCE);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
index 3610b20d6f3..397b117fa16 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
@@ -30,7 +30,10 @@ import org.junit.jupiter.api.Test;
import static org.apache.flink.table.api.DataTypes.STRING;
-/** Test rule {@link RemoveUnreachableCoalesceArgumentsRule}. */
+/**
+ * Tests that {@code COALESCE} arguments appearing after the first {@code NOT
NULL} argument are
+ * pruned from the plan, since they can never be reached.
+ */
class RemoveUnreachableCoalesceArgumentsRuleTest extends TableTestBase {
private StreamTableTestUtil util;
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/GroupingSetsTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/GroupingSetsTest.xml
index 4ec32df7293..ae9e1459285 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/GroupingSetsTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/GroupingSetsTest.xml
@@ -749,14 +749,14 @@ GROUP BY GROUPING SETS ((a, b), (a, b, c))
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], EXPR$2=[coalesce($2, _UTF-16LE'empty')],
EXPR$3=[$3])
+LogicalProject(a=[$0], b=[$1], EXPR$2=[COALESCE($2, _UTF-16LE'empty')],
EXPR$3=[$3])
+- LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1, 2}, {0, 1}]],
EXPR$3=[AVG($3)])
+- LogicalTableScan(table=[[default_catalog, default_database, t1]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, b, coalesce(c, 'empty') AS EXPR$2, EXPR$3])
+Calc(select=[a, b, COALESCE(c, 'empty') AS EXPR$2, EXPR$3])
+- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, b, c, $e,
Final_AVG(sum$0, count$1) AS EXPR$3])
+- Exchange(distribution=[hash[a, b, c, $e]])
+- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, c, $e,
Partial_AVG(d) AS (sum$0, count$1)])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
index 38eb87d03bd..1003f2fdbef 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
@@ -39,7 +39,7 @@ Calc(select=[COALESCE(f0, f2) AS EXPR$0])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(EXPR$0=[COALESCE($1, _UTF-16LE'-')])
+LogicalProject(EXPR$0=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
@@ -56,7 +56,7 @@ Calc(select=[f1 AS EXPR$0])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(EXPR$0=[COALESCE($0, $1, _UTF-16LE'-')])
+LogicalProject(EXPR$0=[COALESCE($0, $1)])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
@@ -94,7 +94,7 @@ Calc(select=[f0, f1, f2], where=[=(COALESCE(f0, f1), 'abc')])
LogicalProject(f0=[$0], f1=[$1], f2=[$2], f00=[$3], f10=[$4], f20=[$5])
+- LogicalProject(f0=[$0], f1=[$1], f2=[$2], f00=[$4], f10=[$5], f20=[$6])
+- LogicalJoin(condition=[=($3, $4)], joinType=[left])
- :- LogicalProject(f0=[$0], f1=[$1], f2=[$2], $f3=[COALESCE($0,
_UTF-16LE'-', $2)])
+ :- LogicalProject(f0=[$0], f1=[$1], f2=[$2], $f3=[COALESCE($0,
_UTF-16LE'-')])
: +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
@@ -134,13 +134,13 @@ COALESCE(cast(NULL as double), cast(NULL as double))]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(EXPR$0=[COALESCE(1)], EXPR$1=[COALESCE(1, 2)],
EXPR$2=[COALESCE(null:INTEGER, 2)], EXPR$3=[COALESCE(1, null:INTEGER)],
EXPR$4=[COALESCE(null:INTEGER, null:INTEGER, 3)], EXPR$5=[COALESCE(4,
null:INTEGER, null:INTEGER, null:INTEGER)], EXPR$6=[COALESCE(_UTF-16LE'1')],
EXPR$7=[COALESCE(_UTF-16LE'1', _UTF-16LE'23')],
EXPR$8=[COALESCE(null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
_UTF-16LE'2')], EXPR$9=[COALESCE(_UTF-16LE'1', null:VARCHAR(2147483647)
CHARACTER SET "UTF-16LE [...]
+LogicalProject(EXPR$0=[1], EXPR$1=[1], EXPR$2=[2], EXPR$3=[1], EXPR$4=[3],
EXPR$5=[4], EXPR$6=[_UTF-16LE'1'], EXPR$7=[_UTF-16LE'1':VARCHAR(2) CHARACTER
SET "UTF-16LE"], EXPR$8=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], EXPR$9=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], EXPR$10=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], EXPR$11=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], EXPR$12=[1.0:DECIMAL(2, 1)], EXPR$13=[1.0:DECIMAL [...]
+- LogicalValues(tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[1 AS EXPR$0, 1 AS EXPR$1, 2 AS EXPR$2, 1 AS EXPR$3, 3 AS EXPR$4,
4 AS EXPR$5, '1' AS EXPR$6, '1' AS EXPR$7, '2' AS EXPR$8, '1' AS EXPR$9, '3' AS
EXPR$10, '4' AS EXPR$11, 1.0 AS EXPR$12, 1.0 AS EXPR$13, 2E0 AS EXPR$14, 2E0 AS
EXPR$15, 2.0 AS EXPR$16, null:DOUBLE AS EXPR$17])
+Calc(select=[1 AS EXPR$0, 1 AS EXPR$1, 2 AS EXPR$2, 1 AS EXPR$3, 3 AS EXPR$4,
4 AS EXPR$5, '1' AS EXPR$6, '1' AS EXPR$7, '2' AS EXPR$8, '1' AS EXPR$9, '3' AS
EXPR$10, '4' AS EXPR$11, 1.0 AS EXPR$12, 1.0 AS EXPR$13, 2.0 AS EXPR$14, 2.0 AS
EXPR$15, 2.0 AS EXPR$16, null:DOUBLE AS EXPR$17])
+- Values(tuples=[[{ 0 }]])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyCoalesceWithEquiJoinConditionRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyCoalesceWithEquiJoinConditionRuleTest.xml
index e2b46196f6c..ea49ada9e0c 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyCoalesceWithEquiJoinConditionRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyCoalesceWithEquiJoinConditionRuleTest.xml
@@ -45,7 +45,7 @@ Calc(select=[COALESCE(order_id0, order_id) AS order_id])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(order_id=[COALESCE($3, $0)])
+LogicalProject(order_id=[$3])
+- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalTableScan(table=[[default_catalog, default_database,
order_details]])
@@ -153,7 +153,7 @@ TableSourceScan(table=[[default_catalog, default_database,
order_details, projec
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(order_id=[COALESCE($0, $3)], amount=[$2])
+LogicalProject(order_id=[$0], amount=[$2])
+- LogicalJoin(condition=[=($0, $3)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalTableScan(table=[[default_catalog, default_database,
order_details]])
@@ -171,7 +171,7 @@ TableSourceScan(table=[[default_catalog, default_database,
orders, project=[orde
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(val=[COALESCE($3, $0, $1)])
+LogicalProject(val=[COALESCE($3, $0)])
+- LogicalJoin(condition=[=($0, $3)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalTableScan(table=[[default_catalog, default_database,
order_details]])
@@ -189,7 +189,7 @@ TableSourceScan(table=[[default_catalog, default_database,
orders, project=[orde
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(val=[COALESCE($1, $3, $0)])
+LogicalProject(val=[$1])
+- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalTableScan(table=[[default_catalog, default_database,
order_details]])
@@ -212,7 +212,7 @@ Calc(select=[user_id AS val])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(val=[COALESCE($1, $0, $3)])
+LogicalProject(val=[$1])
+- LogicalJoin(condition=[=($0, $3)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalTableScan(table=[[default_catalog, default_database,
order_details]])
@@ -230,7 +230,7 @@ TableSourceScan(table=[[default_catalog, default_database,
orders, project=[user
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(val=[COALESCE($3, $1, $0)])
+LogicalProject(val=[COALESCE($3, $1)])
+- LogicalJoin(condition=[=($0, $3)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalTableScan(table=[[default_catalog, default_database,
order_details]])
@@ -253,7 +253,7 @@ Calc(select=[COALESCE(order_id0, user_id) AS val])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(order_id=[COALESCE($3, $0, 0:BIGINT)])
+LogicalProject(order_id=[COALESCE($3, $0)])
+- LogicalJoin(condition=[=($0, $3)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalTableScan(table=[[default_catalog, default_database,
order_details]])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
index 393b486d2f2..11b409c98fe 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
@@ -109,14 +109,14 @@ Calc(select=[a], where=[>(random_udf(b), 10)])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)])
+LogicalProject(field2=[$1], transactionId=[$0.data.nested.trId])
+- LogicalProject(field1=[$0], field2=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database,
testCastOfTestToSameType]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[field1 AS field2, COALESCE(field1.data.nested.trId) AS
transactionId])
+Calc(select=[field1 AS field2, field1.data.nested.trId AS transactionId])
+- TableSourceScan(table=[[default_catalog, default_database,
testCastOfTestToSameType]], fields=[field1])
]]>
</Resource>
@@ -127,14 +127,14 @@ Calc(select=[field1 AS field2,
COALESCE(field1.data.nested.trId) AS transactionI
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(field2=[$1], transactionId=[COALESCE(ITEM($0.data,
0).nested.trId)])
+LogicalProject(field2=[$1], transactionId=[ITEM($0.data, 0).nested.trId])
+- LogicalProject(field1=[$0], field2=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database,
testCastOfTestToSameTypeWithArray]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[field1 AS field2, COALESCE(ITEM(field1.data, 0).nested.trId) AS
transactionId])
+Calc(select=[field1 AS field2, ITEM(field1.data, 0).nested.trId AS
transactionId])
+- TableSourceScan(table=[[default_catalog, default_database,
testCastOfTestToSameTypeWithArray]], fields=[field1])
]]>
</Resource>
@@ -145,14 +145,14 @@ Calc(select=[field1 AS field2, COALESCE(ITEM(field1.data,
0).nested.trId) AS tra
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)])
+LogicalProject(field2=[$1], transactionId=[$0.data.nested.trId])
+- LogicalProject(field1=[$0], field2=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database,
testCastOfTestToSameTypeWithNullableNestedType]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[field1 AS field2, COALESCE(field1.data.nested.trId) AS
transactionId])
+Calc(select=[field1 AS field2, field1.data.nested.trId AS transactionId])
+- TableSourceScan(table=[[default_catalog, default_database,
testCastOfTestToSameTypeWithNullableNestedType]], fields=[field1])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.xml
index 88e69acd291..a5a504566af 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.xml
@@ -717,14 +717,14 @@ GROUP BY GROUPING SETS ((a, b), (a, b, c))
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], EXPR$2=[coalesce($2, _UTF-16LE'empty')],
EXPR$3=[$3])
+LogicalProject(a=[$0], b=[$1], EXPR$2=[COALESCE($2, _UTF-16LE'empty')],
EXPR$3=[$3])
+- LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1, 2}, {0, 1}]],
EXPR$3=[AVG($3)])
+- LogicalTableScan(table=[[default_catalog, default_database, t1]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, b, coalesce(c, 'empty') AS EXPR$2, EXPR$3])
+Calc(select=[a, b, COALESCE(c, 'empty') AS EXPR$2, EXPR$3])
+- GroupAggregate(groupBy=[a, b, c, $e], select=[a, b, c, $e, AVG(d) AS
EXPR$3])
+- Exchange(distribution=[hash[a, b, c, $e]])
+- Expand(projects=[{a, b, c, d, 0 AS $e}, {a, b, null AS c, d, 1 AS
$e}])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 71a9bd5df9a..ea2088d064e 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2102,7 +2102,7 @@ GROUP BY a, ws, we
<![CDATA[
LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
+- LogicalProject(a=[$0], ws=[$2], we=[$3])
- +- LogicalProject(a=[$0], b=[$1], ws=[COALESCE($2, $6)], we=[COALESCE($3,
$7)], new_proctime=[PROCTIME()])
+ +- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3],
new_proctime=[PROCTIME()])
+- LogicalJoin(condition=[AND(=($2, $6), =($3, $7))], joinType=[inner])
:- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
: +- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0),
DESCRIPTOR(_UTF-16LE'proctime'), 300000:INTERVAL MINUTE)],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3)
d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME*
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3)
*PROCTIME* window_time)])
@@ -3567,7 +3567,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
+- LogicalProject(a=[$0], window_start=[$5], window_end=[$6])
+- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0),
DESCRIPTOR(_UTF-16LE'new_proctime'), 300000:INTERVAL MINUTE)],
rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we,
TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3], new_proctime=[$4])
- +- LogicalProject(a=[$0], b=[$1], ws=[COALESCE($2, $6)],
we=[COALESCE($3, $7)], new_proctime=[PROCTIME()])
+ +- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3],
new_proctime=[PROCTIME()])
+- LogicalJoin(condition=[AND(=($2, $6), =($3, $7))],
joinType=[inner])
:- LogicalProject(a=[$0], b=[$1], window_start=[$7],
window_end=[$8])
: +- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0),
DESCRIPTOR(_UTF-16LE'proctime'), 300000:INTERVAL MINUTE)],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3)
d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME*
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3)
*PROCTIME* window_time)])
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/plan/calc-coalesce.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/plan/calc-coalesce.json
new file mode 100644
index 00000000000..70f6bc7db5f
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/plan/calc-coalesce.json
@@ -0,0 +1,135 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "DECIMAL(2, 1)"
+ }, {
+ "name" : "b",
+ "dataType" : "DECIMAL(4, 2)"
+ }, {
+ "name" : "c",
+ "dataType" : "TIMESTAMP(0)"
+ }, {
+ "name" : "d",
+ "dataType" : "TIMESTAMP(3)"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` DECIMAL(2, 1), `b` DECIMAL(4, 2), `c`
TIMESTAMP(0), `d` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a, b, c, d])"
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "CALL",
+ "internalName" : "$COALESCE$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "DECIMAL(2, 1)"
+ } ],
+ "type" : "DECIMAL(4, 2)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "DECIMAL(4, 2)"
+ } ],
+ "type" : "DECIMAL(4, 2)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$COALESCE$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(0)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`x` DECIMAL(4, 2), `y` TIMESTAMP(3)>",
+ "description" : "Calc(select=[COALESCE(CAST(a AS DECIMAL(4, 2)), b) AS x,
COALESCE(CAST(c AS TIMESTAMP(3)), d) AS y])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "x",
+ "dataType" : "DECIMAL(4, 2)"
+ }, {
+ "name" : "y",
+ "dataType" : "TIMESTAMP(3)"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "upsertMaterializeStrategy" : "MAP",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`x` DECIMAL(4, 2), `y` TIMESTAMP(3)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[x, y])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/savepoint/_metadata
new file mode 100644
index 00000000000..e5f4d56b179
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
index 59da2c17b3a..d78082ac8f2 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
@@ -1,8 +1,8 @@
{
- "flinkVersion" : "2.0",
+ "flinkVersion" : "2.4",
"nodes" : [ {
- "id" : 21,
- "type" : "stream-exec-table-source-scan_1",
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_2",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`t`",
@@ -11,18 +11,15 @@
"columns" : [ {
"name" : "a",
"dataType" : "BIGINT"
- } ],
- "watermarkSpecs" : [ ]
- },
- "partitionKeys" : [ ]
+ } ]
+ }
}
}
},
"outputType" : "ROW<`a` BIGINT>",
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a])",
- "inputProperties" : [ ]
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a])"
}, {
- "id" : 22,
+ "id" : 2,
"type" : "stream-exec-calc_1",
"projection" : [ {
"kind" : "CALL",
@@ -41,8 +38,8 @@
}
}, {
"kind" : "CALL",
+ "syntax" : "FUNCTION_ID",
"internalName" : "$CURRENT_TIMESTAMP$1",
- "operands" : [ ],
"type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL"
} ],
"type" : "BIGINT NOT NULL"
@@ -64,8 +61,8 @@
"outputType" : "ROW<`EXPR$0` BIGINT>",
"description" : "Calc(select=[(EXTRACT(YEAR, CURRENT_TIMESTAMP()) / a) AS
EXPR$0])"
}, {
- "id" : 23,
- "type" : "stream-exec-sink_1",
+ "id" : 3,
+ "type" : "stream-exec-sink_2",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
@@ -81,14 +78,13 @@
"columns" : [ {
"name" : "a",
"dataType" : "BIGINT"
- } ],
- "watermarkSpecs" : [ ]
- },
- "partitionKeys" : [ ]
+ } ]
+ }
}
}
},
"inputChangelogMode" : [ "INSERT" ],
+ "upsertMaterializeStrategy" : "ADAPTIVE",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
@@ -100,15 +96,15 @@
"description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[EXPR$0])"
} ],
"edges" : [ {
- "source" : 21,
- "target" : 22,
+ "source" : 1,
+ "target" : 2,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
- "source" : 22,
- "target" : 23,
+ "source" : 2,
+ "target" : 3,
"shuffle" : {
"type" : "FORWARD"
},
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
index 085b8798cf8..39bef7a5568 100644
Binary files
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/CoalesceFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/CoalesceFunction.java
deleted file mode 100644
index 4f2253c7810..00000000000
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/CoalesceFunction.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.scalar;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
-
-import javax.annotation.Nullable;
-
-/** Implementation of {@link BuiltInFunctionDefinitions#COALESCE}. */
-@Internal
-public class CoalesceFunction extends BuiltInScalarFunction {
-
- public CoalesceFunction(SpecializedContext context) {
- super(BuiltInFunctionDefinitions.COALESCE, context);
- }
-
- public @Nullable Object eval(Object... args) {
- for (Object arg : args) {
- if (arg != null) {
- return arg;
- }
- }
- return null;
- }
-}