This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0cdd60a5ef717fd249d797b0019fdb436ad7da9
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Thu Sep 9 15:56:47 2021 +0200

    [FLINK-22942] [sql/planner] Disable UPSERT INTO statement, including a test
    
    Signed-off-by: slinkydeveloper <francescogu...@gmail.com>
---
 .../planner/calcite/FlinkCalciteSqlValidator.java  | 14 +++++++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  7 +++-
 .../calcite/FlinkCalciteSqlValidatorTest.java      | 44 ++++++++++++++++++++++
 3 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index bd5a4a8..1955c4a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.JoinType;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlJoin;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
@@ -125,4 +126,17 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
         // this makes it possible to ignore them in the validator and fall 
back to regular row types
         // see also SqlFunction#deriveType
     }
+
+    @Override
+    public void validateInsert(SqlInsert insert) {
+        // TODO Seems like this validation breaks the SqlToOperationConverter,
+        //  we should investigate what happens inside this method invocation.
+        // super.validateInsert(insert);
+
+        // We don't support UPSERT INTO semantics (see FLINK-24225).
+        if (insert.isUpsert()) {
+            throw new ValidationException(
+                    "UPSERT INTO statement is not supported. Please use INSERT 
INTO instead.");
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 692d6a4..eefdb34 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -33,7 +33,7 @@ import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
 import org.apache.calcite.rex.{RexInputRef, RexNode}
 import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
 import org.apache.calcite.sql.validate.SqlValidator
-import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlExplain, SqlInsert, SqlKind, SqlNode, 
SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
 import org.apache.flink.sql.parser.ddl.{SqlReset, SqlSet, SqlUseModules}
@@ -119,7 +119,6 @@ class FlinkPlannerImpl(
       }
       // no need to validate row type for DDL and insert nodes.
       if (sqlNode.getKind.belongsTo(SqlKind.DDL)
-        || sqlNode.getKind == SqlKind.INSERT
         || sqlNode.getKind == SqlKind.CREATE_FUNCTION
         || sqlNode.getKind == SqlKind.DROP_FUNCTION
         || sqlNode.getKind == SqlKind.OTHER_DDL
@@ -156,6 +155,10 @@ class FlinkPlannerImpl(
           }
           richExplain.setOperand(0, validatedStatement)
           richExplain
+        // Insert requires validation but no row validation
+        case insert: SqlInsert =>
+          validator.validateInsert(insert)
+          insert
         case _ =>
           validator.validate(sqlNode)
       }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java
new file mode 100644
index 0000000..33afb12
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.utils.PlannerMocks;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test for {@link FlinkCalciteSqlValidator}. */
+public class FlinkCalciteSqlValidatorTest {
+
+    private final PlannerMocks plannerMocks =
+            PlannerMocks.create()
+                    .registerTemporaryTable(
+                            "t1", Schema.newBuilder().column("a", 
DataTypes.INT()).build());
+
+    @Test
+    public void testUpsertInto() {
+        Assert.assertThrows(
+                "UPSERT INTO statement is not supported. Please use INSERT 
INTO instead.",
+                ValidationException.class,
+                () -> plannerMocks.getParser().parse("UPSERT INTO t1 
VALUES(1)"));
+    }
+}

Reply via email to