This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd333941553c68c36e1460102ab023f80a5b1362 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Mon May 13 20:07:39 2024 +0800 [FLINK-35193][table] Support convert drop materialized table node to operation --- .../DropMaterializedTableOperation.java | 6 ++-- .../SqlDropMaterializedTableConverter.java | 41 ++++++++++++++++++++++ .../operations/converters/SqlNodeConverters.java | 1 + ...erializedTableNodeToOperationConverterTest.java | 21 +++++++++++ 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java index e5eee557bfc..46dd86ad96b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java @@ -33,9 +33,8 @@ import java.util.Map; public class DropMaterializedTableOperation extends DropTableOperation implements MaterializedTableOperation { - public DropMaterializedTableOperation( - ObjectIdentifier tableIdentifier, boolean ifExists, boolean isTemporary) { - super(tableIdentifier, ifExists, isTemporary); + public DropMaterializedTableOperation(ObjectIdentifier tableIdentifier, boolean ifExists) { + super(tableIdentifier, ifExists, false); } @Override @@ -43,7 +42,6 @@ public class DropMaterializedTableOperation extends DropTableOperation Map<String, Object> params = new LinkedHashMap<>(); params.put("identifier", getTableIdentifier()); params.put("IfExists", isIfExists()); - params.put("isTemporary", isTemporary()); return OperationUtils.formatWithChildren( "DROP MATERIALIZED TABLE", diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java new file mode 100644 index 00000000000..6501dc0c453 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; + +/** A converter for {@link SqlDropMaterializedTable}. */ +public class SqlDropMaterializedTableConverter + implements SqlNodeConverter<SqlDropMaterializedTable> { + @Override + public Operation convertSqlNode( + SqlDropMaterializedTable sqlDropMaterializedTable, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlDropMaterializedTable.fullTableName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + // Currently we don't support temporary materialized table, so isTemporary is always false + return new DropMaterializedTableOperation( + identifier, sqlDropMaterializedTable.getIfExists()); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index 948790dfbe6..b3dca807899 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -61,6 +61,7 @@ public class SqlNodeConverters { register(new SqlAlterMaterializedTableRefreshConverter()); register(new SqlAlterMaterializedTableSuspendConverter()); register(new SqlAlterMaterializedTableResumeConverter()); + register(new SqlDropMaterializedTableConverter()); } /** diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 5200cc3e151..bbf9082bb14 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; +import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; @@ -312,4 +313,24 @@ public class SqlMaterializedTableNodeToOperationConverterTest assertThat(operation2.asSummaryString()) .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])"); } + + @Test + void testDropMaterializedTable() { + final String sql = "DROP MATERIALIZED TABLE mtbl1"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(DropMaterializedTableOperation.class); + assertThat(((DropMaterializedTableOperation) operation).isIfExists()).isFalse(); + assertThat(operation.asSummaryString()) + .isEqualTo( + "DROP MATERIALIZED TABLE: (identifier: [`builtin`.`default`.`mtbl1`], IfExists: [false])"); + + final String sql2 = "DROP MATERIALIZED TABLE IF EXISTS mtbl1"; + Operation operation2 = parse(sql2); + assertThat(operation2).isInstanceOf(DropMaterializedTableOperation.class); + assertThat(((DropMaterializedTableOperation) operation2).isIfExists()).isTrue(); + + assertThat(operation2.asSummaryString()) + .isEqualTo( + "DROP MATERIALIZED TABLE: (identifier: [`builtin`.`default`.`mtbl1`], IfExists: [true])"); + } }