This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5617d62c98f160af1a4a879cc2d40ef6209b2a32 Author: fengli <ldliu...@163.com> AuthorDate: Tue Jun 25 19:24:51 2024 +0800 [FLINK-35691][table] Fix partition.fields.#.date-formatter option verify logic --- .../MaterializedTableManager.java | 8 +-- .../service/MaterializedTableStatementITCase.java | 2 +- .../SqlCreateMaterializedTableConverter.java | 49 +++++++++++++++- ...erializedTableNodeToOperationConverterTest.java | 68 ++++++++++++++++++++++ 4 files changed, 120 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 036d610af81..a4cd306db60 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -581,7 +581,7 @@ public class MaterializedTableManager { try { LOG.info( - "Begin to manually refreshing the materialized table {}, statement: {}", + "Begin to refreshing the materialized table {}, statement: {}", materializedTableIdentifier, insertStatement); ResultFetcher resultFetcher = @@ -610,7 +610,7 @@ public class MaterializedTableManager { } catch (Exception e) { throw new SqlExecutionException( String.format( - "Manually refreshing the materialized table %s occur exception.", + "Refreshing the materialized table %s occur exception.", materializedTableIdentifier), e); } @@ -697,8 +697,8 @@ public class MaterializedTableManager { if (!nonStringPartitionKeys.isEmpty()) { throw new ValidationException( String.format( - "Currently, manually refreshing materialized table only supports specifying char and string type" - + " partition keys. All specific partition keys with unsupported types are:\n\n%s", + "Currently, refreshing materialized table only supports referring to char, varchar and string type" + + " partition keys. All specified partition keys in partition specs with unsupported types are:\n\n%s", String.join("\n", nonStringPartitionKeys))); } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index 961a949ec83..f090a6cada7 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -444,7 +444,7 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS .rootCause() .isInstanceOf(ValidationException.class) .hasMessage( - "Currently, manually refreshing materialized table only supports specifying char and string type partition keys. All specific partition keys with unsupported types are:\n" + "Currently, refreshing materialized table only supports referring to char, varchar and string type partition keys. All specified partition keys in partition specs with unsupported types are:\n" + "\n" + "ds2"); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java index b48070d50e7..de14add9692 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java @@ -37,6 +37,8 @@ import org.apache.flink.table.operations.materializedtable.CreateMaterializedTab import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.utils.MaterializedTableUtils; import org.apache.flink.table.planner.utils.OperationConverterUtils; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; @@ -46,9 +48,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER; import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD; +import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS; import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron; import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration; @@ -129,7 +134,12 @@ public class SqlCreateMaterializedTableConverter sqlCreateMaterializedTable.getPartitionKeyList().getList().stream() .map(p -> ((SqlIdentifier) p).getSimple()) .collect(Collectors.toList()); - verifyPartitioningColumnsExist(resolvedSchema, partitionKeys); + verifyPartitioningColumnsExist( + resolvedSchema, + partitionKeys, + options.keySet().stream() + .filter(k -> k.startsWith(PARTITION_FIELDS)) + .collect(Collectors.toSet())); // verify and build primary key sqlCreateMaterializedTable @@ -158,7 +168,10 @@ public class SqlCreateMaterializedTableConverter } private static void verifyPartitioningColumnsExist( - ResolvedSchema resolvedSchema, List<String> partitionKeys) { + ResolvedSchema resolvedSchema, + List<String> partitionKeys, + Set<String> partitionFieldOptions) { + // verify partition key whether exists for (String partitionKey : partitionKeys) { if (!resolvedSchema.getColumn(partitionKey).isPresent()) { throw new ValidationException( @@ -169,6 +182,38 @@ public class SqlCreateMaterializedTableConverter .collect(Collectors.joining("', '", "'", "'")))); } } + + // verify partition key used by materialized table partition option + // partition.fields.#.date-formatter whether exist + for (String partitionOption : partitionFieldOptions) { + String partitionKey = + partitionOption.substring( + PARTITION_FIELDS.length() + 1, + partitionOption.length() - (DATE_FORMATTER.length() + 1)); + // partition key used in option partition.fields.#.date-formatter must be existed + if (!partitionKeys.contains(partitionKey)) { + throw new ValidationException( + String.format( + "Column '%s' referenced by materialized table option '%s' isn't a partition column. Available partition columns: [%s].", + partitionKey, + partitionOption, + partitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + + // partition key used in option partition.fields.#.date-formatter must be string type + LogicalType partitionKeyType = + resolvedSchema.getColumn(partitionKey).get().getDataType().getLogicalType(); + if (!partitionKeyType + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + throw new ValidationException( + String.format( + "Materialized table option '%s' only supports referring to char, varchar and string type partition column. Column %s type is %s.", + partitionOption, partitionKey, partitionKeyType.asSummaryString())); + } + } } private static void verifyAndBuildPrimaryKey( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 77eb2af1e9d..877276730e5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -22,8 +22,14 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation; @@ -33,6 +39,7 @@ import org.apache.flink.table.operations.materializedtable.DropMaterializedTable import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -47,6 +54,26 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; public class SqlMaterializedTableNodeToOperationConverterTest extends SqlNodeToOperationConversionTestBase { + @BeforeEach + public void before() throws TableAlreadyExistException, DatabaseNotExistException { + super.before(); + final ObjectPath path3 = new ObjectPath(catalogManager.getCurrentDatabase(), "t3"); + final Schema tableSchema = + Schema.newBuilder() + .fromResolvedSchema( + ResolvedSchema.of( + Column.physical("a", DataTypes.BIGINT().notNull()), + Column.physical("b", DataTypes.VARCHAR(Integer.MAX_VALUE)), + Column.physical("c", DataTypes.INT()), + Column.physical("d", DataTypes.VARCHAR(Integer.MAX_VALUE)))) + .build(); + Map<String, String> options = new HashMap<>(); + options.put("connector", "COLLECTION"); + final CatalogTable catalogTable = + CatalogTable.of(tableSchema, "", Arrays.asList("b", "c"), options); + catalog.createTable(path3, catalogTable, true); + } + @Test void testCreateMaterializedTable() { final String sql = @@ -236,6 +263,47 @@ public class SqlMaterializedTableNodeToOperationConverterTest .isInstanceOf(ValidationException.class) .hasMessageContaining( "Partition column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd']."); + + final String sql2 = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "PARTITIONED BY (b, c)\n" + + "WITH (\n" + + " 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t3"; + assertThatThrownBy(() -> parse(sql2)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Column 'ds' referenced by materialized table option 'partition.fields.ds.date-formatter' isn't a partition column. Available partition columns: ['b', 'c']."); + + final String sql3 = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "WITH (\n" + + " 'partition.fields.c.date-formatter' = 'yyyy-MM-dd'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t3"; + assertThatThrownBy(() -> parse(sql3)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Column 'c' referenced by materialized table option 'partition.fields.c.date-formatter' isn't a partition column. Available partition columns: ['']."); + + final String sql4 = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "PARTITIONED BY (b, c)\n" + + "WITH (\n" + + " 'partition.fields.c.date-formatter' = 'yyyy-MM-dd'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t3"; + assertThatThrownBy(() -> parse(sql4)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Materialized table option 'partition.fields.c.date-formatter' only supports referring to char, varchar and string type partition column. Column c type is INT."); } @Test