This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 132d2fe24f18f27d3f752e96ea43ee7c6d196418
Author: fengli <ldliu...@163.com>
AuthorDate: Wed Jun 26 13:31:18 2024 +0800

    [FLINK-35691][table] Fix drop table statement can drop materialized table
---
 .../service/MaterializedTableStatementITCase.java  | 46 ++++++++++++++++++++++
 .../apache/flink/table/catalog/CatalogManager.java | 30 ++++++++++----
 .../DropMaterializedTableOperation.java            | 27 +++++++++++--
 3 files changed, 92 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 98ee00d48ee..6ed91b954af 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -991,6 +991,29 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
         List<RowData> jobResults = fetchAllResults(service, sessionHandle, 
describeJobHandle);
         
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
 
+        // Drop materialized table using drop table statement
+        String dropTableUsingMaterializedTableDDL = "DROP TABLE users_shops";
+        OperationHandle dropTableUsingMaterializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, dropTableUsingMaterializedTableDDL, -1, 
new Configuration());
+
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service,
+                                        sessionHandle,
+                                        dropTableUsingMaterializedTableHandle))
+                .rootCause()
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        String.format(
+                                "Table with identifier '%s' does not exist.",
+                                ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "users_shops")
+                                        .asSummaryString()));
+
         // drop materialized table
         String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS 
users_shops";
         OperationHandle dropMaterializedTableHandle =
@@ -1091,6 +1114,29 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
         // verify refresh workflow is created
         
assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();
 
+        // Drop materialized table using drop table statement
+        String dropTableUsingMaterializedTableDDL = "DROP TABLE users_shops";
+        OperationHandle dropTableUsingMaterializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, dropTableUsingMaterializedTableDDL, -1, 
new Configuration());
+
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service,
+                                        sessionHandle,
+                                        dropTableUsingMaterializedTableHandle))
+                .rootCause()
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        String.format(
+                                "Table with identifier '%s' does not exist.",
+                                ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "users_shops")
+                                        .asSummaryString()));
+
         // drop materialized table
         String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS 
users_shops";
         OperationHandle dropMaterializedTableHandle =
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 5491df3f79e..bba3673c435 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -1257,7 +1257,19 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      *     exist.
      */
     public void dropTable(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
-        dropTableInternal(objectIdentifier, ignoreIfNotExists, true);
+        dropTableInternal(objectIdentifier, ignoreIfNotExists, true, false);
+    }
+
+    /**
+     * Drops a materialized table in a given fully qualified path.
+     *
+     * @param objectIdentifier The fully qualified path of the materialized 
table to drop.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
to drop does not
+     *     exist.
+     */
+    public void dropMaterializedTable(
+            ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
+        dropTableInternal(objectIdentifier, ignoreIfNotExists, true, true);
     }
 
     /**
@@ -1268,16 +1280,19 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      *     exist.
      */
     public void dropView(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
-        dropTableInternal(objectIdentifier, ignoreIfNotExists, false);
+        dropTableInternal(objectIdentifier, ignoreIfNotExists, false, false);
     }
 
     private void dropTableInternal(
-            ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists, 
boolean isDropTable) {
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfNotExists,
+            boolean isDropTable,
+            boolean isDropMaterializedTable) {
         Predicate<CatalogBaseTable> filter =
                 isDropTable
-                        ? table ->
-                                table instanceof CatalogTable
-                                        || table instanceof 
CatalogMaterializedTable
+                        ? isDropMaterializedTable
+                                ? table -> table instanceof 
CatalogMaterializedTable
+                                : table -> table instanceof CatalogTable
                         : table -> table instanceof CatalogView;
         // Same name temporary table or view exists.
         if (filter.test(temporaryTables.get(objectIdentifier))) {
@@ -1317,7 +1332,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                     ignoreIfNotExists,
                     "DropTable");
         } else if (!ignoreIfNotExists) {
-            String tableOrView = isDropTable ? "Table" : "View";
+            String tableOrView =
+                    isDropTable ? isDropMaterializedTable ? "Materialized 
Table" : "Table" : "View";
             throw new ValidationException(
                     String.format(
                             "%s with identifier '%s' does not exist.",
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
index 46dd86ad96b..b499a3e0b60 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
@@ -19,10 +19,12 @@
 package org.apache.flink.table.operations.materializedtable;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
-import org.apache.flink.table.operations.ddl.DropTableOperation;
+import org.apache.flink.table.operations.ddl.DropOperation;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -30,11 +32,22 @@ import java.util.Map;
 
 /** Operation to describe a DROP MATERIALIZED TABLE statement. */
 @Internal
-public class DropMaterializedTableOperation extends DropTableOperation
-        implements MaterializedTableOperation {
+public class DropMaterializedTableOperation implements DropOperation, 
MaterializedTableOperation {
+
+    private final ObjectIdentifier tableIdentifier;
+    private final boolean ifExists;
 
     public DropMaterializedTableOperation(ObjectIdentifier tableIdentifier, 
boolean ifExists) {
-        super(tableIdentifier, ifExists, false);
+        this.tableIdentifier = tableIdentifier;
+        this.ifExists = ifExists;
+    }
+
+    public ObjectIdentifier getTableIdentifier() {
+        return tableIdentifier;
+    }
+
+    public boolean isIfExists() {
+        return ifExists;
     }
 
     @Override
@@ -49,4 +62,10 @@ public class DropMaterializedTableOperation extends 
DropTableOperation
                 Collections.emptyList(),
                 Operation::asSummaryString);
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ctx.getCatalogManager().dropMaterializedTable(getTableIdentifier(), 
isIfExists());
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
 }

Reply via email to