This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5617d62c98f160af1a4a879cc2d40ef6209b2a32
Author: fengli <ldliu...@163.com>
AuthorDate: Tue Jun 25 19:24:51 2024 +0800

    [FLINK-35691][table] Fix partition.fields.#.date-formatter option verify 
logic
---
 .../MaterializedTableManager.java                  |  8 +--
 .../service/MaterializedTableStatementITCase.java  |  2 +-
 .../SqlCreateMaterializedTableConverter.java       | 49 +++++++++++++++-
 ...erializedTableNodeToOperationConverterTest.java | 68 ++++++++++++++++++++++
 4 files changed, 120 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index 036d610af81..a4cd306db60 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -581,7 +581,7 @@ public class MaterializedTableManager {
 
         try {
             LOG.info(
-                    "Begin to manually refreshing the materialized table {}, 
statement: {}",
+                    "Begin to refreshing the materialized table {}, statement: 
{}",
                     materializedTableIdentifier,
                     insertStatement);
             ResultFetcher resultFetcher =
@@ -610,7 +610,7 @@ public class MaterializedTableManager {
         } catch (Exception e) {
             throw new SqlExecutionException(
                     String.format(
-                            "Manually refreshing the materialized table %s 
occur exception.",
+                            "Refreshing the materialized table %s occur 
exception.",
                             materializedTableIdentifier),
                     e);
         }
@@ -697,8 +697,8 @@ public class MaterializedTableManager {
         if (!nonStringPartitionKeys.isEmpty()) {
             throw new ValidationException(
                     String.format(
-                            "Currently, manually refreshing materialized table 
only supports specifying char and string type"
-                                    + " partition keys. All specific partition 
keys with unsupported types are:\n\n%s",
+                            "Currently, refreshing materialized table only 
supports referring to char, varchar and string type"
+                                    + " partition keys. All specified 
partition keys in partition specs with unsupported types are:\n\n%s",
                             String.join("\n", nonStringPartitionKeys)));
         }
     }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 961a949ec83..f090a6cada7 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -444,7 +444,7 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                 .rootCause()
                 .isInstanceOf(ValidationException.class)
                 .hasMessage(
-                        "Currently, manually refreshing materialized table 
only supports specifying char and string type partition keys. All specific 
partition keys with unsupported types are:\n"
+                        "Currently, refreshing materialized table only 
supports referring to char, varchar and string type partition keys. All 
specified partition keys in partition specs with unsupported types are:\n"
                                 + "\n"
                                 + "ds2");
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
index b48070d50e7..de14add9692 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -37,6 +37,8 @@ import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTab
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import org.apache.flink.table.planner.utils.MaterializedTableUtils;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
@@ -46,9 +48,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER;
 import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD;
+import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS;
 import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
 import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
 
@@ -129,7 +134,12 @@ public class SqlCreateMaterializedTableConverter
                 
sqlCreateMaterializedTable.getPartitionKeyList().getList().stream()
                         .map(p -> ((SqlIdentifier) p).getSimple())
                         .collect(Collectors.toList());
-        verifyPartitioningColumnsExist(resolvedSchema, partitionKeys);
+        verifyPartitioningColumnsExist(
+                resolvedSchema,
+                partitionKeys,
+                options.keySet().stream()
+                        .filter(k -> k.startsWith(PARTITION_FIELDS))
+                        .collect(Collectors.toSet()));
 
         // verify and build primary key
         sqlCreateMaterializedTable
@@ -158,7 +168,10 @@ public class SqlCreateMaterializedTableConverter
     }
 
     private static void verifyPartitioningColumnsExist(
-            ResolvedSchema resolvedSchema, List<String> partitionKeys) {
+            ResolvedSchema resolvedSchema,
+            List<String> partitionKeys,
+            Set<String> partitionFieldOptions) {
+        // verify partition key whether exists
         for (String partitionKey : partitionKeys) {
             if (!resolvedSchema.getColumn(partitionKey).isPresent()) {
                 throw new ValidationException(
@@ -169,6 +182,38 @@ public class SqlCreateMaterializedTableConverter
                                         .collect(Collectors.joining("', '", 
"'", "'"))));
             }
         }
+
+        // verify partition key used by materialized table partition option
+        // partition.fields.#.date-formatter whether exist
+        for (String partitionOption : partitionFieldOptions) {
+            String partitionKey =
+                    partitionOption.substring(
+                            PARTITION_FIELDS.length() + 1,
+                            partitionOption.length() - 
(DATE_FORMATTER.length() + 1));
+            // partition key used in option partition.fields.#.date-formatter 
must be existed
+            if (!partitionKeys.contains(partitionKey)) {
+                throw new ValidationException(
+                        String.format(
+                                "Column '%s' referenced by materialized table 
option '%s' isn't a partition column. Available partition columns: [%s].",
+                                partitionKey,
+                                partitionOption,
+                                partitionKeys.stream()
+                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            }
+
+            // partition key used in option partition.fields.#.date-formatter 
must be string type
+            LogicalType partitionKeyType =
+                    
resolvedSchema.getColumn(partitionKey).get().getDataType().getLogicalType();
+            if (!partitionKeyType
+                    .getTypeRoot()
+                    .getFamilies()
+                    .contains(LogicalTypeFamily.CHARACTER_STRING)) {
+                throw new ValidationException(
+                        String.format(
+                                "Materialized table option '%s' only supports 
referring to char, varchar and string type partition column. Column %s type is 
%s.",
+                                partitionOption, partitionKey, 
partitionKeyType.asSummaryString()));
+            }
+        }
     }
 
     private static void verifyAndBuildPrimaryKey(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 77eb2af1e9d..877276730e5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -22,8 +22,14 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
@@ -33,6 +39,7 @@ import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTable
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -47,6 +54,26 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class SqlMaterializedTableNodeToOperationConverterTest
         extends SqlNodeToOperationConversionTestBase {
 
+    @BeforeEach
+    public void before() throws TableAlreadyExistException, 
DatabaseNotExistException {
+        super.before();
+        final ObjectPath path3 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t3");
+        final Schema tableSchema =
+                Schema.newBuilder()
+                        .fromResolvedSchema(
+                                ResolvedSchema.of(
+                                        Column.physical("a", 
DataTypes.BIGINT().notNull()),
+                                        Column.physical("b", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                                        Column.physical("c", DataTypes.INT()),
+                                        Column.physical("d", 
DataTypes.VARCHAR(Integer.MAX_VALUE))))
+                        .build();
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "COLLECTION");
+        final CatalogTable catalogTable =
+                CatalogTable.of(tableSchema, "", Arrays.asList("b", "c"), 
options);
+        catalog.createTable(path3, catalogTable, true);
+    }
+
     @Test
     void testCreateMaterializedTable() {
         final String sql =
@@ -236,6 +263,47 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "Partition column 'e' not defined in the query schema. 
Available columns: ['a', 'b', 'c', 'd'].");
+
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "PARTITIONED BY (b, c)\n"
+                        + "WITH (\n"
+                        + " 'partition.fields.ds.date-formatter' = 
'yyyy-MM-dd'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t3";
+        assertThatThrownBy(() -> parse(sql2))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column 'ds' referenced by materialized table option 
'partition.fields.ds.date-formatter' isn't a partition column. Available 
partition columns: ['b', 'c'].");
+
+        final String sql3 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "WITH (\n"
+                        + " 'partition.fields.c.date-formatter' = 
'yyyy-MM-dd'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t3";
+        assertThatThrownBy(() -> parse(sql3))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column 'c' referenced by materialized table option 
'partition.fields.c.date-formatter' isn't a partition column. Available 
partition columns: [''].");
+
+        final String sql4 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "PARTITIONED BY (b, c)\n"
+                        + "WITH (\n"
+                        + " 'partition.fields.c.date-formatter' = 
'yyyy-MM-dd'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t3";
+        assertThatThrownBy(() -> parse(sql4))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Materialized table option 
'partition.fields.c.date-formatter' only supports referring to char, varchar 
and string type partition column. Column c type is INT.");
     }
 
     @Test

Reply via email to