This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2a9876d7ebc75b9e650653b996c02af3d217b41a Author: fengli <ldliu...@163.com> AuthorDate: Tue Apr 23 10:01:31 2024 +0800 [FLINK-35188][table-api] Introduce CatalogMaterializedTable interface to support materialized table --- .../hive/util/OperationExecutorFactory.java | 12 +- .../apache/flink/table/catalog/CatalogManager.java | 41 ++- .../catalog/CatalogBaseTableResolutionTest.java | 73 +++++- .../org/apache/flink/table/catalog/Catalog.java | 9 + .../flink/table/catalog/CatalogBaseTable.java | 1 + .../table/catalog/CatalogMaterializedTable.java | 280 ++++++++++++++++++++ .../catalog/DefaultCatalogMaterializedTable.java | 292 +++++++++++++++++++++ .../catalog/ResolvedCatalogMaterializedTable.java | 185 +++++++++++++ 8 files changed, 885 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java index d3d31f39230..ecaba8a5eda 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java @@ -101,6 +101,10 @@ import static org.apache.hadoop.hive.serde2.thrift.Type.VARCHAR_TYPE; /** Factory to create the operation executor. */ public class OperationExecutorFactory { + // Hive dialect doesn't support materialized table currently. + private static final Set<TableKind> TABLE_KINDS = + new HashSet<>(Arrays.asList(TableKind.TABLE, TableKind.VIEW)); + public static Callable<ResultSet> createGetCatalogsExecutor( SqlGatewayService service, SessionHandle sessionHandle) { return () -> executeGetCatalogs(service, sessionHandle); @@ -291,14 +295,13 @@ public class OperationExecutorFactory { Set<String> schemaNames = filterAndSort( service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); - Set<TableKind> tableKinds = new HashSet<>(Arrays.asList(TableKind.values())); List<RowData> results = new ArrayList<>(); for (String schema : schemaNames) { Set<TableInfo> tableInfos = filterAndSort( service.listTables( - sessionHandle, specifiedCatalogName, schema, tableKinds), + sessionHandle, specifiedCatalogName, schema, TABLE_KINDS), candidates -> candidates.getIdentifier().getObjectName(), tableName); @@ -369,10 +372,7 @@ public class OperationExecutorFactory { Set<TableInfo> tableInfos = filterAndSort( service.listTables( - sessionHandle, - specifiedCatalogName, - schema, - new HashSet<>(Arrays.asList(TableKind.values()))), + sessionHandle, specifiedCatalogName, schema, TABLE_KINDS), candidate -> candidate.getIdentifier().getObjectName(), tableName); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 543105d027a..9e7bf5ec007 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -963,7 +963,8 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { ignoreIfExists); catalog.createTable(path, resolvedListenedTable, ignoreIfExists); - if (resolvedListenedTable instanceof CatalogTable) { + if (resolvedListenedTable instanceof CatalogTable + || resolvedListenedTable instanceof CatalogMaterializedTable) { catalogModificationListeners.forEach( listener -> listener.onEvent( @@ -1318,6 +1319,8 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { Preconditions.checkNotNull(schemaResolver, "Schema resolver is not initialized."); if (baseTable instanceof CatalogTable) { return resolveCatalogTable((CatalogTable) baseTable); + } else if (baseTable instanceof CatalogMaterializedTable) { + return resolveCatalogMaterializedTable((CatalogMaterializedTable) baseTable); } else if (baseTable instanceof CatalogView) { return resolveCatalogView((CatalogView) baseTable); } @@ -1389,6 +1392,42 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { return new ResolvedCatalogTable(table, resolvedSchema); } + /** + * Resolves a {@link CatalogMaterializedTable} to a validated {@link + * ResolvedCatalogMaterializedTable}. + */ + public ResolvedCatalogMaterializedTable resolveCatalogMaterializedTable( + CatalogMaterializedTable table) { + Preconditions.checkNotNull(schemaResolver, "Schema resolver is not initialized."); + + if (table instanceof ResolvedCatalogMaterializedTable) { + return (ResolvedCatalogMaterializedTable) table; + } + + final ResolvedSchema resolvedSchema = table.getUnresolvedSchema().resolve(schemaResolver); + + // Validate partition keys are included in physical columns + final List<String> physicalColumns = + resolvedSchema.getColumns().stream() + .filter(Column::isPhysical) + .map(Column::getName) + .collect(Collectors.toList()); + table.getPartitionKeys() + .forEach( + partitionKey -> { + if (!physicalColumns.contains(partitionKey)) { + throw new ValidationException( + String.format( + "Invalid partition key '%s'. A partition key must " + + "reference a physical column in the schema. " + + "Available columns are: %s", + partitionKey, physicalColumns)); + } + }); + + return new ResolvedCatalogMaterializedTable(table, resolvedSchema); + } + /** Resolves a {@link CatalogView} to a validated {@link ResolvedCatalogView}. */ public ResolvedCatalogView resolveCatalogView(CatalogView view) { Preconditions.checkNotNull(schemaResolver, "Schema resolver is not initialized."); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java index fe0d84bd493..c52c8bd1bfd 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java @@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test; import javax.annotation.Nullable; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -49,7 +50,8 @@ import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.HamcrestCondition.matching; /** - * Tests for {@link CatalogTable} to {@link ResolvedCatalogTable} and {@link CatalogView} to {@link + * Tests for {@link CatalogTable} to {@link ResolvedCatalogTable}, {@link CatalogMaterializedTable} + * to {@link ResolvedCatalogMaterializedTable} and {@link CatalogView} to {@link * ResolvedCatalogView} including {@link CatalogPropertiesUtil}. */ class CatalogBaseTableResolutionTest { @@ -82,6 +84,17 @@ class CatalogBaseTableResolutionTest { .primaryKeyNamed("primary_constraint", "id") .build(); + private static final Schema MATERIALIZED_TABLE_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT().notNull()) + .column("region", DataTypes.VARCHAR(200)) + .withComment("This is a region column.") + .column("county", DataTypes.VARCHAR(200)) + .column("topic", DataTypes.VARCHAR(200)) + .withComment("") // empty column comment + .primaryKeyNamed("primary_constraint", "id") + .build(); + private static final TableSchema LEGACY_TABLE_SCHEMA = TableSchema.builder() .add(TableColumn.physical("id", DataTypes.INT().notNull())) @@ -117,6 +130,18 @@ class CatalogBaseTableResolutionTest { UniqueConstraint.primaryKey( "primary_constraint", Collections.singletonList("id"))); + private static final ResolvedSchema RESOLVED_MATERIALIZED_TABLE_SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("region", DataTypes.VARCHAR(200)) + .withComment("This is a region column."), + Column.physical("county", DataTypes.VARCHAR(200)), + Column.physical("topic", DataTypes.VARCHAR(200)).withComment("")), + Collections.emptyList(), + UniqueConstraint.primaryKey( + "primary_constraint", Collections.singletonList("id"))); + private static final ResolvedSchema RESOLVED_VIEW_SCHEMA = new ResolvedSchema( Arrays.asList( @@ -140,6 +165,30 @@ class CatalogBaseTableResolutionTest { assertThat(resolvedTable.getSchema()).isEqualTo(LEGACY_TABLE_SCHEMA); } + @Test + void testCatalogMaterializedTableResolution() { + final CatalogMaterializedTable materializedTable = catalogMaterializedTable(); + + assertThat(materializedTable.getUnresolvedSchema()).isNotNull(); + + final ResolvedCatalogMaterializedTable resolvedMaterializedTable = + resolveCatalogBaseTable(ResolvedCatalogMaterializedTable.class, materializedTable); + + assertThat(resolvedMaterializedTable.getResolvedSchema()) + .isEqualTo(RESOLVED_MATERIALIZED_TABLE_SCHEMA); + + assertThat(resolvedMaterializedTable.getDefinitionQuery()) + .isEqualTo(materializedTable.getDefinitionQuery()); + assertThat(resolvedMaterializedTable.getFreshness()) + .isEqualTo(materializedTable.getFreshness()); + assertThat(resolvedMaterializedTable.getLogicalRefreshMode()) + .isEqualTo(materializedTable.getLogicalRefreshMode()); + assertThat(resolvedMaterializedTable.getRefreshMode()) + .isEqualTo(materializedTable.getRefreshMode()); + assertThat(resolvedMaterializedTable.getRefreshStatus()) + .isEqualTo(materializedTable.getRefreshStatus()); + } + @Test void testCatalogViewResolution() { final CatalogView view = catalogView(); @@ -319,6 +368,28 @@ class CatalogBaseTableResolutionTest { return properties; } + private static CatalogMaterializedTable catalogMaterializedTable() { + final String comment = "This is an example materialized table."; + final List<String> partitionKeys = Arrays.asList("region", "county"); + + final String definitionQuery = + String.format( + "SELECT id, region, county FROM %s.%s.T", + DEFAULT_CATALOG, DEFAULT_DATABASE); + + CatalogMaterializedTable.Builder builder = CatalogMaterializedTable.newBuilder(); + return builder.schema(MATERIALIZED_TABLE_SCHEMA) + .comment(comment) + .partitionKeys(partitionKeys) + .options(Collections.emptyMap()) + .definitionQuery(definitionQuery) + .freshness(Duration.ofSeconds(30)) + .logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC) + .refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS) + .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) + .build(); + } + private static CatalogView catalogView() { final String comment = "This is an example table."; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index ccacb948b3b..8cd45111102 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -35,6 +35,8 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.exceptions.TablePartitionedException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.Factory; @@ -66,6 +68,13 @@ public interface Catalog { * internal catalog-specific objects to their own factory. For example, a custom {@link * CatalogTable} can be processed by a custom {@link DynamicTableFactory}. * + * <p>If this catalog support to create materialized table, you should also override this method + * to provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} + * and {@link DynamicTableSink} correctly during compile optimization phase. If you don't + * override this method, you must specify the physical connector identifier that this catalog + * represents storage when create materialized table. Otherwise, the planner can't find the + * {@link DynamicTableFactory}. + * * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement * multiple supported extension points. An {@code instanceof} check is performed by the caller * that checks whether a required factory is implemented; otherwise the discovery process is diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java index 4e72fc238b7..b956b67840d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java @@ -39,6 +39,7 @@ public interface CatalogBaseTable { @PublicEvolving enum TableKind { TABLE, + MATERIALIZED_TABLE, VIEW } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java new file mode 100644 index 00000000000..51856cc859e --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.Schema; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Represents the unresolved metadata of a materialized table in a {@link Catalog}. + * + * <p>Materialized Table definition: In the context of streaming-batch unified storage, it provides + * full history data and incremental changelog. By defining the data's production business logic and + * freshness, data refresh is achieved through continuous or full refresh pipeline, while also + * possessing the capability for both batch and incremental consumption. + * + * <p>The metadata for {@link CatalogMaterializedTable} includes the following four main parts: + * + * <ul> + * <li>Schema, comments, options and partition keys. + * <li>Data freshness, which determines when the data is generated and becomes visible for user. + * <li>Data production business logic, also known as the definition query. + * <li>Background refresh pipeline, either through a flink streaming or periodic flink batch job, + * it is initialized after materialized table is created. + * </ul> + */ +@PublicEvolving +public interface CatalogMaterializedTable extends CatalogBaseTable { + + /** Builder for configuring and creating instances of {@link CatalogMaterializedTable}. */ + @PublicEvolving + static CatalogMaterializedTable.Builder newBuilder() { + return new CatalogMaterializedTable.Builder(); + } + + @Override + default TableKind getTableKind() { + return TableKind.MATERIALIZED_TABLE; + } + + /** + * Check if the table is partitioned or not. + * + * @return true if the table is partitioned; otherwise, false + */ + boolean isPartitioned(); + + /** + * Get the partition keys of the table. This will be an empty set if the table is not + * partitioned. + * + * @return partition keys of the table + */ + List<String> getPartitionKeys(); + + /** + * Returns a copy of this {@code CatalogMaterializedTable} with given table options {@code + * options}. + * + * @return a new copy of this table with replaced table options + */ + CatalogMaterializedTable copy(Map<String, String> options); + + /** + * Returns a copy of this {@code CatalogDynamicTable} with given refresh info. + * + * @return a new copy of this table with replaced refresh info + */ + CatalogMaterializedTable copy( + RefreshStatus refreshStatus, + String refreshHandlerDescription, + byte[] serializedRefreshHandler); + + /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */ + Optional<Long> getSnapshot(); + + /** + * The definition query text of materialized table, text is expanded in contrast to the original + * SQL. This is needed because the context such as current DB is lost after the session, in + * which view is defined, is gone. Expanded query text takes care of this, as an example. + * + * <p>For example, for a materialized table that is defined in the context of "default" database + * with a query {@code select * from test1}, the expanded query text might become {@code select + * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in + * database "default" and has two columns ("name" and "value"). + * + * @return the materialized table definition in expanded text. + */ + String getDefinitionQuery(); + + /** + * Get the freshness of materialized table which is used to determine the physical refresh mode. + */ + Duration getFreshness(); + + /** Get the logical refresh mode of materialized table. */ + LogicalRefreshMode getLogicalRefreshMode(); + + /** Get the physical refresh mode of materialized table. */ + RefreshMode getRefreshMode(); + + /** Get the refresh status of materialized table. */ + RefreshStatus getRefreshStatus(); + + /** Return summary description of refresh handler. */ + Optional<String> getRefreshHandlerDescription(); + + /** + * Return the serialized refresh handler of materialized table. This will not be used for + * describe table. + */ + @Nullable + byte[] getSerializedRefreshHandler(); + + /** The logical refresh mode of materialized table. */ + @PublicEvolving + enum LogicalRefreshMode { + /** + * The refresh pipeline will be executed in continuous mode, corresponding to {@link + * RefreshMode#CONTINUOUS}. + */ + CONTINUOUS, + + /** + * The refresh pipeline will be executed in full mode, corresponding to {@link + * RefreshMode#FULL}. + */ + FULL, + + /** + * The refresh pipeline mode is determined by freshness of materialized table, either {@link + * RefreshMode#FULL} or {@link RefreshMode#CONTINUOUS}. + */ + AUTOMATIC + } + + /** The physical refresh mode of materialized table. */ + @PublicEvolving + enum RefreshMode { + CONTINUOUS, + FULL + } + + /** Background refresh pipeline status of materialized table. */ + @PublicEvolving + enum RefreshStatus { + INITIALIZING, + ACTIVATED, + SUSPENDED + } + + // -------------------------------------------------------------------------------------------- + + /** Builder for configuring and creating instances of {@link CatalogMaterializedTable}. */ + @PublicEvolving + class Builder { + + private Schema schema; + private String comment; + private List<String> partitionKeys = Collections.emptyList(); + private Map<String, String> options = Collections.emptyMap(); + private @Nullable Long snapshot; + private String definitionQuery; + private Duration freshness; + private LogicalRefreshMode logicalRefreshMode; + private RefreshMode refreshMode; + private RefreshStatus refreshStatus; + private @Nullable String refreshHandlerDescription; + private @Nullable byte[] serializedRefreshHandler; + + private Builder() {} + + public Builder schema(Schema schema) { + this.schema = Preconditions.checkNotNull(schema, "Schema must not be null."); + return this; + } + + public Builder comment(@Nullable String comment) { + this.comment = comment; + return this; + } + + public Builder partitionKeys(List<String> partitionKeys) { + this.partitionKeys = + Preconditions.checkNotNull(partitionKeys, "Partition keys must not be null."); + return this; + } + + public Builder options(Map<String, String> options) { + this.options = Preconditions.checkNotNull(options, "Options must not be null."); + return this; + } + + public Builder snapshot(@Nullable Long snapshot) { + this.snapshot = snapshot; + return this; + } + + public Builder definitionQuery(String definitionQuery) { + this.definitionQuery = + Preconditions.checkNotNull( + definitionQuery, "Definition query must not be null."); + return this; + } + + public Builder freshness(Duration freshness) { + this.freshness = Preconditions.checkNotNull(freshness, "Freshness must not be null."); + return this; + } + + public Builder logicalRefreshMode(LogicalRefreshMode logicalRefreshMode) { + this.logicalRefreshMode = + Preconditions.checkNotNull( + logicalRefreshMode, "Logical refresh mode must not be null."); + return this; + } + + public Builder refreshMode(RefreshMode refreshMode) { + this.refreshMode = + Preconditions.checkNotNull(refreshMode, "Refresh mode must not be null."); + return this; + } + + public Builder refreshStatus(RefreshStatus refreshStatus) { + this.refreshStatus = + Preconditions.checkNotNull(refreshStatus, "Refresh status must not be null."); + return this; + } + + public Builder refreshHandlerDescription(@Nullable String refreshHandlerDescription) { + this.refreshHandlerDescription = refreshHandlerDescription; + return this; + } + + public Builder serializedRefreshHandler(@Nullable byte[] serializedRefreshHandler) { + this.serializedRefreshHandler = serializedRefreshHandler; + return this; + } + + public CatalogMaterializedTable build() { + return new DefaultCatalogMaterializedTable( + schema, + comment, + partitionKeys, + options, + snapshot, + definitionQuery, + freshness, + logicalRefreshMode, + refreshMode, + refreshStatus, + refreshHandlerDescription, + serializedRefreshHandler); + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java new file mode 100644 index 00000000000..26166e3e8a2 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Default implementation of a {@link CatalogMaterializedTable}. */ +@Internal +public class DefaultCatalogMaterializedTable implements CatalogMaterializedTable { + + private final Schema schema; + private final @Nullable String comment; + private final List<String> partitionKeys; + private final Map<String, String> options; + + private final @Nullable Long snapshot; + + private final String definitionQuery; + private final Duration freshness; + private final LogicalRefreshMode logicalRefreshMode; + private final RefreshMode refreshMode; + private final RefreshStatus refreshStatus; + private final @Nullable String refreshHandlerDescription; + private final @Nullable byte[] serializedRefreshHandler; + + protected DefaultCatalogMaterializedTable( + Schema schema, + @Nullable String comment, + List<String> partitionKeys, + Map<String, String> options, + @Nullable Long snapshot, + String definitionQuery, + Duration freshness, + LogicalRefreshMode logicalRefreshMode, + RefreshMode refreshMode, + RefreshStatus refreshStatus, + @Nullable String refreshHandlerDescription, + @Nullable byte[] serializedRefreshHandler) { + this.schema = checkNotNull(schema, "Schema must not be null."); + this.comment = comment; + this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must not be null."); + this.options = checkNotNull(options, "Options must not be null."); + this.snapshot = snapshot; + this.definitionQuery = checkNotNull(definitionQuery, "Definition query must not be null."); + this.freshness = checkNotNull(freshness, "Freshness must not be null."); + this.logicalRefreshMode = + checkNotNull(logicalRefreshMode, "Logical refresh mode must not be null."); + this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be null."); + this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must not be null."); + this.refreshHandlerDescription = refreshHandlerDescription; + this.serializedRefreshHandler = serializedRefreshHandler; + + checkArgument( + options.entrySet().stream() + .allMatch(e -> e.getKey() != null && e.getValue() != null), + "Options cannot have null keys or values."); + } + + @Override + public Schema getUnresolvedSchema() { + return schema; + } + + @Override + public String getComment() { + return comment != null ? comment : ""; + } + + @Override + public boolean isPartitioned() { + return !partitionKeys.isEmpty(); + } + + @Override + public List<String> getPartitionKeys() { + return partitionKeys; + } + + @Override + public Map<String, String> getOptions() { + return options; + } + + @Override + public CatalogBaseTable copy() { + return new DefaultCatalogMaterializedTable( + schema, + comment, + partitionKeys, + options, + snapshot, + definitionQuery, + freshness, + logicalRefreshMode, + refreshMode, + refreshStatus, + refreshHandlerDescription, + serializedRefreshHandler); + } + + @Override + public CatalogMaterializedTable copy(Map<String, String> options) { + return new DefaultCatalogMaterializedTable( + schema, + comment, + partitionKeys, + options, + snapshot, + definitionQuery, + freshness, + logicalRefreshMode, + refreshMode, + refreshStatus, + refreshHandlerDescription, + serializedRefreshHandler); + } + + @Override + public CatalogMaterializedTable copy( + RefreshStatus refreshStatus, + String refreshHandlerDescription, + byte[] serializedRefreshHandler) { + return new DefaultCatalogMaterializedTable( + schema, + comment, + partitionKeys, + options, + snapshot, + definitionQuery, + freshness, + logicalRefreshMode, + refreshMode, + refreshStatus, + refreshHandlerDescription, + serializedRefreshHandler); + } + + @Override + public Optional<String> getDescription() { + return Optional.of(getComment()); + } + + @Override + public Optional<String> getDetailedDescription() { + return Optional.empty(); + } + + @Override + public Optional<Long> getSnapshot() { + return Optional.ofNullable(snapshot); + } + + @Override + public String getDefinitionQuery() { + return definitionQuery; + } + + @Override + public Duration getFreshness() { + return freshness; + } + + @Override + public LogicalRefreshMode getLogicalRefreshMode() { + return logicalRefreshMode; + } + + @Override + public RefreshMode getRefreshMode() { + return refreshMode; + } + + @Override + public RefreshStatus getRefreshStatus() { + return refreshStatus; + } + + @Override + public Optional<String> getRefreshHandlerDescription() { + return Optional.ofNullable(refreshHandlerDescription); + } + + @Nullable + @Override + public byte[] getSerializedRefreshHandler() { + return serializedRefreshHandler; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultCatalogMaterializedTable that = (DefaultCatalogMaterializedTable) o; + return Objects.equals(schema, that.schema) + && Objects.equals(comment, that.comment) + && Objects.equals(partitionKeys, that.partitionKeys) + && Objects.equals(options, that.options) + && Objects.equals(snapshot, that.snapshot) + && Objects.equals(definitionQuery, that.definitionQuery) + && Objects.equals(freshness, that.freshness) + && logicalRefreshMode == that.logicalRefreshMode + && refreshMode == that.refreshMode + && refreshStatus == that.refreshStatus + && Objects.equals(refreshHandlerDescription, that.refreshHandlerDescription) + && Arrays.equals(serializedRefreshHandler, that.serializedRefreshHandler); + } + + @Override + public int hashCode() { + int result = + Objects.hash( + schema, + comment, + partitionKeys, + options, + snapshot, + definitionQuery, + freshness, + logicalRefreshMode, + refreshMode, + refreshStatus, + refreshHandlerDescription); + result = 31 * result + Arrays.hashCode(serializedRefreshHandler); + return result; + } + + @Override + public String toString() { + return "DefaultCatalogMaterializedTable{" + + "schema=" + + schema + + ", comment='" + + comment + + '\'' + + ", partitionKeys=" + + partitionKeys + + ", options=" + + options + + ", snapshot=" + + snapshot + + ", definitionQuery='" + + definitionQuery + + '\'' + + ", freshness=" + + freshness + + ", logicalRefreshMode=" + + logicalRefreshMode + + ", refreshMode=" + + refreshMode + + ", refreshStatus=" + + refreshStatus + + ", refreshHandlerDescription='" + + refreshHandlerDescription + + '\'' + + ", serializedRefreshHandler=" + + Arrays.toString(serializedRefreshHandler) + + '}'; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java new file mode 100644 index 00000000000..a0206af3111 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * A validated {@link CatalogMaterializedTable} that is backed by the original metadata coming from + * the {@link Catalog} but resolved by the framework. + * + * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner + * optimize query phase. + */ +@PublicEvolving +public class ResolvedCatalogMaterializedTable + implements ResolvedCatalogBaseTable<CatalogMaterializedTable>, CatalogMaterializedTable { + + private final CatalogMaterializedTable origin; + + private final ResolvedSchema resolvedSchema; + + public ResolvedCatalogMaterializedTable( + CatalogMaterializedTable origin, ResolvedSchema resolvedSchema) { + this.origin = + Preconditions.checkNotNull( + origin, "Original catalog materialized table must not be null."); + this.resolvedSchema = + Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null."); + } + + @Override + public Map<String, String> getOptions() { + return origin.getOptions(); + } + + @Override + public String getComment() { + return origin.getComment(); + } + + @Override + public CatalogBaseTable copy() { + return new ResolvedCatalogMaterializedTable( + (CatalogMaterializedTable) origin.copy(), resolvedSchema); + } + + @Override + public ResolvedCatalogMaterializedTable copy(Map<String, String> options) { + return new ResolvedCatalogMaterializedTable(origin.copy(options), resolvedSchema); + } + + @Override + public ResolvedCatalogMaterializedTable copy( + RefreshStatus refreshStatus, + String refreshHandlerDescription, + byte[] serializedRefreshHandler) { + return new ResolvedCatalogMaterializedTable( + origin.copy(refreshStatus, refreshHandlerDescription, serializedRefreshHandler), + resolvedSchema); + } + + @Override + public Optional<String> getDescription() { + return origin.getDescription(); + } + + @Override + public Optional<String> getDetailedDescription() { + return origin.getDetailedDescription(); + } + + @Override + public boolean isPartitioned() { + return origin.isPartitioned(); + } + + @Override + public List<String> getPartitionKeys() { + return origin.getPartitionKeys(); + } + + @Override + public Optional<Long> getSnapshot() { + return origin.getSnapshot(); + } + + @Override + public CatalogMaterializedTable getOrigin() { + return origin; + } + + @Override + public ResolvedSchema getResolvedSchema() { + return resolvedSchema; + } + + @Override + public String getDefinitionQuery() { + return origin.getDefinitionQuery(); + } + + @Override + public Duration getFreshness() { + return origin.getFreshness(); + } + + @Override + public LogicalRefreshMode getLogicalRefreshMode() { + return origin.getLogicalRefreshMode(); + } + + @Override + public RefreshMode getRefreshMode() { + return origin.getRefreshMode(); + } + + @Override + public RefreshStatus getRefreshStatus() { + return origin.getRefreshStatus(); + } + + @Override + public Optional<String> getRefreshHandlerDescription() { + return origin.getRefreshHandlerDescription(); + } + + @Nullable + @Override + public byte[] getSerializedRefreshHandler() { + return origin.getSerializedRefreshHandler(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ResolvedCatalogMaterializedTable that = (ResolvedCatalogMaterializedTable) o; + return Objects.equals(origin, that.origin) + && Objects.equals(resolvedSchema, that.resolvedSchema); + } + + @Override + public int hashCode() { + return Objects.hash(origin, resolvedSchema); + } + + @Override + public String toString() { + return "ResolvedCatalogMaterializedTable{" + + "origin=" + + origin + + ", resolvedSchema=" + + resolvedSchema + + '}'; + } +}