This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8491c0f9c07f0d3d5e1428cad54902acb6ae0d0
Author: fengli <ldliu...@163.com>
AuthorDate: Mon May 6 20:12:54 2024 +0800

    [FLINK-35195][table] Convert CatalogMaterializedTable to CatalogTable to 
generate execution plan for planner
---
 .../flink/table/catalog/ContextResolvedTable.java  | 26 ++++++++++++++++++++++
 .../catalog/ResolvedCatalogMaterializedTable.java  | 13 +++++++++++
 .../planner/catalog/DatabaseCalciteSchema.java     |  3 ++-
 .../operations/SqlNodeToOperationConversion.java   |  4 +++-
 4 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
index e7b9e5f0835..70a0b5c16d0 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
@@ -142,6 +142,20 @@ public final class ContextResolvedTable {
         return (T) resolvedTable.getOrigin();
     }
 
+    /**
+     * Convert the {@link ResolvedCatalogMaterializedTable} in {@link 
ContextResolvedTable} to
+     * {@link ResolvedCatalogTable }.
+     */
+    public ContextResolvedTable toCatalogTable() {
+        if (resolvedTable.getTableKind() == 
CatalogBaseTable.TableKind.MATERIALIZED_TABLE) {
+            return ContextResolvedTable.permanent(
+                    objectIdentifier,
+                    catalog,
+                    ((ResolvedCatalogMaterializedTable) 
resolvedTable).toResolvedCatalogTable());
+        }
+        return this;
+    }
+
     /**
      * Copy the {@link ContextResolvedTable}, replacing the underlying {@link 
CatalogTable} options.
      */
@@ -150,6 +164,12 @@ public final class ContextResolvedTable {
             throw new ValidationException(
                     String.format("View '%s' cannot be enriched with new 
options.", this));
         }
+        if (resolvedTable.getTableKind() == 
CatalogBaseTable.TableKind.MATERIALIZED_TABLE) {
+            return ContextResolvedTable.permanent(
+                    objectIdentifier,
+                    catalog,
+                    ((ResolvedCatalogMaterializedTable) 
resolvedTable).copy(newOptions));
+        }
         return new ContextResolvedTable(
                 objectIdentifier,
                 catalog,
@@ -159,6 +179,12 @@ public final class ContextResolvedTable {
 
     /** Copy the {@link ContextResolvedTable}, replacing the underlying {@link 
ResolvedSchema}. */
     public ContextResolvedTable copy(ResolvedSchema newSchema) {
+        if (resolvedTable.getTableKind() == 
CatalogBaseTable.TableKind.MATERIALIZED_TABLE) {
+            throw new ValidationException(
+                    String.format(
+                            "Materialized table '%s' cannot be copied with new 
schema %s.",
+                            this, newSchema));
+        }
         return new ContextResolvedTable(
                 objectIdentifier,
                 catalog,
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
index a0206af3111..f876cd74c4d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
@@ -182,4 +182,17 @@ public class ResolvedCatalogMaterializedTable
                 + resolvedSchema
                 + '}';
     }
+
+    /** Convert this object to a {@link ResolvedCatalogTable} object for 
planner optimize query. */
+    public ResolvedCatalogTable toResolvedCatalogTable() {
+        return new ResolvedCatalogTable(
+                CatalogTable.newBuilder()
+                        .schema(getUnresolvedSchema())
+                        .comment(getComment())
+                        .partitionKeys(getPartitionKeys())
+                        .options(getOptions())
+                        .snapshot(getSnapshot().orElse(null))
+                        .build(),
+                getResolvedSchema());
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
index 7ba1e04d83e..d3e738ae5ff 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
@@ -90,7 +90,7 @@ class DatabaseCalciteSchema extends FlinkSchema {
         return table.map(
                         lookupResult ->
                                 new CatalogSchemaTable(
-                                        lookupResult,
+                                        lookupResult.toCatalogTable(),
                                         getStatistic(lookupResult, identifier),
                                         isStreamingMode))
                 .orElse(null);
@@ -102,6 +102,7 @@ class DatabaseCalciteSchema extends FlinkSchema {
                 contextResolvedTable.getResolvedTable();
         switch (resolvedBaseTable.getTableKind()) {
             case TABLE:
+            case MATERIALIZED_TABLE:
                 return 
FlinkStatistic.unknown(resolvedBaseTable.getResolvedSchema())
                         .tableStats(extractTableStats(contextResolvedTable, 
identifier))
                         .build();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 63bae2ecd44..49e3a4abcb9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -734,7 +734,9 @@ public class SqlNodeToOperationConversion {
 
         UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(targetTablePath);
         ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
-        ContextResolvedTable contextResolvedTable = 
catalogManager.getTableOrError(identifier);
+        // If it is materialized table, convert it to catalog table for query 
optimize
+        ContextResolvedTable contextResolvedTable =
+                catalogManager.getTableOrError(identifier).toCatalogTable();
 
         PlannerQueryOperation query =
                 (PlannerQueryOperation)

Reply via email to