This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2a9876d7ebc75b9e650653b996c02af3d217b41a
Author: fengli <ldliu...@163.com>
AuthorDate: Tue Apr 23 10:01:31 2024 +0800

    [FLINK-35188][table-api] Introduce CatalogMaterializedTable interface to 
support materialized table
---
 .../hive/util/OperationExecutorFactory.java        |  12 +-
 .../apache/flink/table/catalog/CatalogManager.java |  41 ++-
 .../catalog/CatalogBaseTableResolutionTest.java    |  73 +++++-
 .../org/apache/flink/table/catalog/Catalog.java    |   9 +
 .../flink/table/catalog/CatalogBaseTable.java      |   1 +
 .../table/catalog/CatalogMaterializedTable.java    | 280 ++++++++++++++++++++
 .../catalog/DefaultCatalogMaterializedTable.java   | 292 +++++++++++++++++++++
 .../catalog/ResolvedCatalogMaterializedTable.java  | 185 +++++++++++++
 8 files changed, 885 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
index d3d31f39230..ecaba8a5eda 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -101,6 +101,10 @@ import static 
org.apache.hadoop.hive.serde2.thrift.Type.VARCHAR_TYPE;
 /** Factory to create the operation executor. */
 public class OperationExecutorFactory {
 
+    // Hive dialect doesn't support materialized table currently.
+    private static final Set<TableKind> TABLE_KINDS =
+            new HashSet<>(Arrays.asList(TableKind.TABLE, TableKind.VIEW));
+
     public static Callable<ResultSet> createGetCatalogsExecutor(
             SqlGatewayService service, SessionHandle sessionHandle) {
         return () -> executeGetCatalogs(service, sessionHandle);
@@ -291,14 +295,13 @@ public class OperationExecutorFactory {
         Set<String> schemaNames =
                 filterAndSort(
                         service.listDatabases(sessionHandle, 
specifiedCatalogName), schemaName);
-        Set<TableKind> tableKinds = new 
HashSet<>(Arrays.asList(TableKind.values()));
 
         List<RowData> results = new ArrayList<>();
         for (String schema : schemaNames) {
             Set<TableInfo> tableInfos =
                     filterAndSort(
                             service.listTables(
-                                    sessionHandle, specifiedCatalogName, 
schema, tableKinds),
+                                    sessionHandle, specifiedCatalogName, 
schema, TABLE_KINDS),
                             candidates -> 
candidates.getIdentifier().getObjectName(),
                             tableName);
 
@@ -369,10 +372,7 @@ public class OperationExecutorFactory {
             Set<TableInfo> tableInfos =
                     filterAndSort(
                             service.listTables(
-                                    sessionHandle,
-                                    specifiedCatalogName,
-                                    schema,
-                                    new 
HashSet<>(Arrays.asList(TableKind.values()))),
+                                    sessionHandle, specifiedCatalogName, 
schema, TABLE_KINDS),
                             candidate -> 
candidate.getIdentifier().getObjectName(),
                             tableName);
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 543105d027a..9e7bf5ec007 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -963,7 +963,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                                     ignoreIfExists);
 
                     catalog.createTable(path, resolvedListenedTable, 
ignoreIfExists);
-                    if (resolvedListenedTable instanceof CatalogTable) {
+                    if (resolvedListenedTable instanceof CatalogTable
+                            || resolvedListenedTable instanceof 
CatalogMaterializedTable) {
                         catalogModificationListeners.forEach(
                                 listener ->
                                         listener.onEvent(
@@ -1318,6 +1319,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         Preconditions.checkNotNull(schemaResolver, "Schema resolver is not 
initialized.");
         if (baseTable instanceof CatalogTable) {
             return resolveCatalogTable((CatalogTable) baseTable);
+        } else if (baseTable instanceof CatalogMaterializedTable) {
+            return resolveCatalogMaterializedTable((CatalogMaterializedTable) 
baseTable);
         } else if (baseTable instanceof CatalogView) {
             return resolveCatalogView((CatalogView) baseTable);
         }
@@ -1389,6 +1392,42 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         return new ResolvedCatalogTable(table, resolvedSchema);
     }
 
+    /**
+     * Resolves a {@link CatalogMaterializedTable} to a validated {@link
+     * ResolvedCatalogMaterializedTable}.
+     */
+    public ResolvedCatalogMaterializedTable resolveCatalogMaterializedTable(
+            CatalogMaterializedTable table) {
+        Preconditions.checkNotNull(schemaResolver, "Schema resolver is not 
initialized.");
+
+        if (table instanceof ResolvedCatalogMaterializedTable) {
+            return (ResolvedCatalogMaterializedTable) table;
+        }
+
+        final ResolvedSchema resolvedSchema = 
table.getUnresolvedSchema().resolve(schemaResolver);
+
+        // Validate partition keys are included in physical columns
+        final List<String> physicalColumns =
+                resolvedSchema.getColumns().stream()
+                        .filter(Column::isPhysical)
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
+        table.getPartitionKeys()
+                .forEach(
+                        partitionKey -> {
+                            if (!physicalColumns.contains(partitionKey)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Invalid partition key '%s'. A 
partition key must "
+                                                        + "reference a 
physical column in the schema. "
+                                                        + "Available columns 
are: %s",
+                                                partitionKey, 
physicalColumns));
+                            }
+                        });
+
+        return new ResolvedCatalogMaterializedTable(table, resolvedSchema);
+    }
+
     /** Resolves a {@link CatalogView} to a validated {@link 
ResolvedCatalogView}. */
     public ResolvedCatalogView resolveCatalogView(CatalogView view) {
         Preconditions.checkNotNull(schemaResolver, "Schema resolver is not 
initialized.");
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index fe0d84bd493..c52c8bd1bfd 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -49,7 +50,8 @@ import static org.assertj.core.api.Assertions.fail;
 import static org.assertj.core.api.HamcrestCondition.matching;
 
 /**
- * Tests for {@link CatalogTable} to {@link ResolvedCatalogTable} and {@link 
CatalogView} to {@link
+ * Tests for {@link CatalogTable} to {@link ResolvedCatalogTable}, {@link 
CatalogMaterializedTable}
+ * to {@link ResolvedCatalogMaterializedTable} and {@link CatalogView} to 
{@link
  * ResolvedCatalogView} including {@link CatalogPropertiesUtil}.
  */
 class CatalogBaseTableResolutionTest {
@@ -82,6 +84,17 @@ class CatalogBaseTableResolutionTest {
                     .primaryKeyNamed("primary_constraint", "id")
                     .build();
 
+    private static final Schema MATERIALIZED_TABLE_SCHEMA =
+            Schema.newBuilder()
+                    .column("id", DataTypes.INT().notNull())
+                    .column("region", DataTypes.VARCHAR(200))
+                    .withComment("This is a region column.")
+                    .column("county", DataTypes.VARCHAR(200))
+                    .column("topic", DataTypes.VARCHAR(200))
+                    .withComment("") // empty column comment
+                    .primaryKeyNamed("primary_constraint", "id")
+                    .build();
+
     private static final TableSchema LEGACY_TABLE_SCHEMA =
             TableSchema.builder()
                     .add(TableColumn.physical("id", DataTypes.INT().notNull()))
@@ -117,6 +130,18 @@ class CatalogBaseTableResolutionTest {
                     UniqueConstraint.primaryKey(
                             "primary_constraint", 
Collections.singletonList("id")));
 
+    private static final ResolvedSchema RESOLVED_MATERIALIZED_TABLE_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("id", DataTypes.INT().notNull()),
+                            Column.physical("region", DataTypes.VARCHAR(200))
+                                    .withComment("This is a region column."),
+                            Column.physical("county", DataTypes.VARCHAR(200)),
+                            Column.physical("topic", 
DataTypes.VARCHAR(200)).withComment("")),
+                    Collections.emptyList(),
+                    UniqueConstraint.primaryKey(
+                            "primary_constraint", 
Collections.singletonList("id")));
+
     private static final ResolvedSchema RESOLVED_VIEW_SCHEMA =
             new ResolvedSchema(
                     Arrays.asList(
@@ -140,6 +165,30 @@ class CatalogBaseTableResolutionTest {
         assertThat(resolvedTable.getSchema()).isEqualTo(LEGACY_TABLE_SCHEMA);
     }
 
+    @Test
+    void testCatalogMaterializedTableResolution() {
+        final CatalogMaterializedTable materializedTable = 
catalogMaterializedTable();
+
+        assertThat(materializedTable.getUnresolvedSchema()).isNotNull();
+
+        final ResolvedCatalogMaterializedTable resolvedMaterializedTable =
+                
resolveCatalogBaseTable(ResolvedCatalogMaterializedTable.class, 
materializedTable);
+
+        assertThat(resolvedMaterializedTable.getResolvedSchema())
+                .isEqualTo(RESOLVED_MATERIALIZED_TABLE_SCHEMA);
+
+        assertThat(resolvedMaterializedTable.getDefinitionQuery())
+                .isEqualTo(materializedTable.getDefinitionQuery());
+        assertThat(resolvedMaterializedTable.getFreshness())
+                .isEqualTo(materializedTable.getFreshness());
+        assertThat(resolvedMaterializedTable.getLogicalRefreshMode())
+                .isEqualTo(materializedTable.getLogicalRefreshMode());
+        assertThat(resolvedMaterializedTable.getRefreshMode())
+                .isEqualTo(materializedTable.getRefreshMode());
+        assertThat(resolvedMaterializedTable.getRefreshStatus())
+                .isEqualTo(materializedTable.getRefreshStatus());
+    }
+
     @Test
     void testCatalogViewResolution() {
         final CatalogView view = catalogView();
@@ -319,6 +368,28 @@ class CatalogBaseTableResolutionTest {
         return properties;
     }
 
+    private static CatalogMaterializedTable catalogMaterializedTable() {
+        final String comment = "This is an example materialized table.";
+        final List<String> partitionKeys = Arrays.asList("region", "county");
+
+        final String definitionQuery =
+                String.format(
+                        "SELECT id, region, county FROM %s.%s.T",
+                        DEFAULT_CATALOG, DEFAULT_DATABASE);
+
+        CatalogMaterializedTable.Builder builder = 
CatalogMaterializedTable.newBuilder();
+        return builder.schema(MATERIALIZED_TABLE_SCHEMA)
+                .comment(comment)
+                .partitionKeys(partitionKeys)
+                .options(Collections.emptyMap())
+                .definitionQuery(definitionQuery)
+                .freshness(Duration.ofSeconds(30))
+                
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
+                .refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
+                
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                .build();
+    }
+
     private static CatalogView catalogView() {
         final String comment = "This is an example table.";
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index ccacb948b3b..8cd45111102 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -35,6 +35,8 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.Factory;
@@ -66,6 +68,13 @@ public interface Catalog {
      * internal catalog-specific objects to their own factory. For example, a 
custom {@link
      * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.
      *
+     * <p>If this catalog support to create materialized table, you should 
also override this method
+     * to provide {@link DynamicTableFactory} which help planner to find 
{@link DynamicTableSource}
+     * and {@link DynamicTableSink} correctly during compile optimization 
phase. If you don't
+     * override this method, you must specify the physical connector 
identifier that this catalog
+     * represents storage when create materialized table. Otherwise, the 
planner can't find the
+     * {@link DynamicTableFactory}.
+     *
      * <p>Because all factories are interfaces, the returned {@link Factory} 
instance can implement
      * multiple supported extension points. An {@code instanceof} check is 
performed by the caller
      * that checks whether a required factory is implemented; otherwise the 
discovery process is
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
index 4e72fc238b7..b956b67840d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
@@ -39,6 +39,7 @@ public interface CatalogBaseTable {
     @PublicEvolving
     enum TableKind {
         TABLE,
+        MATERIALIZED_TABLE,
         VIEW
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
new file mode 100644
index 00000000000..51856cc859e
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Represents the unresolved metadata of a materialized table in a {@link 
Catalog}.
+ *
+ * <p>Materialized Table definition: In the context of streaming-batch unified 
storage, it provides
+ * full history data and incremental changelog. By defining the data's 
production business logic and
+ * freshness, data refresh is achieved through continuous or full refresh 
pipeline, while also
+ * possessing the capability for both batch and incremental consumption.
+ *
+ * <p>The metadata for {@link CatalogMaterializedTable} includes the following 
four main parts:
+ *
+ * <ul>
+ *   <li>Schema, comments, options and partition keys.
+ *   <li>Data freshness, which determines when the data is generated and 
becomes visible for user.
+ *   <li>Data production business logic, also known as the definition query.
+ *   <li>Background refresh pipeline, either through a flink streaming or 
periodic flink batch job,
+ *       it is initialized after materialized table is created.
+ * </ul>
+ */
+@PublicEvolving
+public interface CatalogMaterializedTable extends CatalogBaseTable {
+
+    /** Builder for configuring and creating instances of {@link 
CatalogMaterializedTable}. */
+    @PublicEvolving
+    static CatalogMaterializedTable.Builder newBuilder() {
+        return new CatalogMaterializedTable.Builder();
+    }
+
+    @Override
+    default TableKind getTableKind() {
+        return TableKind.MATERIALIZED_TABLE;
+    }
+
+    /**
+     * Check if the table is partitioned or not.
+     *
+     * @return true if the table is partitioned; otherwise, false
+     */
+    boolean isPartitioned();
+
+    /**
+     * Get the partition keys of the table. This will be an empty set if the 
table is not
+     * partitioned.
+     *
+     * @return partition keys of the table
+     */
+    List<String> getPartitionKeys();
+
+    /**
+     * Returns a copy of this {@code CatalogMaterializedTable} with given 
table options {@code
+     * options}.
+     *
+     * @return a new copy of this table with replaced table options
+     */
+    CatalogMaterializedTable copy(Map<String, String> options);
+
+    /**
+     * Returns a copy of this {@code CatalogDynamicTable} with given refresh 
info.
+     *
+     * @return a new copy of this table with replaced refresh info
+     */
+    CatalogMaterializedTable copy(
+            RefreshStatus refreshStatus,
+            String refreshHandlerDescription,
+            byte[] serializedRefreshHandler);
+
+    /** Return the snapshot specified for the table. Return Optional.empty() 
if not specified. */
+    Optional<Long> getSnapshot();
+
+    /**
+     * The definition query text of materialized table, text is expanded in 
contrast to the original
+     * SQL. This is needed because the context such as current DB is lost 
after the session, in
+     * which view is defined, is gone. Expanded query text takes care of this, 
as an example.
+     *
+     * <p>For example, for a materialized table that is defined in the context 
of "default" database
+     * with a query {@code select * from test1}, the expanded query text might 
become {@code select
+     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table 
test1 resides in
+     * database "default" and has two columns ("name" and "value").
+     *
+     * @return the materialized table definition in expanded text.
+     */
+    String getDefinitionQuery();
+
+    /**
+     * Get the freshness of materialized table which is used to determine the 
physical refresh mode.
+     */
+    Duration getFreshness();
+
+    /** Get the logical refresh mode of materialized table. */
+    LogicalRefreshMode getLogicalRefreshMode();
+
+    /** Get the physical refresh mode of materialized table. */
+    RefreshMode getRefreshMode();
+
+    /** Get the refresh status of materialized table. */
+    RefreshStatus getRefreshStatus();
+
+    /** Return summary description of refresh handler. */
+    Optional<String> getRefreshHandlerDescription();
+
+    /**
+     * Return the serialized refresh handler of materialized table. This will 
not be used for
+     * describe table.
+     */
+    @Nullable
+    byte[] getSerializedRefreshHandler();
+
+    /** The logical refresh mode of materialized table. */
+    @PublicEvolving
+    enum LogicalRefreshMode {
+        /**
+         * The refresh pipeline will be executed in continuous mode, 
corresponding to {@link
+         * RefreshMode#CONTINUOUS}.
+         */
+        CONTINUOUS,
+
+        /**
+         * The refresh pipeline will be executed in full mode, corresponding 
to {@link
+         * RefreshMode#FULL}.
+         */
+        FULL,
+
+        /**
+         * The refresh pipeline mode is determined by freshness of 
materialized table, either {@link
+         * RefreshMode#FULL} or {@link RefreshMode#CONTINUOUS}.
+         */
+        AUTOMATIC
+    }
+
+    /** The physical refresh mode of materialized table. */
+    @PublicEvolving
+    enum RefreshMode {
+        CONTINUOUS,
+        FULL
+    }
+
+    /** Background refresh pipeline status of materialized table. */
+    @PublicEvolving
+    enum RefreshStatus {
+        INITIALIZING,
+        ACTIVATED,
+        SUSPENDED
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Builder for configuring and creating instances of {@link 
CatalogMaterializedTable}. */
+    @PublicEvolving
+    class Builder {
+
+        private Schema schema;
+        private String comment;
+        private List<String> partitionKeys = Collections.emptyList();
+        private Map<String, String> options = Collections.emptyMap();
+        private @Nullable Long snapshot;
+        private String definitionQuery;
+        private Duration freshness;
+        private LogicalRefreshMode logicalRefreshMode;
+        private RefreshMode refreshMode;
+        private RefreshStatus refreshStatus;
+        private @Nullable String refreshHandlerDescription;
+        private @Nullable byte[] serializedRefreshHandler;
+
+        private Builder() {}
+
+        public Builder schema(Schema schema) {
+            this.schema = Preconditions.checkNotNull(schema, "Schema must not 
be null.");
+            return this;
+        }
+
+        public Builder comment(@Nullable String comment) {
+            this.comment = comment;
+            return this;
+        }
+
+        public Builder partitionKeys(List<String> partitionKeys) {
+            this.partitionKeys =
+                    Preconditions.checkNotNull(partitionKeys, "Partition keys 
must not be null.");
+            return this;
+        }
+
+        public Builder options(Map<String, String> options) {
+            this.options = Preconditions.checkNotNull(options, "Options must 
not be null.");
+            return this;
+        }
+
+        public Builder snapshot(@Nullable Long snapshot) {
+            this.snapshot = snapshot;
+            return this;
+        }
+
+        public Builder definitionQuery(String definitionQuery) {
+            this.definitionQuery =
+                    Preconditions.checkNotNull(
+                            definitionQuery, "Definition query must not be 
null.");
+            return this;
+        }
+
+        public Builder freshness(Duration freshness) {
+            this.freshness = Preconditions.checkNotNull(freshness, "Freshness 
must not be null.");
+            return this;
+        }
+
+        public Builder logicalRefreshMode(LogicalRefreshMode 
logicalRefreshMode) {
+            this.logicalRefreshMode =
+                    Preconditions.checkNotNull(
+                            logicalRefreshMode, "Logical refresh mode must not 
be null.");
+            return this;
+        }
+
+        public Builder refreshMode(RefreshMode refreshMode) {
+            this.refreshMode =
+                    Preconditions.checkNotNull(refreshMode, "Refresh mode must 
not be null.");
+            return this;
+        }
+
+        public Builder refreshStatus(RefreshStatus refreshStatus) {
+            this.refreshStatus =
+                    Preconditions.checkNotNull(refreshStatus, "Refresh status 
must not be null.");
+            return this;
+        }
+
+        public Builder refreshHandlerDescription(@Nullable String 
refreshHandlerDescription) {
+            this.refreshHandlerDescription = refreshHandlerDescription;
+            return this;
+        }
+
+        public Builder serializedRefreshHandler(@Nullable byte[] 
serializedRefreshHandler) {
+            this.serializedRefreshHandler = serializedRefreshHandler;
+            return this;
+        }
+
+        public CatalogMaterializedTable build() {
+            return new DefaultCatalogMaterializedTable(
+                    schema,
+                    comment,
+                    partitionKeys,
+                    options,
+                    snapshot,
+                    definitionQuery,
+                    freshness,
+                    logicalRefreshMode,
+                    refreshMode,
+                    refreshStatus,
+                    refreshHandlerDescription,
+                    serializedRefreshHandler);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
new file mode 100644
index 00000000000..26166e3e8a2
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of a {@link CatalogMaterializedTable}. */
+@Internal
+public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable {
+
+    private final Schema schema;
+    private final @Nullable String comment;
+    private final List<String> partitionKeys;
+    private final Map<String, String> options;
+
+    private final @Nullable Long snapshot;
+
+    private final String definitionQuery;
+    private final Duration freshness;
+    private final LogicalRefreshMode logicalRefreshMode;
+    private final RefreshMode refreshMode;
+    private final RefreshStatus refreshStatus;
+    private final @Nullable String refreshHandlerDescription;
+    private final @Nullable byte[] serializedRefreshHandler;
+
+    protected DefaultCatalogMaterializedTable(
+            Schema schema,
+            @Nullable String comment,
+            List<String> partitionKeys,
+            Map<String, String> options,
+            @Nullable Long snapshot,
+            String definitionQuery,
+            Duration freshness,
+            LogicalRefreshMode logicalRefreshMode,
+            RefreshMode refreshMode,
+            RefreshStatus refreshStatus,
+            @Nullable String refreshHandlerDescription,
+            @Nullable byte[] serializedRefreshHandler) {
+        this.schema = checkNotNull(schema, "Schema must not be null.");
+        this.comment = comment;
+        this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must 
not be null.");
+        this.options = checkNotNull(options, "Options must not be null.");
+        this.snapshot = snapshot;
+        this.definitionQuery = checkNotNull(definitionQuery, "Definition query 
must not be null.");
+        this.freshness = checkNotNull(freshness, "Freshness must not be 
null.");
+        this.logicalRefreshMode =
+                checkNotNull(logicalRefreshMode, "Logical refresh mode must 
not be null.");
+        this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be 
null.");
+        this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must 
not be null.");
+        this.refreshHandlerDescription = refreshHandlerDescription;
+        this.serializedRefreshHandler = serializedRefreshHandler;
+
+        checkArgument(
+                options.entrySet().stream()
+                        .allMatch(e -> e.getKey() != null && e.getValue() != 
null),
+                "Options cannot have null keys or values.");
+    }
+
+    @Override
+    public Schema getUnresolvedSchema() {
+        return schema;
+    }
+
+    @Override
+    public String getComment() {
+        return comment != null ? comment : "";
+    }
+
+    @Override
+    public boolean isPartitioned() {
+        return !partitionKeys.isEmpty();
+    }
+
+    @Override
+    public List<String> getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    @Override
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    @Override
+    public CatalogBaseTable copy() {
+        return new DefaultCatalogMaterializedTable(
+                schema,
+                comment,
+                partitionKeys,
+                options,
+                snapshot,
+                definitionQuery,
+                freshness,
+                logicalRefreshMode,
+                refreshMode,
+                refreshStatus,
+                refreshHandlerDescription,
+                serializedRefreshHandler);
+    }
+
+    @Override
+    public CatalogMaterializedTable copy(Map<String, String> options) {
+        return new DefaultCatalogMaterializedTable(
+                schema,
+                comment,
+                partitionKeys,
+                options,
+                snapshot,
+                definitionQuery,
+                freshness,
+                logicalRefreshMode,
+                refreshMode,
+                refreshStatus,
+                refreshHandlerDescription,
+                serializedRefreshHandler);
+    }
+
+    @Override
+    public CatalogMaterializedTable copy(
+            RefreshStatus refreshStatus,
+            String refreshHandlerDescription,
+            byte[] serializedRefreshHandler) {
+        return new DefaultCatalogMaterializedTable(
+                schema,
+                comment,
+                partitionKeys,
+                options,
+                snapshot,
+                definitionQuery,
+                freshness,
+                logicalRefreshMode,
+                refreshMode,
+                refreshStatus,
+                refreshHandlerDescription,
+                serializedRefreshHandler);
+    }
+
+    @Override
+    public Optional<String> getDescription() {
+        return Optional.of(getComment());
+    }
+
+    @Override
+    public Optional<String> getDetailedDescription() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Long> getSnapshot() {
+        return Optional.ofNullable(snapshot);
+    }
+
+    @Override
+    public String getDefinitionQuery() {
+        return definitionQuery;
+    }
+
+    @Override
+    public Duration getFreshness() {
+        return freshness;
+    }
+
+    @Override
+    public LogicalRefreshMode getLogicalRefreshMode() {
+        return logicalRefreshMode;
+    }
+
+    @Override
+    public RefreshMode getRefreshMode() {
+        return refreshMode;
+    }
+
+    @Override
+    public RefreshStatus getRefreshStatus() {
+        return refreshStatus;
+    }
+
+    @Override
+    public Optional<String> getRefreshHandlerDescription() {
+        return Optional.ofNullable(refreshHandlerDescription);
+    }
+
+    @Nullable
+    @Override
+    public byte[] getSerializedRefreshHandler() {
+        return serializedRefreshHandler;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DefaultCatalogMaterializedTable that = 
(DefaultCatalogMaterializedTable) o;
+        return Objects.equals(schema, that.schema)
+                && Objects.equals(comment, that.comment)
+                && Objects.equals(partitionKeys, that.partitionKeys)
+                && Objects.equals(options, that.options)
+                && Objects.equals(snapshot, that.snapshot)
+                && Objects.equals(definitionQuery, that.definitionQuery)
+                && Objects.equals(freshness, that.freshness)
+                && logicalRefreshMode == that.logicalRefreshMode
+                && refreshMode == that.refreshMode
+                && refreshStatus == that.refreshStatus
+                && Objects.equals(refreshHandlerDescription, 
that.refreshHandlerDescription)
+                && Arrays.equals(serializedRefreshHandler, 
that.serializedRefreshHandler);
+    }
+
+    @Override
+    public int hashCode() {
+        int result =
+                Objects.hash(
+                        schema,
+                        comment,
+                        partitionKeys,
+                        options,
+                        snapshot,
+                        definitionQuery,
+                        freshness,
+                        logicalRefreshMode,
+                        refreshMode,
+                        refreshStatus,
+                        refreshHandlerDescription);
+        result = 31 * result + Arrays.hashCode(serializedRefreshHandler);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "DefaultCatalogMaterializedTable{"
+                + "schema="
+                + schema
+                + ", comment='"
+                + comment
+                + '\''
+                + ", partitionKeys="
+                + partitionKeys
+                + ", options="
+                + options
+                + ", snapshot="
+                + snapshot
+                + ", definitionQuery='"
+                + definitionQuery
+                + '\''
+                + ", freshness="
+                + freshness
+                + ", logicalRefreshMode="
+                + logicalRefreshMode
+                + ", refreshMode="
+                + refreshMode
+                + ", refreshStatus="
+                + refreshStatus
+                + ", refreshHandlerDescription='"
+                + refreshHandlerDescription
+                + '\''
+                + ", serializedRefreshHandler="
+                + Arrays.toString(serializedRefreshHandler)
+                + '}';
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
new file mode 100644
index 00000000000..a0206af3111
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A validated {@link CatalogMaterializedTable} that is backed by the original 
metadata coming from
+ * the {@link Catalog} but resolved by the framework.
+ *
+ * <p>Note: This will be converted to {@link ResolvedCatalogTable} by 
framework during planner
+ * optimize query phase.
+ */
+@PublicEvolving
+public class ResolvedCatalogMaterializedTable
+        implements ResolvedCatalogBaseTable<CatalogMaterializedTable>, 
CatalogMaterializedTable {
+
+    private final CatalogMaterializedTable origin;
+
+    private final ResolvedSchema resolvedSchema;
+
+    public ResolvedCatalogMaterializedTable(
+            CatalogMaterializedTable origin, ResolvedSchema resolvedSchema) {
+        this.origin =
+                Preconditions.checkNotNull(
+                        origin, "Original catalog materialized table must not 
be null.");
+        this.resolvedSchema =
+                Preconditions.checkNotNull(resolvedSchema, "Resolved schema 
must not be null.");
+    }
+
+    @Override
+    public Map<String, String> getOptions() {
+        return origin.getOptions();
+    }
+
+    @Override
+    public String getComment() {
+        return origin.getComment();
+    }
+
+    @Override
+    public CatalogBaseTable copy() {
+        return new ResolvedCatalogMaterializedTable(
+                (CatalogMaterializedTable) origin.copy(), resolvedSchema);
+    }
+
+    @Override
+    public ResolvedCatalogMaterializedTable copy(Map<String, String> options) {
+        return new ResolvedCatalogMaterializedTable(origin.copy(options), 
resolvedSchema);
+    }
+
+    @Override
+    public ResolvedCatalogMaterializedTable copy(
+            RefreshStatus refreshStatus,
+            String refreshHandlerDescription,
+            byte[] serializedRefreshHandler) {
+        return new ResolvedCatalogMaterializedTable(
+                origin.copy(refreshStatus, refreshHandlerDescription, 
serializedRefreshHandler),
+                resolvedSchema);
+    }
+
+    @Override
+    public Optional<String> getDescription() {
+        return origin.getDescription();
+    }
+
+    @Override
+    public Optional<String> getDetailedDescription() {
+        return origin.getDetailedDescription();
+    }
+
+    @Override
+    public boolean isPartitioned() {
+        return origin.isPartitioned();
+    }
+
+    @Override
+    public List<String> getPartitionKeys() {
+        return origin.getPartitionKeys();
+    }
+
+    @Override
+    public Optional<Long> getSnapshot() {
+        return origin.getSnapshot();
+    }
+
+    @Override
+    public CatalogMaterializedTable getOrigin() {
+        return origin;
+    }
+
+    @Override
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedSchema;
+    }
+
+    @Override
+    public String getDefinitionQuery() {
+        return origin.getDefinitionQuery();
+    }
+
+    @Override
+    public Duration getFreshness() {
+        return origin.getFreshness();
+    }
+
+    @Override
+    public LogicalRefreshMode getLogicalRefreshMode() {
+        return origin.getLogicalRefreshMode();
+    }
+
+    @Override
+    public RefreshMode getRefreshMode() {
+        return origin.getRefreshMode();
+    }
+
+    @Override
+    public RefreshStatus getRefreshStatus() {
+        return origin.getRefreshStatus();
+    }
+
+    @Override
+    public Optional<String> getRefreshHandlerDescription() {
+        return origin.getRefreshHandlerDescription();
+    }
+
+    @Nullable
+    @Override
+    public byte[] getSerializedRefreshHandler() {
+        return origin.getSerializedRefreshHandler();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ResolvedCatalogMaterializedTable that = 
(ResolvedCatalogMaterializedTable) o;
+        return Objects.equals(origin, that.origin)
+                && Objects.equals(resolvedSchema, that.resolvedSchema);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(origin, resolvedSchema);
+    }
+
+    @Override
+    public String toString() {
+        return "ResolvedCatalogMaterializedTable{"
+                + "origin="
+                + origin
+                + ", resolvedSchema="
+                + resolvedSchema
+                + '}';
+    }
+}


Reply via email to