This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a0cdd60a5ef717fd249d797b0019fdb436ad7da9 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Thu Sep 9 15:56:47 2021 +0200 [FLINK-22942] [sql/planner] Disable UPSERT INTO statement, including a test Signed-off-by: slinkydeveloper <francescogu...@gmail.com> --- .../planner/calcite/FlinkCalciteSqlValidator.java | 14 +++++++ .../table/planner/calcite/FlinkPlannerImpl.scala | 7 +++- .../calcite/FlinkCalciteSqlValidatorTest.java | 44 ++++++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java index bd5a4a8..1955c4a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java @@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.JoinType; import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; @@ -125,4 +126,17 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { // this makes it possible to ignore them in the validator and fall back to regular row types // see also SqlFunction#deriveType } + + @Override + public void validateInsert(SqlInsert insert) { + // TODO Seems like this validation breaks the SqlToOperationConverter, + // we should investigate what happens inside this method invocation. + // super.validateInsert(insert); + + // We don't support UPSERT INTO semantics (see FLINK-24225). + if (insert.isUpsert()) { + throw new ValidationException( + "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); + } + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index 692d6a4..eefdb34 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -33,7 +33,7 @@ import org.apache.calcite.rel.{RelFieldCollation, RelRoot} import org.apache.calcite.rex.{RexInputRef, RexNode} import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator} import org.apache.calcite.sql.validate.SqlValidator -import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable} +import org.apache.calcite.sql.{SqlExplain, SqlInsert, SqlKind, SqlNode, SqlOperatorTable} import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter} import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} import org.apache.flink.sql.parser.ddl.{SqlReset, SqlSet, SqlUseModules} @@ -119,7 +119,6 @@ class FlinkPlannerImpl( } // no need to validate row type for DDL and insert nodes. if (sqlNode.getKind.belongsTo(SqlKind.DDL) - || sqlNode.getKind == SqlKind.INSERT || sqlNode.getKind == SqlKind.CREATE_FUNCTION || sqlNode.getKind == SqlKind.DROP_FUNCTION || sqlNode.getKind == SqlKind.OTHER_DDL @@ -156,6 +155,10 @@ class FlinkPlannerImpl( } richExplain.setOperand(0, validatedStatement) richExplain + // Insert requires validation but no row validation + case insert: SqlInsert => + validator.validateInsert(insert) + insert case _ => validator.validate(sqlNode) } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java new file mode 100644 index 0000000..33afb12 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.utils.PlannerMocks; + +import org.junit.Assert; +import org.junit.Test; + +/** Test for {@link FlinkCalciteSqlValidator}. */ +public class FlinkCalciteSqlValidatorTest { + + private final PlannerMocks plannerMocks = + PlannerMocks.create() + .registerTemporaryTable( + "t1", Schema.newBuilder().column("a", DataTypes.INT()).build()); + + @Test + public void testUpsertInto() { + Assert.assertThrows( + "UPSERT INTO statement is not supported. Please use INSERT INTO instead.", + ValidationException.class, + () -> plannerMocks.getParser().parse("UPSERT INTO t1 VALUES(1)")); + } +}