This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 753c1af691c [FLINK-38532][table] Make `FRESHNESS` Optional for
Materialized Tables
753c1af691c is described below
commit 753c1af691c25e375cf80dc277cf026758cfed8d
Author: Ramin Gharib <[email protected]>
AuthorDate: Thu Nov 6 09:17:01 2025 +0100
[FLINK-38532][table] Make `FRESHNESS` Optional for Materialized Tables
---
.../docs/dev/table/materialized-table/overview.md | 5 +-
.../dev/table/materialized-table/statements.md | 91 +++++-----
.../docs/dev/table/materialized-table/overview.md | 4 +-
.../dev/table/materialized-table/statements.md | 55 +++---
.../materialized_table_config_configuration.html | 12 ++
.../MaterializedTableManager.java | 42 ++---
.../service/operation/OperationExecutor.java | 3 +-
.../service/MaterializedTableStatementITCase.java | 189 ++++++++++++++++++++-
.../src/main/codegen/includes/parserImpls.ftl | 18 +-
.../sql/parser/ddl/SqlCreateMaterializedTable.java | 22 ++-
.../MaterializedTableStatementParserTest.java | 51 +++---
.../api/config/MaterializedTableConfigOptions.java | 16 ++
.../apache/flink/table/catalog/CatalogManager.java | 51 +++++-
.../table/api/internal/ShowCreateUtilTest.java | 4 +-
.../catalog/CatalogBaseTableResolutionTest.java | 4 +-
.../table/catalog/CatalogMaterializedTable.java | 26 +--
.../catalog/DefaultCatalogMaterializedTable.java | 8 +-
.../catalog/DefaultMaterializedTableEnricher.java | 115 +++++++++++++
.../flink/table/catalog/IntervalFreshness.java | 183 +++++++++++++++++++-
.../table/catalog/MaterializedTableEnricher.java | 47 +++++
.../catalog/MaterializedTableEnrichmentResult.java | 50 ++++++
.../catalog/ResolvedCatalogMaterializedTable.java | 39 +++--
.../flink/table/utils/IntervalFreshnessUtils.java | 146 ----------------
.../table/catalog/CatalogPropertiesUtilTest.java | 5 +-
.../IntervalFreshnessTest.java} | 54 +++---
.../SqlCreateMaterializedTableConverter.java | 50 +++---
.../planner/utils/MaterializedTableUtils.java | 27 +--
.../operations/SqlDdlToOperationConverterTest.java | 127 +++++++++-----
...erializedTableNodeToOperationConverterTest.java | 136 +++++++++++++--
.../catalog/TestFileSystemCatalogTest.java | 4 +-
30 files changed, 1154 insertions(+), 430 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/materialized-table/overview.md
b/docs/content.zh/docs/dev/table/materialized-table/overview.md
index 9ce5e9ba34f..d0499bcf1c9 100644
--- a/docs/content.zh/docs/dev/table/materialized-table/overview.md
+++ b/docs/content.zh/docs/dev/table/materialized-table/overview.md
@@ -34,7 +34,9 @@ under the License.
## 数据新鲜度
-数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度内刷新。
+数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink
尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度目标内刷新。
+
+Data freshness is optional when creating a materialized table. If not
specified, the system uses the default freshness based on the refresh mode:
[materialized-table.default-freshness.continuous]({{< ref
"docs/dev/table/config" >}}#materialized-table-default-freshness-continuous)
(default: 3 minutes) for CONTINUOUS mode, or
[materialized-table.default-freshness.full]({{< ref "docs/dev/table/config"
>}}#materialized-table-default-freshness-full) (default: 1 hour) for FULL mode.
数据新鲜度是物化表的一个关键属性,具有两个主要作用:
- **确定刷新模式**:目前有持续模式和全量模式。关于如何确定刷新模式的详细信息,请参阅
[materialized-table.refresh-mode.freshness-threshold]({{< ref
"docs/dev/table/config"
>}}#materialized-table-refresh-mode-freshness-threshold) 配置项。
@@ -59,4 +61,3 @@ under the License.
## Schema
物化表的 `Schema` 定义与普通表相同,可以声明主键和分区字段。其列名和类型会从相应的查询中推导,用户无法手动指定。
-
diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md
b/docs/content.zh/docs/dev/table/materialized-table/statements.md
index 6e0e641c968..a27bce3093f 100644
--- a/docs/content.zh/docs/dev/table/materialized-table/statements.md
+++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md
@@ -35,21 +35,21 @@ Flink SQL 目前支持以下物化表操作:
```
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-
+
[ ([ <table_constraint> ]) ]
-
+
[COMMENT table_comment]
-
+
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-
+
[WITH (key1=val1, key2=val2, ...)]
-
+
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-
+
[REFRESH_MODE = { CONTINUOUS | FULL }]
-
+
AS <select_statement>
-
+
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
```
@@ -69,7 +69,7 @@ AS <select_statement>
CREATE MATERIALIZED TABLE my_materialized_table
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
- AS SELECT
+ AS SELECT
ds
FROM
...
@@ -103,9 +103,11 @@ CREATE MATERIALIZED TABLE my_materialized_table
`FRESHNESS` 用于指定物化表的数据新鲜度。
+`FRESHNESS` is optional. When omitted, the system uses the default freshness
based on the refresh mode: `materialized-table.default-freshness.continuous`
(default: 3 minutes) for CONTINUOUS mode, or
`materialized-table.default-freshness.full` (default: 1 hour) for FULL mode.
+
**数据新鲜度与刷新模式关系**
-数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config"
>}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref
"docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。
+数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。When not specified, it uses the default value from
configuration based on the refresh mode. 它有两个作用,首先通过[配置]({{< ref
"docs/dev/table/config"
>}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref
"docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。
**FRESHNESS 参数详解**
@@ -128,6 +130,22 @@ FRESHNESS = INTERVAL '1' HOUR
FRESHNESS = INTERVAL '1' DAY
```
+**Default FRESHNESS Example:**
+(Assuming `materialized-table.default-freshness.continuous` is 3 minutes,
`materialized-table.default-freshness.full` is 1 hour, and
`materialized-table.refresh-mode.freshness-threshold` is 30 minutes)
+
+```sql
+-- FRESHNESS is omitted, uses the configured default of 3 minutes for
CONTINUOUS mode
+-- The corresponding refresh pipeline is a streaming job with a checkpoint
interval of 3 minutes
+CREATE MATERIALIZED TABLE my_materialized_table
+ AS SELECT * FROM source_table;
+
+-- FRESHNESS is omitted and FULL mode is explicitly specified, uses the
configured default of 1 hour
+-- The corresponding refresh pipeline is a scheduled workflow with a schedule
cycle of 1 hour
+CREATE MATERIALIZED TABLE my_materialized_table_full
+ REFRESH_MODE = FULL
+ AS SELECT * FROM source_table;
+```
+
**不合法的 `FRESHNESS` 示例:**
```sql
@@ -147,6 +165,7 @@ FRESHNESS = INTERVAL '5' HOUR
```
<span class="label label-danger">注意</span>
+- If FRESHNESS is not specified, the table will use the default freshness
interval based on the refresh mode:
`materialized-table.default-freshness.continuous` (default: 3 minutes) for
CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1
hour) for FULL mode.
- 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。
- 在持续模式下,数据新鲜度和 `checkpoint` 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 `checkpoint`
性能,建议[开启 Changelog]({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。
- 在全量模式下,数据新鲜度会转换为 `cron` 表达式,因此目前仅支持在预定义时间间隔单位内的新鲜度间隔,这种设计确保了与 `cron`
表达式语义的一致性。具体支持以下新鲜度间隔:
@@ -168,14 +187,14 @@ CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '1' HOUR
REFRESH_MODE = CONTINUOUS
AS SELECT
- ...
+ ...
-- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' MINUTE
REFRESH_MODE = FULL
AS SELECT
- ...
+ ...
```
## AS <select_statement>
@@ -204,22 +223,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_continuous
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '10' SECOND
- AS
- SELECT
+ AS
+ SELECT
k.ds,
k.user_id,
COUNT(*) AS event_count,
SUM(k.amount) AS total_amount,
MAX(u.age) AS max_age
- FROM
+ FROM
kafka_catalog.db1.kafka_table k
- JOIN
+ JOIN
user_catalog.db1.user_table u
- ON
+ ON
k.user_id = u.user_id
- WHERE
+ WHERE
k.event_type = 'purchase'
- GROUP BY
+ GROUP BY
k.ds, k.user_id
```
@@ -233,22 +252,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' HOUR
- AS
- SELECT
+ AS
+ SELECT
p.ds,
p.product_id,
p.product_name,
AVG(s.sale_price) AS avg_sale_price,
SUM(s.quantity) AS total_quantity
- FROM
+ FROM
paimon_catalog.db1.product_table p
- LEFT JOIN
+ LEFT JOIN
paimon_catalog.db1.sales_table s
- ON
+ ON
p.product_id = s.product_id
- WHERE
+ WHERE
p.category = 'electronics'
- GROUP BY
+ GROUP BY
p.ds, p.product_id, p.product_name
```
@@ -276,7 +295,7 @@ ALTER MATERIALIZED TABLE
[catalog_name.][db_name.]table_name SUSPEND
`SUSPEND` 用于暂停物化表的后台刷新管道。
-**示例:**
+**示例:**
```sql
-- 暂停前指定 SAVEPOINT 路径
@@ -297,7 +316,7 @@ ALTER MATERIALIZED TABLE
[catalog_name.][db_name.]table_name RESUME [WITH (key1=
`RESUME` 用于恢复物化表的刷新管道。在恢复时,可以通过 `WITH`
子句动态指定物化表的参数,该参数仅对当前恢复的刷新管道生效,并不会持久化到物化表中。
-**示例:**
+**示例:**
```sql
-- 恢复指定的物化表
@@ -358,21 +377,21 @@ ALTER MATERIALIZED TABLE
[catalog_name.][db_name.]table_name AS <select_statemen
-- 原始物化表定义
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
- AS
- SELECT
+ AS
+ SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
- FROM
+ FROM
kafka_catalog.db1.events
- WHERE
+ WHERE
event_type = 'purchase'
- GROUP BY
+ GROUP BY
user_id;
-- 修改现有物化表的查询
ALTER MATERIALIZED TABLE my_materialized_table
-AS SELECT
+AS SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
@@ -403,7 +422,3 @@ DROP MATERIALIZED TABLE [IF EXISTS]
[catalog_name.][database_name.]table_name
-- 删除指定的物化表
DROP MATERIALIZED TABLE IF EXISTS my_materialized_table;
```
-
-
-
-
diff --git a/docs/content/docs/dev/table/materialized-table/overview.md
b/docs/content/docs/dev/table/materialized-table/overview.md
index 7b78da97a9c..d41145e494d 100644
--- a/docs/content/docs/dev/table/materialized-table/overview.md
+++ b/docs/content/docs/dev/table/materialized-table/overview.md
@@ -34,7 +34,9 @@ Materialized Table encompass the following core concepts:
Data Freshness, Refres
## Data Freshness
-Data freshness defines the maximum amount of time that the materialized
table’s content should lag behind updates to the base tables. Freshness is not
a guarantee. Instead, it is a target that Flink attempts to meet. Data in
materialized table is refreshed as closely as possible within the freshness.
+Data freshness defines the maximum amount of time that the materialized
table's content should lag behind updates to the base tables. Freshness is not
a guarantee. Instead, it is a target that Flink attempts to meet. The data in
materialized table is refreshed as closely as possible within the freshness
target.
+
+Data freshness is optional when creating a materialized table. If not
specified, the system uses the default freshness based on the refresh mode:
[materialized-table.default-freshness.continuous]({{< ref
"docs/dev/table/config" >}}#materialized-table-default-freshness-continuous)
(default: 3 minutes) for CONTINUOUS mode, or
[materialized-table.default-freshness.full]({{< ref "docs/dev/table/config"
>}}#materialized-table-default-freshness-full) (default: 1 hour) for FULL mode.
Data freshness is a crucial attribute of a materialized table, serving two
main purposes:
- **Determining the Refresh Mode**. Currently, there are CONTINUOUS and FULL
modes. For details on how to determine the refresh mode, refer to the
[materialized-table.refresh-mode.freshness-threshold]({{< ref
"docs/dev/table/config"
>}}#materialized-table-refresh-mode-freshness-threshold) configuration item.
diff --git a/docs/content/docs/dev/table/materialized-table/statements.md
b/docs/content/docs/dev/table/materialized-table/statements.md
index a5c77453977..07784bb3ede 100644
--- a/docs/content/docs/dev/table/materialized-table/statements.md
+++ b/docs/content/docs/dev/table/materialized-table/statements.md
@@ -44,7 +44,7 @@ CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[WITH (key1=val1, key2=val2, ...)]
-FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
+[FRESHNESS = INTERVAL '<num>' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }]
[REFRESH_MODE = { CONTINUOUS | FULL }]
@@ -69,7 +69,7 @@ AS <select_statement>
CREATE MATERIALIZED TABLE my_materialized_table
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
- AS SELECT
+ AS SELECT
ds
FROM
...
@@ -103,9 +103,11 @@ As shown in the above example, we specified the
date-formatter option for the `d
`FRESHNESS` defines the data freshness of a materialized table.
+`FRESHNESS` is optional. When omitted, the system uses the default freshness
based on the refresh mode: `materialized-table.default-freshness.continuous`
(default: 3 minutes) for CONTINUOUS mode, or
`materialized-table.default-freshness.full` (default: 1 hour) for FULL mode.
+
**FRESHNESS and Refresh Mode Relationship**
-FRESHNESS defines the maximum amount of time that the materialized table’s
content should lag behind updates to the base tables. It does two things,
firstly it determines the [refresh mode]({{< ref
"docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the
materialized table through [configuration]({{< ref "docs/dev/table/config"
>}}#materialized-table-refresh-mode-freshness-threshold), followed by
determines the data refresh frequency to meet the actual data freshness
requirements.
+FRESHNESS defines the maximum amount of time that the materialized table's
content should lag behind updates to the base tables. When not specified, it
uses the default value from configuration based on the refresh mode. It does
two things: firstly it determines the [refresh mode]({{< ref
"docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the
materialized table through [configuration]({{< ref "docs/dev/table/config"
>}}#materialized-table-refresh-mode-freshness-threshold), [...]
**Explanation of FRESHNESS Parameter**
@@ -128,6 +130,22 @@ FRESHNESS = INTERVAL '1' HOUR
FRESHNESS = INTERVAL '1' DAY
```
+**Default FRESHNESS Example:**
+(Assuming `materialized-table.default-freshness.continuous` is 3 minutes,
`materialized-table.default-freshness.full` is 1 hour, and
`materialized-table.refresh-mode.freshness-threshold` is 30 minutes)
+
+```sql
+-- FRESHNESS is omitted, uses the configured default of 3 minutes for
CONTINUOUS mode
+-- The corresponding refresh pipeline is a streaming job with a checkpoint
interval of 3 minutes
+CREATE MATERIALIZED TABLE my_materialized_table
+ AS SELECT * FROM source_table;
+
+-- FRESHNESS is omitted and FULL mode is explicitly specified, uses the
configured default of 1 hour
+-- The corresponding refresh pipeline is a scheduled workflow with a schedule
cycle of 1 hour
+CREATE MATERIALIZED TABLE my_materialized_table_full
+ REFRESH_MODE = FULL
+ AS SELECT * FROM source_table;
+```
+
**Invalid `FRESHNESS` Examples:**
```sql
@@ -147,6 +165,7 @@ FRESHNESS = INTERVAL '5' HOUR
```
<span class="label label-danger">Note</span>
+- If FRESHNESS is not specified, the table will use the default freshness
interval based on the refresh mode:
`materialized-table.default-freshness.continuous` (default: 3 minutes) for
CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1
hour) for FULL mode.
- The materialized table data will be refreshed as closely as possible within
the defined freshness but cannot guarantee complete satisfaction.
- In CONTINUOUS mode, setting a data freshness interval that is too short can
impact job performance as it aligns with the checkpoint interval. To optimize
checkpoint performance, consider [enabling-changelog]({{< ref
"docs/ops/state/state_backends" >}}#incremental-checkpoints).
- In FULL mode, data freshness must be translated into a cron expression,
consequently, only freshness intervals within predefined time spans are
presently accommodated, this design ensures alignment with cron's capabilities.
Specifically, support for the following freshness:
@@ -168,14 +187,14 @@ CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '1' HOUR
REFRESH_MODE = CONTINUOUS
AS SELECT
- ...
+ ...
-- The refresh mode of the created materialized table is FULL, and the job's
schedule cycle is 10 minutes.
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' MINUTE
REFRESH_MODE = FULL
AS SELECT
- ...
+ ...
```
## AS <select_statement>
@@ -204,22 +223,21 @@ CREATE MATERIALIZED TABLE my_materialized_table_continuous
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '10' SECOND
- AS
- SELECT
+ AS SELECT
k.ds,
k.user_id,
COUNT(*) AS event_count,
SUM(k.amount) AS total_amount,
MAX(u.age) AS max_age
- FROM
+ FROM
kafka_catalog.db1.kafka_table k
- JOIN
+ JOIN
user_catalog.db1.user_table u
- ON
+ ON
k.user_id = u.user_id
- WHERE
+ WHERE
k.event_type = 'purchase'
- GROUP BY
+ GROUP BY
k.ds, k.user_id
```
@@ -233,22 +251,21 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' HOUR
- AS
- SELECT
+ AS SELECT
p.ds,
p.product_id,
p.product_name,
AVG(s.sale_price) AS avg_sale_price,
SUM(s.quantity) AS total_quantity
- FROM
+ FROM
paimon_catalog.db1.product_table p
- LEFT JOIN
+ LEFT JOIN
paimon_catalog.db1.sales_table s
- ON
+ ON
p.product_id = s.product_id
- WHERE
+ WHERE
p.category = 'electronics'
- GROUP BY
+ GROUP BY
p.ds, p.product_id, p.product_name
```
diff --git
a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
index d5829bf3224..f3cb3972a73 100644
---
a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
@@ -8,6 +8,18 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>materialized-table.default-freshness.continuous</h5><br>
<span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span></td>
+ <td style="word-wrap: break-word;">3 min</td>
+ <td>Duration</td>
+ <td>The default freshness interval for continuous refresh mode
when the FRESHNESS clause is omitted in a materialized table definition.</td>
+ </tr>
+ <tr>
+ <td><h5>materialized-table.default-freshness.full</h5><br> <span
class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span></td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>The default freshness interval for full refresh mode when the
FRESHNESS clause is omitted in a materialized table definition.</td>
+ </tr>
<tr>
<td><h5>materialized-table.refresh-mode.freshness-threshold</h5><br> <span
class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">30 min</td>
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index 323e0ca17b7..bafb3650711 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -83,6 +83,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URLClassLoader;
+import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
@@ -108,13 +109,12 @@ import static
org.apache.flink.table.api.config.MaterializedTableConfigOptions.P
import static
org.apache.flink.table.api.config.MaterializedTableConfigOptions.SCHEDULE_TIME_DATE_FORMATTER_DEFAULT;
import static
org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;
import static
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+import static
org.apache.flink.table.catalog.IntervalFreshness.convertFreshnessToCron;
import static
org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil.WORKFLOW_SCHEDULER_PREFIX;
import static
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig;
import static
org.apache.flink.table.gateway.service.utils.Constants.CLUSTER_INFO;
import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
import static
org.apache.flink.table.utils.DateTimeUtils.formatTimestampStringWithOffset;
-import static
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
-import static
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
/** Manager is responsible for execute the {@link MaterializedTableOperation}.
*/
@Internal
@@ -166,8 +166,7 @@ public class MaterializedTableManager {
public ResultFetcher callMaterializedTableOperation(
OperationExecutor operationExecutor,
OperationHandle handle,
- MaterializedTableOperation op,
- String statement) {
+ MaterializedTableOperation op) {
if (op instanceof CreateMaterializedTableOperation) {
return callCreateMaterializedTableOperation(
operationExecutor, handle,
(CreateMaterializedTableOperation) op);
@@ -260,9 +259,8 @@ public class MaterializedTableManager {
ResolvedCatalogMaterializedTable catalogMaterializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();
- // convert duration to cron expression
- String cronExpression =
-
convertFreshnessToCron(catalogMaterializedTable.getDefinitionFreshness());
+ final IntervalFreshness freshness =
catalogMaterializedTable.getDefinitionFreshness();
+ String cronExpression = convertFreshnessToCron(freshness);
// create full refresh job
CreateRefreshWorkflow createRefreshWorkflow =
new CreatePeriodicRefreshWorkflow(
@@ -306,7 +304,7 @@ public class MaterializedTableManager {
OperationHandle handle,
AlterMaterializedTableSuspendOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
- CatalogMaterializedTable materializedTable =
+ ResolvedCatalogMaterializedTable materializedTable =
getCatalogMaterializedTable(operationExecutor,
tableIdentifier);
// Initialization phase doesn't support resume operation.
@@ -420,7 +418,7 @@ public class MaterializedTableManager {
OperationHandle handle,
AlterMaterializedTableResumeOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
- CatalogMaterializedTable catalogMaterializedTable =
+ ResolvedCatalogMaterializedTable catalogMaterializedTable =
getCatalogMaterializedTable(operationExecutor,
tableIdentifier);
// Initialization phase doesn't support resume operation.
@@ -456,7 +454,7 @@ public class MaterializedTableManager {
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier tableIdentifier,
- CatalogMaterializedTable catalogMaterializedTable,
+ ResolvedCatalogMaterializedTable catalogMaterializedTable,
Map<String, String> dynamicOptions) {
ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(
@@ -562,9 +560,10 @@ public class MaterializedTableManager {
.getSessionContext()
.getSessionConf()
.contains(CheckpointingOptions.CHECKPOINTING_INTERVAL)) {
- customConfig.set(
- CheckpointingOptions.CHECKPOINTING_INTERVAL,
- catalogMaterializedTable.getFreshness());
+
+ final Duration freshness =
+
validateAndGetIntervalFreshness(catalogMaterializedTable).toDuration();
+ customConfig.set(CheckpointingOptions.CHECKPOINTING_INTERVAL,
freshness);
}
String insertStatement =
@@ -727,7 +726,7 @@ public class MaterializedTableManager {
SCHEDULE_TIME_DATE_FORMATTER_DEFAULT,
partFieldFormatter,
TimeZone.getTimeZone(localZoneId),
- -convertFreshnessToDuration(freshness).toMillis());
+ -freshness.toDuration().toMillis());
if (partFiledValue == null) {
throw new SqlExecutionException(
String.format(
@@ -818,7 +817,7 @@ public class MaterializedTableManager {
OperationHandle handle,
AlterMaterializedTableChangeOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
- CatalogMaterializedTable oldMaterializedTable =
+ ResolvedCatalogMaterializedTable oldMaterializedTable =
getCatalogMaterializedTable(operationExecutor,
tableIdentifier);
if (CatalogMaterializedTable.RefreshMode.FULL ==
oldMaterializedTable.getRefreshMode()) {
@@ -995,12 +994,12 @@ public class MaterializedTableManager {
}
}
- CatalogMaterializedTable materializedTable =
+ ResolvedCatalogMaterializedTable materializedTable =
getCatalogMaterializedTable(operationExecutor,
tableIdentifier);
- CatalogMaterializedTable.RefreshMode refreshMode =
materializedTable.getRefreshMode();
CatalogMaterializedTable.RefreshStatus refreshStatus =
materializedTable.getRefreshStatus();
if (CatalogMaterializedTable.RefreshStatus.ACTIVATED == refreshStatus
|| CatalogMaterializedTable.RefreshStatus.SUSPENDED ==
refreshStatus) {
+ CatalogMaterializedTable.RefreshMode refreshMode =
materializedTable.getRefreshMode();
if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode) {
deleteRefreshWorkflow(tableIdentifier, materializedTable);
} else if (CatalogMaterializedTable.RefreshMode.CONTINUOUS ==
refreshMode
@@ -1008,8 +1007,7 @@ public class MaterializedTableManager {
cancelContinuousRefreshJob(
operationExecutor, handle, tableIdentifier,
materializedTable);
}
- } else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
- == materializedTable.getRefreshStatus()) {
+ } else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING ==
refreshStatus) {
throw new ValidationException(
String.format(
"Current refresh status of materialized table %s
is initializing, skip the drop operation.",
@@ -1375,4 +1373,10 @@ public class MaterializedTableManager {
return Optional.empty();
}
}
+
+ private static IntervalFreshness validateAndGetIntervalFreshness(
+ final CatalogMaterializedTable catalogMaterializedTable) {
+ return
Optional.ofNullable(catalogMaterializedTable.getDefinitionFreshness())
+ .orElseThrow(() -> new SqlExecutionException("Freshness cannot
be null"));
+ }
}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index acf5ac7d772..5735abd6dee 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -556,8 +556,7 @@ public class OperationExecutor {
return sessionContext
.getSessionState()
.materializedTableManager
- .callMaterializedTableOperation(
- this, handle, (MaterializedTableOperation) op,
statement);
+ .callMaterializedTableOperation(this, handle,
(MaterializedTableOperation) op);
} else {
return callOperation(tableEnv, handle, op);
}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 2e2499e75ae..de73723176b 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -142,7 +143,6 @@ class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatemen
Column.physical("pv",
DataTypes.INT().notNull())));
assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
-
assertThat(actualMaterializedTable.getFreshness()).isEqualTo(Duration.ofSeconds(30));
assertThat(actualMaterializedTable.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
assertThat(actualMaterializedTable.getRefreshMode())
@@ -178,6 +178,191 @@ class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatemen
ObjectIdentifier.of(fileSystemCatalogName,
TEST_DEFAULT_DATABASE, "users_shops"));
}
+ @Test
+ void testCreateMaterializedTableInFullModeWithoutFreshness() throws
Exception {
+ String materializedTableDDL =
+ "CREATE MATERIALIZED TABLE users_shops"
+ + " PARTITIONED BY (ds)\n"
+ + " WITH(\n"
+ + " 'format' = 'debezium-json'\n"
+ + " )\n"
+ + " REFRESH_MODE = FULL\n"
+ + " AS SELECT \n"
+ + " user_id,\n"
+ + " shop_id,\n"
+ + " ds,\n"
+ + " SUM (payment_amount_cents) AS
payed_buy_fee_sum,\n"
+ + " SUM (1) AS pv\n"
+ + " FROM (\n"
+ + " SELECT user_id, shop_id,
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM
datagenSource"
+ + " ) AS tmp\n"
+ + " GROUP BY (user_id, shop_id, ds)";
+ OperationHandle materializedTableHandle =
+ service.executeStatement(
+ sessionHandle, materializedTableDDL, -1, new
Configuration());
+ awaitOperationTermination(service, sessionHandle,
materializedTableHandle);
+
+ // validate materialized table: schema, refresh mode, refresh status,
refresh handler,
+ // doesn't check the data because it generates randomly.
+ ResolvedCatalogMaterializedTable actualMaterializedTable =
+ (ResolvedCatalogMaterializedTable)
+ service.getTable(
+ sessionHandle,
+ ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "users_shops"));
+
+ // Expected schema
+ ResolvedSchema expectedSchema =
+ ResolvedSchema.of(
+ Arrays.asList(
+ Column.physical("user_id", DataTypes.BIGINT()),
+ Column.physical("shop_id", DataTypes.BIGINT()),
+ Column.physical("ds", DataTypes.STRING()),
+ Column.physical("payed_buy_fee_sum",
DataTypes.BIGINT()),
+ Column.physical("pv",
DataTypes.INT().notNull())));
+
+
assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
+ assertThat(actualMaterializedTable.getLogicalRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);
+
assertThat(actualMaterializedTable.getRefreshMode()).isEqualTo(RefreshMode.FULL);
+ assertThat(actualMaterializedTable.getRefreshStatus())
+ .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
+
assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
+
+ // verify refresh handler
+ byte[] serializedHandler =
actualMaterializedTable.getSerializedRefreshHandler();
+ EmbeddedRefreshHandler embeddedRefreshHandler =
+ EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+ serializedHandler, getClass().getClassLoader());
+ assertThat(embeddedRefreshHandler.getWorkflowName())
+ .isEqualTo(
+ "quartz_job_"
+ + ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "users_shops")
+ .asSerializableString());
+
+ EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+ SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+ .getSqlGatewayRestEndpoint()
+ .getQuartzScheduler();
+ JobKey jobKey =
+ new JobKey(
+ embeddedRefreshHandler.getWorkflowName(),
+ embeddedRefreshHandler.getWorkflowGroup());
+
+ // verify the job is created
+
assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();
+
+ // verify initialization conf
+ JobDetail jobDetail =
embeddedWorkflowScheduler.getQuartzScheduler().getJobDetail(jobKey);
+ String workflowJsonStr =
jobDetail.getJobDataMap().getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ assertThat(workflowInfo.getInitConfig())
+ .containsEntry("k1", "v1")
+ .containsEntry("k2", "v2")
+ .containsKey("sql-gateway.endpoint.rest.address")
+ .containsKey("sql-gateway.endpoint.rest.port")
+ .containsKey("table.catalog-store.kind")
+ .containsKey("table.catalog-store.file.path")
+ .doesNotContainKey(WORKFLOW_SCHEDULER_TYPE.key())
+ .doesNotContainKey(RESOURCES_DOWNLOAD_DIR.key());
+
+ // drop the materialized table
+ dropMaterializedTable(
+ ObjectIdentifier.of(fileSystemCatalogName,
TEST_DEFAULT_DATABASE, "users_shops"));
+ }
+
+ @Test
+ void
testCreateMaterializedTableInContinuousModeWithoutFreshnessAndRefreshMode()
+ throws Exception {
+ String materializedTableDDL =
+ "CREATE MATERIALIZED TABLE users_shops"
+ + " PARTITIONED BY (ds)\n"
+ + " WITH(\n"
+ + " 'format' = 'debezium-json'\n"
+ + " )\n"
+ + " AS SELECT \n"
+ + " user_id,\n"
+ + " shop_id,\n"
+ + " ds,\n"
+ + " SUM (payment_amount_cents) AS
payed_buy_fee_sum,\n"
+ + " SUM (1) AS pv\n"
+ + " FROM (\n"
+ + " SELECT user_id, shop_id,
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM
datagenSource"
+ + " ) AS tmp\n"
+ + " GROUP BY (user_id, shop_id, ds)";
+ OperationHandle materializedTableHandle =
+ service.executeStatement(
+ sessionHandle, materializedTableDDL, -1, new
Configuration());
+ awaitOperationTermination(service, sessionHandle,
materializedTableHandle);
+
+ // validate materialized table: schema, refresh mode, refresh status,
refresh handler,
+ // doesn't check the data because it generates randomly.
+ ResolvedCatalogMaterializedTable actualMaterializedTable =
+ (ResolvedCatalogMaterializedTable)
+ service.getTable(
+ sessionHandle,
+ ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "users_shops"));
+
+ // Expected schema
+ ResolvedSchema expectedSchema =
+ ResolvedSchema.of(
+ Arrays.asList(
+ Column.physical("user_id", DataTypes.BIGINT()),
+ Column.physical("shop_id", DataTypes.BIGINT()),
+ Column.physical("ds", DataTypes.STRING()),
+ Column.physical("payed_buy_fee_sum",
DataTypes.BIGINT()),
+ Column.physical("pv",
DataTypes.INT().notNull())));
+
+
assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
+ assertThat(actualMaterializedTable.getDefinitionFreshness())
+ .isEqualTo(IntervalFreshness.ofMinute("3"));
+ assertThat(actualMaterializedTable.getLogicalRefreshMode())
+
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+ assertThat(actualMaterializedTable.getRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+ assertThat(actualMaterializedTable.getRefreshStatus())
+ .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
+
assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
+
+ ContinuousRefreshHandler activeRefreshHandler =
+ ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+ actualMaterializedTable.getSerializedRefreshHandler(),
+ getClass().getClassLoader());
+
+ waitUntilAllTasksAreRunning(
+ restClusterClient,
JobID.fromHexString(activeRefreshHandler.getJobId()));
+
+ // verify the background job is running
+ String describeJobDDL = String.format("DESCRIBE JOB '%s'",
activeRefreshHandler.getJobId());
+ OperationHandle describeJobHandle =
+ service.executeStatement(sessionHandle, describeJobDDL, -1,
new Configuration());
+ awaitOperationTermination(service, sessionHandle, describeJobHandle);
+ List<RowData> jobResults = fetchAllResults(service, sessionHandle,
describeJobHandle);
+
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
+
+ // get checkpoint interval
+ long checkpointInterval =
+ getCheckpointIntervalConfig(restClusterClient,
activeRefreshHandler.getJobId());
+
+ // default checkpoint interval is 3 minutes
+ final int expectedCheckpointInterval = 60 * 3 * 1000;
+ assertThat(checkpointInterval).isEqualTo(expectedCheckpointInterval);
+
+ // drop the materialized table
+ dropMaterializedTable(
+ ObjectIdentifier.of(fileSystemCatalogName,
TEST_DEFAULT_DATABASE, "users_shops"));
+ }
+
@Test
void
testCreateMaterializedTableInContinuousModeWithCustomCheckpointInterval()
throws Exception {
@@ -348,7 +533,7 @@ class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatemen
}
@Test
- void testCreateMaterializedTableFailedInInContinuousMode() throws
Exception {
+ void testCreateMaterializedTableFailedInInContinuousMode() {
// create a materialized table with invalid SQL
String materializedTableDDL =
"CREATE MATERIALIZED TABLE users_shops"
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 5717da2e51f..9200c1dc2ac 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1907,16 +1907,18 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean
replace, boolean isTemporar
<WITH>
propertyList = Properties()
]
- <FRESHNESS> <EQ>
- freshness = Expression(ExprContext.ACCEPT_NON_QUERY)
- {
- if (!(freshness instanceof SqlIntervalLiteral))
+ [
+ <FRESHNESS> <EQ>
+ freshness = Expression(ExprContext.ACCEPT_NON_QUERY)
{
- throw SqlUtil.newContextException(
- getPos(),
- ParserResource.RESOURCE.unsupportedFreshnessType());
+ if (!(freshness instanceof SqlIntervalLiteral))
+ {
+ throw SqlUtil.newContextException(
+ getPos(),
+ ParserResource.RESOURCE.unsupportedFreshnessType());
+ }
}
- }
+ ]
[
<REFRESH_MODE> <EQ>
(
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index 5a9245c5dd0..8c7fe2d5089 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -61,7 +61,7 @@ public class SqlCreateMaterializedTable extends SqlCreate {
private final SqlNodeList propertyList;
- private final SqlIntervalLiteral freshness;
+ private final @Nullable SqlIntervalLiteral freshness;
private final @Nullable SqlLiteral refreshMode;
@@ -75,7 +75,7 @@ public class SqlCreateMaterializedTable extends SqlCreate {
@Nullable SqlDistribution distribution,
SqlNodeList partitionKeyList,
SqlNodeList propertyList,
- SqlIntervalLiteral freshness,
+ @Nullable SqlIntervalLiteral freshness,
@Nullable SqlLiteral refreshMode,
SqlNode asQuery) {
super(OPERATOR, pos, false, false);
@@ -86,7 +86,7 @@ public class SqlCreateMaterializedTable extends SqlCreate {
this.partitionKeyList =
requireNonNull(partitionKeyList, "partitionKeyList should not
be null");
this.propertyList = requireNonNull(propertyList, "propertyList should
not be null");
- this.freshness = requireNonNull(freshness, "freshness should not be
null");
+ this.freshness = freshness;
this.refreshMode = refreshMode;
this.asQuery = requireNonNull(asQuery, "asQuery should not be null");
}
@@ -136,12 +136,14 @@ public class SqlCreateMaterializedTable extends SqlCreate
{
return propertyList;
}
+ @Nullable
public SqlIntervalLiteral getFreshness() {
return freshness;
}
- public Optional<SqlLiteral> getRefreshMode() {
- return Optional.ofNullable(refreshMode);
+ @Nullable
+ public SqlLiteral getRefreshMode() {
+ return refreshMode;
}
public SqlNode getAsQuery() {
@@ -195,10 +197,12 @@ public class SqlCreateMaterializedTable extends SqlCreate
{
writer.endList(withFrame);
}
- writer.newlineAndIndent();
- writer.keyword("FRESHNESS");
- writer.keyword("=");
- freshness.unparse(writer, leftPrec, rightPrec);
+ if (freshness != null) {
+ writer.newlineAndIndent();
+ writer.keyword("FRESHNESS");
+ writer.keyword("=");
+ freshness.unparse(writer, leftPrec, rightPrec);
+ }
if (refreshMode != null) {
writer.newlineAndIndent();
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
index 0e58ffbb546..d089ecc8079 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
@@ -63,26 +63,6 @@ class MaterializedTableStatementParserTest {
+ "AS SELECT a, b, h, t m FROM source";
sql(sql).fails(
"MATERIALIZED TABLE only supports define interval type
FRESHNESS, please refer to the materialized table document.");
-
- final String sql2 =
- "CREATE MATERIALIZED TABLE tbl1\n"
- + "(\n"
- + " PRIMARY KEY (a, b)\n"
- + ")\n"
- + "COMMENT 'table comment'\n"
- + "PARTITIONED BY (a, h)\n"
- + "WITH (\n"
- + " 'group.id' = 'latest', \n"
- + " 'kafka.topic' = 'log.test'\n"
- + ")\n"
- + "^AS^ SELECT a, b, h, t m FROM source";
-
- sql(sql2)
- .fails(
- "Encountered \"AS\" at line 11, column 1.\n"
- + "Was expecting:\n"
- + " \"FRESHNESS\" ...\n"
- + " ");
}
@Test
@@ -395,7 +375,8 @@ class MaterializedTableStatementParserTest {
return Stream.of(
Arguments.of(fullExample()),
Arguments.of(withoutTableConstraint()),
- Arguments.of(withPrimaryKey()));
+ Arguments.of(withPrimaryKey()),
+ Arguments.of(withoutFreshness()));
}
private static Map.Entry<String, String> fullExample() {
@@ -411,7 +392,7 @@ class MaterializedTableStatementParserTest {
+ " 'group.id' = 'latest', \n"
+ " 'kafka.topic' = 'log.test'\n"
+ ")\n"
- + "FRESHNESS = INTERVAL '3' MINUTE\n"
+ + "FRESHNESS = INTERVAL '3' MINUTES\n"
+ "AS SELECT a, b, h, t m FROM source",
"CREATE MATERIALIZED TABLE `TBL1`\n"
+ "(\n"
@@ -456,7 +437,7 @@ class MaterializedTableStatementParserTest {
return new AbstractMap.SimpleEntry<>(
"CREATE MATERIALIZED TABLE tbl1\n"
+ "COMMENT 'table comment'\n"
- + "FRESHNESS = INTERVAL '3' DAY\n"
+ + "FRESHNESS = INTERVAL '3' DAYS\n"
+ "REFRESH_MODE = FULL\n"
+ "AS SELECT a, b, h, t m FROM source",
"CREATE MATERIALIZED TABLE `TBL1`\n"
@@ -467,4 +448,28 @@ class MaterializedTableStatementParserTest {
+ "SELECT `A`, `B`, `H`, `T` AS `M`\n"
+ "FROM `SOURCE`");
}
+
+ private static Map.Entry<String, String> withoutFreshness() {
+ return new AbstractMap.SimpleEntry<>(
+ "CREATE MATERIALIZED TABLE tbl1\n"
+ + "(\n"
+ + " PRIMARY KEY (a, b)\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'group.id' = 'latest', \n"
+ + " 'kafka.topic' = 'log.test'\n"
+ + ")\n"
+ + "AS SELECT a, b, h, t m FROM source",
+ "CREATE MATERIALIZED TABLE `TBL1`\n"
+ + "(\n"
+ + " PRIMARY KEY (`A`, `B`)\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'group.id' = 'latest',\n"
+ + " 'kafka.topic' = 'log.test'\n"
+ + ")\n"
+ + "AS\n"
+ + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
+ + "FROM `SOURCE`");
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
index 56cebedd191..e387705e1a3 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
@@ -58,4 +58,20 @@ public class MaterializedTableConfigOptions {
.withDescription(
"Specifies the time partition formatter for the
partitioned materialized table, where '#' denotes a string-based partition
field name."
+ " This serves as a hint to the framework
regarding which partition to refresh in full refresh mode.");
+
+ @Documentation.TableOption(execMode =
Documentation.ExecMode.BATCH_STREAMING)
+ public static final ConfigOption<Duration>
MATERIALIZED_TABLE_DEFAULT_FRESHNESS_CONTINUOUS =
+ key("materialized-table.default-freshness.continuous")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(3))
+ .withDescription(
+ "The default freshness interval for continuous
refresh mode when the FRESHNESS clause is omitted in a materialized table
definition.");
+
+ @Documentation.TableOption(execMode =
Documentation.ExecMode.BATCH_STREAMING)
+ public static final ConfigOption<Duration>
MATERIALIZED_TABLE_DEFAULT_FRESHNESS_FULL =
+ key("materialized-table.default-freshness.full")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription(
+ "The default freshness interval for full refresh
mode when the FRESHNESS clause is omitted in a materialized table definition.");
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 24ebe07e9ea..bc870857807 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -27,8 +27,10 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.MaterializedTableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -65,6 +67,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -83,6 +86,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.lang.String.format;
+import static
org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -124,13 +128,16 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
private SqlFactory sqlFactory;
+ private final MaterializedTableEnricher materializedTableEnricher;
+
private CatalogManager(
String defaultCatalogName,
Catalog defaultCatalog,
DataTypeFactory typeFactory,
List<CatalogModificationListener> catalogModificationListeners,
CatalogStoreHolder catalogStoreHolder,
- SqlFactory sqlFactory) {
+ SqlFactory sqlFactory,
+ MaterializedTableEnricher materializedTableEnricher) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
"Default catalog name cannot be null or empty");
@@ -153,6 +160,8 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
this.catalogStoreHolder = catalogStoreHolder;
this.sqlFactory = sqlFactory;
+ this.materializedTableEnricher =
+ checkNotNull(materializedTableEnricher,
"MaterializedTableEnricher cannot be null");
}
@VisibleForTesting
@@ -190,6 +199,8 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
private SqlFactory sqlFactory = DefaultSqlFactory.INSTANCE;
+ private MaterializedTableEnricher materializedTableEnricher;
+
public Builder classLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
@@ -232,6 +243,12 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
return this;
}
+ public Builder materializedTableEnricher(
+ MaterializedTableEnricher materializedTableEnricher) {
+ this.materializedTableEnricher = materializedTableEnricher;
+ return this;
+ }
+
public CatalogManager build() {
checkNotNull(classLoader, "Class loader cannot be null");
checkNotNull(config, "Config cannot be null");
@@ -249,7 +266,29 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
:
executionConfig.getSerializerConfig()),
catalogModificationListeners,
catalogStoreHolder,
- sqlFactory);
+ sqlFactory,
+ materializedTableEnricher != null
+ ? materializedTableEnricher
+ : createDefaultMaterializedTableEnricher());
+ }
+
+ private MaterializedTableEnricher
createDefaultMaterializedTableEnricher() {
+ final Duration defaultDurationContinuous =
+ catalogStoreHolder
+ .config()
+ .get(
+ MaterializedTableConfigOptions
+
.MATERIALIZED_TABLE_DEFAULT_FRESHNESS_CONTINUOUS);
+ final Duration defaultDurationFull =
+ catalogStoreHolder
+ .config()
+ .get(
+ MaterializedTableConfigOptions
+
.MATERIALIZED_TABLE_DEFAULT_FRESHNESS_FULL);
+ final Duration freshnessThreshold =
+
catalogStoreHolder.config().get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD);
+ return DefaultMaterializedTableEnricher.create(
+ defaultDurationContinuous, defaultDurationFull,
freshnessThreshold);
}
}
@@ -1869,6 +1908,11 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
final ResolvedSchema resolvedSchema =
table.getUnresolvedSchema().resolve(schemaResolver);
+ final MaterializedTableEnrichmentResult enrichmentResult =
+ this.materializedTableEnricher.enrich(table);
+ IntervalFreshness freshness = enrichmentResult.getFreshness();
+ RefreshMode resolvedRefreshMode = enrichmentResult.getRefreshMode();
+
// Validate partition keys are included in physical columns
final List<String> physicalColumns =
resolvedSchema.getColumns().stream()
@@ -1888,7 +1932,8 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
}
});
- return new ResolvedCatalogMaterializedTable(table, resolvedSchema);
+ return new ResolvedCatalogMaterializedTable(
+ table, resolvedSchema, resolvedRefreshMode, freshness);
}
/** Resolves a {@link CatalogView} to a validated {@link
ResolvedCatalogView}. */
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
index 872dbb4ccd7..f567658c7c1 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -386,6 +386,8 @@ class ShowCreateUtilTest {
.logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
.refreshStatus(RefreshStatus.ACTIVATED)
.build(),
- resolvedSchema);
+ resolvedSchema,
+ refreshMode,
+ freshness);
}
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index 9b8ab94a421..2ef20f7f80c 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -197,8 +197,8 @@ class CatalogBaseTableResolutionTest {
assertThat(resolvedMaterializedTable.getDefinitionQuery())
.isEqualTo(materializedTable.getDefinitionQuery());
- assertThat(resolvedMaterializedTable.getFreshness())
- .isEqualTo(materializedTable.getFreshness());
+ assertThat(resolvedMaterializedTable.getDefinitionFreshness())
+ .isEqualTo(materializedTable.getDefinitionFreshness());
assertThat(resolvedMaterializedTable.getLogicalRefreshMode())
.isEqualTo(materializedTable.getLogicalRefreshMode());
assertThat(resolvedMaterializedTable.getRefreshMode())
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
index a2bbeba1c59..f0691d6252c 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
@@ -30,8 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
-
/**
* Represents the unresolved metadata of a materialized table in a {@link
Catalog}.
*
@@ -126,20 +124,27 @@ public interface CatalogMaterializedTable extends
CatalogBaseTable {
* Get the definition freshness of materialized table which is used to
determine the physical
* refresh mode.
*/
+ @Nullable
IntervalFreshness getDefinitionFreshness();
/**
* Get the {@link Duration} value of materialized table definition
freshness, it is converted
* from {@link IntervalFreshness}.
+ *
+ * @deprecated use {@link #getDefinitionFreshness()} together with {@link
+ * IntervalFreshness#toDuration()} instead.
*/
- default Duration getFreshness() {
- return convertFreshnessToDuration(getDefinitionFreshness());
+ @Deprecated
+ default @Nullable Duration getFreshness() {
+ final IntervalFreshness definitionFreshness = getDefinitionFreshness();
+ return definitionFreshness == null ? null :
definitionFreshness.toDuration();
}
/** Get the logical refresh mode of materialized table. */
LogicalRefreshMode getLogicalRefreshMode();
/** Get the physical refresh mode of materialized table. */
+ @Nullable
RefreshMode getRefreshMode();
/** Get the refresh status of materialized table. */
@@ -205,9 +210,9 @@ public interface CatalogMaterializedTable extends
CatalogBaseTable {
private Map<String, String> options = Collections.emptyMap();
private @Nullable Long snapshot;
private String definitionQuery;
- private IntervalFreshness freshness;
+ private @Nullable IntervalFreshness freshness;
private LogicalRefreshMode logicalRefreshMode;
- private RefreshMode refreshMode;
+ private @Nullable RefreshMode refreshMode;
private RefreshStatus refreshStatus;
private @Nullable String refreshHandlerDescription;
private @Nullable byte[] serializedRefreshHandler;
@@ -247,8 +252,8 @@ public interface CatalogMaterializedTable extends
CatalogBaseTable {
return this;
}
- public Builder freshness(IntervalFreshness freshness) {
- this.freshness = Preconditions.checkNotNull(freshness, "Freshness
must not be null.");
+ public Builder freshness(@Nullable IntervalFreshness freshness) {
+ this.freshness = freshness;
return this;
}
@@ -259,9 +264,8 @@ public interface CatalogMaterializedTable extends
CatalogBaseTable {
return this;
}
- public Builder refreshMode(RefreshMode refreshMode) {
- this.refreshMode =
- Preconditions.checkNotNull(refreshMode, "Refresh mode must
not be null.");
+ public Builder refreshMode(@Nullable RefreshMode refreshMode) {
+ this.refreshMode = refreshMode;
return this;
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
index 9378e862ab2..265e16fde93 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
@@ -60,9 +60,9 @@ public class DefaultCatalogMaterializedTable implements
CatalogMaterializedTable
Map<String, String> options,
@Nullable Long snapshot,
String definitionQuery,
- IntervalFreshness freshness,
+ @Nullable IntervalFreshness freshness,
LogicalRefreshMode logicalRefreshMode,
- RefreshMode refreshMode,
+ @Nullable RefreshMode refreshMode,
RefreshStatus refreshStatus,
@Nullable String refreshHandlerDescription,
@Nullable byte[] serializedRefreshHandler) {
@@ -73,10 +73,10 @@ public class DefaultCatalogMaterializedTable implements
CatalogMaterializedTable
this.options = checkNotNull(options, "Options must not be null.");
this.snapshot = snapshot;
this.definitionQuery = checkNotNull(definitionQuery, "Definition query
must not be null.");
- this.freshness = checkNotNull(freshness, "Freshness must not be
null.");
+ this.freshness = freshness;
this.logicalRefreshMode =
checkNotNull(logicalRefreshMode, "Logical refresh mode must
not be null.");
- this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be
null.");
+ this.refreshMode = refreshMode;
this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must
not be null.");
this.refreshHandlerDescription = refreshHandlerDescription;
this.serializedRefreshHandler = serializedRefreshHandler;
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
new file mode 100644
index 00000000000..c9858dab7d0
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+
+import java.time.Duration;
+
+/**
+ * Default implementation of {@link MaterializedTableEnricher}.
+ *
+ * <p>Applies default freshness values based on refresh mode and determines
the physical refresh
+ * mode using freshness threshold comparison.
+ */
+@Internal
+public class DefaultMaterializedTableEnricher implements
MaterializedTableEnricher {
+
+ private final IntervalFreshness defaultContinuousFreshness;
+ private final IntervalFreshness defaultFullFreshness;
+ private final Duration freshnessThreshold;
+
+ public static DefaultMaterializedTableEnricher create(
+ final Duration defaultContinuousFreshness,
+ final Duration defaultFullFreshness,
+ final Duration freshnessThreshold) {
+ final IntervalFreshness continuousFreshness =
+ IntervalFreshness.fromDuration(defaultContinuousFreshness);
+ final IntervalFreshness fullFreshness =
+ IntervalFreshness.fromDuration(defaultFullFreshness);
+ return new DefaultMaterializedTableEnricher(
+ continuousFreshness, fullFreshness, freshnessThreshold);
+ }
+
+ private DefaultMaterializedTableEnricher(
+ final IntervalFreshness defaultContinuousFreshness,
+ final IntervalFreshness defaultFullFreshness,
+ final Duration freshnessThreshold) {
+ this.defaultContinuousFreshness = defaultContinuousFreshness;
+ this.defaultFullFreshness = defaultFullFreshness;
+ this.freshnessThreshold = freshnessThreshold;
+ }
+
+ @Override
+ public MaterializedTableEnrichmentResult enrich(final
CatalogMaterializedTable table) {
+ // Determine the final freshness value
+ final IntervalFreshness finalFreshness = deriveFreshness(table);
+
+ // Derive the final refresh mode using the freshness and threshold
+ final RefreshMode finalRefreshMode =
+ deriveRefreshMode(
+ table.getLogicalRefreshMode(), finalFreshness,
freshnessThreshold);
+
+ return new MaterializedTableEnrichmentResult(finalFreshness,
finalRefreshMode);
+ }
+
+ /**
+ * Returns user-specified freshness or applies mode-specific defaults:
FULL mode uses {@code
+ * defaultFullFreshness}, others use {@code defaultContinuousFreshness}.
+ */
+ private IntervalFreshness deriveFreshness(final CatalogMaterializedTable
table) {
+ final IntervalFreshness finalFreshness;
+ if (table.getDefinitionFreshness() != null) {
+ // User provided an explicit freshness, use it
+ finalFreshness = table.getDefinitionFreshness();
+ } else {
+ // User omitted freshness, choose default based on logical mode
+ if (table.getLogicalRefreshMode() == LogicalRefreshMode.FULL) {
+ finalFreshness = defaultFullFreshness;
+ } else {
+ // For AUTOMATIC or CONTINUOUS modes, use the continuous
default
+ finalFreshness = defaultContinuousFreshness;
+ }
+ }
+ return finalFreshness;
+ }
+
+ /**
+ * Determines physical refresh mode: CONTINUOUS if freshness is below
threshold or explicitly
+ * requested, otherwise FULL (validated for cron conversion).
+ */
+ public RefreshMode deriveRefreshMode(
+ LogicalRefreshMode logicalRefreshMode,
+ IntervalFreshness freshness,
+ Duration threshold) {
+ if (logicalRefreshMode != LogicalRefreshMode.FULL) {
+ final Duration definedFreshness = freshness.toDuration();
+ if (logicalRefreshMode == LogicalRefreshMode.CONTINUOUS
+ || definedFreshness.compareTo(threshold) < 0) {
+ return RefreshMode.CONTINUOUS;
+ }
+ }
+
+ // Validate that freshness can be converted to cron for FULL mode
+ IntervalFreshness.validateFreshnessForCron(freshness);
+ return RefreshMode.FULL;
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
index fdbb8e6139f..6548b149663 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
@@ -19,7 +19,10 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.Objects;
/**
@@ -31,35 +34,186 @@ import java.util.Objects;
@PublicEvolving
public class IntervalFreshness {
- private final String interval;
+ private static final String SECOND_CRON_EXPRESSION_TEMPLATE = "0/%s * * *
* ? *";
+ private static final String MINUTE_CRON_EXPRESSION_TEMPLATE = "0 0/%s * *
* ? *";
+ private static final String HOUR_CRON_EXPRESSION_TEMPLATE = "0 0 0/%s * *
? *";
+ private static final String ONE_DAY_CRON_EXPRESSION_TEMPLATE = "0 0 0 * *
? *";
+ private static final long SECOND_CRON_UPPER_BOUND = 60;
+ private static final long MINUTE_CRON_UPPER_BOUND = 60;
+ private static final long HOUR_CRON_UPPER_BOUND = 24;
+
+ private final int interval;
private final TimeUnit timeUnit;
- private IntervalFreshness(String interval, TimeUnit timeUnit) {
+ private IntervalFreshness(int interval, TimeUnit timeUnit) {
this.interval = interval;
this.timeUnit = timeUnit;
}
public static IntervalFreshness of(String interval, TimeUnit timeUnit) {
- return new IntervalFreshness(interval, timeUnit);
+ final int validateIntervalInput = validateIntervalInput(interval);
+ return new IntervalFreshness(validateIntervalInput, timeUnit);
+ }
+
+ private static int validateIntervalInput(final String interval) {
+ final int parsedInt;
+ try {
+ parsedInt = Integer.parseInt(interval);
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format(
+ "The freshness interval currently only supports
positive integer type values. But was: %s",
+ interval);
+ throw new ValidationException(errorMessage, e);
+ }
+
+ if (parsedInt <= 0) {
+ final String errorMessage =
+ String.format(
+ "The freshness interval currently only supports
positive integer type values. But was: %s",
+ interval);
+ throw new ValidationException(errorMessage);
+ }
+ return parsedInt;
}
public static IntervalFreshness ofSecond(String interval) {
- return new IntervalFreshness(interval, TimeUnit.SECOND);
+ return IntervalFreshness.of(interval, TimeUnit.SECOND);
}
public static IntervalFreshness ofMinute(String interval) {
- return new IntervalFreshness(interval, TimeUnit.MINUTE);
+ return IntervalFreshness.of(interval, TimeUnit.MINUTE);
}
public static IntervalFreshness ofHour(String interval) {
- return new IntervalFreshness(interval, TimeUnit.HOUR);
+ return IntervalFreshness.of(interval, TimeUnit.HOUR);
}
public static IntervalFreshness ofDay(String interval) {
- return new IntervalFreshness(interval, TimeUnit.DAY);
+ return IntervalFreshness.of(interval, TimeUnit.DAY);
+ }
+
+ /**
+ * Validates that the given freshness can be converted to a cron
expression in full refresh
+ * mode. Since freshness and cron expression cannot be converted
equivalently, there are
+ * currently only a limited patterns of freshness that are supported.
+ *
+ * @param intervalFreshness the freshness to validate
+ * @throws ValidationException if the freshness cannot be converted to a
valid cron expression
+ */
+ public static void validateFreshnessForCron(IntervalFreshness
intervalFreshness) {
+ switch (intervalFreshness.getTimeUnit()) {
+ case SECOND:
+ validateCronConstraints(intervalFreshness,
SECOND_CRON_UPPER_BOUND);
+ break;
+ case MINUTE:
+ validateCronConstraints(intervalFreshness,
MINUTE_CRON_UPPER_BOUND);
+ break;
+ case HOUR:
+ validateCronConstraints(intervalFreshness,
HOUR_CRON_UPPER_BOUND);
+ break;
+ case DAY:
+ validateDayConstraints(intervalFreshness);
+ break;
+ default:
+ throw new ValidationException(
+ String.format(
+ "Unknown freshness time unit: %s.",
+ intervalFreshness.getTimeUnit()));
+ }
+ }
+
+ /**
+ * Converts the freshness of materialized table to cron expression in full
refresh mode. The
+ * freshness must first pass validation via {@link
#validateFreshnessForCron}.
+ *
+ * @param intervalFreshness the freshness to convert
+ * @return the corresponding cron expression
+ * @throws ValidationException if the freshness cannot be converted to a
valid cron expression
+ */
+ public static String convertFreshnessToCron(IntervalFreshness
intervalFreshness) {
+ // First validate that conversion is possible
+ validateFreshnessForCron(intervalFreshness);
+
+ // Then perform the conversion
+ switch (intervalFreshness.getTimeUnit()) {
+ case SECOND:
+ return String.format(
+ SECOND_CRON_EXPRESSION_TEMPLATE,
intervalFreshness.getIntervalInt());
+ case MINUTE:
+ return String.format(
+ MINUTE_CRON_EXPRESSION_TEMPLATE,
intervalFreshness.getIntervalInt());
+ case HOUR:
+ return String.format(
+ HOUR_CRON_EXPRESSION_TEMPLATE,
intervalFreshness.getIntervalInt());
+ case DAY:
+ return ONE_DAY_CRON_EXPRESSION_TEMPLATE;
+ default:
+ throw new ValidationException(
+ String.format(
+ "Unknown freshness time unit: %s.",
+ intervalFreshness.getTimeUnit()));
+ }
}
+ private static void validateCronConstraints(
+ IntervalFreshness intervalFreshness, long cronUpperBound) {
+ int interval = intervalFreshness.getIntervalInt();
+ TimeUnit timeUnit = intervalFreshness.getTimeUnit();
+ // Freshness must be less than cronUpperBound for corresponding time
unit when convert it
+ // to cron expression
+ if (interval >= cronUpperBound) {
+ throw new ValidationException(
+ String.format(
+ "In full refresh mode, freshness must be less than
%s when the time unit is %s.",
+ cronUpperBound, timeUnit));
+ }
+ // Freshness must be factors of cronUpperBound for corresponding time
unit
+ if (cronUpperBound % interval != 0) {
+ throw new ValidationException(
+ String.format(
+ "In full refresh mode, only freshness that are
factors of %s are currently supported when the time unit is %s.",
+ cronUpperBound, timeUnit));
+ }
+ }
+
+ private static void validateDayConstraints(IntervalFreshness
intervalFreshness) {
+ // Since the number of days in each month is different, only one day
of freshness is
+ // currently supported when the time unit is DAY
+ int interval = intervalFreshness.getIntervalInt();
+ if (interval > 1) {
+ throw new ValidationException(
+ "In full refresh mode, freshness must be 1 when the time
unit is DAY.");
+ }
+ }
+
+ /**
+ * Creates an IntervalFreshness from a Duration, choosing the most
appropriate time unit.
+ * Prefers larger units when possible (e.g., 60 seconds → 1 minute).
+ */
+ public static IntervalFreshness fromDuration(Duration duration) {
+ if (duration.equals(duration.truncatedTo(ChronoUnit.DAYS))) {
+ return new IntervalFreshness((int) duration.toDays(),
TimeUnit.DAY);
+ }
+ if (duration.equals(duration.truncatedTo(ChronoUnit.HOURS))) {
+ return new IntervalFreshness((int) duration.toHours(),
TimeUnit.HOUR);
+ }
+ if (duration.equals(duration.truncatedTo(ChronoUnit.MINUTES))) {
+ return new IntervalFreshness((int) duration.toMinutes(),
TimeUnit.MINUTE);
+ }
+
+ return new IntervalFreshness((int) duration.getSeconds(),
TimeUnit.SECOND);
+ }
+
+ /**
+ * @deprecated Use {@link #getIntervalInt()} instead.
+ */
+ @Deprecated
public String getInterval() {
+ return String.valueOf(interval);
+ }
+
+ public int getIntervalInt() {
return interval;
}
@@ -67,6 +221,21 @@ public class IntervalFreshness {
return timeUnit;
}
+ public Duration toDuration() {
+ switch (timeUnit) {
+ case SECOND:
+ return Duration.ofSeconds(interval);
+ case MINUTE:
+ return Duration.ofMinutes(interval);
+ case HOUR:
+ return Duration.ofHours(interval);
+ case DAY:
+ return Duration.ofDays(interval);
+ default:
+ throw new IllegalStateException("Unexpected value: " +
timeUnit);
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java
new file mode 100644
index 00000000000..33cb9adf7b6
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Enricher interface for determining materialized table properties during
catalog resolution.
+ *
+ * <p>This enricher resolves:
+ *
+ * <ul>
+ * <li>Freshness intervals when not explicitly specified by the user
+ * <li>Physical refresh modes (CONTINUOUS or FULL) based on logical
preferences and configuration
+ * </ul>
+ *
+ * <p>Implementations can provide custom strategies tailored to different
deployment environments or
+ * operational requirements.
+ */
+@Experimental
+public interface MaterializedTableEnricher {
+
+ /**
+ * Enriches a materialized table by determining its final freshness
interval and refresh mode.
+ *
+ * @param catalogMaterializedTable the materialized table to enrich, which
may have null
+ * freshness
+ * @return the enrichment result with resolved, non-null freshness and
refresh mode
+ */
+ MaterializedTableEnrichmentResult enrich(CatalogMaterializedTable
catalogMaterializedTable);
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
new file mode 100644
index 00000000000..dc19d82ea27
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+
+/**
+ * Result of the enrichment process containing the resolved freshness interval
and physical refresh
+ * mode for a {@link CatalogMaterializedTable}.
+ *
+ * <p>This object is returned by {@link MaterializedTableEnricher} after
determining the final,
+ * non-null values for both properties.
+ */
+@Experimental
+public class MaterializedTableEnrichmentResult {
+
+ private final IntervalFreshness freshness;
+ private final RefreshMode refreshMode;
+
+ public MaterializedTableEnrichmentResult(
+ final IntervalFreshness freshness, final RefreshMode refreshMode) {
+ this.freshness = freshness;
+ this.refreshMode = refreshMode;
+ }
+
+ public IntervalFreshness getFreshness() {
+ return freshness;
+ }
+
+ public RefreshMode getRefreshMode() {
+ return refreshMode;
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
index 82ad39d0fc3..6f0bf960b18 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
@@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A validated {@link CatalogMaterializedTable} that is backed by the original
metadata coming from
* the {@link Catalog} but resolved by the framework.
@@ -43,13 +45,19 @@ public class ResolvedCatalogMaterializedTable
private final ResolvedSchema resolvedSchema;
+ private final RefreshMode refreshMode;
+
+ private final IntervalFreshness freshness;
+
public ResolvedCatalogMaterializedTable(
- CatalogMaterializedTable origin, ResolvedSchema resolvedSchema) {
- this.origin =
- Preconditions.checkNotNull(
- origin, "Original catalog materialized table must not
be null.");
- this.resolvedSchema =
- Preconditions.checkNotNull(resolvedSchema, "Resolved schema
must not be null.");
+ CatalogMaterializedTable origin,
+ ResolvedSchema resolvedSchema,
+ RefreshMode refreshMode,
+ IntervalFreshness freshness) {
+ this.origin = checkNotNull(origin, "Original catalog materialized
table must not be null.");
+ this.resolvedSchema = checkNotNull(resolvedSchema, "Resolved schema
must not be null.");
+ this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be
null.");
+ this.freshness = checkNotNull(freshness, "Freshness must not be
null.");
}
@Override
@@ -65,12 +73,13 @@ public class ResolvedCatalogMaterializedTable
@Override
public CatalogBaseTable copy() {
return new ResolvedCatalogMaterializedTable(
- (CatalogMaterializedTable) origin.copy(), resolvedSchema);
+ (CatalogMaterializedTable) origin.copy(), resolvedSchema,
refreshMode, freshness);
}
@Override
public ResolvedCatalogMaterializedTable copy(Map<String, String> options) {
- return new ResolvedCatalogMaterializedTable(origin.copy(options),
resolvedSchema);
+ return new ResolvedCatalogMaterializedTable(
+ origin.copy(options), resolvedSchema, refreshMode, freshness);
}
@Override
@@ -80,7 +89,9 @@ public class ResolvedCatalogMaterializedTable
byte[] serializedRefreshHandler) {
return new ResolvedCatalogMaterializedTable(
origin.copy(refreshStatus, refreshHandlerDescription,
serializedRefreshHandler),
- resolvedSchema);
+ resolvedSchema,
+ refreshMode,
+ freshness);
}
@Override
@@ -124,8 +135,8 @@ public class ResolvedCatalogMaterializedTable
}
@Override
- public IntervalFreshness getDefinitionFreshness() {
- return origin.getDefinitionFreshness();
+ public @Nonnull IntervalFreshness getDefinitionFreshness() {
+ return freshness;
}
@Override
@@ -134,8 +145,8 @@ public class ResolvedCatalogMaterializedTable
}
@Override
- public RefreshMode getRefreshMode() {
- return origin.getRefreshMode();
+ public @Nonnull RefreshMode getRefreshMode() {
+ return refreshMode;
}
@Override
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java
deleted file mode 100644
index cd58bff4d91..00000000000
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.IntervalFreshness;
-
-import org.apache.commons.lang3.math.NumberUtils;
-
-import java.time.Duration;
-
-/** Utilities to {@link IntervalFreshness}. */
-@Internal
-public class IntervalFreshnessUtils {
-
- private static final String SECOND_CRON_EXPRESSION_TEMPLATE = "0/%s * * *
* ? *";
- private static final String MINUTE_CRON_EXPRESSION_TEMPLATE = "0 0/%s * *
* ? *";
- private static final String HOUR_CRON_EXPRESSION_TEMPLATE = "0 0 0/%s * *
? *";
- private static final String ONE_DAY_CRON_EXPRESSION_TEMPLATE = "0 0 0 * *
? *";
-
- private static final long SECOND_CRON_UPPER_BOUND = 60;
- private static final long MINUTE_CRON_UPPER_BOUND = 60;
- private static final long HOUR_CRON_UPPER_BOUND = 24;
-
- private IntervalFreshnessUtils() {}
-
- @VisibleForTesting
- static void validateIntervalFreshness(IntervalFreshness intervalFreshness)
{
- if (!NumberUtils.isParsable(intervalFreshness.getInterval())) {
- throw new ValidationException(
- String.format(
- "The interval freshness value '%s' is an illegal
integer type value.",
- intervalFreshness.getInterval()));
- }
-
- if (!NumberUtils.isDigits(intervalFreshness.getInterval())) {
- throw new ValidationException(
- "The freshness interval currently only supports integer
type values.");
- }
- }
-
- public static Duration convertFreshnessToDuration(IntervalFreshness
intervalFreshness) {
- // validate the freshness value firstly
- validateIntervalFreshness(intervalFreshness);
-
- long interval = Long.parseLong(intervalFreshness.getInterval());
- switch (intervalFreshness.getTimeUnit()) {
- case DAY:
- return Duration.ofDays(interval);
- case HOUR:
- return Duration.ofHours(interval);
- case MINUTE:
- return Duration.ofMinutes(interval);
- case SECOND:
- return Duration.ofSeconds(interval);
- default:
- throw new ValidationException(
- String.format(
- "Unknown freshness time unit: %s.",
- intervalFreshness.getTimeUnit()));
- }
- }
-
- /**
- * This is an util method that is used to convert the freshness of
materialized table to cron
- * expression in full refresh mode. Since freshness and cron expression
cannot be converted
- * equivalently, there are currently only a limited patterns of freshness
that can be converted
- * to cron expression.
- */
- public static String convertFreshnessToCron(IntervalFreshness
intervalFreshness) {
- switch (intervalFreshness.getTimeUnit()) {
- case SECOND:
- return validateAndConvertCron(
- intervalFreshness,
- SECOND_CRON_UPPER_BOUND,
- SECOND_CRON_EXPRESSION_TEMPLATE);
- case MINUTE:
- return validateAndConvertCron(
- intervalFreshness,
- MINUTE_CRON_UPPER_BOUND,
- MINUTE_CRON_EXPRESSION_TEMPLATE);
- case HOUR:
- return validateAndConvertCron(
- intervalFreshness, HOUR_CRON_UPPER_BOUND,
HOUR_CRON_EXPRESSION_TEMPLATE);
- case DAY:
- return validateAndConvertDayCron(intervalFreshness);
- default:
- throw new ValidationException(
- String.format(
- "Unknown freshness time unit: %s.",
- intervalFreshness.getTimeUnit()));
- }
- }
-
- private static String validateAndConvertCron(
- IntervalFreshness intervalFreshness, long cronUpperBound, String
cronTemplate) {
- long interval = Long.parseLong(intervalFreshness.getInterval());
- IntervalFreshness.TimeUnit timeUnit = intervalFreshness.getTimeUnit();
- // Freshness must be less than cronUpperBound for corresponding time
unit when convert it
- // to cron expression
- if (interval >= cronUpperBound) {
- throw new ValidationException(
- String.format(
- "In full refresh mode, freshness must be less than
%s when the time unit is %s.",
- cronUpperBound, timeUnit));
- }
- // Freshness must be factors of cronUpperBound for corresponding time
unit
- if (cronUpperBound % interval != 0) {
- throw new ValidationException(
- String.format(
- "In full refresh mode, only freshness that are
factors of %s are currently supported when the time unit is %s.",
- cronUpperBound, timeUnit));
- }
-
- return String.format(cronTemplate, interval);
- }
-
- private static String validateAndConvertDayCron(IntervalFreshness
intervalFreshness) {
- // Since the number of days in each month is different, only one day
of freshness is
- // currently supported when the time unit is DAY
- long interval = Long.parseLong(intervalFreshness.getInterval());
- if (interval > 1) {
- throw new ValidationException(
- "In full refresh mode, freshness must be 1 when the time
unit is DAY.");
- }
- return ONE_DAY_CRON_EXPRESSION_TEMPLATE;
- }
-}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
index bbe708e6509..adb8a845421 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.expressions.DefaultSqlFactory;
import org.junit.jupiter.api.Test;
@@ -165,6 +166,8 @@ class CatalogPropertiesUtilTest {
.refreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED)
.refreshHandlerDescription("description")
.build(),
- resolvedSchema));
+ resolvedSchema,
+ RefreshMode.CONTINUOUS,
+ IntervalFreshness.ofHour("123")));
}
}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
similarity index 74%
rename from
flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java
rename to
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
index 86e660d50b1..de794684956 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
@@ -16,53 +16,69 @@
* limitations under the License.
*/
-package org.apache.flink.table.utils;
+package org.apache.flink.table.catalog;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.IntervalFreshness;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
-import static
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
-import static
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
-import static
org.apache.flink.table.utils.IntervalFreshnessUtils.validateIntervalFreshness;
+import static
org.apache.flink.table.catalog.IntervalFreshness.convertFreshnessToCron;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link IntervalFreshnessUtils}. */
-public class IntervalFreshnessUtilsTest {
+/** Tests for {@link IntervalFreshness}. */
+public class IntervalFreshnessTest {
- @Test
- void testIllegalIntervalFreshness() {
- assertThatThrownBy(() ->
validateIntervalFreshness(IntervalFreshness.ofMinute("2efedd")))
+ @ParameterizedTest
+ @ValueSource(strings = {"2efedd", "2.5", "-2", "0",
"12345678901234567890"})
+ void testIllegalIntervalFreshness(String invalidInput) {
+ assertThatThrownBy(() -> IntervalFreshness.ofMinute(invalidInput))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "The interval freshness value '2efedd' is an illegal
integer type value.");
+ String.format(
+ "The freshness interval currently only
supports positive integer type values. But was: %s",
+ invalidInput));
+ }
- assertThatThrownBy(() ->
validateIntervalFreshness(IntervalFreshness.ofMinute("2.5")))
- .isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "The freshness interval currently only supports
integer type values.");
+ @Test
+ void testConvertDurationToFreshnessInterval() {
+ // verify second
+ IntervalFreshness actualSeconds =
IntervalFreshness.fromDuration(Duration.ofSeconds(20));
+ assertThat(actualSeconds).isEqualTo(IntervalFreshness.ofSecond("20"));
+
+ // verify minute
+ IntervalFreshness actualMinutes =
IntervalFreshness.fromDuration(Duration.ofMinutes(3));
+ assertThat(actualMinutes).isEqualTo(IntervalFreshness.ofMinute("3"));
+
+ // verify hour
+ IntervalFreshness actualHour =
IntervalFreshness.fromDuration(Duration.ofHours(1));
+ assertThat(actualHour).isEqualTo(IntervalFreshness.ofHour("1"));
+
+ // verify day
+ IntervalFreshness actualDay =
IntervalFreshness.fromDuration(Duration.ofDays(2));
+ assertThat(actualDay).isEqualTo(IntervalFreshness.ofDay("2"));
}
@Test
void testConvertFreshnessToDuration() {
// verify second
- Duration actualSecond =
convertFreshnessToDuration(IntervalFreshness.ofSecond("20"));
+ Duration actualSecond = IntervalFreshness.ofSecond("20").toDuration();
assertThat(actualSecond).isEqualTo(Duration.ofSeconds(20));
// verify minute
- Duration actualMinute =
convertFreshnessToDuration(IntervalFreshness.ofMinute("3"));
+ Duration actualMinute = IntervalFreshness.ofMinute("3").toDuration();
assertThat(actualMinute).isEqualTo(Duration.ofMinutes(3));
// verify hour
- Duration actualHour =
convertFreshnessToDuration(IntervalFreshness.ofHour("3"));
+ Duration actualHour = IntervalFreshness.ofHour("3").toDuration();
assertThat(actualHour).isEqualTo(Duration.ofHours(3));
// verify day
- Duration actualDay =
convertFreshnessToDuration(IntervalFreshness.ofDay("3"));
+ Duration actualDay = IntervalFreshness.ofDay("3").toDuration();
assertThat(actualDay).isEqualTo(Duration.ofDays(3));
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
index 46c6dd44fa7..2107b209b27 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -26,6 +26,8 @@ import org.apache.flink.sql.parser.error.SqlValidateException;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -50,10 +52,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER;
-import static
org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD;
import static
org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS;
-import static
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
-import static
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
+import static
org.apache.flink.table.catalog.IntervalFreshness.validateFreshnessForCron;
/** A converter for {@link SqlCreateMaterializedTable}. */
public class SqlCreateMaterializedTableConverter
@@ -77,33 +77,27 @@ public class SqlCreateMaterializedTableConverter
// get freshness
IntervalFreshness intervalFreshness =
- MaterializedTableUtils.getMaterializedTableFreshness(
- sqlCreateMaterializedTable.getFreshness());
+ Optional.ofNullable(sqlCreateMaterializedTable.getFreshness())
+
.map(MaterializedTableUtils::getMaterializedTableFreshness)
+ .orElse(null);
- // get refresh mode
- SqlRefreshMode sqlRefreshMode = null;
- if (sqlCreateMaterializedTable.getRefreshMode().isPresent()) {
- sqlRefreshMode =
- sqlCreateMaterializedTable
- .getRefreshMode()
- .get()
- .getValueAs(SqlRefreshMode.class);
- }
- CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode =
+ // Get the logical refresh mode from SQL
+ SqlRefreshMode sqlRefreshMode =
+
Optional.ofNullable(sqlCreateMaterializedTable.getRefreshMode())
+ .map(mode -> mode.getValueAs(SqlRefreshMode.class))
+ .orElse(null);
+
+ final LogicalRefreshMode logicalRefreshMode =
MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode);
- // only MATERIALIZED_TABLE_FRESHNESS_THRESHOLD configured in flink
conf yaml work, so we get
- // it from rootConfiguration instead of table config
- CatalogMaterializedTable.RefreshMode refreshMode =
- MaterializedTableUtils.deriveRefreshMode(
- context.getTableConfig()
- .getRootConfiguration()
- .get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD),
- convertFreshnessToDuration(intervalFreshness),
- logicalRefreshMode);
- // If the refresh mode is full, validate whether the freshness can
convert to cron
- // expression in advance
- if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode) {
- convertFreshnessToCron(intervalFreshness);
+
+ // get the physical refresh mode from SQL
+ final RefreshMode refreshMode =
+ sqlRefreshMode == null
+ ? null
+ :
MaterializedTableUtils.fromSqltoRefreshMode(sqlRefreshMode);
+
+ if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode &&
intervalFreshness != null) {
+ validateFreshnessForCron(intervalFreshness);
}
// get query schema and definition query
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index ac41bfa58c1..ba765e40a69 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -22,13 +22,12 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.calcite.sql.SqlIntervalLiteral;
import org.apache.calcite.sql.type.SqlTypeFamily;
-import java.time.Duration;
-
/** The utils for materialized table. */
@Internal
public class MaterializedTableUtils {
@@ -79,22 +78,14 @@ public class MaterializedTableUtils {
}
}
- public static CatalogMaterializedTable.RefreshMode deriveRefreshMode(
- Duration threshold,
- Duration definedFreshness,
- CatalogMaterializedTable.LogicalRefreshMode definedRefreshMode) {
- // If the refresh mode is specified manually, use it directly.
- if (definedRefreshMode ==
CatalogMaterializedTable.LogicalRefreshMode.FULL) {
- return CatalogMaterializedTable.RefreshMode.FULL;
- } else if (definedRefreshMode ==
CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS) {
- return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
- }
-
- // derive the actual refresh mode via defined freshness
- if (definedFreshness.compareTo(threshold) < 0) {
- return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
- } else {
- return CatalogMaterializedTable.RefreshMode.FULL;
+ public static RefreshMode fromSqltoRefreshMode(SqlRefreshMode
sqlRefreshMode) {
+ switch (sqlRefreshMode) {
+ case FULL:
+ return RefreshMode.FULL;
+ case CONTINUOUS:
+ return RefreshMode.CONTINUOUS;
+ default:
+ throw new IllegalArgumentException("Unknown refresh mode: " +
sqlRefreshMode);
}
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index cdf6fbe4174..13835f16d61 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -85,6 +85,9 @@ import org.apache.calcite.sql.SqlNode;
import org.assertj.core.api.HamcrestCondition;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import javax.annotation.Nullable;
@@ -96,7 +99,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
@@ -1404,53 +1409,93 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
checkAlterNonExistTable("alter table %s nonexistent drop watermark");
}
- @Test
- void createMaterializedTableWithDistribution() throws Exception {
- final String sql =
- "CREATE MATERIALIZED TABLE users_shops ("
- + " PRIMARY KEY (user_id) not enforced)"
- + " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n"
- + " WITH(\n"
- + " 'format' = 'debezium-json'\n"
- + " )\n"
- + " FRESHNESS = INTERVAL '30' SECOND\n"
- + " AS SELECT 1 as shop_id, 2 as user_id ";
-
- final String expectedSummaryString =
- "CREATE MATERIALIZED TABLE: (materializedTable: "
- +
"[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
- + " `shop_id` INT NOT NULL,\n"
- + " `user_id` INT NOT NULL,\n"
- + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`)
NOT ENFORCED\n"
- + "), comment='null', distribution=DISTRIBUTED BY
HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "
- + "options={format=debezium-json}, snapshot=null,
definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
- + "freshness=INTERVAL '30' SECOND,
logicalRefreshMode=AUTOMATIC, refreshMode=CONTINUOUS, "
- + "refreshStatus=INITIALIZING,
refreshHandlerDescription='null', serializedRefreshHandler=null},
resolvedSchema=(\n"
- + " `shop_id` INT NOT NULL,\n"
- + " `user_id` INT NOT NULL,\n"
- + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`)
NOT ENFORCED\n"
- + ")}], identifier:
[`builtin`.`default`.`users_shops`])";
+ @ParameterizedTest(name = "[{index}] {0}")
+ @MethodSource("provideCreateMaterializedTableTestCases")
+ void createMaterializedTableWithVariousOptions(
+ String testName,
+ String sql,
+ String expectedSummaryString,
+ Consumer<CreateMaterializedTableOperation> additionalAssertions) {
final Operation operation = parse(sql);
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
- assertThat(
- ((CreateMaterializedTableOperation) operation)
- .getCatalogMaterializedTable()
- .getDistribution()
- .get())
- .isEqualTo(TableDistribution.of(Kind.HASH, 7,
List.of("user_id")));
-
- prepareMaterializedTable("tb2", false, 1, null, "SELECT 1");
- assertThatThrownBy(
- () ->
- parse(
- "alter MATERIALIZED table cat1.db1.tb2
modify distribution into 3 buckets"))
- .isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Materialized table `cat1`.`db1`.`tb2` does not have a
distribution to modify.");
+ final CreateMaterializedTableOperation
createMaterializedTableOperation =
+ (CreateMaterializedTableOperation) operation;
+
+ additionalAssertions.accept(createMaterializedTableOperation);
+ }
+
+ private static Stream<Arguments> provideCreateMaterializedTableTestCases()
{
+ return Stream.of(
+ Arguments.of(
+ "with refresh mode continuous",
+ "CREATE MATERIALIZED TABLE users_shops ("
+ + " PRIMARY KEY (user_id) not enforced)"
+ + " WITH(\n"
+ + " 'format' = 'debezium-json'\n"
+ + " )\n"
+ + " FRESHNESS = INTERVAL '30' SECOND\n"
+ + " REFRESH_MODE = CONTINUOUS\n"
+ + " AS SELECT 1 as shop_id, 2 as user_id ",
+ "CREATE MATERIALIZED TABLE: (materializedTable: "
+ +
"[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
+ + " `shop_id` INT NOT NULL,\n"
+ + " `user_id` INT NOT NULL,\n"
+ + " CONSTRAINT `PK_user_id` PRIMARY KEY
(`user_id`) NOT ENFORCED\n"
+ + "), comment='null', distribution=null,
partitionKeys=[], "
+ + "options={format=debezium-json},
snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+ + "freshness=INTERVAL '30' SECOND,
logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "
+ + "refreshStatus=INITIALIZING,
refreshHandlerDescription='null', serializedRefreshHandler=null},
resolvedSchema=(\n"
+ + " `shop_id` INT NOT NULL,\n"
+ + " `user_id` INT NOT NULL,\n"
+ + " CONSTRAINT `PK_user_id` PRIMARY KEY
(`user_id`) NOT ENFORCED\n"
+ + ")}], identifier:
[`builtin`.`default`.`users_shops`])",
+ (Consumer<CreateMaterializedTableOperation>)
+ op -> {
+ assertThat(
+
op.getCatalogMaterializedTable()
+
.getDefinitionFreshness())
+
.isEqualTo(IntervalFreshness.ofSecond("30"));
+
assertThat(op.getCatalogMaterializedTable().getRefreshMode())
+ .isSameAs(RefreshMode.CONTINUOUS);
+ }),
+ Arguments.of(
+ "with distribution",
+ "CREATE MATERIALIZED TABLE users_shops ("
+ + " PRIMARY KEY (user_id) not enforced)"
+ + " DISTRIBUTED BY HASH (user_id) INTO 7
BUCKETS\n"
+ + " WITH(\n"
+ + " 'format' = 'debezium-json'\n"
+ + " )\n"
+ + " FRESHNESS = INTERVAL '30' SECOND\n"
+ + " AS SELECT 1 as shop_id, 2 as user_id ",
+ "CREATE MATERIALIZED TABLE: (materializedTable: "
+ +
"[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
+ + " `shop_id` INT NOT NULL,\n"
+ + " `user_id` INT NOT NULL,\n"
+ + " CONSTRAINT `PK_user_id` PRIMARY KEY
(`user_id`) NOT ENFORCED\n"
+ + "), comment='null', distribution=DISTRIBUTED
BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "
+ + "options={format=debezium-json},
snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+ + "freshness=INTERVAL '30' SECOND,
logicalRefreshMode=AUTOMATIC, refreshMode=null, "
+ + "refreshStatus=INITIALIZING,
refreshHandlerDescription='null', serializedRefreshHandler=null},
resolvedSchema=(\n"
+ + " `shop_id` INT NOT NULL,\n"
+ + " `user_id` INT NOT NULL,\n"
+ + " CONSTRAINT `PK_user_id` PRIMARY KEY
(`user_id`) NOT ENFORCED\n"
+ + ")}], identifier:
[`builtin`.`default`.`users_shops`])",
+ (Consumer<CreateMaterializedTableOperation>)
+ op ->
+ assertThat(
+
op.getCatalogMaterializedTable()
+
.getDistribution()
+ .get())
+ .isEqualTo(
+ TableDistribution.of(
+ Kind.HASH,
+ 7,
+
List.of("user_id")))));
}
@Test
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 39275541d8c..67a828d7fe0 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.api.FunctionDescriptor;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.IntervalFreshness;
@@ -128,9 +130,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
CreateMaterializedTableOperation op =
(CreateMaterializedTableOperation) operation;
ResolvedCatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
- Map<String, String> options = new HashMap<>();
- options.put("connector", "filesystem");
- options.put("format", "json");
+ Map<String, String> options = Map.of("connector", "filesystem",
"format", "json");
CatalogMaterializedTable expected =
CatalogMaterializedTable.newBuilder()
.schema(
@@ -146,13 +146,120 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
.partitionKeys(Arrays.asList("a", "d"))
.freshness(IntervalFreshness.ofSecond("30"))
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
- .refreshMode(CatalogMaterializedTable.RefreshMode.FULL)
+ .refreshMode(RefreshMode.FULL)
+
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+ .definitionQuery(
+ "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`,
`t1`.`d`\n"
+ + "FROM `builtin`.`default`.`t1` AS
`t1`")
+ .build();
+
+ final IntervalFreshness resolvedFreshness =
materializedTable.getDefinitionFreshness();
+
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofSecond("30"));
+
+ final RefreshMode resolvedRefreshMode =
materializedTable.getRefreshMode();
+ assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL);
+
+ assertThat(materializedTable.getOrigin()).isEqualTo(expected);
+ }
+
+ @Test
+ void testCreateMaterializedTableWithoutFreshness() {
+ final String sql =
+ "CREATE MATERIALIZED TABLE mtbl1 (\n"
+ + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+ + ")\n"
+ + "COMMENT 'materialized table comment'\n"
+ + "PARTITIONED BY (a, d)\n"
+ + "WITH (\n"
+ + " 'connector' = 'filesystem', \n"
+ + " 'format' = 'json'\n"
+ + ")\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS SELECT * FROM t1";
+ Operation operation = parse(sql);
+
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+ CreateMaterializedTableOperation op =
(CreateMaterializedTableOperation) operation;
+ ResolvedCatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
+
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+ Map<String, String> options = Map.of("connector", "filesystem",
"format", "json");
+
+ CatalogMaterializedTable expected =
+ CatalogMaterializedTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column("a",
DataTypes.BIGINT().notNull())
+ .column("b",
DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .column("c", DataTypes.INT())
+ .column("d",
DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .primaryKeyNamed("ct1",
Collections.singletonList("a"))
+ .build())
+ .comment("materialized table comment")
+ .options(options)
+ .partitionKeys(Arrays.asList("a", "d"))
+ .logicalRefreshMode(LogicalRefreshMode.FULL)
+ .refreshMode(RefreshMode.FULL)
+
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+ .definitionQuery(
+ "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`,
`t1`.`d`\n"
+ + "FROM `builtin`.`default`.`t1` AS
`t1`")
+ .build();
+
+ // The resolved freshness should default to 1 minute
+ final IntervalFreshness resolvedFreshness =
materializedTable.getDefinitionFreshness();
+ assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofHour("1"));
+
+ final RefreshMode resolvedRefreshMode =
materializedTable.getRefreshMode();
+ assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL);
+
+ assertThat(materializedTable.getOrigin()).isEqualTo(expected);
+ }
+
+ @Test
+ void testCreateMaterializedTableWithoutFreshnessAndRefreshMode() {
+ final String sql =
+ "CREATE MATERIALIZED TABLE mtbl1 (\n"
+ + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+ + ")\n"
+ + "COMMENT 'materialized table comment'\n"
+ + "PARTITIONED BY (a, d)\n"
+ + "WITH (\n"
+ + " 'connector' = 'filesystem', \n"
+ + " 'format' = 'json'\n"
+ + ")\n"
+ + "AS SELECT * FROM t1";
+ Operation operation = parse(sql);
+
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+ CreateMaterializedTableOperation op =
(CreateMaterializedTableOperation) operation;
+ ResolvedCatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
+
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+ Map<String, String> options = Map.of("connector", "filesystem",
"format", "json");
+
+ CatalogMaterializedTable expected =
+ CatalogMaterializedTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column("a",
DataTypes.BIGINT().notNull())
+ .column("b",
DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .column("c", DataTypes.INT())
+ .column("d",
DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .primaryKeyNamed("ct1",
Collections.singletonList("a"))
+ .build())
+ .comment("materialized table comment")
+ .options(options)
+ .partitionKeys(Arrays.asList("a", "d"))
+ .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
.definitionQuery(
"SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`,
`t1`.`d`\n"
+ "FROM `builtin`.`default`.`t1` AS
`t1`")
.build();
+ final IntervalFreshness resolvedFreshness =
materializedTable.getDefinitionFreshness();
+
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofMinute("3"));
assertThat(materializedTable.getOrigin()).isEqualTo(expected);
}
@@ -206,8 +313,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
assertThat(materializedTable.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
- assertThat(materializedTable.getRefreshMode())
- .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+
assertThat(materializedTable.getRefreshMode()).isEqualTo(RefreshMode.CONTINUOUS);
// test continuous mode by manual specify
final String sql2 =
@@ -242,8 +348,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
assertThat(materializedTable.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
- assertThat(materializedTable.getRefreshMode())
- .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+
assertThat(materializedTable.getRefreshMode()).isEqualTo(RefreshMode.FULL);
// test full mode by manual specify
final String sql2 =
@@ -259,17 +364,26 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
assertThat(materializedTable2.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);
- assertThat(materializedTable2.getRefreshMode())
- .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
-
+
assertThat(materializedTable2.getRefreshMode()).isEqualTo(RefreshMode.FULL);
final String sql3 =
"CREATE MATERIALIZED TABLE mtbl1\n"
+ "FRESHNESS = INTERVAL '40' MINUTE\n"
+ + "REFRESH_MODE = FULL\n"
+ "AS SELECT * FROM t1";
assertThatThrownBy(() -> parse(sql3))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, only freshness that are factors
of 60 are currently supported when the time unit is MINUTE.");
+
+ final String sql4 =
+ "CREATE MATERIALIZED TABLE mtbl1\n"
+ + "FRESHNESS = INTERVAL '40' MINUTE\n"
+ + "AS SELECT * FROM t1";
+
+ assertThatThrownBy(() -> parse(sql4))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "In full refresh mode, only freshness that are factors
of 60 are currently supported when the time unit is MINUTE.");
}
@Test
diff --git
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
index 646b2e832c7..5a63cc2a2d8 100644
---
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
+++
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
@@ -110,7 +110,9 @@ public class TestFileSystemCatalogTest extends
TestFileSystemCatalogTestBase {
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
.build(),
- CREATE_RESOLVED_SCHEMA);
+ CREATE_RESOLVED_SCHEMA,
+ CatalogMaterializedTable.RefreshMode.CONTINUOUS,
+ FRESHNESS);
private static final TestRefreshHandler REFRESH_HANDLER =
new TestRefreshHandler("jobID: xxx, clusterId: yyy");