This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd333941553c68c36e1460102ab023f80a5b1362
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Mon May 13 20:07:39 2024 +0800

    [FLINK-35193][table] Support convert drop materialized table node to 
operation
---
 .../DropMaterializedTableOperation.java            |  6 ++--
 .../SqlDropMaterializedTableConverter.java         | 41 ++++++++++++++++++++++
 .../operations/converters/SqlNodeConverters.java   |  1 +
 ...erializedTableNodeToOperationConverterTest.java | 21 +++++++++++
 4 files changed, 65 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
index e5eee557bfc..46dd86ad96b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/DropMaterializedTableOperation.java
@@ -33,9 +33,8 @@ import java.util.Map;
 public class DropMaterializedTableOperation extends DropTableOperation
         implements MaterializedTableOperation {
 
-    public DropMaterializedTableOperation(
-            ObjectIdentifier tableIdentifier, boolean ifExists, boolean 
isTemporary) {
-        super(tableIdentifier, ifExists, isTemporary);
+    public DropMaterializedTableOperation(ObjectIdentifier tableIdentifier, 
boolean ifExists) {
+        super(tableIdentifier, ifExists, false);
     }
 
     @Override
@@ -43,7 +42,6 @@ public class DropMaterializedTableOperation extends 
DropTableOperation
         Map<String, Object> params = new LinkedHashMap<>();
         params.put("identifier", getTableIdentifier());
         params.put("IfExists", isIfExists());
-        params.put("isTemporary", isTemporary());
 
         return OperationUtils.formatWithChildren(
                 "DROP MATERIALIZED TABLE",
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java
new file mode 100644
index 00000000000..6501dc0c453
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
+
+/** A converter for {@link SqlDropMaterializedTable}. */
+public class SqlDropMaterializedTableConverter
+        implements SqlNodeConverter<SqlDropMaterializedTable> {
+    @Override
+    public Operation convertSqlNode(
+            SqlDropMaterializedTable sqlDropMaterializedTable, ConvertContext 
context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                
UnresolvedIdentifier.of(sqlDropMaterializedTable.fullTableName());
+        ObjectIdentifier identifier =
+                
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+        // Currently we don't support temporary materialized table, so 
isTemporary is always false
+        return new DropMaterializedTableOperation(
+                identifier, sqlDropMaterializedTable.getIfExists());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 948790dfbe6..b3dca807899 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -61,6 +61,7 @@ public class SqlNodeConverters {
         register(new SqlAlterMaterializedTableRefreshConverter());
         register(new SqlAlterMaterializedTableSuspendConverter());
         register(new SqlAlterMaterializedTableResumeConverter());
+        register(new SqlDropMaterializedTableConverter());
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 5200cc3e151..bbf9082bb14 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
 import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 
@@ -312,4 +313,24 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
         assertThat(operation2.asSummaryString())
                 .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 
RESUME WITH (k1: [v1])");
     }
+
+    @Test
+    void testDropMaterializedTable() {
+        final String sql = "DROP MATERIALIZED TABLE mtbl1";
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(DropMaterializedTableOperation.class);
+        assertThat(((DropMaterializedTableOperation) 
operation).isIfExists()).isFalse();
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "DROP MATERIALIZED TABLE: (identifier: 
[`builtin`.`default`.`mtbl1`], IfExists: [false])");
+
+        final String sql2 = "DROP MATERIALIZED TABLE IF EXISTS mtbl1";
+        Operation operation2 = parse(sql2);
+        
assertThat(operation2).isInstanceOf(DropMaterializedTableOperation.class);
+        assertThat(((DropMaterializedTableOperation) 
operation2).isIfExists()).isTrue();
+
+        assertThat(operation2.asSummaryString())
+                .isEqualTo(
+                        "DROP MATERIALIZED TABLE: (identifier: 
[`builtin`.`default`.`mtbl1`], IfExists: [true])");
+    }
 }

Reply via email to