This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d3c3909 [FLINK-17449][sql-parser-hive] Implement ADD/DROP partitions d3c3909 is described below commit d3c39090a73aeb928c78ecd63e1fd6190c0df1f1 Author: Rui Li <li...@apache.org> AuthorDate: Sun May 17 21:07:07 2020 +0800 [FLINK-17449][sql-parser-hive] Implement ADD/DROP partitions This closes #12195 --- .../flink/connectors/hive/HiveDialectTest.java | 35 ++++++-- .../src/main/codegen/data/Parser.tdd | 2 + .../src/main/codegen/includes/parserImpls.ftl | 79 +++++++++++++++- .../sql/parser/hive/ddl/SqlAddHivePartitions.java | 85 ++++++++++++++++++ .../parser/hive/FlinkHiveSqlParserImplTest.java | 27 ++++++ .../flink/sql/parser/ddl/SqlAddPartitions.java | 100 +++++++++++++++++++++ .../flink/sql/parser/ddl/SqlDropPartitions.java | 87 ++++++++++++++++++ .../table/api/internal/TableEnvironmentImpl.java | 21 +++++ .../operations/ddl/AddPartitionsOperation.java | 74 +++++++++++++++ .../operations/ddl/DropPartitionsOperation.java | 60 +++++++++++++ .../operations/SqlToOperationConverter.java | 22 +++++ 11 files changed, 582 insertions(+), 10 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java index b85309b..9b6df99 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java @@ -24,7 +24,6 @@ import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.CatalogPartitionImpl; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; @@ -60,7 +59,6 @@ import org.junit.Test; import java.io.File; import java.net.URI; import java.net.URISyntaxException; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -302,8 +300,7 @@ public class HiveDialectTest { // add/replace columns cascade tableEnv.executeSql("create table tbl2 (x int) partitioned by (dt date,id bigint)"); - ObjectPath tablePath2 = new ObjectPath("default", "tbl2"); - // TODO: use DDL to add partitions once we support it + tableEnv.executeSql("alter table tbl2 add partition (dt='2020-01-23',id=1) partition (dt='2020-04-24',id=2)"); CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{ put("dt", "2020-01-23"); put("id", "1"); @@ -312,9 +309,8 @@ public class HiveDialectTest { put("dt", "2020-04-24"); put("id", "2"); }}); - hiveCatalog.createPartition(tablePath2, partitionSpec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false); - hiveCatalog.createPartition(tablePath2, partitionSpec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false); tableEnv.executeSql("alter table tbl2 replace columns (ti tinyint,d decimal) cascade"); + ObjectPath tablePath2 = new ObjectPath("default", "tbl2"); hiveTable = hiveCatalog.getHiveTable(tablePath2); Partition hivePartition = hiveCatalog.getHivePartition(hiveTable, partitionSpec1); assertEquals(2, hivePartition.getSd().getColsSize()); @@ -344,7 +340,7 @@ public class HiveDialectTest { @Test public void testAlterPartition() throws Exception { tableEnv.executeSql("create table tbl (x tinyint,y string) partitioned by (p1 bigint,p2 date)"); - // TODO: use DDL to add partitions once we support it + tableEnv.executeSql("alter table tbl add partition (p1=1000,p2='2020-05-01') partition (p1=2000,p2='2020-01-01')"); CatalogPartitionSpec spec1 = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{ put("p1", "1000"); put("p2", "2020-05-01"); @@ -354,8 +350,6 @@ public class HiveDialectTest { put("p2", "2020-01-01"); }}); ObjectPath tablePath = new ObjectPath("default", "tbl"); - hiveCatalog.createPartition(tablePath, spec1, new CatalogPartitionImpl(Collections.emptyMap(), null), false); - hiveCatalog.createPartition(tablePath, spec2, new CatalogPartitionImpl(Collections.emptyMap(), null), false); Table hiveTable = hiveCatalog.getHiveTable(tablePath); @@ -438,6 +432,29 @@ public class HiveDialectTest { assertEquals(DEFAULT_BUILTIN_DATABASE, databases.get(0).toString()); } + @Test + public void testAddDropPartitions() throws Exception { + tableEnv.executeSql("create table tbl (x int,y binary) partitioned by (dt date,country string)"); + tableEnv.executeSql("alter table tbl add partition (dt='2020-04-30',country='china') partition (dt='2020-04-30',country='us')"); + + ObjectPath tablePath = new ObjectPath("default", "tbl"); + assertEquals(2, hiveCatalog.listPartitions(tablePath).size()); + + String partLocation = warehouse + "/part3_location"; + tableEnv.executeSql(String.format( + "alter table tbl add partition (dt='2020-05-01',country='belgium') location '%s'", partLocation)); + Table hiveTable = hiveCatalog.getHiveTable(tablePath); + CatalogPartitionSpec spec = new CatalogPartitionSpec(new LinkedHashMap<String, String>() {{ + put("dt", "2020-05-01"); + put("country", "belgium"); + }}); + Partition hivePartition = hiveCatalog.getHivePartition(hiveTable, spec); + assertEquals(partLocation, locationPath(hivePartition.getSd().getLocation())); + + tableEnv.executeSql("alter table tbl drop partition (dt='2020-04-30',country='china'),partition (dt='2020-05-01',country='belgium')"); + assertEquals(1, hiveCatalog.listPartitions(tablePath).size()); + } + private static String locationPath(String locationURI) throws URISyntaxException { return new URI(locationURI).getPath(); } diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd index 0657f64..e000d34 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd @@ -24,6 +24,7 @@ # Please keep the import classes in alphabetical order if new class is added. imports: [ "org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils" + "org.apache.flink.sql.parser.hive.ddl.SqlAddHivePartitions" "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseLocation" "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner" "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseProps" @@ -65,6 +66,7 @@ "org.apache.flink.sql.parser.ddl.SqlCreateView" "org.apache.flink.sql.parser.ddl.SqlDropDatabase" "org.apache.flink.sql.parser.ddl.SqlDropFunction" + "org.apache.flink.sql.parser.ddl.SqlDropPartitions" "org.apache.flink.sql.parser.ddl.SqlDropTable" "org.apache.flink.sql.parser.ddl.SqlDropView" "org.apache.flink.sql.parser.ddl.SqlTableColumn" diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl index 4aba880..2ffc233 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl @@ -1136,12 +1136,32 @@ SqlAlterTable SqlAlterTable() : } ) | - <ADD> <COLUMNS> + <ADD> + ( + <COLUMNS> + { + EnsureAlterTableOnly(partitionSpec, "Add columns"); + return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, false); + } + | + [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ] + { + EnsureAlterTableOnly(partitionSpec, "Add partitions"); + return SqlAddHivePartitions(startPos, tableIdentifier, ifNotExists); + } + ) { EnsureAlterTableOnly(partitionSpec, "Add columns"); return SqlAlterHiveTableAddReplaceColumn(startPos, tableIdentifier, false); } | + <DROP> + [ <IF> <EXISTS> { ifExists = true; } ] + { + EnsureAlterTableOnly(partitionSpec, "Drop partitions"); + return SqlDropPartitions(startPos, tableIdentifier, ifExists); + } + | <REPLACE> <COLUMNS> { EnsureAlterTableOnly(partitionSpec, "Replace columns"); @@ -1368,3 +1388,60 @@ SqlAlterView SqlAlterView() : } ) } + +/** + * Hive syntax: + * + * ALTER TABLE table_name ADD [IF NOT EXISTS] + * PARTITION partition_spec [LOCATION 'location'][PARTITION partition_spec [LOCATION 'location']][...]; + */ +SqlAlterTable SqlAddHivePartitions(SqlParserPos startPos, SqlIdentifier tableIdentifier, boolean ifNotExists) : +{ + List<SqlNodeList> partSpecs = new ArrayList(); + List<SqlCharStringLiteral> partLocations = new ArrayList(); + SqlNodeList partSpec; + SqlCharStringLiteral partLocation; +} +{ + ( + <PARTITION> + { + partSpec = new SqlNodeList(getPos()); + partLocation = null; + PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec); + } + [ <LOCATION> <QUOTED_STRING> { partLocation = createStringLiteral(token.image, getPos()); } ] + { partSpecs.add(partSpec); partLocations.add(partLocation); } + )+ + { return new SqlAddHivePartitions(startPos.plus(getPos()), tableIdentifier, ifNotExists, partSpecs, partLocations); } +} + +/** + * Hive syntax: + * + * ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec[, PARTITION partition_spec, ...] + * [IGNORE PROTECTION] [PURGE]; + */ +SqlAlterTable SqlDropPartitions(SqlParserPos startPos, SqlIdentifier tableIdentifier, boolean ifExists) : +{ + List<SqlNodeList> partSpecs = new ArrayList(); + SqlNodeList partSpec; +} +{ + <PARTITION> + { + partSpec = new SqlNodeList(getPos()); + PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec); + partSpecs.add(partSpec); + } + ( + <COMMA> + <PARTITION> + { + partSpec = new SqlNodeList(getPos()); + PartitionSpecCommaList(new SqlNodeList(getPos()), partSpec); + partSpecs.add(partSpec); + } + )* + { return new SqlDropPartitions(startPos.plus(getPos()), tableIdentifier, ifExists, partSpecs); } +} diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java new file mode 100644 index 0000000..d003f6e --- /dev/null +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.hive.ddl; + +import org.apache.flink.sql.parser.ddl.SqlAddPartitions; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.ArrayList; +import java.util.List; + +/** + * Add partitions to a Hive table. + * + * <p>Hive syntax: + * ALTER TABLE table_name ADD [IF NOT EXISTS] + * PARTITION partition_spec [LOCATION 'location'][PARTITION partition_spec [LOCATION 'location']][...]; + */ +public class SqlAddHivePartitions extends SqlAddPartitions { + + private final List<SqlCharStringLiteral> partLocations; + + public SqlAddHivePartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifNotExists, + List<SqlNodeList> partSpecs, List<SqlCharStringLiteral> partLocations) { + super(pos, tableName, ifNotExists, partSpecs, toProps(partLocations)); + this.partLocations = partLocations; + } + + private static List<SqlNodeList> toProps(List<SqlCharStringLiteral> partLocations) { + List<SqlNodeList> res = new ArrayList<>(partLocations.size()); + for (SqlCharStringLiteral partLocation : partLocations) { + SqlNodeList prop = null; + if (partLocation != null) { + prop = new SqlNodeList(partLocation.getParserPosition()); + prop.add(HiveDDLUtils.toTableOption(SqlCreateHiveTable.TABLE_LOCATION_URI, partLocation, partLocation.getParserPosition())); + } + res.add(prop); + } + return res; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("ALTER TABLE"); + tableIdentifier.unparse(writer, leftPrec, rightPrec); + writer.newlineAndIndent(); + writer.keyword("ADD"); + if (ifNotExists()) { + writer.keyword("IF NOT EXISTS"); + } + int opLeftPrec = getOperator().getLeftPrec(); + int opRightPrec = getOperator().getRightPrec(); + for (int i = 0; i < getPartSpecs().size(); i++) { + writer.newlineAndIndent(); + SqlNodeList partSpec = getPartSpecs().get(i); + writer.keyword("PARTITION"); + partSpec.unparse(writer, opLeftPrec, opRightPrec); + SqlCharStringLiteral location = partLocations.get(i); + if (location != null) { + writer.keyword("LOCATION"); + location.unparse(writer, opLeftPrec, opRightPrec); + } + } + } +} diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java index 5e5c443..7130713 100644 --- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java +++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java @@ -383,4 +383,31 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest { "SELECT `C1`, `C2`\n" + "FROM `TBL`"); } + + @Test + public void testAddPartition() { + sql("alter table tbl add partition (p1=1,p2='a') location '/part1/location'") + .ok("ALTER TABLE `TBL`\n" + + "ADD\n" + + "PARTITION (`P1` = 1, `P2` = 'a') LOCATION '/part1/location'"); + sql("alter table tbl add if not exists partition (p=1) partition (p=2) location '/part2/location'") + .ok("ALTER TABLE `TBL`\n" + + "ADD IF NOT EXISTS\n" + + "PARTITION (`P` = 1)\n" + + "PARTITION (`P` = 2) LOCATION '/part2/location'"); + } + + @Test + public void testDropPartition() { + sql("alter table tbl drop if exists partition (p=1)") + .ok("ALTER TABLE `TBL`\n" + + "DROP IF EXISTS\n" + + "PARTITION (`P` = 1)"); + sql("alter table tbl drop partition (p1='a',p2=1), partition(p1='b',p2=2)") + .ok("ALTER TABLE `TBL`\n" + + "DROP\n" + + "PARTITION (`P1` = 'a', `P2` = 1)\n" + + "PARTITION (`P1` = 'b', `P2` = 2)"); + // TODO: support IGNORE PROTECTION, PURGE + } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java new file mode 100644 index 0000000..7aa71ab --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlPartitionUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * ALTER TABLE DDL to add partitions to a table. + */ +public class SqlAddPartitions extends SqlAlterTable { + + private final boolean ifNotExists; + private final List<SqlNodeList> partSpecs; + private final List<SqlNodeList> partProps; + + public SqlAddPartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifNotExists, + List<SqlNodeList> partSpecs, List<SqlNodeList> partProps) { + super(pos, tableName); + this.ifNotExists = ifNotExists; + this.partSpecs = partSpecs; + this.partProps = partProps; + } + + public boolean ifNotExists() { + return ifNotExists; + } + + public List<SqlNodeList> getPartSpecs() { + return partSpecs; + } + + public LinkedHashMap<String, String> getPartitionKVs(int i) { + return SqlPartitionUtils.getPartitionKVs(getPartSpecs().get(i)); + } + + public List<SqlNodeList> getPartProps() { + return partProps; + } + + @Nonnull + @Override + public List<SqlNode> getOperandList() { + List<SqlNode> operands = new ArrayList<>(); + operands.add(tableIdentifier); + operands.addAll(partSpecs); + operands.addAll(partProps); + return operands; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.newlineAndIndent(); + writer.keyword("ADD"); + if (ifNotExists) { + writer.keyword("IF NOT EXISTS"); + } + int opLeftPrec = getOperator().getLeftPrec(); + int opRightPrec = getOperator().getRightPrec(); + for (int i = 0; i < partSpecs.size(); i++) { + writer.newlineAndIndent(); + SqlNodeList partSpec = partSpecs.get(i); + SqlNodeList partProp = partProps.get(i); + writer.keyword("PARTITION"); + partSpec.unparse(writer, opLeftPrec, opRightPrec); + if (partProp != null) { + writer.keyword("WITH"); + partProp.unparse(writer, opLeftPrec, opRightPrec); + } + } + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java new file mode 100644 index 0000000..16ed67c --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlPartitionUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * ALTER TABLE DDL to drop partitions of a table. + */ +public class SqlDropPartitions extends SqlAlterTable { + + private final boolean ifExists; + private final List<SqlNodeList> partSpecs; + + public SqlDropPartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifExists, + List<SqlNodeList> partSpecs) { + super(pos, tableName); + this.ifExists = ifExists; + this.partSpecs = partSpecs; + } + + public boolean ifExists() { + return ifExists; + } + + public List<SqlNodeList> getPartSpecs() { + return partSpecs; + } + + public LinkedHashMap<String, String> getPartitionKVs(int i) { + return SqlPartitionUtils.getPartitionKVs(getPartSpecs().get(i)); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.newlineAndIndent(); + writer.keyword("DROP"); + if (ifExists) { + writer.keyword("IF EXISTS"); + } + int opLeftPrec = getOperator().getLeftPrec(); + int opRightPrec = getOperator().getRightPrec(); + for (SqlNodeList partSpec : partSpecs) { + writer.newlineAndIndent(); + writer.keyword("PARTITION"); + partSpec.unparse(writer, opLeftPrec, opRightPrec); + } + } + + @Nonnull + @Override + public List<SqlNode> getOperandList() { + List<SqlNode> operands = new ArrayList<>(); + operands.add(tableIdentifier); + operands.addAll(partSpecs); + return operands; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 6a481928..3b4db5a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -42,12 +42,15 @@ import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.QueryOperationCatalogView; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.exceptions.CatalogException; @@ -90,6 +93,7 @@ import org.apache.flink.table.operations.TableSourceQueryOperation; import org.apache.flink.table.operations.UnregisteredSinkModifyOperation; import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.ddl.AddPartitionsOperation; import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation; @@ -111,6 +115,7 @@ import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.CreateViewOperation; import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.DropDatabaseOperation; +import org.apache.flink.table.operations.ddl.DropPartitionsOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; @@ -838,6 +843,22 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { catalog.alterTable(alterTableSchemaOperation.getTableIdentifier().toObjectPath(), alterTableSchemaOperation.getCatalogTable(), false); + } else if (alterTableOperation instanceof AddPartitionsOperation) { + AddPartitionsOperation addPartitionsOperation = (AddPartitionsOperation) alterTableOperation; + List<CatalogPartitionSpec> specs = addPartitionsOperation.getPartitionSpecs(); + List<CatalogPartition> partitions = addPartitionsOperation.getCatalogPartitions(); + boolean ifNotExists = addPartitionsOperation.ifNotExists(); + ObjectPath tablePath = addPartitionsOperation.getTableIdentifier().toObjectPath(); + for (int i = 0; i < specs.size(); i++) { + catalog.createPartition(tablePath, specs.get(i), partitions.get(i), ifNotExists); + } + } else if (alterTableOperation instanceof DropPartitionsOperation) { + DropPartitionsOperation dropPartitionsOperation = (DropPartitionsOperation) alterTableOperation; + ObjectPath tablePath = dropPartitionsOperation.getTableIdentifier().toObjectPath(); + boolean ifExists = dropPartitionsOperation.ifExists(); + for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) { + catalog.dropPartition(tablePath, spec, ifExists); + } } return TableResultImpl.TABLE_RESULT_OK; } catch (TableAlreadyExistException | TableNotExistException e) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java new file mode 100644 index 0000000..1802375 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.List; +import java.util.Map; + +/** + * Operation to describe ALTER TABLE ADD PARTITION statement. + */ +public class AddPartitionsOperation extends AlterTableOperation { + + private final boolean ifNotExists; + private final List<CatalogPartitionSpec> partitionSpecs; + private final List<CatalogPartition> catalogPartitions; + + public AddPartitionsOperation(ObjectIdentifier tableIdentifier, boolean ifNotExists, + List<CatalogPartitionSpec> partitionSpecs, List<CatalogPartition> catalogPartitions) { + super(tableIdentifier); + this.ifNotExists = ifNotExists; + this.partitionSpecs = partitionSpecs; + this.catalogPartitions = catalogPartitions; + } + + public List<CatalogPartitionSpec> getPartitionSpecs() { + return partitionSpecs; + } + + public List<CatalogPartition> getCatalogPartitions() { + return catalogPartitions; + } + + public boolean ifNotExists() { + return ifNotExists; + } + + @Override + public String asSummaryString() { + StringBuilder builder = new StringBuilder(String.format("ALTER TABLE %s ADD", tableIdentifier.asSummaryString())); + if (ifNotExists) { + builder.append(" IF NOT EXISTS"); + } + for (int i = 0; i < partitionSpecs.size(); i++) { + String spec = OperationUtils.formatPartitionSpec(partitionSpecs.get(i)); + builder.append(String.format(" PARTITION (%s)", spec)); + Map<String, String> properties = catalogPartitions.get(i).getProperties(); + if (!properties.isEmpty()) { + builder.append(String.format(" WITH (%s)", OperationUtils.formatProperties(properties))); + } + } + return builder.toString(); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java new file mode 100644 index 0000000..f3fc4f3 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.List; + +/** + * Operation to describe ALTER TABLE DROP PARTITION statement. + */ +public class DropPartitionsOperation extends AlterTableOperation { + + private final boolean ifExists; + private final List<CatalogPartitionSpec> partitionSpecs; + + public DropPartitionsOperation(ObjectIdentifier tableIdentifier, boolean ifExists, List<CatalogPartitionSpec> partitionSpecs) { + super(tableIdentifier); + this.ifExists = ifExists; + this.partitionSpecs = partitionSpecs; + } + + public boolean ifExists() { + return ifExists; + } + + public List<CatalogPartitionSpec> getPartitionSpecs() { + return partitionSpecs; + } + + @Override + public String asSummaryString() { + StringBuilder builder = new StringBuilder(String.format("ALTER TABLE %s DROP", tableIdentifier.asSummaryString())); + if (ifExists) { + builder.append(" IF EXISTS"); + } + for (CatalogPartitionSpec spec : partitionSpecs) { + builder.append(String.format(" PARTITION (%s)", OperationUtils.formatPartitionSpec(spec))); + } + return builder.toString(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index b496ddf..615ad60 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.operations; +import org.apache.flink.sql.parser.ddl.SqlAddPartitions; import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns; import org.apache.flink.sql.parser.ddl.SqlAlterDatabase; import org.apache.flink.sql.parser.ddl.SqlAlterFunction; @@ -38,6 +39,7 @@ import org.apache.flink.sql.parser.ddl.SqlCreateTable; import org.apache.flink.sql.parser.ddl.SqlCreateView; import org.apache.flink.sql.parser.ddl.SqlDropDatabase; import org.apache.flink.sql.parser.ddl.SqlDropFunction; +import org.apache.flink.sql.parser.ddl.SqlDropPartitions; import org.apache.flink.sql.parser.ddl.SqlDropTable; import org.apache.flink.sql.parser.ddl.SqlDropView; import org.apache.flink.sql.parser.ddl.SqlTableOption; @@ -84,6 +86,7 @@ import org.apache.flink.table.operations.ShowTablesOperation; import org.apache.flink.table.operations.ShowViewsOperation; import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.ddl.AddPartitionsOperation; import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation; @@ -101,6 +104,7 @@ import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.CreateViewOperation; import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.DropDatabaseOperation; +import org.apache.flink.table.operations.ddl.DropPartitionsOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; @@ -126,6 +130,7 @@ import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.dialect.CalciteSqlDialect; import org.apache.calcite.sql.parser.SqlParser; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; @@ -363,6 +368,23 @@ public class SqlToOperationConverter { (SqlChangeColumn) sqlAlterTable, (CatalogTable) baseTable, flinkPlanner.getOrCreateSqlValidator()); + } else if (sqlAlterTable instanceof SqlAddPartitions) { + List<CatalogPartitionSpec> specs = new ArrayList<>(); + List<CatalogPartition> partitions = new ArrayList<>(); + SqlAddPartitions addPartitions = (SqlAddPartitions) sqlAlterTable; + for (int i = 0; i < addPartitions.getPartSpecs().size(); i++) { + specs.add(new CatalogPartitionSpec(addPartitions.getPartitionKVs(i))); + Map<String, String> props = OperationConverterUtils.extractProperties(addPartitions.getPartProps().get(i)); + partitions.add(new CatalogPartitionImpl(props, null)); + } + return new AddPartitionsOperation(tableIdentifier, addPartitions.ifNotExists(), specs, partitions); + } else if (sqlAlterTable instanceof SqlDropPartitions) { + SqlDropPartitions dropPartitions = (SqlDropPartitions) sqlAlterTable; + List<CatalogPartitionSpec> specs = new ArrayList<>(); + for (int i = 0; i < dropPartitions.getPartSpecs().size(); i++) { + specs.add(new CatalogPartitionSpec(dropPartitions.getPartitionKVs(i))); + } + return new DropPartitionsOperation(tableIdentifier, dropPartitions.ifExists(), specs); } else { throw new ValidationException( String.format("[%s] needs to implement",