This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e28e495cdd3e0e7cbb58685cb09e1fa08af7223e Author: fengli <ldliu...@163.com> AuthorDate: Mon May 6 20:17:57 2024 +0800 [FLINK-35195][table] Convert SqlCreateMaterializedTable node to CreateMaterializedTableOperation --- .../flink/sql/parser/SqlConstraintValidator.java | 2 +- .../sql/parser/ddl/SqlCreateMaterializedTable.java | 1 - .../CreateMaterializedTableOperation.java | 76 ++++++ .../MaterializedTableOperation.java | 26 +++ .../planner/operations/SqlNodeConvertContext.java | 8 + .../SqlCreateMaterializedTableConverter.java | 210 +++++++++++++++++ .../operations/converters/SqlNodeConverter.java | 5 + .../operations/converters/SqlNodeConverters.java | 1 + .../planner/utils/MaterializedTableUtils.java | 98 ++++++++ ...erializedTableNodeToOperationConverterTest.java | 259 +++++++++++++++++++++ .../SqlNodeToOperationConversionTestBase.java | 2 +- .../SqlRTASNodeToOperationConverterTest.java | 2 +- 12 files changed, 686 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java index 8a9a7727b54..f157e5034a8 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java @@ -89,7 +89,7 @@ public class SqlConstraintValidator { } /** Check table constraint. */ - private static void validate(SqlTableConstraint constraint) throws SqlValidateException { + public static void validate(SqlTableConstraint constraint) throws SqlValidateException { if (constraint.isUnique()) { throw new SqlValidateException( constraint.getParserPosition(), "UNIQUE constraint is not supported yet"); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java index 1630a0f0117..eae6f1fcba9 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java @@ -132,7 +132,6 @@ public class SqlCreateMaterializedTable extends SqlCreate { return freshness; } - @Nullable public Optional<SqlLiteral> getRefreshMode() { return Optional.ofNullable(refreshMode); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java new file mode 100644 index 00000000000..d4eff00254d --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.ddl.CreateOperation; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** Operation to describe a CREATE MATERIALIZED TABLE statement. */ +@Internal +public class CreateMaterializedTableOperation + implements CreateOperation, MaterializedTableOperation { + + private final ObjectIdentifier tableIdentifier; + private final CatalogMaterializedTable materializedTable; + + public CreateMaterializedTableOperation( + ObjectIdentifier tableIdentifier, ResolvedCatalogMaterializedTable materializedTable) { + this.tableIdentifier = tableIdentifier; + this.materializedTable = materializedTable; + } + + @Override + public TableResultInternal execute(Context ctx) { + // create materialized table in catalog + ctx.getCatalogManager().createTable(materializedTable, tableIdentifier, false); + return TableResultImpl.TABLE_RESULT_OK; + } + + public ObjectIdentifier getTableIdentifier() { + return tableIdentifier; + } + + public CatalogMaterializedTable getCatalogMaterializedTable() { + return materializedTable; + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("materializedTable", materializedTable); + params.put("identifier", tableIdentifier); + + return OperationUtils.formatWithChildren( + "CREATE MATERIALIZED TABLE", + params, + Collections.emptyList(), + Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java new file mode 100644 index 00000000000..72b83ad1939 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.operations.Operation; + +/** The marker interface for materialized table. */ +@Internal +public interface MaterializedTableOperation extends Operation {} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java index 50d894bd825..2958333caba 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.operations; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.SqlToRexConverter; @@ -43,6 +44,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig; + /** An implementation of {@link SqlNodeConverter.ConvertContext}. */ public class SqlNodeConvertContext implements SqlNodeConverter.ConvertContext { @@ -54,6 +57,11 @@ public class SqlNodeConvertContext implements SqlNodeConverter.ConvertContext { this.catalogManager = catalogManager; } + @Override + public TableConfig getTableConfig() { + return unwrapTableConfig(flinkPlanner.cluster()); + } + @Override public SqlValidator getSqlValidator() { return flinkPlanner.getOrCreateSqlValidator(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java new file mode 100644 index 00000000000..03fc0cbcf66 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.SqlConstraintValidator; +import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable; +import org.apache.flink.sql.parser.ddl.SqlRefreshMode; +import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; +import org.apache.flink.sql.parser.error.SqlValidateException; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; +import org.apache.flink.table.planner.utils.MaterializedTableUtils; +import org.apache.flink.table.planner.utils.OperationConverterUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD; + +/** A converter for {@link SqlCreateMaterializedTable}. */ +public class SqlCreateMaterializedTableConverter + implements SqlNodeConverter<SqlCreateMaterializedTable> { + + @Override + public Operation convertSqlNode( + SqlCreateMaterializedTable sqlCreateMaterializedTable, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlCreateMaterializedTable.fullTableName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + + // get comment + String tableComment = + OperationConverterUtils.getTableComment(sqlCreateMaterializedTable.getComment()); + + // get options + Map<String, String> options = new HashMap<>(); + sqlCreateMaterializedTable + .getPropertyList() + .getList() + .forEach( + p -> + options.put( + ((SqlTableOption) p).getKeyString(), + ((SqlTableOption) p).getValueString())); + + // get freshness + Duration freshness = + MaterializedTableUtils.getMaterializedTableFreshness( + sqlCreateMaterializedTable.getFreshness()); + + // get refresh mode + SqlRefreshMode sqlRefreshMode = null; + if (sqlCreateMaterializedTable.getRefreshMode().isPresent()) { + sqlRefreshMode = + sqlCreateMaterializedTable + .getRefreshMode() + .get() + .getValueAs(SqlRefreshMode.class); + } + CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode = + MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode); + // only MATERIALIZED_TABLE_FRESHNESS_THRESHOLD configured in flink conf yaml work, so we get + // it from rootConfiguration instead of table config + CatalogMaterializedTable.RefreshMode refreshMode = + MaterializedTableUtils.deriveRefreshMode( + context.getTableConfig() + .getRootConfiguration() + .get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD), + freshness, + logicalRefreshMode); + + // get query schema and definition query + SqlNode validateQuery = + context.getSqlValidator().validate(sqlCreateMaterializedTable.getAsQuery()); + PlannerQueryOperation queryOperation = + new PlannerQueryOperation( + context.toRelRoot(validateQuery).project(), + () -> context.toQuotedSqlString(validateQuery)); + String definitionQuery = + context.expandSqlIdentifiers(queryOperation.asSerializableString()); + + // get schema + ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema(); + Schema.Builder builder = Schema.newBuilder().fromResolvedSchema(resolvedSchema); + + // get and verify partition key + List<String> partitionKeys = + sqlCreateMaterializedTable.getPartitionKeyList().getList().stream() + .map(p -> ((SqlIdentifier) p).getSimple()) + .collect(Collectors.toList()); + verifyPartitioningColumnsExist(resolvedSchema, partitionKeys); + + // verify and build primary key + sqlCreateMaterializedTable + .getTableConstraint() + .ifPresent( + sqlTableConstraint -> + verifyAndBuildPrimaryKey( + builder, resolvedSchema, sqlTableConstraint)); + + CatalogMaterializedTable materializedTable = + CatalogMaterializedTable.newBuilder() + .schema(builder.build()) + .comment(tableComment) + .partitionKeys(partitionKeys) + .options(options) + .definitionQuery(definitionQuery) + .freshness(freshness) + .logicalRefreshMode(logicalRefreshMode) + .refreshMode(refreshMode) + .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) + .build(); + + return new CreateMaterializedTableOperation( + identifier, + context.getCatalogManager().resolveCatalogMaterializedTable(materializedTable)); + } + + private static void verifyPartitioningColumnsExist( + ResolvedSchema resolvedSchema, List<String> partitionKeys) { + for (String partitionKey : partitionKeys) { + if (!resolvedSchema.getColumn(partitionKey).isPresent()) { + throw new ValidationException( + String.format( + "Partition column '%s' not defined in the query schema. Available columns: [%s].", + partitionKey, + resolvedSchema.getColumnNames().stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + } + } + + private static void verifyAndBuildPrimaryKey( + Schema.Builder schemaBuilder, + ResolvedSchema resolvedSchema, + SqlTableConstraint sqlTableConstraint) { + // check constraint type + try { + SqlConstraintValidator.validate(sqlTableConstraint); + } catch (SqlValidateException e) { + throw new ValidationException( + String.format("Primary key validation failed: %s.", e.getMessage()), e); + } + + List<String> primaryKeyColumns = Arrays.asList(sqlTableConstraint.getColumnNames()); + for (String columnName : primaryKeyColumns) { + Optional<Column> columnOptional = resolvedSchema.getColumn(columnName); + if (!columnOptional.isPresent()) { + throw new ValidationException( + String.format( + "Primary key column '%s' not defined in the query schema. Available columns: [%s].", + columnName, + resolvedSchema.getColumnNames().stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + + if (columnOptional.get().getDataType().getLogicalType().isNullable()) { + throw new ValidationException( + String.format( + "Could not create a PRIMARY KEY with nullable column '%s'.\n" + + "A PRIMARY KEY column must be declared on non-nullable physical columns.", + columnName)); + } + } + + // build primary key + String constraintName = + sqlTableConstraint + .getConstraintName() + .orElseGet( + () -> + primaryKeyColumns.stream() + .collect(Collectors.joining("_", "PK_", ""))); + schemaBuilder.primaryKeyNamed(constraintName, primaryKeyColumns); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java index a2b48359810..cdd9d860097 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java @@ -18,6 +18,8 @@ package org.apache.flink.table.planner.operations.converters; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.planner.utils.Expander; @@ -72,6 +74,9 @@ public interface SqlNodeConverter<S extends SqlNode> { /** Context of {@link SqlNodeConverter}. */ interface ConvertContext { + /** Returns the {@link TableConfig} defined in {@link TableEnvironment}. */ + TableConfig getTableConfig(); + /** Returns the {@link SqlValidator} in the convert context. */ SqlValidator getSqlValidator(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index caaafc9a331..fc5e3bd498c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -56,6 +56,7 @@ public class SqlNodeConverters { register(new SqlShowCreateCatalogConverter()); register(new SqlDescribeCatalogConverter()); register(new SqlDescribeJobConverter()); + register(new SqlCreateMaterializedTableConverter()); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java new file mode 100644 index 00000000000..33794eac26f --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.sql.parser.ddl.SqlRefreshMode; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; + +import org.apache.calcite.sql.SqlIntervalLiteral; +import org.apache.calcite.sql.type.SqlTypeFamily; + +import java.time.Duration; + +/** The utils for materialized table. */ +@Internal +public class MaterializedTableUtils { + + public static Duration getMaterializedTableFreshness(SqlIntervalLiteral sqlIntervalLiteral) { + if (sqlIntervalLiteral.signum() < 0) { + throw new ValidationException( + "Materialized table freshness doesn't support negative value."); + } + if (sqlIntervalLiteral.getTypeName().getFamily() != SqlTypeFamily.INTERVAL_DAY_TIME) { + throw new ValidationException( + "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit."); + } + + SqlIntervalLiteral.IntervalValue intervalValue = + sqlIntervalLiteral.getValueAs(SqlIntervalLiteral.IntervalValue.class); + long interval = Long.parseLong(intervalValue.getIntervalLiteral()); + switch (intervalValue.getIntervalQualifier().typeName()) { + case INTERVAL_DAY: + return Duration.ofDays(interval); + case INTERVAL_HOUR: + return Duration.ofHours(interval); + case INTERVAL_MINUTE: + return Duration.ofMinutes(interval); + case INTERVAL_SECOND: + return Duration.ofSeconds(interval); + default: + throw new ValidationException( + "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit."); + } + } + + public static CatalogMaterializedTable.LogicalRefreshMode deriveLogicalRefreshMode( + SqlRefreshMode sqlRefreshMode) { + if (sqlRefreshMode == null) { + return CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC; + } + + switch (sqlRefreshMode) { + case FULL: + return CatalogMaterializedTable.LogicalRefreshMode.FULL; + case CONTINUOUS: + return CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS; + default: + throw new ValidationException( + String.format("Unsupported logical refresh mode: %s.", sqlRefreshMode)); + } + } + + public static CatalogMaterializedTable.RefreshMode deriveRefreshMode( + Duration threshold, + Duration definedFreshness, + CatalogMaterializedTable.LogicalRefreshMode definedRefreshMode) { + // If the refresh mode is specified manually, use it directly. + if (definedRefreshMode == CatalogMaterializedTable.LogicalRefreshMode.FULL) { + return CatalogMaterializedTable.RefreshMode.FULL; + } else if (definedRefreshMode == CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS) { + return CatalogMaterializedTable.RefreshMode.CONTINUOUS; + } + + // derive the actual refresh mode via defined freshness + if (definedFreshness.compareTo(threshold) <= 0) { + return CatalogMaterializedTable.RefreshMode.CONTINUOUS; + } else { + return CatalogMaterializedTable.RefreshMode.FULL; + } + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java new file mode 100644 index 00000000000..9514e12367e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for the materialized table statements for {@link SqlNodeToOperationConversion}. */ +public class SqlMaterializedTableNodeToOperationConverterTest + extends SqlNodeToOperationConversionTestBase { + + @Test + public void testCreateMaterializedTable() { + final String sql = + "CREATE MATERIALIZED TABLE mtbl1 (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); + + CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation; + CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); + assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class); + + Map<String, String> options = new HashMap<>(); + options.put("connector", "filesystem"); + options.put("format", "json"); + CatalogMaterializedTable expected = + CatalogMaterializedTable.newBuilder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.BIGINT().notNull()) + .column("b", DataTypes.VARCHAR(Integer.MAX_VALUE)) + .column("c", DataTypes.INT()) + .column("d", DataTypes.VARCHAR(Integer.MAX_VALUE)) + .primaryKeyNamed("ct1", Collections.singletonList("a")) + .build()) + .comment("materialized table comment") + .options(options) + .partitionKeys(Arrays.asList("a", "d")) + .freshness(Duration.ofSeconds(30)) + .logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL) + .refreshMode(CatalogMaterializedTable.RefreshMode.FULL) + .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) + .definitionQuery( + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`") + .build(); + + assertThat(((ResolvedCatalogMaterializedTable) materializedTable).getOrigin()) + .isEqualTo(expected); + } + + @Test + public void testContinuousRefreshMode() { + // test continuous mode derived by specify freshness automatically + final String sql = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); + + CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation; + CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); + assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class); + + assertThat(materializedTable.getLogicalRefreshMode()) + .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); + assertThat(materializedTable.getRefreshMode()) + .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS); + + // test continuous mode by manual specify + final String sql2 = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "FRESHNESS = INTERVAL '30' DAY\n" + + "REFRESH_MODE = CONTINUOUS\n" + + "AS SELECT * FROM t1"; + Operation operation2 = parse(sql2); + assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class); + + CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2; + CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable(); + assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class); + + assertThat(materializedTable2.getLogicalRefreshMode()) + .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS); + assertThat(materializedTable2.getRefreshMode()) + .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS); + } + + @Test + public void testFullRefreshMode() { + // test full mode derived by specify freshness automatically + final String sql = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "FRESHNESS = INTERVAL '1' DAY\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); + + CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation; + CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); + assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class); + + assertThat(materializedTable.getLogicalRefreshMode()) + .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); + assertThat(materializedTable.getRefreshMode()) + .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL); + + // test full mode by manual specify + final String sql2 = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + Operation operation2 = parse(sql2); + assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class); + + CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2; + CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable(); + assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class); + + assertThat(materializedTable2.getLogicalRefreshMode()) + .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL); + assertThat(materializedTable2.getRefreshMode()) + .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL); + } + + @Test + public void testCreateMaterializedTableWithInvalidPrimaryKey() { + // test unsupported constraint + final String sql = + "CREATE MATERIALIZED TABLE mtbl1 (\n" + + " CONSTRAINT ct1 UNIQUE(a) NOT ENFORCED" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "AS SELECT * FROM t1"; + + assertThatThrownBy(() -> parse(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Primary key validation failed: UNIQUE constraint is not supported yet."); + + // test primary key not defined in source table + final String sql2 = + "CREATE MATERIALIZED TABLE mtbl1 (\n" + + " CONSTRAINT ct1 PRIMARY KEY(e) NOT ENFORCED" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "AS SELECT * FROM t1"; + + assertThatThrownBy(() -> parse(sql2)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Primary key column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd']."); + + // test primary key with nullable source column + final String sql3 = + "CREATE MATERIALIZED TABLE mtbl1 (\n" + + " CONSTRAINT ct1 PRIMARY KEY(d) NOT ENFORCED" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "AS SELECT * FROM t1"; + + assertThatThrownBy(() -> parse(sql3)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Could not create a PRIMARY KEY with nullable column 'd'."); + } + + @Test + public void testCreateMaterializedTableWithInvalidPartitionKey() { + final String sql = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "PARTITIONED BY (a, e)\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + assertThatThrownBy(() -> parse(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Partition column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd']."); + } + + @Test + public void testCreateMaterializedTableWithInvalidFreshnessType() { + // test negative freshness value + final String sql = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "FRESHNESS = INTERVAL -'30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + assertThatThrownBy(() -> parse(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Materialized table freshness doesn't support negative value."); + + // test unsupported freshness type + final String sql2 = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "FRESHNESS = INTERVAL '30' YEAR\n" + + "AS SELECT * FROM t1"; + assertThatThrownBy(() -> parse(sql2)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit."); + + // test unsupported freshness type + final String sql3 = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "FRESHNESS = INTERVAL '30' DAY TO HOUR\n" + + "AS SELECT * FROM t1"; + assertThatThrownBy(() -> parse(sql3)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit."); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java index 68efa48899e..2b87c89ebf8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java @@ -107,7 +107,7 @@ public class SqlNodeToOperationConversionTestBase { Schema.newBuilder() .fromResolvedSchema( ResolvedSchema.of( - Column.physical("a", DataTypes.BIGINT()), + Column.physical("a", DataTypes.BIGINT().notNull()), Column.physical("b", DataTypes.VARCHAR(Integer.MAX_VALUE)), Column.physical("c", DataTypes.INT()), Column.physical("d", DataTypes.VARCHAR(Integer.MAX_VALUE)))) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java index 5c31f230cd4..8a6fc806cab 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java @@ -126,7 +126,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve .fromFields( new String[] {"a", "b", "c", "d"}, new AbstractDataType[] { - DataTypes.BIGINT(), + DataTypes.BIGINT().notNull(), DataTypes.STRING(), DataTypes.INT(), DataTypes.STRING()