This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 271ef7b7dfa [FLINK-29677][table] Prevent dropping the current catalog
271ef7b7dfa is described below

commit 271ef7b7dfac195b43abced8b81c38998855dabf
Author: Jane Chan <qingyue....@gmail.com>
AuthorDate: Tue Oct 18 18:14:34 2022 +0800

    [FLINK-29677][table] Prevent dropping the current catalog
    
    This closes #21097
---
 .../src/test/resources/sql/catalog_database.q      |  14 +++
 .../src/test/resources/sql/catalog_database.q      | 107 +++++++++++++++++----
 .../apache/flink/table/catalog/CatalogManager.java |   3 +
 .../flink/table/planner/catalog/CatalogITCase.java |  19 +++-
 .../flink/table/api/TableEnvironmentTest.scala     |   6 ++
 5 files changed, 125 insertions(+), 24 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index a2fbae0d300..3704ce2f52c 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -88,6 +88,11 @@ drop catalog default_catalog;
 [INFO] Execute statement succeed.
 !info
 
+drop catalog c1;
+[ERROR] Could not execute SQL statement. Reason:
+org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a 
catalog which is currently in use.
+!error
+
 # ==========================================================================
 # test database
 # ==========================================================================
@@ -183,6 +188,15 @@ drop database `default`;
 [INFO] Execute statement succeed.
 !info
 
+drop catalog `mod`;
+[ERROR] Could not execute SQL statement. Reason:
+org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a 
catalog which is currently in use.
+!error
+
+use catalog `c1`;
+[INFO] Execute statement succeed.
+!info
+
 drop catalog `mod`;
 [INFO] Execute statement succeed.
 !info
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
index fe96c4e1918..d79da350830 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
@@ -262,6 +262,21 @@ drop database `default`;
 1 row in set
 !ok
 
+drop catalog `mod`;
+!output
+org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a 
catalog which is currently in use.
+!error
+
+use catalog `c1`;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
 drop catalog `mod`;
 !output
 +--------+
@@ -283,39 +298,71 @@ 
org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name
 
 create table MyTable1 (a int, b string) with ('connector' = 'values');
 !output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist.
-!error
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 create table MyTable2 (a int, b string) with ('connector' = 'values');
 !output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist.
-!error
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 # hive catalog is case-insensitive
 show tables;
 !output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist
-!error
++------------+
+| table name |
++------------+
+|   MyTable1 |
+|   MyTable2 |
++------------+
+2 rows in set
+!ok
 
 show views;
 !output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist
-!error
+Empty set
+!ok
 
 create view MyView1 as select 1 + 1;
 !output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist.
-!error
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 create view MyView2 as select 1 + 1;
 !output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist.
-!error
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
 
 show views;
 !output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist
-!error
++-----------+
+| view name |
++-----------+
+|   MyView1 |
+|   MyView2 |
++-----------+
+2 rows in set
+!ok
 
 # test create with full qualified name
 create table c1.db1.MyTable3 (a int, b string) with ('connector' = 'values');
@@ -458,12 +505,16 @@ show tables;
 +------------+
 | table name |
 +------------+
+|   MyTable1 |
+|   MyTable2 |
 |   MyTable5 |
 |   MyTable6 |
+|    MyView1 |
+|    MyView2 |
 |    MyView5 |
 |    MyView6 |
 +------------+
-4 rows in set
+8 rows in set
 !ok
 
 show views;
@@ -471,10 +522,12 @@ show views;
 +-----------+
 | view name |
 +-----------+
+|   MyView1 |
+|   MyView2 |
 |   MyView5 |
 |   MyView6 |
 +-----------+
-2 rows in set
+4 rows in set
 !ok
 
 drop table db1.MyTable3;
@@ -563,10 +616,14 @@ show tables;
 +------------+
 | table name |
 +------------+
+|   MyTable1 |
+|   MyTable2 |
 |   MyTable5 |
+|    MyView1 |
+|    MyView2 |
 |    MyView5 |
 +------------+
-2 rows in set
+6 rows in set
 !ok
 
 show views;
@@ -574,9 +631,11 @@ show views;
 +-----------+
 | view name |
 +-----------+
+|   MyView1 |
+|   MyView2 |
 |   MyView5 |
 +-----------+
-1 row in set
+3 rows in set
 !ok
 
 # ==========================================================================
@@ -598,11 +657,15 @@ show tables;
 +------------+
 | table name |
 +------------+
+|   MyTable1 |
+|   MyTable2 |
 |   MyTable5 |
 |   MyTable7 |
+|    MyView1 |
+|    MyView2 |
 |    MyView5 |
 +------------+
-3 rows in set
+7 rows in set
 !ok
 
 reset;
@@ -630,10 +693,14 @@ show tables;
 +------------+
 | table name |
 +------------+
+|   MyTable1 |
+|   MyTable2 |
 |   MyTable7 |
+|    MyView1 |
+|    MyView2 |
 |    MyView5 |
 +------------+
-2 rows in set
+6 rows in set
 !ok
 
 # ==========================================================================
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index ade4c3b155e..66b6dc2b4d8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -226,6 +226,9 @@ public final class CatalogManager {
                 "Catalog name cannot be null or empty.");
 
         if (catalogs.containsKey(catalogName)) {
+            if (currentCatalogName.equals(catalogName)) {
+                throw new CatalogException("Cannot drop a catalog which is 
currently in use.");
+            }
             Catalog catalog = catalogs.remove(catalogName);
             catalog.close();
         } else if (!ignoreIfNotExists) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
index 119aa4b51fc..5e95788fc95 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
@@ -30,6 +31,7 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
@@ -43,6 +45,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT Case for catalog ddl. */
 public class CatalogITCase {
@@ -69,15 +72,23 @@ public class CatalogITCase {
         String name = "c1";
         TableEnvironment tableEnv = getTableEnvironment();
 
-        String ddl =
+        String createDdl =
                 String.format(
                         "create catalog %s with('type'='%s')",
                         name, GenericInMemoryCatalogFactoryOptions.IDENTIFIER);
-        tableEnv.executeSql(ddl);
+        tableEnv.executeSql(createDdl);
         assertThat(tableEnv.getCatalog(name)).isPresent();
 
-        ddl = String.format("drop catalog %s", name);
-        tableEnv.executeSql(ddl);
+        String dropDdl = String.format("drop catalog %s", name);
+        tableEnv.executeSql(String.format("use catalog %s", name));
+        assertThatThrownBy(() -> tableEnv.executeSql(dropDdl))
+                .isInstanceOf(ValidationException.class)
+                .hasRootCauseExactlyInstanceOf(CatalogException.class)
+                .hasRootCauseMessage("Cannot drop a catalog which is currently 
in use.");
+        assertThat(tableEnv.getCatalog(name)).isPresent();
+
+        tableEnv.executeSql("use catalog default_catalog");
+        tableEnv.executeSql(dropDdl);
         assertThat(tableEnv.getCatalog(name)).isNotPresent();
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 791a365831d..e10ad0c93f1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -770,6 +770,12 @@ class TableEnvironmentTest {
     assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
     assertEquals("my_catalog", tableEnv.getCurrentCatalog)
 
+    assertThatThrownBy(() => tableEnv.executeSql("DROP CATALOG my_catalog"))
+      .isInstanceOf(classOf[ValidationException])
+      .hasRootCauseMessage("Cannot drop a catalog which is currently in use.")
+
+    tableEnv.executeSql("USE CATALOG default_catalog")
+
     val tableResult3 = tableEnv.executeSql("DROP CATALOG my_catalog")
     assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
     assertFalse(tableEnv.getCatalog("my_catalog").isPresent)

Reply via email to