This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c3909  [FLINK-17449][sql-parser-hive] Implement ADD/DROP partitions
d3c3909 is described below

commit d3c39090a73aeb928c78ecd63e1fd6190c0df1f1
Author: Rui Li <li...@apache.org>
AuthorDate: Sun May 17 21:07:07 2020 +0800

    [FLINK-17449][sql-parser-hive] Implement ADD/DROP partitions
    
    
    This closes #12195
---
 .../flink/connectors/hive/HiveDialectTest.java     |  35 ++++++--
 .../src/main/codegen/data/Parser.tdd               |   2 +
 .../src/main/codegen/includes/parserImpls.ftl      |  79 +++++++++++++++-
 .../sql/parser/hive/ddl/SqlAddHivePartitions.java  |  85 ++++++++++++++++++
 .../parser/hive/FlinkHiveSqlParserImplTest.java    |  27 ++++++
 .../flink/sql/parser/ddl/SqlAddPartitions.java     | 100 +++++++++++++++++++++
 .../flink/sql/parser/ddl/SqlDropPartitions.java    |  87 ++++++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   |  21 +++++
 .../operations/ddl/AddPartitionsOperation.java     |  74 +++++++++++++++
 .../operations/ddl/DropPartitionsOperation.java    |  60 +++++++++++++
 .../operations/SqlToOperationConverter.java        |  22 +++++
 11 files changed, 582 insertions(+), 10 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
index b85309b..9b6df99 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartitionImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -60,7 +59,6 @@ import org.junit.Test;
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 
@@ -302,8 +300,7 @@ public class HiveDialectTest {
 
                // add/replace columns cascade
                tableEnv.executeSql("create table tbl2 (x int) partitioned by 
(dt date,id bigint)");
-               ObjectPath tablePath2 = new ObjectPath("default", "tbl2");
-               // TODO: use DDL to add partitions once we support it
+               tableEnv.executeSql("alter table tbl2 add partition 
(dt='2020-01-23',id=1) partition (dt='2020-04-24',id=2)");
                CatalogPartitionSpec partitionSpec1 = new 
CatalogPartitionSpec(new LinkedHashMap<String, String>() {{
                        put("dt", "2020-01-23");
                        put("id", "1");
@@ -312,9 +309,8 @@ public class HiveDialectTest {
                        put("dt", "2020-04-24");
                        put("id", "2");
                }});
-               hiveCatalog.createPartition(tablePath2, partitionSpec1, new 
CatalogPartitionImpl(Collections.emptyMap(), null), false);
-               hiveCatalog.createPartition(tablePath2, partitionSpec2, new 
CatalogPartitionImpl(Collections.emptyMap(), null), false);
                tableEnv.executeSql("alter table tbl2 replace columns (ti 
tinyint,d decimal) cascade");
+               ObjectPath tablePath2 = new ObjectPath("default", "tbl2");
                hiveTable = hiveCatalog.getHiveTable(tablePath2);
                Partition hivePartition = 
hiveCatalog.getHivePartition(hiveTable, partitionSpec1);
                assertEquals(2, hivePartition.getSd().getColsSize());
@@ -344,7 +340,7 @@ public class HiveDialectTest {
        @Test
        public void testAlterPartition() throws Exception {
                tableEnv.executeSql("create table tbl (x tinyint,y string) 
partitioned by (p1 bigint,p2 date)");
-               // TODO: use DDL to add partitions once we support it
+               tableEnv.executeSql("alter table tbl add partition 
(p1=1000,p2='2020-05-01') partition (p1=2000,p2='2020-01-01')");
                CatalogPartitionSpec spec1 = new CatalogPartitionSpec(new 
LinkedHashMap<String, String>() {{
                        put("p1", "1000");
                        put("p2", "2020-05-01");
@@ -354,8 +350,6 @@ public class HiveDialectTest {
                        put("p2", "2020-01-01");
                }});
                ObjectPath tablePath = new ObjectPath("default", "tbl");
-               hiveCatalog.createPartition(tablePath, spec1, new 
CatalogPartitionImpl(Collections.emptyMap(), null), false);
-               hiveCatalog.createPartition(tablePath, spec2, new 
CatalogPartitionImpl(Collections.emptyMap(), null), false);
 
                Table hiveTable = hiveCatalog.getHiveTable(tablePath);
 
@@ -438,6 +432,29 @@ public class HiveDialectTest {
                assertEquals(DEFAULT_BUILTIN_DATABASE, 
databases.get(0).toString());
        }
 
+       @Test
+       public void testAddDropPartitions() throws Exception {
+               tableEnv.executeSql("create table tbl (x int,y binary) 
partitioned by (dt date,country string)");
+               tableEnv.executeSql("alter table tbl add partition 
(dt='2020-04-30',country='china') partition (dt='2020-04-30',country='us')");
+
+               ObjectPath tablePath = new ObjectPath("default", "tbl");
+               assertEquals(2, hiveCatalog.listPartitions(tablePath).size());
+
+               String partLocation = warehouse + "/part3_location";
+               tableEnv.executeSql(String.format(
+                               "alter table tbl add partition 
(dt='2020-05-01',country='belgium') location '%s'", partLocation));
+               Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+               CatalogPartitionSpec spec = new CatalogPartitionSpec(new 
LinkedHashMap<String, String>() {{
+                       put("dt", "2020-05-01");
+                       put("country", "belgium");
+               }});
+               Partition hivePartition = 
hiveCatalog.getHivePartition(hiveTable, spec);
+               assertEquals(partLocation, 
locationPath(hivePartition.getSd().getLocation()));
+
+               tableEnv.executeSql("alter table tbl drop partition 
(dt='2020-04-30',country='china'),partition 
(dt='2020-05-01',country='belgium')");
+               assertEquals(1, hiveCatalog.listPartitions(tablePath).size());
+       }
+
        private static String locationPath(String locationURI) throws 
URISyntaxException {
                return new URI(locationURI).getPath();
        }
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
index 0657f64..e000d34 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
@@ -24,6 +24,7 @@
   # Please keep the import classes in alphabetical order if new class is added.
   imports: [
     "org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils"
+    "org.apache.flink.sql.parser.hive.ddl.SqlAddHivePartitions"
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseLocation"
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner"
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseProps"
@@ -65,6 +66,7 @@
     "org.apache.flink.sql.parser.ddl.SqlCreateView"
     "org.apache.flink.sql.parser.ddl.SqlDropDatabase"
     "org.apache.flink.sql.parser.ddl.SqlDropFunction"
+    "org.apache.flink.sql.parser.ddl.SqlDropPartitions"
     "org.apache.flink.sql.parser.ddl.SqlDropTable"
     "org.apache.flink.sql.parser.ddl.SqlDropView"
     "org.apache.flink.sql.parser.ddl.SqlTableColumn"
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index 4aba880..2ffc233 100644
--- 
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ 
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1136,12 +1136,32 @@ SqlAlterTable SqlAlterTable() :
             }
         )
     |
-        <ADD> <COLUMNS>
+        <ADD>
+        (
+            <COLUMNS>
+            {
+                EnsureAlterTableOnly(partitionSpec, "Add columns");
+                return SqlAlterHiveTableAddReplaceColumn(startPos, 
tableIdentifier, false);
+            }
+        |
+            [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ]
+            {
+                EnsureAlterTableOnly(partitionSpec, "Add partitions");
+                return SqlAddHivePartitions(startPos, tableIdentifier, 
ifNotExists);
+            }
+        )
         {
             EnsureAlterTableOnly(partitionSpec, "Add columns");
             return SqlAlterHiveTableAddReplaceColumn(startPos, 
tableIdentifier, false);
         }
     |
+        <DROP>
+        [ <IF> <EXISTS> { ifExists = true; } ]
+        {
+            EnsureAlterTableOnly(partitionSpec, "Drop partitions");
+            return SqlDropPartitions(startPos, tableIdentifier, ifExists);
+        }
+    |
         <REPLACE> <COLUMNS>
         {
             EnsureAlterTableOnly(partitionSpec, "Replace columns");
@@ -1368,3 +1388,60 @@ SqlAlterView SqlAlterView() :
       }
   )
 }
+
+/**
+ * Hive syntax:
+ *
+ * ALTER TABLE table_name ADD [IF NOT EXISTS]
+ *     PARTITION partition_spec [LOCATION 'location'][PARTITION partition_spec 
[LOCATION 'location']][...];
+ */
+SqlAlterTable SqlAddHivePartitions(SqlParserPos startPos, SqlIdentifier 
tableIdentifier, boolean ifNotExists) :
+{
+  List<SqlNodeList> partSpecs = new ArrayList();
+  List<SqlCharStringLiteral> partLocations = new ArrayList();
+  SqlNodeList partSpec;
+  SqlCharStringLiteral partLocation;
+}
+{
+  (
+    <PARTITION>
+    {
+      partSpec = new SqlNodeList(getPos());
+      partLocation = null;
+      PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec);
+    }
+    [ <LOCATION> <QUOTED_STRING> { partLocation = 
createStringLiteral(token.image, getPos()); } ]
+    { partSpecs.add(partSpec); partLocations.add(partLocation); }
+  )+
+  { return new SqlAddHivePartitions(startPos.plus(getPos()), tableIdentifier, 
ifNotExists, partSpecs, partLocations); }
+}
+
+/**
+ * Hive syntax:
+ *
+ * ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec[, 
PARTITION partition_spec, ...]
+ *    [IGNORE PROTECTION] [PURGE];
+ */
+SqlAlterTable SqlDropPartitions(SqlParserPos startPos, SqlIdentifier 
tableIdentifier, boolean ifExists) :
+{
+  List<SqlNodeList> partSpecs = new ArrayList();
+  SqlNodeList partSpec;
+}
+{
+  <PARTITION>
+  {
+    partSpec = new SqlNodeList(getPos());
+    PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec);
+    partSpecs.add(partSpec);
+  }
+  (
+    <COMMA>
+    <PARTITION>
+    {
+      partSpec = new SqlNodeList(getPos());
+      PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec);
+      partSpecs.add(partSpec);
+    }
+  )*
+  { return new SqlDropPartitions(startPos.plus(getPos()), tableIdentifier, 
ifExists, partSpecs); }
+}
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
new file mode 100644
index 0000000..d003f6e
--- /dev/null
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlAddPartitions;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Add partitions to a Hive table.
+ *
+ * <p>Hive syntax:
+ * ALTER TABLE table_name ADD [IF NOT EXISTS]
+ *     PARTITION partition_spec [LOCATION 'location'][PARTITION partition_spec 
[LOCATION 'location']][...];
+ */
+public class SqlAddHivePartitions extends SqlAddPartitions {
+
+       private final List<SqlCharStringLiteral> partLocations;
+
+       public SqlAddHivePartitions(SqlParserPos pos, SqlIdentifier tableName, 
boolean ifNotExists,
+                       List<SqlNodeList> partSpecs, List<SqlCharStringLiteral> 
partLocations) {
+               super(pos, tableName, ifNotExists, partSpecs, 
toProps(partLocations));
+               this.partLocations = partLocations;
+       }
+
+       private static List<SqlNodeList> toProps(List<SqlCharStringLiteral> 
partLocations) {
+               List<SqlNodeList> res = new ArrayList<>(partLocations.size());
+               for (SqlCharStringLiteral partLocation : partLocations) {
+                       SqlNodeList prop = null;
+                       if (partLocation != null) {
+                               prop = new 
SqlNodeList(partLocation.getParserPosition());
+                               
prop.add(HiveDDLUtils.toTableOption(SqlCreateHiveTable.TABLE_LOCATION_URI, 
partLocation, partLocation.getParserPosition()));
+                       }
+                       res.add(prop);
+               }
+               return res;
+       }
+
+       @Override
+       public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+               writer.keyword("ALTER TABLE");
+               tableIdentifier.unparse(writer, leftPrec, rightPrec);
+               writer.newlineAndIndent();
+               writer.keyword("ADD");
+               if (ifNotExists()) {
+                       writer.keyword("IF NOT EXISTS");
+               }
+               int opLeftPrec = getOperator().getLeftPrec();
+               int opRightPrec = getOperator().getRightPrec();
+               for (int i = 0; i < getPartSpecs().size(); i++) {
+                       writer.newlineAndIndent();
+                       SqlNodeList partSpec = getPartSpecs().get(i);
+                       writer.keyword("PARTITION");
+                       partSpec.unparse(writer, opLeftPrec, opRightPrec);
+                       SqlCharStringLiteral location = partLocations.get(i);
+                       if (location != null) {
+                               writer.keyword("LOCATION");
+                               location.unparse(writer, opLeftPrec, 
opRightPrec);
+                       }
+               }
+       }
+}
diff --git 
a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
 
b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 5e5c443..7130713 100644
--- 
a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -383,4 +383,31 @@ public class FlinkHiveSqlParserImplTest extends 
SqlParserTest {
                                                "SELECT `C1`, `C2`\n" +
                                                "FROM `TBL`");
        }
+
+       @Test
+       public void testAddPartition() {
+               sql("alter table tbl add partition (p1=1,p2='a') location 
'/part1/location'")
+                               .ok("ALTER TABLE `TBL`\n" +
+                                               "ADD\n" +
+                                               "PARTITION (`P1` = 1, `P2` = 
'a') LOCATION '/part1/location'");
+               sql("alter table tbl add if not exists partition (p=1) 
partition (p=2) location '/part2/location'")
+                               .ok("ALTER TABLE `TBL`\n" +
+                                               "ADD IF NOT EXISTS\n" +
+                                               "PARTITION (`P` = 1)\n" +
+                                               "PARTITION (`P` = 2) LOCATION 
'/part2/location'");
+       }
+
+       @Test
+       public void testDropPartition() {
+               sql("alter table tbl drop if exists partition (p=1)")
+                               .ok("ALTER TABLE `TBL`\n" +
+                                               "DROP IF EXISTS\n" +
+                                               "PARTITION (`P` = 1)");
+               sql("alter table tbl drop partition (p1='a',p2=1), 
partition(p1='b',p2=2)")
+                               .ok("ALTER TABLE `TBL`\n" +
+                                               "DROP\n" +
+                                               "PARTITION (`P1` = 'a', `P2` = 
1)\n" +
+                                               "PARTITION (`P1` = 'b', `P2` = 
2)");
+               // TODO: support IGNORE PROTECTION, PURGE
+       }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
new file mode 100644
index 0000000..7aa71ab
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlPartitionUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * ALTER TABLE DDL to add partitions to a table.
+ */
+public class SqlAddPartitions extends SqlAlterTable {
+
+       private final boolean ifNotExists;
+       private final List<SqlNodeList> partSpecs;
+       private final List<SqlNodeList> partProps;
+
+       public SqlAddPartitions(SqlParserPos pos, SqlIdentifier tableName, 
boolean ifNotExists,
+                       List<SqlNodeList> partSpecs, List<SqlNodeList> 
partProps) {
+               super(pos, tableName);
+               this.ifNotExists = ifNotExists;
+               this.partSpecs = partSpecs;
+               this.partProps = partProps;
+       }
+
+       public boolean ifNotExists() {
+               return ifNotExists;
+       }
+
+       public List<SqlNodeList> getPartSpecs() {
+               return partSpecs;
+       }
+
+       public LinkedHashMap<String, String> getPartitionKVs(int i) {
+               return SqlPartitionUtils.getPartitionKVs(getPartSpecs().get(i));
+       }
+
+       public List<SqlNodeList> getPartProps() {
+               return partProps;
+       }
+
+       @Nonnull
+       @Override
+       public List<SqlNode> getOperandList() {
+               List<SqlNode> operands = new ArrayList<>();
+               operands.add(tableIdentifier);
+               operands.addAll(partSpecs);
+               operands.addAll(partProps);
+               return operands;
+       }
+
+       @Override
+       public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+               super.unparse(writer, leftPrec, rightPrec);
+               writer.newlineAndIndent();
+               writer.keyword("ADD");
+               if (ifNotExists) {
+                       writer.keyword("IF NOT EXISTS");
+               }
+               int opLeftPrec = getOperator().getLeftPrec();
+               int opRightPrec = getOperator().getRightPrec();
+               for (int i = 0; i < partSpecs.size(); i++) {
+                       writer.newlineAndIndent();
+                       SqlNodeList partSpec = partSpecs.get(i);
+                       SqlNodeList partProp = partProps.get(i);
+                       writer.keyword("PARTITION");
+                       partSpec.unparse(writer, opLeftPrec, opRightPrec);
+                       if (partProp != null) {
+                               writer.keyword("WITH");
+                               partProp.unparse(writer, opLeftPrec, 
opRightPrec);
+                       }
+               }
+       }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
new file mode 100644
index 0000000..16ed67c
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlPartitionUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * ALTER TABLE DDL to drop partitions of a table.
+ */
+public class SqlDropPartitions extends SqlAlterTable {
+
+       private final boolean ifExists;
+       private final List<SqlNodeList> partSpecs;
+
+       public SqlDropPartitions(SqlParserPos pos, SqlIdentifier tableName, 
boolean ifExists,
+                       List<SqlNodeList> partSpecs) {
+               super(pos, tableName);
+               this.ifExists = ifExists;
+               this.partSpecs = partSpecs;
+       }
+
+       public boolean ifExists() {
+               return ifExists;
+       }
+
+       public List<SqlNodeList> getPartSpecs() {
+               return partSpecs;
+       }
+
+       public LinkedHashMap<String, String> getPartitionKVs(int i) {
+               return SqlPartitionUtils.getPartitionKVs(getPartSpecs().get(i));
+       }
+
+       @Override
+       public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+               super.unparse(writer, leftPrec, rightPrec);
+               writer.newlineAndIndent();
+               writer.keyword("DROP");
+               if (ifExists) {
+                       writer.keyword("IF EXISTS");
+               }
+               int opLeftPrec = getOperator().getLeftPrec();
+               int opRightPrec = getOperator().getRightPrec();
+               for (SqlNodeList partSpec : partSpecs) {
+                       writer.newlineAndIndent();
+                       writer.keyword("PARTITION");
+                       partSpec.unparse(writer, opLeftPrec, opRightPrec);
+               }
+       }
+
+       @Nonnull
+       @Override
+       public List<SqlNode> getOperandList() {
+               List<SqlNode> operands = new ArrayList<>();
+               operands.add(tableIdentifier);
+               operands.addAll(partSpecs);
+               return operands;
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 6a481928..3b4db5a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -42,12 +42,15 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -90,6 +93,7 @@ import 
org.apache.flink.table.operations.TableSourceQueryOperation;
 import org.apache.flink.table.operations.UnregisteredSinkModifyOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
@@ -111,6 +115,7 @@ import 
org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateViewOperation;
 import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
+import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
@@ -838,6 +843,22 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                                        
catalog.alterTable(alterTableSchemaOperation.getTableIdentifier().toObjectPath(),
                                                        
alterTableSchemaOperation.getCatalogTable(),
                                                        false);
+                               } else if (alterTableOperation instanceof 
AddPartitionsOperation) {
+                                       AddPartitionsOperation 
addPartitionsOperation = (AddPartitionsOperation) alterTableOperation;
+                                       List<CatalogPartitionSpec> specs = 
addPartitionsOperation.getPartitionSpecs();
+                                       List<CatalogPartition> partitions = 
addPartitionsOperation.getCatalogPartitions();
+                                       boolean ifNotExists = 
addPartitionsOperation.ifNotExists();
+                                       ObjectPath tablePath = 
addPartitionsOperation.getTableIdentifier().toObjectPath();
+                                       for (int i = 0; i < specs.size(); i++) {
+                                               
catalog.createPartition(tablePath, specs.get(i), partitions.get(i), 
ifNotExists);
+                                       }
+                               } else if (alterTableOperation instanceof 
DropPartitionsOperation) {
+                                       DropPartitionsOperation 
dropPartitionsOperation = (DropPartitionsOperation) alterTableOperation;
+                                       ObjectPath tablePath = 
dropPartitionsOperation.getTableIdentifier().toObjectPath();
+                                       boolean ifExists = 
dropPartitionsOperation.ifExists();
+                                       for (CatalogPartitionSpec spec : 
dropPartitionsOperation.getPartitionSpecs()) {
+                                               
catalog.dropPartition(tablePath, spec, ifExists);
+                                       }
                                }
                                return TableResultImpl.TABLE_RESULT_OK;
                        } catch (TableAlreadyExistException | 
TableNotExistException e) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
new file mode 100644
index 0000000..1802375
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Operation to describe ALTER TABLE ADD PARTITION statement.
+ */
+public class AddPartitionsOperation extends AlterTableOperation {
+
+       private final boolean ifNotExists;
+       private final List<CatalogPartitionSpec> partitionSpecs;
+       private final List<CatalogPartition> catalogPartitions;
+
+       public AddPartitionsOperation(ObjectIdentifier tableIdentifier, boolean 
ifNotExists,
+                       List<CatalogPartitionSpec> partitionSpecs, 
List<CatalogPartition> catalogPartitions) {
+               super(tableIdentifier);
+               this.ifNotExists = ifNotExists;
+               this.partitionSpecs = partitionSpecs;
+               this.catalogPartitions = catalogPartitions;
+       }
+
+       public List<CatalogPartitionSpec> getPartitionSpecs() {
+               return partitionSpecs;
+       }
+
+       public List<CatalogPartition> getCatalogPartitions() {
+               return catalogPartitions;
+       }
+
+       public boolean ifNotExists() {
+               return ifNotExists;
+       }
+
+       @Override
+       public String asSummaryString() {
+               StringBuilder builder = new StringBuilder(String.format("ALTER 
TABLE %s ADD", tableIdentifier.asSummaryString()));
+               if (ifNotExists) {
+                       builder.append(" IF NOT EXISTS");
+               }
+               for (int i = 0; i < partitionSpecs.size(); i++) {
+                       String spec = 
OperationUtils.formatPartitionSpec(partitionSpecs.get(i));
+                       builder.append(String.format(" PARTITION (%s)", spec));
+                       Map<String, String> properties = 
catalogPartitions.get(i).getProperties();
+                       if (!properties.isEmpty()) {
+                               builder.append(String.format(" WITH (%s)", 
OperationUtils.formatProperties(properties)));
+                       }
+               }
+               return builder.toString();
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
new file mode 100644
index 0000000..f3fc4f3
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.List;
+
+/**
+ * Operation to describe ALTER TABLE DROP PARTITION statement.
+ */
+public class DropPartitionsOperation extends AlterTableOperation {
+
+       private final boolean ifExists;
+       private final List<CatalogPartitionSpec> partitionSpecs;
+
+       public DropPartitionsOperation(ObjectIdentifier tableIdentifier, 
boolean ifExists, List<CatalogPartitionSpec> partitionSpecs) {
+               super(tableIdentifier);
+               this.ifExists = ifExists;
+               this.partitionSpecs = partitionSpecs;
+       }
+
+       public boolean ifExists() {
+               return ifExists;
+       }
+
+       public List<CatalogPartitionSpec> getPartitionSpecs() {
+               return partitionSpecs;
+       }
+
+       @Override
+       public String asSummaryString() {
+               StringBuilder builder = new StringBuilder(String.format("ALTER 
TABLE %s DROP", tableIdentifier.asSummaryString()));
+               if (ifExists) {
+                       builder.append(" IF EXISTS");
+               }
+               for (CatalogPartitionSpec spec : partitionSpecs) {
+                       builder.append(String.format(" PARTITION (%s)", 
OperationUtils.formatPartitionSpec(spec)));
+               }
+               return builder.toString();
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index b496ddf..615ad60 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.sql.parser.ddl.SqlAddPartitions;
 import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
 import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
 import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
@@ -38,6 +39,7 @@ import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.ddl.SqlCreateView;
 import org.apache.flink.sql.parser.ddl.SqlDropDatabase;
 import org.apache.flink.sql.parser.ddl.SqlDropFunction;
+import org.apache.flink.sql.parser.ddl.SqlDropPartitions;
 import org.apache.flink.sql.parser.ddl.SqlDropTable;
 import org.apache.flink.sql.parser.ddl.SqlDropView;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
@@ -84,6 +86,7 @@ import org.apache.flink.table.operations.ShowTablesOperation;
 import org.apache.flink.table.operations.ShowViewsOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
@@ -101,6 +104,7 @@ import 
org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateViewOperation;
 import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
+import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
@@ -126,6 +130,7 @@ import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
 import org.apache.calcite.sql.parser.SqlParser;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -363,6 +368,23 @@ public class SqlToOperationConverter {
                                        (SqlChangeColumn) sqlAlterTable,
                                        (CatalogTable) baseTable,
                                        flinkPlanner.getOrCreateSqlValidator());
+               } else if (sqlAlterTable instanceof SqlAddPartitions) {
+                       List<CatalogPartitionSpec> specs = new ArrayList<>();
+                       List<CatalogPartition> partitions = new ArrayList<>();
+                       SqlAddPartitions addPartitions = (SqlAddPartitions) 
sqlAlterTable;
+                       for (int i = 0; i < 
addPartitions.getPartSpecs().size(); i++) {
+                               specs.add(new 
CatalogPartitionSpec(addPartitions.getPartitionKVs(i)));
+                               Map<String, String> props = 
OperationConverterUtils.extractProperties(addPartitions.getPartProps().get(i));
+                               partitions.add(new CatalogPartitionImpl(props, 
null));
+                       }
+                       return new AddPartitionsOperation(tableIdentifier, 
addPartitions.ifNotExists(), specs, partitions);
+               } else if (sqlAlterTable instanceof SqlDropPartitions) {
+                       SqlDropPartitions dropPartitions = (SqlDropPartitions) 
sqlAlterTable;
+                       List<CatalogPartitionSpec> specs = new ArrayList<>();
+                       for (int i = 0; i < 
dropPartitions.getPartSpecs().size(); i++) {
+                               specs.add(new 
CatalogPartitionSpec(dropPartitions.getPartitionKVs(i)));
+                       }
+                       return new DropPartitionsOperation(tableIdentifier, 
dropPartitions.ifExists(), specs);
                } else {
                        throw new ValidationException(
                                        String.format("[%s] needs to implement",

Reply via email to