This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 753c1af691c [FLINK-38532][table] Make `FRESHNESS` Optional for 
Materialized Tables
753c1af691c is described below

commit 753c1af691c25e375cf80dc277cf026758cfed8d
Author: Ramin Gharib <[email protected]>
AuthorDate: Thu Nov 6 09:17:01 2025 +0100

    [FLINK-38532][table] Make `FRESHNESS` Optional for Materialized Tables
---
 .../docs/dev/table/materialized-table/overview.md  |   5 +-
 .../dev/table/materialized-table/statements.md     |  91 +++++-----
 .../docs/dev/table/materialized-table/overview.md  |   4 +-
 .../dev/table/materialized-table/statements.md     |  55 +++---
 .../materialized_table_config_configuration.html   |  12 ++
 .../MaterializedTableManager.java                  |  42 ++---
 .../service/operation/OperationExecutor.java       |   3 +-
 .../service/MaterializedTableStatementITCase.java  | 189 ++++++++++++++++++++-
 .../src/main/codegen/includes/parserImpls.ftl      |  18 +-
 .../sql/parser/ddl/SqlCreateMaterializedTable.java |  22 ++-
 .../MaterializedTableStatementParserTest.java      |  51 +++---
 .../api/config/MaterializedTableConfigOptions.java |  16 ++
 .../apache/flink/table/catalog/CatalogManager.java |  51 +++++-
 .../table/api/internal/ShowCreateUtilTest.java     |   4 +-
 .../catalog/CatalogBaseTableResolutionTest.java    |   4 +-
 .../table/catalog/CatalogMaterializedTable.java    |  26 +--
 .../catalog/DefaultCatalogMaterializedTable.java   |   8 +-
 .../catalog/DefaultMaterializedTableEnricher.java  | 115 +++++++++++++
 .../flink/table/catalog/IntervalFreshness.java     | 183 +++++++++++++++++++-
 .../table/catalog/MaterializedTableEnricher.java   |  47 +++++
 .../catalog/MaterializedTableEnrichmentResult.java |  50 ++++++
 .../catalog/ResolvedCatalogMaterializedTable.java  |  39 +++--
 .../flink/table/utils/IntervalFreshnessUtils.java  | 146 ----------------
 .../table/catalog/CatalogPropertiesUtilTest.java   |   5 +-
 .../IntervalFreshnessTest.java}                    |  54 +++---
 .../SqlCreateMaterializedTableConverter.java       |  50 +++---
 .../planner/utils/MaterializedTableUtils.java      |  27 +--
 .../operations/SqlDdlToOperationConverterTest.java | 127 +++++++++-----
 ...erializedTableNodeToOperationConverterTest.java | 136 +++++++++++++--
 .../catalog/TestFileSystemCatalogTest.java         |   4 +-
 30 files changed, 1154 insertions(+), 430 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/materialized-table/overview.md 
b/docs/content.zh/docs/dev/table/materialized-table/overview.md
index 9ce5e9ba34f..d0499bcf1c9 100644
--- a/docs/content.zh/docs/dev/table/materialized-table/overview.md
+++ b/docs/content.zh/docs/dev/table/materialized-table/overview.md
@@ -34,7 +34,9 @@ under the License.
 
 ## 数据新鲜度
 
-数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度内刷新。
+数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 
尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度目标内刷新。
+
+Data freshness is optional when creating a materialized table. If not 
specified, the system uses the default freshness based on the refresh mode: 
[materialized-table.default-freshness.continuous]({{< ref 
"docs/dev/table/config" >}}#materialized-table-default-freshness-continuous) 
(default: 3 minutes) for CONTINUOUS mode, or 
[materialized-table.default-freshness.full]({{< ref "docs/dev/table/config" 
>}}#materialized-table-default-freshness-full) (default: 1 hour) for FULL mode.
 
 数据新鲜度是物化表的一个关键属性,具有两个主要作用:
 - **确定刷新模式**:目前有持续模式和全量模式。关于如何确定刷新模式的详细信息,请参阅 
[materialized-table.refresh-mode.freshness-threshold]({{< ref 
"docs/dev/table/config" 
>}}#materialized-table-refresh-mode-freshness-threshold) 配置项。
@@ -59,4 +61,3 @@ under the License.
 ## Schema
 
 物化表的 `Schema` 定义与普通表相同,可以声明主键和分区字段。其列名和类型会从相应的查询中推导,用户无法手动指定。
-
diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md 
b/docs/content.zh/docs/dev/table/materialized-table/statements.md
index 6e0e641c968..a27bce3093f 100644
--- a/docs/content.zh/docs/dev/table/materialized-table/statements.md
+++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md
@@ -35,21 +35,21 @@ Flink SQL 目前支持以下物化表操作:
 
 ```
 CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
- 
+
 [ ([ <table_constraint> ]) ]
- 
+
 [COMMENT table_comment]
- 
+
 [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
- 
+
 [WITH (key1=val1, key2=val2, ...)]
- 
+
 FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
- 
+
 [REFRESH_MODE = { CONTINUOUS | FULL }]
- 
+
 AS <select_statement>
- 
+
 <table_constraint>:
   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
 ```
@@ -69,7 +69,7 @@ AS <select_statement>
 CREATE MATERIALIZED TABLE my_materialized_table
     PARTITIONED BY (ds)
     FRESHNESS = INTERVAL '1' HOUR
-    AS SELECT 
+    AS SELECT
         ds
     FROM
         ...
@@ -103,9 +103,11 @@ CREATE MATERIALIZED TABLE my_materialized_table
 
 `FRESHNESS` 用于指定物化表的数据新鲜度。
 
+`FRESHNESS` is optional. When omitted, the system uses the default freshness 
based on the refresh mode: `materialized-table.default-freshness.continuous` 
(default: 3 minutes) for CONTINUOUS mode, or 
`materialized-table.default-freshness.full` (default: 1 hour) for FULL mode.
+
 **数据新鲜度与刷新模式关系**
 
-数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" 
>}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref 
"docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。
+数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。When not specified, it uses the default value from 
configuration based on the refresh mode. 它有两个作用,首先通过[配置]({{< ref 
"docs/dev/table/config" 
>}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref 
"docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。
 
 **FRESHNESS 参数详解**
 
@@ -128,6 +130,22 @@ FRESHNESS = INTERVAL '1' HOUR
 FRESHNESS = INTERVAL '1' DAY
 ```
 
+**Default FRESHNESS Example:**
+(Assuming `materialized-table.default-freshness.continuous` is 3 minutes, 
`materialized-table.default-freshness.full` is 1 hour, and 
`materialized-table.refresh-mode.freshness-threshold` is 30 minutes)
+
+```sql
+-- FRESHNESS is omitted, uses the configured default of 3 minutes for 
CONTINUOUS mode
+-- The corresponding refresh pipeline is a streaming job with a checkpoint 
interval of 3 minutes
+CREATE MATERIALIZED TABLE my_materialized_table
+    AS SELECT * FROM source_table;
+
+-- FRESHNESS is omitted and FULL mode is explicitly specified, uses the 
configured default of 1 hour
+-- The corresponding refresh pipeline is a scheduled workflow with a schedule 
cycle of 1 hour
+CREATE MATERIALIZED TABLE my_materialized_table_full
+    REFRESH_MODE = FULL
+    AS SELECT * FROM source_table;
+```
+
 **不合法的 `FRESHNESS` 示例:**
 
 ```sql
@@ -147,6 +165,7 @@ FRESHNESS = INTERVAL '5' HOUR
 ```
 
 <span class="label label-danger">注意</span>
+- If FRESHNESS is not specified, the table will use the default freshness 
interval based on the refresh mode: 
`materialized-table.default-freshness.continuous` (default: 3 minutes) for 
CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 
hour) for FULL mode.
 - 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。
 - 在持续模式下,数据新鲜度和 `checkpoint` 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 `checkpoint` 
性能,建议[开启 Changelog]({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。
 - 在全量模式下,数据新鲜度会转换为 `cron` 表达式,因此目前仅支持在预定义时间间隔单位内的新鲜度间隔,这种设计确保了与 `cron` 
表达式语义的一致性。具体支持以下新鲜度间隔:
@@ -168,14 +187,14 @@ CREATE MATERIALIZED TABLE my_materialized_table
     FRESHNESS = INTERVAL '1' HOUR
     REFRESH_MODE = CONTINUOUS
     AS SELECT
-       ...    
+       ...
 
 -- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。
 CREATE MATERIALIZED TABLE my_materialized_table
     FRESHNESS = INTERVAL '10' MINUTE
     REFRESH_MODE = FULL
     AS SELECT
-       ...    
+       ...
 ```
 
 ## AS <select_statement>
@@ -204,22 +223,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_continuous
         'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
     )
     FRESHNESS = INTERVAL '10' SECOND
-    AS 
-    SELECT 
+    AS
+    SELECT
         k.ds,
         k.user_id,
         COUNT(*) AS event_count,
         SUM(k.amount) AS total_amount,
         MAX(u.age) AS max_age
-    FROM 
+    FROM
         kafka_catalog.db1.kafka_table k
-    JOIN 
+    JOIN
         user_catalog.db1.user_table u
-    ON 
+    ON
         k.user_id = u.user_id
-    WHERE 
+    WHERE
         k.event_type = 'purchase'
-    GROUP BY 
+    GROUP BY
         k.ds, k.user_id
 ```
 
@@ -233,22 +252,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
         'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
     )
     FRESHNESS = INTERVAL '1' HOUR
-    AS 
-    SELECT 
+    AS
+    SELECT
         p.ds,
         p.product_id,
         p.product_name,
         AVG(s.sale_price) AS avg_sale_price,
         SUM(s.quantity) AS total_quantity
-    FROM 
+    FROM
         paimon_catalog.db1.product_table p
-    LEFT JOIN 
+    LEFT JOIN
         paimon_catalog.db1.sales_table s
-    ON 
+    ON
         p.product_id = s.product_id
-    WHERE 
+    WHERE
         p.category = 'electronics'
-    GROUP BY 
+    GROUP BY
         p.ds, p.product_id, p.product_name
 ```
 
@@ -276,7 +295,7 @@ ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name SUSPEND
 
 `SUSPEND` 用于暂停物化表的后台刷新管道。
 
-**示例:** 
+**示例:**
 
 ```sql
 -- 暂停前指定 SAVEPOINT 路径
@@ -297,7 +316,7 @@ ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name RESUME [WITH (key1=
 
 `RESUME` 用于恢复物化表的刷新管道。在恢复时,可以通过 `WITH` 
子句动态指定物化表的参数,该参数仅对当前恢复的刷新管道生效,并不会持久化到物化表中。
 
-**示例:** 
+**示例:**
 
 ```sql
 -- 恢复指定的物化表
@@ -358,21 +377,21 @@ ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name AS <select_statemen
 -- 原始物化表定义
 CREATE MATERIALIZED TABLE my_materialized_table
     FRESHNESS = INTERVAL '10' SECOND
-    AS 
-    SELECT 
+    AS
+    SELECT
         user_id,
         COUNT(*) AS event_count,
         SUM(amount) AS total_amount
-    FROM 
+    FROM
         kafka_catalog.db1.events
-    WHERE 
+    WHERE
         event_type = 'purchase'
-    GROUP BY 
+    GROUP BY
         user_id;
 
 -- 修改现有物化表的查询
 ALTER MATERIALIZED TABLE my_materialized_table
-AS SELECT 
+AS SELECT
     user_id,
     COUNT(*) AS event_count,
     SUM(amount) AS total_amount,
@@ -403,7 +422,3 @@ DROP MATERIALIZED TABLE [IF EXISTS] 
[catalog_name.][database_name.]table_name
 -- 删除指定的物化表
 DROP MATERIALIZED TABLE IF EXISTS my_materialized_table;
 ```
-
-
-
-
diff --git a/docs/content/docs/dev/table/materialized-table/overview.md 
b/docs/content/docs/dev/table/materialized-table/overview.md
index 7b78da97a9c..d41145e494d 100644
--- a/docs/content/docs/dev/table/materialized-table/overview.md
+++ b/docs/content/docs/dev/table/materialized-table/overview.md
@@ -34,7 +34,9 @@ Materialized Table encompass the following core concepts: 
Data Freshness, Refres
 
 ## Data Freshness
 
-Data freshness defines the maximum amount of time that the materialized 
table’s content should lag behind updates to the base tables. Freshness is not 
a guarantee. Instead, it is a target that Flink attempts to meet. Data in 
materialized table is refreshed as closely as possible within the freshness.
+Data freshness defines the maximum amount of time that the materialized 
table's content should lag behind updates to the base tables. Freshness is not 
a guarantee. Instead, it is a target that Flink attempts to meet. The data in 
materialized table is refreshed as closely as possible within the freshness 
target.
+
+Data freshness is optional when creating a materialized table. If not 
specified, the system uses the default freshness based on the refresh mode: 
[materialized-table.default-freshness.continuous]({{< ref 
"docs/dev/table/config" >}}#materialized-table-default-freshness-continuous) 
(default: 3 minutes) for CONTINUOUS mode, or 
[materialized-table.default-freshness.full]({{< ref "docs/dev/table/config" 
>}}#materialized-table-default-freshness-full) (default: 1 hour) for FULL mode.
 
 Data freshness is a crucial attribute of a materialized table, serving two 
main purposes:
 - **Determining the Refresh Mode**. Currently, there are CONTINUOUS and FULL 
modes. For details on how to determine the refresh mode, refer to the 
[materialized-table.refresh-mode.freshness-threshold]({{< ref 
"docs/dev/table/config" 
>}}#materialized-table-refresh-mode-freshness-threshold) configuration item.
diff --git a/docs/content/docs/dev/table/materialized-table/statements.md 
b/docs/content/docs/dev/table/materialized-table/statements.md
index a5c77453977..07784bb3ede 100644
--- a/docs/content/docs/dev/table/materialized-table/statements.md
+++ b/docs/content/docs/dev/table/materialized-table/statements.md
@@ -44,7 +44,7 @@ CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
 
 [WITH (key1=val1, key2=val2, ...)]
 
-FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
+[FRESHNESS = INTERVAL '<num>' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }]
 
 [REFRESH_MODE = { CONTINUOUS | FULL }]
 
@@ -69,7 +69,7 @@ AS <select_statement>
 CREATE MATERIALIZED TABLE my_materialized_table
     PARTITIONED BY (ds)
     FRESHNESS = INTERVAL '1' HOUR
-    AS SELECT 
+    AS SELECT
         ds
     FROM
         ...
@@ -103,9 +103,11 @@ As shown in the above example, we specified the 
date-formatter option for the `d
 
 `FRESHNESS` defines the data freshness of a materialized table.
 
+`FRESHNESS` is optional. When omitted, the system uses the default freshness 
based on the refresh mode: `materialized-table.default-freshness.continuous` 
(default: 3 minutes) for CONTINUOUS mode, or 
`materialized-table.default-freshness.full` (default: 1 hour) for FULL mode.
+
 **FRESHNESS and Refresh Mode Relationship**
 
-FRESHNESS defines the maximum amount of time that the materialized table’s 
content should lag behind updates to the base tables. It does two things, 
firstly it determines the [refresh mode]({{< ref 
"docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the 
materialized table through [configuration]({{< ref "docs/dev/table/config" 
>}}#materialized-table-refresh-mode-freshness-threshold), followed by 
determines the data refresh frequency to meet the actual data freshness 
requirements.
+FRESHNESS defines the maximum amount of time that the materialized table's 
content should lag behind updates to the base tables. When not specified, it 
uses the default value from configuration based on the refresh mode. It does 
two things: firstly it determines the [refresh mode]({{< ref 
"docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the 
materialized table through [configuration]({{< ref "docs/dev/table/config" 
>}}#materialized-table-refresh-mode-freshness-threshold), [...]
 
 **Explanation of FRESHNESS Parameter**
 
@@ -128,6 +130,22 @@ FRESHNESS = INTERVAL '1' HOUR
 FRESHNESS = INTERVAL '1' DAY
 ```
 
+**Default FRESHNESS Example:**
+(Assuming `materialized-table.default-freshness.continuous` is 3 minutes, 
`materialized-table.default-freshness.full` is 1 hour, and 
`materialized-table.refresh-mode.freshness-threshold` is 30 minutes)
+
+```sql
+-- FRESHNESS is omitted, uses the configured default of 3 minutes for 
CONTINUOUS mode
+-- The corresponding refresh pipeline is a streaming job with a checkpoint 
interval of 3 minutes
+CREATE MATERIALIZED TABLE my_materialized_table
+    AS SELECT * FROM source_table;
+
+-- FRESHNESS is omitted and FULL mode is explicitly specified, uses the 
configured default of 1 hour
+-- The corresponding refresh pipeline is a scheduled workflow with a schedule 
cycle of 1 hour
+CREATE MATERIALIZED TABLE my_materialized_table_full
+    REFRESH_MODE = FULL
+    AS SELECT * FROM source_table;
+```
+
 **Invalid `FRESHNESS` Examples:**
 
 ```sql
@@ -147,6 +165,7 @@ FRESHNESS = INTERVAL '5' HOUR
 ```
 
 <span class="label label-danger">Note</span>
+- If FRESHNESS is not specified, the table will use the default freshness 
interval based on the refresh mode: 
`materialized-table.default-freshness.continuous` (default: 3 minutes) for 
CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 
hour) for FULL mode.
 - The materialized table data will be refreshed as closely as possible within 
the defined freshness but cannot guarantee complete satisfaction.
 - In CONTINUOUS mode, setting a data freshness interval that is too short can 
impact job performance as it aligns with the checkpoint interval. To optimize 
checkpoint performance, consider [enabling-changelog]({{< ref 
"docs/ops/state/state_backends" >}}#incremental-checkpoints).
 - In FULL mode, data freshness must be translated into a cron expression, 
consequently, only freshness intervals within predefined time spans are 
presently accommodated, this design ensures alignment with cron's capabilities. 
Specifically, support for the following freshness:
@@ -168,14 +187,14 @@ CREATE MATERIALIZED TABLE my_materialized_table
     FRESHNESS = INTERVAL '1' HOUR
     REFRESH_MODE = CONTINUOUS
     AS SELECT
-       ...    
+       ...
 
 -- The refresh mode of the created materialized table is FULL, and the job's 
schedule cycle is 10 minutes.
 CREATE MATERIALIZED TABLE my_materialized_table
     FRESHNESS = INTERVAL '10' MINUTE
     REFRESH_MODE = FULL
     AS SELECT
-       ...    
+       ...
 ```
 
 ## AS <select_statement>
@@ -204,22 +223,21 @@ CREATE MATERIALIZED TABLE my_materialized_table_continuous
         'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
     )
     FRESHNESS = INTERVAL '10' SECOND
-    AS 
-    SELECT 
+    AS SELECT
         k.ds,
         k.user_id,
         COUNT(*) AS event_count,
         SUM(k.amount) AS total_amount,
         MAX(u.age) AS max_age
-    FROM 
+    FROM
         kafka_catalog.db1.kafka_table k
-    JOIN 
+    JOIN
         user_catalog.db1.user_table u
-    ON 
+    ON
         k.user_id = u.user_id
-    WHERE 
+    WHERE
         k.event_type = 'purchase'
-    GROUP BY 
+    GROUP BY
         k.ds, k.user_id
 ```
 
@@ -233,22 +251,21 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
         'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
     )
     FRESHNESS = INTERVAL '1' HOUR
-    AS 
-    SELECT 
+    AS SELECT
         p.ds,
         p.product_id,
         p.product_name,
         AVG(s.sale_price) AS avg_sale_price,
         SUM(s.quantity) AS total_quantity
-    FROM 
+    FROM
         paimon_catalog.db1.product_table p
-    LEFT JOIN 
+    LEFT JOIN
         paimon_catalog.db1.sales_table s
-    ON 
+    ON
         p.product_id = s.product_id
-    WHERE 
+    WHERE
         p.category = 'electronics'
-    GROUP BY 
+    GROUP BY
         p.ds, p.product_id, p.product_name
 ```
 
diff --git 
a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
 
b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
index d5829bf3224..f3cb3972a73 100644
--- 
a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
@@ -8,6 +8,18 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>materialized-table.default-freshness.continuous</h5><br> 
<span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">3 min</td>
+            <td>Duration</td>
+            <td>The default freshness interval for continuous refresh mode 
when the FRESHNESS clause is omitted in a materialized table definition.</td>
+        </tr>
+        <tr>
+            <td><h5>materialized-table.default-freshness.full</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">1 h</td>
+            <td>Duration</td>
+            <td>The default freshness interval for full refresh mode when the 
FRESHNESS clause is omitted in a materialized table definition.</td>
+        </tr>
         <tr>
             
<td><h5>materialized-table.refresh-mode.freshness-threshold</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">30 min</td>
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index 323e0ca17b7..bafb3650711 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -83,6 +83,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.URLClassLoader;
+import java.time.Duration;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -108,13 +109,12 @@ import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.P
 import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.SCHEDULE_TIME_DATE_FORMATTER_DEFAULT;
 import static 
org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;
 import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+import static 
org.apache.flink.table.catalog.IntervalFreshness.convertFreshnessToCron;
 import static 
org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil.WORKFLOW_SCHEDULER_PREFIX;
 import static 
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig;
 import static 
org.apache.flink.table.gateway.service.utils.Constants.CLUSTER_INFO;
 import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
 import static 
org.apache.flink.table.utils.DateTimeUtils.formatTimestampStringWithOffset;
-import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
-import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
 
 /** Manager is responsible for execute the {@link MaterializedTableOperation}. 
*/
 @Internal
@@ -166,8 +166,7 @@ public class MaterializedTableManager {
     public ResultFetcher callMaterializedTableOperation(
             OperationExecutor operationExecutor,
             OperationHandle handle,
-            MaterializedTableOperation op,
-            String statement) {
+            MaterializedTableOperation op) {
         if (op instanceof CreateMaterializedTableOperation) {
             return callCreateMaterializedTableOperation(
                     operationExecutor, handle, 
(CreateMaterializedTableOperation) op);
@@ -260,9 +259,8 @@ public class MaterializedTableManager {
         ResolvedCatalogMaterializedTable catalogMaterializedTable =
                 createMaterializedTableOperation.getCatalogMaterializedTable();
 
-        // convert duration to cron expression
-        String cronExpression =
-                
convertFreshnessToCron(catalogMaterializedTable.getDefinitionFreshness());
+        final IntervalFreshness freshness = 
catalogMaterializedTable.getDefinitionFreshness();
+        String cronExpression = convertFreshnessToCron(freshness);
         // create full refresh job
         CreateRefreshWorkflow createRefreshWorkflow =
                 new CreatePeriodicRefreshWorkflow(
@@ -306,7 +304,7 @@ public class MaterializedTableManager {
             OperationHandle handle,
             AlterMaterializedTableSuspendOperation op) {
         ObjectIdentifier tableIdentifier = op.getTableIdentifier();
-        CatalogMaterializedTable materializedTable =
+        ResolvedCatalogMaterializedTable materializedTable =
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
         // Initialization phase doesn't support resume operation.
@@ -420,7 +418,7 @@ public class MaterializedTableManager {
             OperationHandle handle,
             AlterMaterializedTableResumeOperation op) {
         ObjectIdentifier tableIdentifier = op.getTableIdentifier();
-        CatalogMaterializedTable catalogMaterializedTable =
+        ResolvedCatalogMaterializedTable catalogMaterializedTable =
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
         // Initialization phase doesn't support resume operation.
@@ -456,7 +454,7 @@ public class MaterializedTableManager {
             OperationExecutor operationExecutor,
             OperationHandle handle,
             ObjectIdentifier tableIdentifier,
-            CatalogMaterializedTable catalogMaterializedTable,
+            ResolvedCatalogMaterializedTable catalogMaterializedTable,
             Map<String, String> dynamicOptions) {
         ContinuousRefreshHandler refreshHandler =
                 deserializeContinuousHandler(
@@ -562,9 +560,10 @@ public class MaterializedTableManager {
                 .getSessionContext()
                 .getSessionConf()
                 .contains(CheckpointingOptions.CHECKPOINTING_INTERVAL)) {
-            customConfig.set(
-                    CheckpointingOptions.CHECKPOINTING_INTERVAL,
-                    catalogMaterializedTable.getFreshness());
+
+            final Duration freshness =
+                    
validateAndGetIntervalFreshness(catalogMaterializedTable).toDuration();
+            customConfig.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
freshness);
         }
 
         String insertStatement =
@@ -727,7 +726,7 @@ public class MaterializedTableManager {
                             SCHEDULE_TIME_DATE_FORMATTER_DEFAULT,
                             partFieldFormatter,
                             TimeZone.getTimeZone(localZoneId),
-                            -convertFreshnessToDuration(freshness).toMillis());
+                            -freshness.toDuration().toMillis());
             if (partFiledValue == null) {
                 throw new SqlExecutionException(
                         String.format(
@@ -818,7 +817,7 @@ public class MaterializedTableManager {
             OperationHandle handle,
             AlterMaterializedTableChangeOperation op) {
         ObjectIdentifier tableIdentifier = op.getTableIdentifier();
-        CatalogMaterializedTable oldMaterializedTable =
+        ResolvedCatalogMaterializedTable oldMaterializedTable =
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
         if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
@@ -995,12 +994,12 @@ public class MaterializedTableManager {
             }
         }
 
-        CatalogMaterializedTable materializedTable =
+        ResolvedCatalogMaterializedTable materializedTable =
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
-        CatalogMaterializedTable.RefreshMode refreshMode = 
materializedTable.getRefreshMode();
         CatalogMaterializedTable.RefreshStatus refreshStatus = 
materializedTable.getRefreshStatus();
         if (CatalogMaterializedTable.RefreshStatus.ACTIVATED == refreshStatus
                 || CatalogMaterializedTable.RefreshStatus.SUSPENDED == 
refreshStatus) {
+            CatalogMaterializedTable.RefreshMode refreshMode = 
materializedTable.getRefreshMode();
             if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode) {
                 deleteRefreshWorkflow(tableIdentifier, materializedTable);
             } else if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == 
refreshMode
@@ -1008,8 +1007,7 @@ public class MaterializedTableManager {
                 cancelContinuousRefreshJob(
                         operationExecutor, handle, tableIdentifier, 
materializedTable);
             }
-        } else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
-                == materializedTable.getRefreshStatus()) {
+        } else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING == 
refreshStatus) {
             throw new ValidationException(
                     String.format(
                             "Current refresh status of materialized table %s 
is initializing, skip the drop operation.",
@@ -1375,4 +1373,10 @@ public class MaterializedTableManager {
             return Optional.empty();
         }
     }
+
+    private static IntervalFreshness validateAndGetIntervalFreshness(
+            final CatalogMaterializedTable catalogMaterializedTable) {
+        return 
Optional.ofNullable(catalogMaterializedTable.getDefinitionFreshness())
+                .orElseThrow(() -> new SqlExecutionException("Freshness cannot 
be null"));
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index acf5ac7d772..5735abd6dee 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -556,8 +556,7 @@ public class OperationExecutor {
             return sessionContext
                     .getSessionState()
                     .materializedTableManager
-                    .callMaterializedTableOperation(
-                            this, handle, (MaterializedTableOperation) op, 
statement);
+                    .callMaterializedTableOperation(this, handle, 
(MaterializedTableOperation) op);
         } else {
             return callOperation(tableEnv, handle, op);
         }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 2e2499e75ae..de73723176b 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -142,7 +143,6 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
                                 Column.physical("pv", 
DataTypes.INT().notNull())));
 
         
assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
-        
assertThat(actualMaterializedTable.getFreshness()).isEqualTo(Duration.ofSeconds(30));
         assertThat(actualMaterializedTable.getLogicalRefreshMode())
                 
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
         assertThat(actualMaterializedTable.getRefreshMode())
@@ -178,6 +178,191 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
                 ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
+    @Test
+    void testCreateMaterializedTableInFullModeWithoutFreshness() throws 
Exception {
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " REFRESH_MODE = FULL\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // validate materialized table: schema, refresh mode, refresh status, 
refresh handler,
+        // doesn't check the data because it generates randomly.
+        ResolvedCatalogMaterializedTable actualMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Expected schema
+        ResolvedSchema expectedSchema =
+                ResolvedSchema.of(
+                        Arrays.asList(
+                                Column.physical("user_id", DataTypes.BIGINT()),
+                                Column.physical("shop_id", DataTypes.BIGINT()),
+                                Column.physical("ds", DataTypes.STRING()),
+                                Column.physical("payed_buy_fee_sum", 
DataTypes.BIGINT()),
+                                Column.physical("pv", 
DataTypes.INT().notNull())));
+
+        
assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
+        assertThat(actualMaterializedTable.getLogicalRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);
+        
assertThat(actualMaterializedTable.getRefreshMode()).isEqualTo(RefreshMode.FULL);
+        assertThat(actualMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+        
assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
+        
assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
+
+        // verify refresh handler
+        byte[] serializedHandler = 
actualMaterializedTable.getSerializedRefreshHandler();
+        EmbeddedRefreshHandler embeddedRefreshHandler =
+                EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+                        serializedHandler, getClass().getClassLoader());
+        assertThat(embeddedRefreshHandler.getWorkflowName())
+                .isEqualTo(
+                        "quartz_job_"
+                                + ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "users_shops")
+                                        .asSerializableString());
+
+        EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+                SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+                        .getSqlGatewayRestEndpoint()
+                        .getQuartzScheduler();
+        JobKey jobKey =
+                new JobKey(
+                        embeddedRefreshHandler.getWorkflowName(),
+                        embeddedRefreshHandler.getWorkflowGroup());
+
+        // verify the job is created
+        
assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();
+
+        // verify initialization conf
+        JobDetail jobDetail = 
embeddedWorkflowScheduler.getQuartzScheduler().getJobDetail(jobKey);
+        String workflowJsonStr = 
jobDetail.getJobDataMap().getString(WORKFLOW_INFO);
+        WorkflowInfo workflowInfo = fromJson(workflowJsonStr, 
WorkflowInfo.class);
+        assertThat(workflowInfo.getInitConfig())
+                .containsEntry("k1", "v1")
+                .containsEntry("k2", "v2")
+                .containsKey("sql-gateway.endpoint.rest.address")
+                .containsKey("sql-gateway.endpoint.rest.port")
+                .containsKey("table.catalog-store.kind")
+                .containsKey("table.catalog-store.file.path")
+                .doesNotContainKey(WORKFLOW_SCHEDULER_TYPE.key())
+                .doesNotContainKey(RESOURCES_DOWNLOAD_DIR.key());
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void 
testCreateMaterializedTableInContinuousModeWithoutFreshnessAndRefreshMode()
+            throws Exception {
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // validate materialized table: schema, refresh mode, refresh status, 
refresh handler,
+        // doesn't check the data because it generates randomly.
+        ResolvedCatalogMaterializedTable actualMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Expected schema
+        ResolvedSchema expectedSchema =
+                ResolvedSchema.of(
+                        Arrays.asList(
+                                Column.physical("user_id", DataTypes.BIGINT()),
+                                Column.physical("shop_id", DataTypes.BIGINT()),
+                                Column.physical("ds", DataTypes.STRING()),
+                                Column.physical("payed_buy_fee_sum", 
DataTypes.BIGINT()),
+                                Column.physical("pv", 
DataTypes.INT().notNull())));
+
+        
assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
+        assertThat(actualMaterializedTable.getDefinitionFreshness())
+                .isEqualTo(IntervalFreshness.ofMinute("3"));
+        assertThat(actualMaterializedTable.getLogicalRefreshMode())
+                
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+        assertThat(actualMaterializedTable.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+        assertThat(actualMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+        
assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
+        
assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
+
+        ContinuousRefreshHandler activeRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        actualMaterializedTable.getSerializedRefreshHandler(),
+                        getClass().getClassLoader());
+
+        waitUntilAllTasksAreRunning(
+                restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
+
+        // verify the background job is running
+        String describeJobDDL = String.format("DESCRIBE JOB '%s'", 
activeRefreshHandler.getJobId());
+        OperationHandle describeJobHandle =
+                service.executeStatement(sessionHandle, describeJobDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, describeJobHandle);
+        List<RowData> jobResults = fetchAllResults(service, sessionHandle, 
describeJobHandle);
+        
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
+
+        // get checkpoint interval
+        long checkpointInterval =
+                getCheckpointIntervalConfig(restClusterClient, 
activeRefreshHandler.getJobId());
+
+        // default checkpoint interval is 3 minutes
+        final int expectedCheckpointInterval = 60 * 3 * 1000;
+        assertThat(checkpointInterval).isEqualTo(expectedCheckpointInterval);
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
     @Test
     void 
testCreateMaterializedTableInContinuousModeWithCustomCheckpointInterval()
             throws Exception {
@@ -348,7 +533,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
     }
 
     @Test
-    void testCreateMaterializedTableFailedInInContinuousMode() throws 
Exception {
+    void testCreateMaterializedTableFailedInInContinuousMode() {
         // create a materialized table with invalid SQL
         String materializedTableDDL =
                 "CREATE MATERIALIZED TABLE users_shops"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 5717da2e51f..9200c1dc2ac 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1907,16 +1907,18 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
         <WITH>
         propertyList = Properties()
     ]
-    <FRESHNESS> <EQ>
-    freshness = Expression(ExprContext.ACCEPT_NON_QUERY)
-    {
-        if (!(freshness instanceof SqlIntervalLiteral))
+    [
+        <FRESHNESS> <EQ>
+        freshness = Expression(ExprContext.ACCEPT_NON_QUERY)
         {
-            throw SqlUtil.newContextException(
-            getPos(),
-            ParserResource.RESOURCE.unsupportedFreshnessType());
+            if (!(freshness instanceof SqlIntervalLiteral))
+            {
+                throw SqlUtil.newContextException(
+                getPos(),
+                ParserResource.RESOURCE.unsupportedFreshnessType());
+            }
         }
-    }
+    ]
     [
         <REFRESH_MODE> <EQ>
         (
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index 5a9245c5dd0..8c7fe2d5089 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -61,7 +61,7 @@ public class SqlCreateMaterializedTable extends SqlCreate {
 
     private final SqlNodeList propertyList;
 
-    private final SqlIntervalLiteral freshness;
+    private final @Nullable SqlIntervalLiteral freshness;
 
     private final @Nullable SqlLiteral refreshMode;
 
@@ -75,7 +75,7 @@ public class SqlCreateMaterializedTable extends SqlCreate {
             @Nullable SqlDistribution distribution,
             SqlNodeList partitionKeyList,
             SqlNodeList propertyList,
-            SqlIntervalLiteral freshness,
+            @Nullable SqlIntervalLiteral freshness,
             @Nullable SqlLiteral refreshMode,
             SqlNode asQuery) {
         super(OPERATOR, pos, false, false);
@@ -86,7 +86,7 @@ public class SqlCreateMaterializedTable extends SqlCreate {
         this.partitionKeyList =
                 requireNonNull(partitionKeyList, "partitionKeyList should not 
be null");
         this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
-        this.freshness = requireNonNull(freshness, "freshness should not be 
null");
+        this.freshness = freshness;
         this.refreshMode = refreshMode;
         this.asQuery = requireNonNull(asQuery, "asQuery should not be null");
     }
@@ -136,12 +136,14 @@ public class SqlCreateMaterializedTable extends SqlCreate 
{
         return propertyList;
     }
 
+    @Nullable
     public SqlIntervalLiteral getFreshness() {
         return freshness;
     }
 
-    public Optional<SqlLiteral> getRefreshMode() {
-        return Optional.ofNullable(refreshMode);
+    @Nullable
+    public SqlLiteral getRefreshMode() {
+        return refreshMode;
     }
 
     public SqlNode getAsQuery() {
@@ -195,10 +197,12 @@ public class SqlCreateMaterializedTable extends SqlCreate 
{
             writer.endList(withFrame);
         }
 
-        writer.newlineAndIndent();
-        writer.keyword("FRESHNESS");
-        writer.keyword("=");
-        freshness.unparse(writer, leftPrec, rightPrec);
+        if (freshness != null) {
+            writer.newlineAndIndent();
+            writer.keyword("FRESHNESS");
+            writer.keyword("=");
+            freshness.unparse(writer, leftPrec, rightPrec);
+        }
 
         if (refreshMode != null) {
             writer.newlineAndIndent();
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
index 0e58ffbb546..d089ecc8079 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
@@ -63,26 +63,6 @@ class MaterializedTableStatementParserTest {
                         + "AS SELECT a, b, h, t m FROM source";
         sql(sql).fails(
                         "MATERIALIZED TABLE only supports define interval type 
FRESHNESS, please refer to the materialized table document.");
-
-        final String sql2 =
-                "CREATE MATERIALIZED TABLE tbl1\n"
-                        + "(\n"
-                        + "   PRIMARY KEY (a, b)\n"
-                        + ")\n"
-                        + "COMMENT 'table comment'\n"
-                        + "PARTITIONED BY (a, h)\n"
-                        + "WITH (\n"
-                        + "  'group.id' = 'latest', \n"
-                        + "  'kafka.topic' = 'log.test'\n"
-                        + ")\n"
-                        + "^AS^ SELECT a, b, h, t m FROM source";
-
-        sql(sql2)
-                .fails(
-                        "Encountered \"AS\" at line 11, column 1.\n"
-                                + "Was expecting:\n"
-                                + "    \"FRESHNESS\" ...\n"
-                                + "    ");
     }
 
     @Test
@@ -395,7 +375,8 @@ class MaterializedTableStatementParserTest {
         return Stream.of(
                 Arguments.of(fullExample()),
                 Arguments.of(withoutTableConstraint()),
-                Arguments.of(withPrimaryKey()));
+                Arguments.of(withPrimaryKey()),
+                Arguments.of(withoutFreshness()));
     }
 
     private static Map.Entry<String, String> fullExample() {
@@ -411,7 +392,7 @@ class MaterializedTableStatementParserTest {
                         + "  'group.id' = 'latest', \n"
                         + "  'kafka.topic' = 'log.test'\n"
                         + ")\n"
-                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTES\n"
                         + "AS SELECT a, b, h, t m FROM source",
                 "CREATE MATERIALIZED TABLE `TBL1`\n"
                         + "(\n"
@@ -456,7 +437,7 @@ class MaterializedTableStatementParserTest {
         return new AbstractMap.SimpleEntry<>(
                 "CREATE MATERIALIZED TABLE tbl1\n"
                         + "COMMENT 'table comment'\n"
-                        + "FRESHNESS = INTERVAL '3' DAY\n"
+                        + "FRESHNESS = INTERVAL '3' DAYS\n"
                         + "REFRESH_MODE = FULL\n"
                         + "AS SELECT a, b, h, t m FROM source",
                 "CREATE MATERIALIZED TABLE `TBL1`\n"
@@ -467,4 +448,28 @@ class MaterializedTableStatementParserTest {
                         + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
                         + "FROM `SOURCE`");
     }
+
+    private static Map.Entry<String, String> withoutFreshness() {
+        return new AbstractMap.SimpleEntry<>(
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(\n"
+                        + "   PRIMARY KEY (a, b)\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "AS SELECT a, b, h, t m FROM source",
+                "CREATE MATERIALIZED TABLE `TBL1`\n"
+                        + "(\n"
+                        + "  PRIMARY KEY (`A`, `B`)\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest',\n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "AS\n"
+                        + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
+                        + "FROM `SOURCE`");
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
index 56cebedd191..e387705e1a3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
@@ -58,4 +58,20 @@ public class MaterializedTableConfigOptions {
                     .withDescription(
                             "Specifies the time partition formatter for the 
partitioned materialized table, where '#' denotes a string-based partition 
field name."
                                     + " This serves as a hint to the framework 
regarding which partition to refresh in full refresh mode.");
+
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<Duration> 
MATERIALIZED_TABLE_DEFAULT_FRESHNESS_CONTINUOUS =
+            key("materialized-table.default-freshness.continuous")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(3))
+                    .withDescription(
+                            "The default freshness interval for continuous 
refresh mode when the FRESHNESS clause is omitted in a materialized table 
definition.");
+
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<Duration> 
MATERIALIZED_TABLE_DEFAULT_FRESHNESS_FULL =
+            key("materialized-table.default-freshness.full")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    .withDescription(
+                            "The default freshness interval for full refresh 
mode when the FRESHNESS clause is omitted in a materialized table definition.");
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 24ebe07e9ea..bc870857807 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -27,8 +27,10 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.MaterializedTableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -65,6 +67,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -83,6 +86,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static java.lang.String.format;
+import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -124,13 +128,16 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
 
     private SqlFactory sqlFactory;
 
+    private final MaterializedTableEnricher materializedTableEnricher;
+
     private CatalogManager(
             String defaultCatalogName,
             Catalog defaultCatalog,
             DataTypeFactory typeFactory,
             List<CatalogModificationListener> catalogModificationListeners,
             CatalogStoreHolder catalogStoreHolder,
-            SqlFactory sqlFactory) {
+            SqlFactory sqlFactory,
+            MaterializedTableEnricher materializedTableEnricher) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
                 "Default catalog name cannot be null or empty");
@@ -153,6 +160,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         this.catalogStoreHolder = catalogStoreHolder;
 
         this.sqlFactory = sqlFactory;
+        this.materializedTableEnricher =
+                checkNotNull(materializedTableEnricher, 
"MaterializedTableEnricher cannot be null");
     }
 
     @VisibleForTesting
@@ -190,6 +199,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
 
         private SqlFactory sqlFactory = DefaultSqlFactory.INSTANCE;
 
+        private MaterializedTableEnricher materializedTableEnricher;
+
         public Builder classLoader(ClassLoader classLoader) {
             this.classLoader = classLoader;
             return this;
@@ -232,6 +243,12 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
             return this;
         }
 
+        public Builder materializedTableEnricher(
+                MaterializedTableEnricher materializedTableEnricher) {
+            this.materializedTableEnricher = materializedTableEnricher;
+            return this;
+        }
+
         public CatalogManager build() {
             checkNotNull(classLoader, "Class loader cannot be null");
             checkNotNull(config, "Config cannot be null");
@@ -249,7 +266,29 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                                             : 
executionConfig.getSerializerConfig()),
                     catalogModificationListeners,
                     catalogStoreHolder,
-                    sqlFactory);
+                    sqlFactory,
+                    materializedTableEnricher != null
+                            ? materializedTableEnricher
+                            : createDefaultMaterializedTableEnricher());
+        }
+
+        private MaterializedTableEnricher 
createDefaultMaterializedTableEnricher() {
+            final Duration defaultDurationContinuous =
+                    catalogStoreHolder
+                            .config()
+                            .get(
+                                    MaterializedTableConfigOptions
+                                            
.MATERIALIZED_TABLE_DEFAULT_FRESHNESS_CONTINUOUS);
+            final Duration defaultDurationFull =
+                    catalogStoreHolder
+                            .config()
+                            .get(
+                                    MaterializedTableConfigOptions
+                                            
.MATERIALIZED_TABLE_DEFAULT_FRESHNESS_FULL);
+            final Duration freshnessThreshold =
+                    
catalogStoreHolder.config().get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD);
+            return DefaultMaterializedTableEnricher.create(
+                    defaultDurationContinuous, defaultDurationFull, 
freshnessThreshold);
         }
     }
 
@@ -1869,6 +1908,11 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
 
         final ResolvedSchema resolvedSchema = 
table.getUnresolvedSchema().resolve(schemaResolver);
 
+        final MaterializedTableEnrichmentResult enrichmentResult =
+                this.materializedTableEnricher.enrich(table);
+        IntervalFreshness freshness = enrichmentResult.getFreshness();
+        RefreshMode resolvedRefreshMode = enrichmentResult.getRefreshMode();
+
         // Validate partition keys are included in physical columns
         final List<String> physicalColumns =
                 resolvedSchema.getColumns().stream()
@@ -1888,7 +1932,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                             }
                         });
 
-        return new ResolvedCatalogMaterializedTable(table, resolvedSchema);
+        return new ResolvedCatalogMaterializedTable(
+                table, resolvedSchema, resolvedRefreshMode, freshness);
     }
 
     /** Resolves a {@link CatalogView} to a validated {@link 
ResolvedCatalogView}. */
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
index 872dbb4ccd7..f567658c7c1 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -386,6 +386,8 @@ class ShowCreateUtilTest {
                         .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
                         .refreshStatus(RefreshStatus.ACTIVATED)
                         .build(),
-                resolvedSchema);
+                resolvedSchema,
+                refreshMode,
+                freshness);
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index 9b8ab94a421..2ef20f7f80c 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -197,8 +197,8 @@ class CatalogBaseTableResolutionTest {
 
         assertThat(resolvedMaterializedTable.getDefinitionQuery())
                 .isEqualTo(materializedTable.getDefinitionQuery());
-        assertThat(resolvedMaterializedTable.getFreshness())
-                .isEqualTo(materializedTable.getFreshness());
+        assertThat(resolvedMaterializedTable.getDefinitionFreshness())
+                .isEqualTo(materializedTable.getDefinitionFreshness());
         assertThat(resolvedMaterializedTable.getLogicalRefreshMode())
                 .isEqualTo(materializedTable.getLogicalRefreshMode());
         assertThat(resolvedMaterializedTable.getRefreshMode())
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
index a2bbeba1c59..f0691d6252c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
@@ -30,8 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
-
 /**
  * Represents the unresolved metadata of a materialized table in a {@link 
Catalog}.
  *
@@ -126,20 +124,27 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
      * Get the definition freshness of materialized table which is used to 
determine the physical
      * refresh mode.
      */
+    @Nullable
     IntervalFreshness getDefinitionFreshness();
 
     /**
      * Get the {@link Duration} value of materialized table definition 
freshness, it is converted
      * from {@link IntervalFreshness}.
+     *
+     * @deprecated use {@link #getDefinitionFreshness()} together with {@link
+     *     IntervalFreshness#toDuration()} instead.
      */
-    default Duration getFreshness() {
-        return convertFreshnessToDuration(getDefinitionFreshness());
+    @Deprecated
+    default @Nullable Duration getFreshness() {
+        final IntervalFreshness definitionFreshness = getDefinitionFreshness();
+        return definitionFreshness == null ? null : 
definitionFreshness.toDuration();
     }
 
     /** Get the logical refresh mode of materialized table. */
     LogicalRefreshMode getLogicalRefreshMode();
 
     /** Get the physical refresh mode of materialized table. */
+    @Nullable
     RefreshMode getRefreshMode();
 
     /** Get the refresh status of materialized table. */
@@ -205,9 +210,9 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
         private Map<String, String> options = Collections.emptyMap();
         private @Nullable Long snapshot;
         private String definitionQuery;
-        private IntervalFreshness freshness;
+        private @Nullable IntervalFreshness freshness;
         private LogicalRefreshMode logicalRefreshMode;
-        private RefreshMode refreshMode;
+        private @Nullable RefreshMode refreshMode;
         private RefreshStatus refreshStatus;
         private @Nullable String refreshHandlerDescription;
         private @Nullable byte[] serializedRefreshHandler;
@@ -247,8 +252,8 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
             return this;
         }
 
-        public Builder freshness(IntervalFreshness freshness) {
-            this.freshness = Preconditions.checkNotNull(freshness, "Freshness 
must not be null.");
+        public Builder freshness(@Nullable IntervalFreshness freshness) {
+            this.freshness = freshness;
             return this;
         }
 
@@ -259,9 +264,8 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
             return this;
         }
 
-        public Builder refreshMode(RefreshMode refreshMode) {
-            this.refreshMode =
-                    Preconditions.checkNotNull(refreshMode, "Refresh mode must 
not be null.");
+        public Builder refreshMode(@Nullable RefreshMode refreshMode) {
+            this.refreshMode = refreshMode;
             return this;
         }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
index 9378e862ab2..265e16fde93 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
@@ -60,9 +60,9 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
             Map<String, String> options,
             @Nullable Long snapshot,
             String definitionQuery,
-            IntervalFreshness freshness,
+            @Nullable IntervalFreshness freshness,
             LogicalRefreshMode logicalRefreshMode,
-            RefreshMode refreshMode,
+            @Nullable RefreshMode refreshMode,
             RefreshStatus refreshStatus,
             @Nullable String refreshHandlerDescription,
             @Nullable byte[] serializedRefreshHandler) {
@@ -73,10 +73,10 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
         this.options = checkNotNull(options, "Options must not be null.");
         this.snapshot = snapshot;
         this.definitionQuery = checkNotNull(definitionQuery, "Definition query 
must not be null.");
-        this.freshness = checkNotNull(freshness, "Freshness must not be 
null.");
+        this.freshness = freshness;
         this.logicalRefreshMode =
                 checkNotNull(logicalRefreshMode, "Logical refresh mode must 
not be null.");
-        this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be 
null.");
+        this.refreshMode = refreshMode;
         this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must 
not be null.");
         this.refreshHandlerDescription = refreshHandlerDescription;
         this.serializedRefreshHandler = serializedRefreshHandler;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
new file mode 100644
index 00000000000..c9858dab7d0
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+
+import java.time.Duration;
+
+/**
+ * Default implementation of {@link MaterializedTableEnricher}.
+ *
+ * <p>Applies default freshness values based on refresh mode and determines 
the physical refresh
+ * mode using freshness threshold comparison.
+ */
+@Internal
+public class DefaultMaterializedTableEnricher implements 
MaterializedTableEnricher {
+
+    private final IntervalFreshness defaultContinuousFreshness;
+    private final IntervalFreshness defaultFullFreshness;
+    private final Duration freshnessThreshold;
+
+    public static DefaultMaterializedTableEnricher create(
+            final Duration defaultContinuousFreshness,
+            final Duration defaultFullFreshness,
+            final Duration freshnessThreshold) {
+        final IntervalFreshness continuousFreshness =
+                IntervalFreshness.fromDuration(defaultContinuousFreshness);
+        final IntervalFreshness fullFreshness =
+                IntervalFreshness.fromDuration(defaultFullFreshness);
+        return new DefaultMaterializedTableEnricher(
+                continuousFreshness, fullFreshness, freshnessThreshold);
+    }
+
+    private DefaultMaterializedTableEnricher(
+            final IntervalFreshness defaultContinuousFreshness,
+            final IntervalFreshness defaultFullFreshness,
+            final Duration freshnessThreshold) {
+        this.defaultContinuousFreshness = defaultContinuousFreshness;
+        this.defaultFullFreshness = defaultFullFreshness;
+        this.freshnessThreshold = freshnessThreshold;
+    }
+
+    @Override
+    public MaterializedTableEnrichmentResult enrich(final 
CatalogMaterializedTable table) {
+        // Determine the final freshness value
+        final IntervalFreshness finalFreshness = deriveFreshness(table);
+
+        // Derive the final refresh mode using the freshness and threshold
+        final RefreshMode finalRefreshMode =
+                deriveRefreshMode(
+                        table.getLogicalRefreshMode(), finalFreshness, 
freshnessThreshold);
+
+        return new MaterializedTableEnrichmentResult(finalFreshness, 
finalRefreshMode);
+    }
+
+    /**
+     * Returns user-specified freshness or applies mode-specific defaults: 
FULL mode uses {@code
+     * defaultFullFreshness}, others use {@code defaultContinuousFreshness}.
+     */
+    private IntervalFreshness deriveFreshness(final CatalogMaterializedTable 
table) {
+        final IntervalFreshness finalFreshness;
+        if (table.getDefinitionFreshness() != null) {
+            // User provided an explicit freshness, use it
+            finalFreshness = table.getDefinitionFreshness();
+        } else {
+            // User omitted freshness, choose default based on logical mode
+            if (table.getLogicalRefreshMode() == LogicalRefreshMode.FULL) {
+                finalFreshness = defaultFullFreshness;
+            } else {
+                // For AUTOMATIC or CONTINUOUS modes, use the continuous 
default
+                finalFreshness = defaultContinuousFreshness;
+            }
+        }
+        return finalFreshness;
+    }
+
+    /**
+     * Determines physical refresh mode: CONTINUOUS if freshness is below 
threshold or explicitly
+     * requested, otherwise FULL (validated for cron conversion).
+     */
+    public RefreshMode deriveRefreshMode(
+            LogicalRefreshMode logicalRefreshMode,
+            IntervalFreshness freshness,
+            Duration threshold) {
+        if (logicalRefreshMode != LogicalRefreshMode.FULL) {
+            final Duration definedFreshness = freshness.toDuration();
+            if (logicalRefreshMode == LogicalRefreshMode.CONTINUOUS
+                    || definedFreshness.compareTo(threshold) < 0) {
+                return RefreshMode.CONTINUOUS;
+            }
+        }
+
+        // Validate that freshness can be converted to cron for FULL mode
+        IntervalFreshness.validateFreshnessForCron(freshness);
+        return RefreshMode.FULL;
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
index fdbb8e6139f..6548b149663 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
@@ -19,7 +19,10 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
 
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.Objects;
 
 /**
@@ -31,35 +34,186 @@ import java.util.Objects;
 @PublicEvolving
 public class IntervalFreshness {
 
-    private final String interval;
+    private static final String SECOND_CRON_EXPRESSION_TEMPLATE = "0/%s * * * 
* ? *";
+    private static final String MINUTE_CRON_EXPRESSION_TEMPLATE = "0 0/%s * * 
* ? *";
+    private static final String HOUR_CRON_EXPRESSION_TEMPLATE = "0 0 0/%s * * 
? *";
+    private static final String ONE_DAY_CRON_EXPRESSION_TEMPLATE = "0 0 0 * * 
? *";
+    private static final long SECOND_CRON_UPPER_BOUND = 60;
+    private static final long MINUTE_CRON_UPPER_BOUND = 60;
+    private static final long HOUR_CRON_UPPER_BOUND = 24;
+
+    private final int interval;
     private final TimeUnit timeUnit;
 
-    private IntervalFreshness(String interval, TimeUnit timeUnit) {
+    private IntervalFreshness(int interval, TimeUnit timeUnit) {
         this.interval = interval;
         this.timeUnit = timeUnit;
     }
 
     public static IntervalFreshness of(String interval, TimeUnit timeUnit) {
-        return new IntervalFreshness(interval, timeUnit);
+        final int validateIntervalInput = validateIntervalInput(interval);
+        return new IntervalFreshness(validateIntervalInput, timeUnit);
+    }
+
+    private static int validateIntervalInput(final String interval) {
+        final int parsedInt;
+        try {
+            parsedInt = Integer.parseInt(interval);
+        } catch (Exception e) {
+            final String errorMessage =
+                    String.format(
+                            "The freshness interval currently only supports 
positive integer type values. But was: %s",
+                            interval);
+            throw new ValidationException(errorMessage, e);
+        }
+
+        if (parsedInt <= 0) {
+            final String errorMessage =
+                    String.format(
+                            "The freshness interval currently only supports 
positive integer type values. But was: %s",
+                            interval);
+            throw new ValidationException(errorMessage);
+        }
+        return parsedInt;
     }
 
     public static IntervalFreshness ofSecond(String interval) {
-        return new IntervalFreshness(interval, TimeUnit.SECOND);
+        return IntervalFreshness.of(interval, TimeUnit.SECOND);
     }
 
     public static IntervalFreshness ofMinute(String interval) {
-        return new IntervalFreshness(interval, TimeUnit.MINUTE);
+        return IntervalFreshness.of(interval, TimeUnit.MINUTE);
     }
 
     public static IntervalFreshness ofHour(String interval) {
-        return new IntervalFreshness(interval, TimeUnit.HOUR);
+        return IntervalFreshness.of(interval, TimeUnit.HOUR);
     }
 
     public static IntervalFreshness ofDay(String interval) {
-        return new IntervalFreshness(interval, TimeUnit.DAY);
+        return IntervalFreshness.of(interval, TimeUnit.DAY);
+    }
+
+    /**
+     * Validates that the given freshness can be converted to a cron 
expression in full refresh
+     * mode. Since freshness and cron expression cannot be converted 
equivalently, there are
+     * currently only a limited patterns of freshness that are supported.
+     *
+     * @param intervalFreshness the freshness to validate
+     * @throws ValidationException if the freshness cannot be converted to a 
valid cron expression
+     */
+    public static void validateFreshnessForCron(IntervalFreshness 
intervalFreshness) {
+        switch (intervalFreshness.getTimeUnit()) {
+            case SECOND:
+                validateCronConstraints(intervalFreshness, 
SECOND_CRON_UPPER_BOUND);
+                break;
+            case MINUTE:
+                validateCronConstraints(intervalFreshness, 
MINUTE_CRON_UPPER_BOUND);
+                break;
+            case HOUR:
+                validateCronConstraints(intervalFreshness, 
HOUR_CRON_UPPER_BOUND);
+                break;
+            case DAY:
+                validateDayConstraints(intervalFreshness);
+                break;
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Unknown freshness time unit: %s.",
+                                intervalFreshness.getTimeUnit()));
+        }
+    }
+
+    /**
+     * Converts the freshness of materialized table to cron expression in full 
refresh mode. The
+     * freshness must first pass validation via {@link 
#validateFreshnessForCron}.
+     *
+     * @param intervalFreshness the freshness to convert
+     * @return the corresponding cron expression
+     * @throws ValidationException if the freshness cannot be converted to a 
valid cron expression
+     */
+    public static String convertFreshnessToCron(IntervalFreshness 
intervalFreshness) {
+        // First validate that conversion is possible
+        validateFreshnessForCron(intervalFreshness);
+
+        // Then perform the conversion
+        switch (intervalFreshness.getTimeUnit()) {
+            case SECOND:
+                return String.format(
+                        SECOND_CRON_EXPRESSION_TEMPLATE, 
intervalFreshness.getIntervalInt());
+            case MINUTE:
+                return String.format(
+                        MINUTE_CRON_EXPRESSION_TEMPLATE, 
intervalFreshness.getIntervalInt());
+            case HOUR:
+                return String.format(
+                        HOUR_CRON_EXPRESSION_TEMPLATE, 
intervalFreshness.getIntervalInt());
+            case DAY:
+                return ONE_DAY_CRON_EXPRESSION_TEMPLATE;
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Unknown freshness time unit: %s.",
+                                intervalFreshness.getTimeUnit()));
+        }
     }
 
+    private static void validateCronConstraints(
+            IntervalFreshness intervalFreshness, long cronUpperBound) {
+        int interval = intervalFreshness.getIntervalInt();
+        TimeUnit timeUnit = intervalFreshness.getTimeUnit();
+        // Freshness must be less than cronUpperBound for corresponding time 
unit when convert it
+        // to cron expression
+        if (interval >= cronUpperBound) {
+            throw new ValidationException(
+                    String.format(
+                            "In full refresh mode, freshness must be less than 
%s when the time unit is %s.",
+                            cronUpperBound, timeUnit));
+        }
+        // Freshness must be factors of cronUpperBound for corresponding time 
unit
+        if (cronUpperBound % interval != 0) {
+            throw new ValidationException(
+                    String.format(
+                            "In full refresh mode, only freshness that are 
factors of %s are currently supported when the time unit is %s.",
+                            cronUpperBound, timeUnit));
+        }
+    }
+
+    private static void validateDayConstraints(IntervalFreshness 
intervalFreshness) {
+        // Since the number of days in each month is different, only one day 
of freshness is
+        // currently supported when the time unit is DAY
+        int interval = intervalFreshness.getIntervalInt();
+        if (interval > 1) {
+            throw new ValidationException(
+                    "In full refresh mode, freshness must be 1 when the time 
unit is DAY.");
+        }
+    }
+
+    /**
+     * Creates an IntervalFreshness from a Duration, choosing the most 
appropriate time unit.
+     * Prefers larger units when possible (e.g., 60 seconds → 1 minute).
+     */
+    public static IntervalFreshness fromDuration(Duration duration) {
+        if (duration.equals(duration.truncatedTo(ChronoUnit.DAYS))) {
+            return new IntervalFreshness((int) duration.toDays(), 
TimeUnit.DAY);
+        }
+        if (duration.equals(duration.truncatedTo(ChronoUnit.HOURS))) {
+            return new IntervalFreshness((int) duration.toHours(), 
TimeUnit.HOUR);
+        }
+        if (duration.equals(duration.truncatedTo(ChronoUnit.MINUTES))) {
+            return new IntervalFreshness((int) duration.toMinutes(), 
TimeUnit.MINUTE);
+        }
+
+        return new IntervalFreshness((int) duration.getSeconds(), 
TimeUnit.SECOND);
+    }
+
+    /**
+     * @deprecated Use {@link #getIntervalInt()} instead.
+     */
+    @Deprecated
     public String getInterval() {
+        return String.valueOf(interval);
+    }
+
+    public int getIntervalInt() {
         return interval;
     }
 
@@ -67,6 +221,21 @@ public class IntervalFreshness {
         return timeUnit;
     }
 
+    public Duration toDuration() {
+        switch (timeUnit) {
+            case SECOND:
+                return Duration.ofSeconds(interval);
+            case MINUTE:
+                return Duration.ofMinutes(interval);
+            case HOUR:
+                return Duration.ofHours(interval);
+            case DAY:
+                return Duration.ofDays(interval);
+            default:
+                throw new IllegalStateException("Unexpected value: " + 
timeUnit);
+        }
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java
new file mode 100644
index 00000000000..33cb9adf7b6
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Enricher interface for determining materialized table properties during 
catalog resolution.
+ *
+ * <p>This enricher resolves:
+ *
+ * <ul>
+ *   <li>Freshness intervals when not explicitly specified by the user
+ *   <li>Physical refresh modes (CONTINUOUS or FULL) based on logical 
preferences and configuration
+ * </ul>
+ *
+ * <p>Implementations can provide custom strategies tailored to different 
deployment environments or
+ * operational requirements.
+ */
+@Experimental
+public interface MaterializedTableEnricher {
+
+    /**
+     * Enriches a materialized table by determining its final freshness 
interval and refresh mode.
+     *
+     * @param catalogMaterializedTable the materialized table to enrich, which 
may have null
+     *     freshness
+     * @return the enrichment result with resolved, non-null freshness and 
refresh mode
+     */
+    MaterializedTableEnrichmentResult enrich(CatalogMaterializedTable 
catalogMaterializedTable);
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
new file mode 100644
index 00000000000..dc19d82ea27
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+
+/**
+ * Result of the enrichment process containing the resolved freshness interval 
and physical refresh
+ * mode for a {@link CatalogMaterializedTable}.
+ *
+ * <p>This object is returned by {@link MaterializedTableEnricher} after 
determining the final,
+ * non-null values for both properties.
+ */
+@Experimental
+public class MaterializedTableEnrichmentResult {
+
+    private final IntervalFreshness freshness;
+    private final RefreshMode refreshMode;
+
+    public MaterializedTableEnrichmentResult(
+            final IntervalFreshness freshness, final RefreshMode refreshMode) {
+        this.freshness = freshness;
+        this.refreshMode = refreshMode;
+    }
+
+    public IntervalFreshness getFreshness() {
+        return freshness;
+    }
+
+    public RefreshMode getRefreshMode() {
+        return refreshMode;
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
index 82ad39d0fc3..6f0bf960b18 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.List;
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A validated {@link CatalogMaterializedTable} that is backed by the original 
metadata coming from
  * the {@link Catalog} but resolved by the framework.
@@ -43,13 +45,19 @@ public class ResolvedCatalogMaterializedTable
 
     private final ResolvedSchema resolvedSchema;
 
+    private final RefreshMode refreshMode;
+
+    private final IntervalFreshness freshness;
+
     public ResolvedCatalogMaterializedTable(
-            CatalogMaterializedTable origin, ResolvedSchema resolvedSchema) {
-        this.origin =
-                Preconditions.checkNotNull(
-                        origin, "Original catalog materialized table must not 
be null.");
-        this.resolvedSchema =
-                Preconditions.checkNotNull(resolvedSchema, "Resolved schema 
must not be null.");
+            CatalogMaterializedTable origin,
+            ResolvedSchema resolvedSchema,
+            RefreshMode refreshMode,
+            IntervalFreshness freshness) {
+        this.origin = checkNotNull(origin, "Original catalog materialized 
table must not be null.");
+        this.resolvedSchema = checkNotNull(resolvedSchema, "Resolved schema 
must not be null.");
+        this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be 
null.");
+        this.freshness = checkNotNull(freshness, "Freshness must not be 
null.");
     }
 
     @Override
@@ -65,12 +73,13 @@ public class ResolvedCatalogMaterializedTable
     @Override
     public CatalogBaseTable copy() {
         return new ResolvedCatalogMaterializedTable(
-                (CatalogMaterializedTable) origin.copy(), resolvedSchema);
+                (CatalogMaterializedTable) origin.copy(), resolvedSchema, 
refreshMode, freshness);
     }
 
     @Override
     public ResolvedCatalogMaterializedTable copy(Map<String, String> options) {
-        return new ResolvedCatalogMaterializedTable(origin.copy(options), 
resolvedSchema);
+        return new ResolvedCatalogMaterializedTable(
+                origin.copy(options), resolvedSchema, refreshMode, freshness);
     }
 
     @Override
@@ -80,7 +89,9 @@ public class ResolvedCatalogMaterializedTable
             byte[] serializedRefreshHandler) {
         return new ResolvedCatalogMaterializedTable(
                 origin.copy(refreshStatus, refreshHandlerDescription, 
serializedRefreshHandler),
-                resolvedSchema);
+                resolvedSchema,
+                refreshMode,
+                freshness);
     }
 
     @Override
@@ -124,8 +135,8 @@ public class ResolvedCatalogMaterializedTable
     }
 
     @Override
-    public IntervalFreshness getDefinitionFreshness() {
-        return origin.getDefinitionFreshness();
+    public @Nonnull IntervalFreshness getDefinitionFreshness() {
+        return freshness;
     }
 
     @Override
@@ -134,8 +145,8 @@ public class ResolvedCatalogMaterializedTable
     }
 
     @Override
-    public RefreshMode getRefreshMode() {
-        return origin.getRefreshMode();
+    public @Nonnull RefreshMode getRefreshMode() {
+        return refreshMode;
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java
deleted file mode 100644
index cd58bff4d91..00000000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.IntervalFreshness;
-
-import org.apache.commons.lang3.math.NumberUtils;
-
-import java.time.Duration;
-
-/** Utilities to {@link IntervalFreshness}. */
-@Internal
-public class IntervalFreshnessUtils {
-
-    private static final String SECOND_CRON_EXPRESSION_TEMPLATE = "0/%s * * * 
* ? *";
-    private static final String MINUTE_CRON_EXPRESSION_TEMPLATE = "0 0/%s * * 
* ? *";
-    private static final String HOUR_CRON_EXPRESSION_TEMPLATE = "0 0 0/%s * * 
? *";
-    private static final String ONE_DAY_CRON_EXPRESSION_TEMPLATE = "0 0 0 * * 
? *";
-
-    private static final long SECOND_CRON_UPPER_BOUND = 60;
-    private static final long MINUTE_CRON_UPPER_BOUND = 60;
-    private static final long HOUR_CRON_UPPER_BOUND = 24;
-
-    private IntervalFreshnessUtils() {}
-
-    @VisibleForTesting
-    static void validateIntervalFreshness(IntervalFreshness intervalFreshness) 
{
-        if (!NumberUtils.isParsable(intervalFreshness.getInterval())) {
-            throw new ValidationException(
-                    String.format(
-                            "The interval freshness value '%s' is an illegal 
integer type value.",
-                            intervalFreshness.getInterval()));
-        }
-
-        if (!NumberUtils.isDigits(intervalFreshness.getInterval())) {
-            throw new ValidationException(
-                    "The freshness interval currently only supports integer 
type values.");
-        }
-    }
-
-    public static Duration convertFreshnessToDuration(IntervalFreshness 
intervalFreshness) {
-        // validate the freshness value firstly
-        validateIntervalFreshness(intervalFreshness);
-
-        long interval = Long.parseLong(intervalFreshness.getInterval());
-        switch (intervalFreshness.getTimeUnit()) {
-            case DAY:
-                return Duration.ofDays(interval);
-            case HOUR:
-                return Duration.ofHours(interval);
-            case MINUTE:
-                return Duration.ofMinutes(interval);
-            case SECOND:
-                return Duration.ofSeconds(interval);
-            default:
-                throw new ValidationException(
-                        String.format(
-                                "Unknown freshness time unit: %s.",
-                                intervalFreshness.getTimeUnit()));
-        }
-    }
-
-    /**
-     * This is an util method that is used to convert the freshness of 
materialized table to cron
-     * expression in full refresh mode. Since freshness and cron expression 
cannot be converted
-     * equivalently, there are currently only a limited patterns of freshness 
that can be converted
-     * to cron expression.
-     */
-    public static String convertFreshnessToCron(IntervalFreshness 
intervalFreshness) {
-        switch (intervalFreshness.getTimeUnit()) {
-            case SECOND:
-                return validateAndConvertCron(
-                        intervalFreshness,
-                        SECOND_CRON_UPPER_BOUND,
-                        SECOND_CRON_EXPRESSION_TEMPLATE);
-            case MINUTE:
-                return validateAndConvertCron(
-                        intervalFreshness,
-                        MINUTE_CRON_UPPER_BOUND,
-                        MINUTE_CRON_EXPRESSION_TEMPLATE);
-            case HOUR:
-                return validateAndConvertCron(
-                        intervalFreshness, HOUR_CRON_UPPER_BOUND, 
HOUR_CRON_EXPRESSION_TEMPLATE);
-            case DAY:
-                return validateAndConvertDayCron(intervalFreshness);
-            default:
-                throw new ValidationException(
-                        String.format(
-                                "Unknown freshness time unit: %s.",
-                                intervalFreshness.getTimeUnit()));
-        }
-    }
-
-    private static String validateAndConvertCron(
-            IntervalFreshness intervalFreshness, long cronUpperBound, String 
cronTemplate) {
-        long interval = Long.parseLong(intervalFreshness.getInterval());
-        IntervalFreshness.TimeUnit timeUnit = intervalFreshness.getTimeUnit();
-        // Freshness must be less than cronUpperBound for corresponding time 
unit when convert it
-        // to cron expression
-        if (interval >= cronUpperBound) {
-            throw new ValidationException(
-                    String.format(
-                            "In full refresh mode, freshness must be less than 
%s when the time unit is %s.",
-                            cronUpperBound, timeUnit));
-        }
-        // Freshness must be factors of cronUpperBound for corresponding time 
unit
-        if (cronUpperBound % interval != 0) {
-            throw new ValidationException(
-                    String.format(
-                            "In full refresh mode, only freshness that are 
factors of %s are currently supported when the time unit is %s.",
-                            cronUpperBound, timeUnit));
-        }
-
-        return String.format(cronTemplate, interval);
-    }
-
-    private static String validateAndConvertDayCron(IntervalFreshness 
intervalFreshness) {
-        // Since the number of days in each month is different, only one day 
of freshness is
-        // currently supported when the time unit is DAY
-        long interval = Long.parseLong(intervalFreshness.getInterval());
-        if (interval > 1) {
-            throw new ValidationException(
-                    "In full refresh mode, freshness must be 1 when the time 
unit is DAY.");
-        }
-        return ONE_DAY_CRON_EXPRESSION_TEMPLATE;
-    }
-}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
index bbe708e6509..adb8a845421 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.expressions.DefaultSqlFactory;
 
 import org.junit.jupiter.api.Test;
@@ -165,6 +166,8 @@ class CatalogPropertiesUtilTest {
                                 
.refreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED)
                                 .refreshHandlerDescription("description")
                                 .build(),
-                        resolvedSchema));
+                        resolvedSchema,
+                        RefreshMode.CONTINUOUS,
+                        IntervalFreshness.ofHour("123")));
     }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
similarity index 74%
rename from 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java
rename to 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
index 86e660d50b1..de794684956 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
@@ -16,53 +16,69 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.utils;
+package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.IntervalFreshness;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 
-import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
-import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
-import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.validateIntervalFreshness;
+import static 
org.apache.flink.table.catalog.IntervalFreshness.convertFreshnessToCron;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link IntervalFreshnessUtils}. */
-public class IntervalFreshnessUtilsTest {
+/** Tests for {@link IntervalFreshness}. */
+public class IntervalFreshnessTest {
 
-    @Test
-    void testIllegalIntervalFreshness() {
-        assertThatThrownBy(() -> 
validateIntervalFreshness(IntervalFreshness.ofMinute("2efedd")))
+    @ParameterizedTest
+    @ValueSource(strings = {"2efedd", "2.5", "-2", "0", 
"12345678901234567890"})
+    void testIllegalIntervalFreshness(String invalidInput) {
+        assertThatThrownBy(() -> IntervalFreshness.ofMinute(invalidInput))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "The interval freshness value '2efedd' is an illegal 
integer type value.");
+                        String.format(
+                                "The freshness interval currently only 
supports positive integer type values. But was: %s",
+                                invalidInput));
+    }
 
-        assertThatThrownBy(() -> 
validateIntervalFreshness(IntervalFreshness.ofMinute("2.5")))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "The freshness interval currently only supports 
integer type values.");
+    @Test
+    void testConvertDurationToFreshnessInterval() {
+        // verify second
+        IntervalFreshness actualSeconds = 
IntervalFreshness.fromDuration(Duration.ofSeconds(20));
+        assertThat(actualSeconds).isEqualTo(IntervalFreshness.ofSecond("20"));
+
+        // verify minute
+        IntervalFreshness actualMinutes = 
IntervalFreshness.fromDuration(Duration.ofMinutes(3));
+        assertThat(actualMinutes).isEqualTo(IntervalFreshness.ofMinute("3"));
+
+        // verify hour
+        IntervalFreshness actualHour = 
IntervalFreshness.fromDuration(Duration.ofHours(1));
+        assertThat(actualHour).isEqualTo(IntervalFreshness.ofHour("1"));
+
+        // verify day
+        IntervalFreshness actualDay = 
IntervalFreshness.fromDuration(Duration.ofDays(2));
+        assertThat(actualDay).isEqualTo(IntervalFreshness.ofDay("2"));
     }
 
     @Test
     void testConvertFreshnessToDuration() {
         // verify second
-        Duration actualSecond = 
convertFreshnessToDuration(IntervalFreshness.ofSecond("20"));
+        Duration actualSecond = IntervalFreshness.ofSecond("20").toDuration();
         assertThat(actualSecond).isEqualTo(Duration.ofSeconds(20));
 
         // verify minute
-        Duration actualMinute = 
convertFreshnessToDuration(IntervalFreshness.ofMinute("3"));
+        Duration actualMinute = IntervalFreshness.ofMinute("3").toDuration();
         assertThat(actualMinute).isEqualTo(Duration.ofMinutes(3));
 
         // verify hour
-        Duration actualHour = 
convertFreshnessToDuration(IntervalFreshness.ofHour("3"));
+        Duration actualHour = IntervalFreshness.ofHour("3").toDuration();
         assertThat(actualHour).isEqualTo(Duration.ofHours(3));
 
         // verify day
-        Duration actualDay = 
convertFreshnessToDuration(IntervalFreshness.ofDay("3"));
+        Duration actualDay = IntervalFreshness.ofDay("3").toDuration();
         assertThat(actualDay).isEqualTo(Duration.ofDays(3));
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
index 46c6dd44fa7..2107b209b27 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -26,6 +26,8 @@ import org.apache.flink.sql.parser.error.SqlValidateException;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -50,10 +52,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER;
-import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD;
 import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS;
-import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
-import static 
org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;
+import static 
org.apache.flink.table.catalog.IntervalFreshness.validateFreshnessForCron;
 
 /** A converter for {@link SqlCreateMaterializedTable}. */
 public class SqlCreateMaterializedTableConverter
@@ -77,33 +77,27 @@ public class SqlCreateMaterializedTableConverter
 
         // get freshness
         IntervalFreshness intervalFreshness =
-                MaterializedTableUtils.getMaterializedTableFreshness(
-                        sqlCreateMaterializedTable.getFreshness());
+                Optional.ofNullable(sqlCreateMaterializedTable.getFreshness())
+                        
.map(MaterializedTableUtils::getMaterializedTableFreshness)
+                        .orElse(null);
 
-        // get refresh mode
-        SqlRefreshMode sqlRefreshMode = null;
-        if (sqlCreateMaterializedTable.getRefreshMode().isPresent()) {
-            sqlRefreshMode =
-                    sqlCreateMaterializedTable
-                            .getRefreshMode()
-                            .get()
-                            .getValueAs(SqlRefreshMode.class);
-        }
-        CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode =
+        // Get the logical refresh mode from SQL
+        SqlRefreshMode sqlRefreshMode =
+                
Optional.ofNullable(sqlCreateMaterializedTable.getRefreshMode())
+                        .map(mode -> mode.getValueAs(SqlRefreshMode.class))
+                        .orElse(null);
+
+        final LogicalRefreshMode logicalRefreshMode =
                 
MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode);
-        // only MATERIALIZED_TABLE_FRESHNESS_THRESHOLD configured in flink 
conf yaml work, so we get
-        // it from rootConfiguration instead of table config
-        CatalogMaterializedTable.RefreshMode refreshMode =
-                MaterializedTableUtils.deriveRefreshMode(
-                        context.getTableConfig()
-                                .getRootConfiguration()
-                                .get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD),
-                        convertFreshnessToDuration(intervalFreshness),
-                        logicalRefreshMode);
-        // If the refresh mode is full, validate whether the freshness can 
convert to cron
-        // expression in advance
-        if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode) {
-            convertFreshnessToCron(intervalFreshness);
+
+        // get the physical refresh mode from SQL
+        final RefreshMode refreshMode =
+                sqlRefreshMode == null
+                        ? null
+                        : 
MaterializedTableUtils.fromSqltoRefreshMode(sqlRefreshMode);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode && 
intervalFreshness != null) {
+            validateFreshnessForCron(intervalFreshness);
         }
 
         // get query schema and definition query
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index ac41bfa58c1..ba765e40a69 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -22,13 +22,12 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.IntervalFreshness;
 
 import org.apache.calcite.sql.SqlIntervalLiteral;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 
-import java.time.Duration;
-
 /** The utils for materialized table. */
 @Internal
 public class MaterializedTableUtils {
@@ -79,22 +78,14 @@ public class MaterializedTableUtils {
         }
     }
 
-    public static CatalogMaterializedTable.RefreshMode deriveRefreshMode(
-            Duration threshold,
-            Duration definedFreshness,
-            CatalogMaterializedTable.LogicalRefreshMode definedRefreshMode) {
-        // If the refresh mode is specified manually, use it directly.
-        if (definedRefreshMode == 
CatalogMaterializedTable.LogicalRefreshMode.FULL) {
-            return CatalogMaterializedTable.RefreshMode.FULL;
-        } else if (definedRefreshMode == 
CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS) {
-            return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
-        }
-
-        // derive the actual refresh mode via defined freshness
-        if (definedFreshness.compareTo(threshold) < 0) {
-            return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
-        } else {
-            return CatalogMaterializedTable.RefreshMode.FULL;
+    public static RefreshMode fromSqltoRefreshMode(SqlRefreshMode 
sqlRefreshMode) {
+        switch (sqlRefreshMode) {
+            case FULL:
+                return RefreshMode.FULL;
+            case CONTINUOUS:
+                return RefreshMode.CONTINUOUS;
+            default:
+                throw new IllegalArgumentException("Unknown refresh mode: " + 
sqlRefreshMode);
         }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index cdf6fbe4174..13835f16d61 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -85,6 +85,9 @@ import org.apache.calcite.sql.SqlNode;
 import org.assertj.core.api.HamcrestCondition;
 import org.assertj.core.api.InstanceOfAssertFactories;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import javax.annotation.Nullable;
 
@@ -96,7 +99,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
@@ -1404,53 +1409,93 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
         checkAlterNonExistTable("alter table %s nonexistent drop watermark");
     }
 
-    @Test
-    void createMaterializedTableWithDistribution() throws Exception {
-        final String sql =
-                "CREATE MATERIALIZED TABLE users_shops ("
-                        + " PRIMARY KEY (user_id) not enforced)"
-                        + " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n"
-                        + " WITH(\n"
-                        + "   'format' = 'debezium-json'\n"
-                        + " )\n"
-                        + " FRESHNESS = INTERVAL '30' SECOND\n"
-                        + " AS SELECT 1 as shop_id, 2 as user_id ";
-
-        final String expectedSummaryString =
-                "CREATE MATERIALIZED TABLE: (materializedTable: "
-                        + 
"[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
-                        + "  `shop_id` INT NOT NULL,\n"
-                        + "  `user_id` INT NOT NULL,\n"
-                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) 
NOT ENFORCED\n"
-                        + "), comment='null', distribution=DISTRIBUTED BY 
HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "
-                        + "options={format=debezium-json}, snapshot=null, 
definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
-                        + "freshness=INTERVAL '30' SECOND, 
logicalRefreshMode=AUTOMATIC, refreshMode=CONTINUOUS, "
-                        + "refreshStatus=INITIALIZING, 
refreshHandlerDescription='null', serializedRefreshHandler=null}, 
resolvedSchema=(\n"
-                        + "  `shop_id` INT NOT NULL,\n"
-                        + "  `user_id` INT NOT NULL,\n"
-                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) 
NOT ENFORCED\n"
-                        + ")}], identifier: 
[`builtin`.`default`.`users_shops`])";
+    @ParameterizedTest(name = "[{index}] {0}")
+    @MethodSource("provideCreateMaterializedTableTestCases")
+    void createMaterializedTableWithVariousOptions(
+            String testName,
+            String sql,
+            String expectedSummaryString,
+            Consumer<CreateMaterializedTableOperation> additionalAssertions) {
 
         final Operation operation = parse(sql);
 
         
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
         
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
-        assertThat(
-                        ((CreateMaterializedTableOperation) operation)
-                                .getCatalogMaterializedTable()
-                                .getDistribution()
-                                .get())
-                .isEqualTo(TableDistribution.of(Kind.HASH, 7, 
List.of("user_id")));
-
-        prepareMaterializedTable("tb2", false, 1, null, "SELECT 1");
 
-        assertThatThrownBy(
-                        () ->
-                                parse(
-                                        "alter MATERIALIZED table cat1.db1.tb2 
modify distribution into 3 buckets"))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Materialized table `cat1`.`db1`.`tb2` does not have a 
distribution to modify.");
+        final CreateMaterializedTableOperation 
createMaterializedTableOperation =
+                (CreateMaterializedTableOperation) operation;
+
+        additionalAssertions.accept(createMaterializedTableOperation);
+    }
+
+    private static Stream<Arguments> provideCreateMaterializedTableTestCases() 
{
+        return Stream.of(
+                Arguments.of(
+                        "with refresh mode continuous",
+                        "CREATE MATERIALIZED TABLE users_shops ("
+                                + " PRIMARY KEY (user_id) not enforced)"
+                                + " WITH(\n"
+                                + "   'format' = 'debezium-json'\n"
+                                + " )\n"
+                                + " FRESHNESS = INTERVAL '30' SECOND\n"
+                                + " REFRESH_MODE = CONTINUOUS\n"
+                                + " AS SELECT 1 as shop_id, 2 as user_id ",
+                        "CREATE MATERIALIZED TABLE: (materializedTable: "
+                                + 
"[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
+                                + "  `shop_id` INT NOT NULL,\n"
+                                + "  `user_id` INT NOT NULL,\n"
+                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
+                                + "), comment='null', distribution=null, 
partitionKeys=[], "
+                                + "options={format=debezium-json}, 
snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+                                + "freshness=INTERVAL '30' SECOND, 
logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "
+                                + "refreshStatus=INITIALIZING, 
refreshHandlerDescription='null', serializedRefreshHandler=null}, 
resolvedSchema=(\n"
+                                + "  `shop_id` INT NOT NULL,\n"
+                                + "  `user_id` INT NOT NULL,\n"
+                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
+                                + ")}], identifier: 
[`builtin`.`default`.`users_shops`])",
+                        (Consumer<CreateMaterializedTableOperation>)
+                                op -> {
+                                    assertThat(
+                                                    
op.getCatalogMaterializedTable()
+                                                            
.getDefinitionFreshness())
+                                            
.isEqualTo(IntervalFreshness.ofSecond("30"));
+                                    
assertThat(op.getCatalogMaterializedTable().getRefreshMode())
+                                            .isSameAs(RefreshMode.CONTINUOUS);
+                                }),
+                Arguments.of(
+                        "with distribution",
+                        "CREATE MATERIALIZED TABLE users_shops ("
+                                + " PRIMARY KEY (user_id) not enforced)"
+                                + " DISTRIBUTED BY HASH (user_id) INTO 7 
BUCKETS\n"
+                                + " WITH(\n"
+                                + "   'format' = 'debezium-json'\n"
+                                + " )\n"
+                                + " FRESHNESS = INTERVAL '30' SECOND\n"
+                                + " AS SELECT 1 as shop_id, 2 as user_id ",
+                        "CREATE MATERIALIZED TABLE: (materializedTable: "
+                                + 
"[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
+                                + "  `shop_id` INT NOT NULL,\n"
+                                + "  `user_id` INT NOT NULL,\n"
+                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
+                                + "), comment='null', distribution=DISTRIBUTED 
BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "
+                                + "options={format=debezium-json}, 
snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+                                + "freshness=INTERVAL '30' SECOND, 
logicalRefreshMode=AUTOMATIC, refreshMode=null, "
+                                + "refreshStatus=INITIALIZING, 
refreshHandlerDescription='null', serializedRefreshHandler=null}, 
resolvedSchema=(\n"
+                                + "  `shop_id` INT NOT NULL,\n"
+                                + "  `user_id` INT NOT NULL,\n"
+                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
+                                + ")}], identifier: 
[`builtin`.`default`.`users_shops`])",
+                        (Consumer<CreateMaterializedTableOperation>)
+                                op ->
+                                        assertThat(
+                                                        
op.getCatalogMaterializedTable()
+                                                                
.getDistribution()
+                                                                .get())
+                                                .isEqualTo(
+                                                        TableDistribution.of(
+                                                                Kind.HASH,
+                                                                7,
+                                                                
List.of("user_id")))));
     }
 
     @Test
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 39275541d8c..67a828d7fe0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.api.FunctionDescriptor;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.IntervalFreshness;
@@ -128,9 +130,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
         CreateMaterializedTableOperation op = 
(CreateMaterializedTableOperation) operation;
         ResolvedCatalogMaterializedTable materializedTable = 
op.getCatalogMaterializedTable();
 
-        Map<String, String> options = new HashMap<>();
-        options.put("connector", "filesystem");
-        options.put("format", "json");
+        Map<String, String> options = Map.of("connector", "filesystem", 
"format", "json");
         CatalogMaterializedTable expected =
                 CatalogMaterializedTable.newBuilder()
                         .schema(
@@ -146,13 +146,120 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                         .partitionKeys(Arrays.asList("a", "d"))
                         .freshness(IntervalFreshness.ofSecond("30"))
                         
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
-                        .refreshMode(CatalogMaterializedTable.RefreshMode.FULL)
+                        .refreshMode(RefreshMode.FULL)
+                        
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                        .definitionQuery(
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`")
+                        .build();
+
+        final IntervalFreshness resolvedFreshness = 
materializedTable.getDefinitionFreshness();
+        
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofSecond("30"));
+
+        final RefreshMode resolvedRefreshMode = 
materializedTable.getRefreshMode();
+        assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL);
+
+        assertThat(materializedTable.getOrigin()).isEqualTo(expected);
+    }
+
+    @Test
+    void testCreateMaterializedTableWithoutFreshness() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op = 
(CreateMaterializedTableOperation) operation;
+        ResolvedCatalogMaterializedTable materializedTable = 
op.getCatalogMaterializedTable();
+        
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        Map<String, String> options = Map.of("connector", "filesystem", 
"format", "json");
+
+        CatalogMaterializedTable expected =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("a", 
DataTypes.BIGINT().notNull())
+                                        .column("b", 
DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .column("c", DataTypes.INT())
+                                        .column("d", 
DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .primaryKeyNamed("ct1", 
Collections.singletonList("a"))
+                                        .build())
+                        .comment("materialized table comment")
+                        .options(options)
+                        .partitionKeys(Arrays.asList("a", "d"))
+                        .logicalRefreshMode(LogicalRefreshMode.FULL)
+                        .refreshMode(RefreshMode.FULL)
+                        
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                        .definitionQuery(
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`")
+                        .build();
+
+        // The resolved freshness should default to 1 minute
+        final IntervalFreshness resolvedFreshness = 
materializedTable.getDefinitionFreshness();
+        assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofHour("1"));
+
+        final RefreshMode resolvedRefreshMode = 
materializedTable.getRefreshMode();
+        assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL);
+
+        assertThat(materializedTable.getOrigin()).isEqualTo(expected);
+    }
+
+    @Test
+    void testCreateMaterializedTableWithoutFreshnessAndRefreshMode() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op = 
(CreateMaterializedTableOperation) operation;
+        ResolvedCatalogMaterializedTable materializedTable = 
op.getCatalogMaterializedTable();
+        
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        Map<String, String> options = Map.of("connector", "filesystem", 
"format", "json");
+
+        CatalogMaterializedTable expected =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("a", 
DataTypes.BIGINT().notNull())
+                                        .column("b", 
DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .column("c", DataTypes.INT())
+                                        .column("d", 
DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .primaryKeyNamed("ct1", 
Collections.singletonList("a"))
+                                        .build())
+                        .comment("materialized table comment")
+                        .options(options)
+                        .partitionKeys(Arrays.asList("a", "d"))
+                        .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
                         
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
                         .definitionQuery(
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
                                         + "FROM `builtin`.`default`.`t1` AS 
`t1`")
                         .build();
 
+        final IntervalFreshness resolvedFreshness = 
materializedTable.getDefinitionFreshness();
+        
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofMinute("3"));
         assertThat(materializedTable.getOrigin()).isEqualTo(expected);
     }
 
@@ -206,8 +313,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
 
         assertThat(materializedTable.getLogicalRefreshMode())
                 
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
-        assertThat(materializedTable.getRefreshMode())
-                .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+        
assertThat(materializedTable.getRefreshMode()).isEqualTo(RefreshMode.CONTINUOUS);
 
         // test continuous mode by manual specify
         final String sql2 =
@@ -242,8 +348,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
 
         assertThat(materializedTable.getLogicalRefreshMode())
                 
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
-        assertThat(materializedTable.getRefreshMode())
-                .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+        
assertThat(materializedTable.getRefreshMode()).isEqualTo(RefreshMode.FULL);
 
         // test full mode by manual specify
         final String sql2 =
@@ -259,17 +364,26 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
 
         assertThat(materializedTable2.getLogicalRefreshMode())
                 .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);
-        assertThat(materializedTable2.getRefreshMode())
-                .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
-
+        
assertThat(materializedTable2.getRefreshMode()).isEqualTo(RefreshMode.FULL);
         final String sql3 =
                 "CREATE MATERIALIZED TABLE mtbl1\n"
                         + "FRESHNESS = INTERVAL '40' MINUTE\n"
+                        + "REFRESH_MODE = FULL\n"
                         + "AS SELECT * FROM t1";
         assertThatThrownBy(() -> parse(sql3))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "In full refresh mode, only freshness that are factors 
of 60 are currently supported when the time unit is MINUTE.");
+
+        final String sql4 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '40' MINUTE\n"
+                        + "AS SELECT * FROM t1";
+
+        assertThatThrownBy(() -> parse(sql4))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "In full refresh mode, only freshness that are factors 
of 60 are currently supported when the time unit is MINUTE.");
     }
 
     @Test
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
index 646b2e832c7..5a63cc2a2d8 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
@@ -110,7 +110,9 @@ public class TestFileSystemCatalogTest extends 
TestFileSystemCatalogTestBase {
                             
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
                             
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
                             .build(),
-                    CREATE_RESOLVED_SCHEMA);
+                    CREATE_RESOLVED_SCHEMA,
+                    CatalogMaterializedTable.RefreshMode.CONTINUOUS,
+                    FRESHNESS);
 
     private static final TestRefreshHandler REFRESH_HANDLER =
             new TestRefreshHandler("jobID: xxx, clusterId: yyy");

Reply via email to