This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d8491c0f9c07f0d3d5e1428cad54902acb6ae0d0 Author: fengli <ldliu...@163.com> AuthorDate: Mon May 6 20:12:54 2024 +0800 [FLINK-35195][table] Convert CatalogMaterializedTable to CatalogTable to generate execution plan for planner --- .../flink/table/catalog/ContextResolvedTable.java | 26 ++++++++++++++++++++++ .../catalog/ResolvedCatalogMaterializedTable.java | 13 +++++++++++ .../planner/catalog/DatabaseCalciteSchema.java | 3 ++- .../operations/SqlNodeToOperationConversion.java | 4 +++- 4 files changed, 44 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java index e7b9e5f0835..70a0b5c16d0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java @@ -142,6 +142,20 @@ public final class ContextResolvedTable { return (T) resolvedTable.getOrigin(); } + /** + * Convert the {@link ResolvedCatalogMaterializedTable} in {@link ContextResolvedTable} to + * {@link ResolvedCatalogTable }. + */ + public ContextResolvedTable toCatalogTable() { + if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.MATERIALIZED_TABLE) { + return ContextResolvedTable.permanent( + objectIdentifier, + catalog, + ((ResolvedCatalogMaterializedTable) resolvedTable).toResolvedCatalogTable()); + } + return this; + } + /** * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options. */ @@ -150,6 +164,12 @@ public final class ContextResolvedTable { throw new ValidationException( String.format("View '%s' cannot be enriched with new options.", this)); } + if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.MATERIALIZED_TABLE) { + return ContextResolvedTable.permanent( + objectIdentifier, + catalog, + ((ResolvedCatalogMaterializedTable) resolvedTable).copy(newOptions)); + } return new ContextResolvedTable( objectIdentifier, catalog, @@ -159,6 +179,12 @@ public final class ContextResolvedTable { /** Copy the {@link ContextResolvedTable}, replacing the underlying {@link ResolvedSchema}. */ public ContextResolvedTable copy(ResolvedSchema newSchema) { + if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.MATERIALIZED_TABLE) { + throw new ValidationException( + String.format( + "Materialized table '%s' cannot be copied with new schema %s.", + this, newSchema)); + } return new ContextResolvedTable( objectIdentifier, catalog, diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java index a0206af3111..f876cd74c4d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java @@ -182,4 +182,17 @@ public class ResolvedCatalogMaterializedTable + resolvedSchema + '}'; } + + /** Convert this object to a {@link ResolvedCatalogTable} object for planner optimize query. */ + public ResolvedCatalogTable toResolvedCatalogTable() { + return new ResolvedCatalogTable( + CatalogTable.newBuilder() + .schema(getUnresolvedSchema()) + .comment(getComment()) + .partitionKeys(getPartitionKeys()) + .options(getOptions()) + .snapshot(getSnapshot().orElse(null)) + .build(), + getResolvedSchema()); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java index 7ba1e04d83e..d3e738ae5ff 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java @@ -90,7 +90,7 @@ class DatabaseCalciteSchema extends FlinkSchema { return table.map( lookupResult -> new CatalogSchemaTable( - lookupResult, + lookupResult.toCatalogTable(), getStatistic(lookupResult, identifier), isStreamingMode)) .orElse(null); @@ -102,6 +102,7 @@ class DatabaseCalciteSchema extends FlinkSchema { contextResolvedTable.getResolvedTable(); switch (resolvedBaseTable.getTableKind()) { case TABLE: + case MATERIALIZED_TABLE: return FlinkStatistic.unknown(resolvedBaseTable.getResolvedSchema()) .tableStats(extractTableStats(contextResolvedTable, identifier)) .build(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 63bae2ecd44..49e3a4abcb9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -734,7 +734,9 @@ public class SqlNodeToOperationConversion { UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(targetTablePath); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError(identifier); + // If it is materialized table, convert it to catalog table for query optimize + ContextResolvedTable contextResolvedTable = + catalogManager.getTableOrError(identifier).toCatalogTable(); PlannerQueryOperation query = (PlannerQueryOperation)