This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 64a21e7fff8 [FLINK-39348][table] Add `Interval` to reuse code for 
`FRESHNESS` and `START MODE`
64a21e7fff8 is described below

commit 64a21e7fff838bb5ac8a7f80beb2137285dc6ab7
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 31 12:33:25 2026 +0200

    [FLINK-39348][table] Add `Interval` to reuse code for `FRESHNESS` and 
`START MODE`
---
 .../service/MaterializedTableStatementITCase.java  |   2 +-
 .../MaterializedTableManagerTest.java              |  74 ++++----
 .../table/api/internal/ShowCreateUtilTest.java     |  10 +-
 .../catalog/CatalogBaseTableResolutionTest.java    |   4 +-
 .../flink/table/catalog/CatalogPropertiesUtil.java |   4 +-
 .../org/apache/flink/table/catalog/Interval.java   | 173 +++++++++++++++++
 .../flink/table/catalog/IntervalFreshness.java     | 207 +++++++++++----------
 .../apache/flink/table/utils/DateTimeUtils.java    |  12 +-
 .../table/catalog/CatalogPropertiesUtilTest.java   |   4 +-
 .../flink/table/catalog/IntervalFreshnessTest.java |  48 ++---
 .../planner/utils/MaterializedTableUtils.java      | 133 +++++++++++--
 .../operations/SqlDdlToOperationConverterTest.java |   4 +-
 ...erializedTableNodeToOperationConverterTest.java |  16 +-
 .../catalog/TestFileSystemCatalogTest.java         |   2 +-
 14 files changed, 491 insertions(+), 202 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 31564138b61..bf484e57c7d 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -288,7 +288,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
 
         
assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
         assertThat(actualMaterializedTable.getDefinitionFreshness())
-                .isEqualTo(IntervalFreshness.ofMinute("3"));
+                .isEqualTo(IntervalFreshness.ofMinute(3));
         assertThat(actualMaterializedTable.getLogicalRefreshMode())
                 .isSameAs(LogicalRefreshMode.AUTOMATIC);
         
assertThat(actualMaterializedTable.getRefreshMode()).isSameAs(RefreshMode.CONTINUOUS);
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java
index 082766cd298..993e60db156 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java
@@ -136,66 +136,66 @@ class MaterializedTableManagerTest {
                 // The interval of freshness match the partition specified by 
the 'date-formatter'.
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofDay("1"))
+                        .freshness(IntervalFreshness.ofDay(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2023-12-31"),
                 TestSpec.create()
                         .schedulerTime("2024-01-02 00:00:00")
-                        .freshness(IntervalFreshness.ofDay("1"))
+                        .freshness(IntervalFreshness.ofDay(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2024-01-01"),
                 TestSpec.create()
                         .schedulerTime("2024-01-02 00:00:00")
-                        .freshness(IntervalFreshness.ofHour("1"))
+                        .freshness(IntervalFreshness.ofHour(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2024-01-01")
                         .expectedRefreshPartition("hour", "23"),
                 TestSpec.create()
                         .schedulerTime("2024-01-02 01:00:00")
-                        .freshness(IntervalFreshness.ofHour("1"))
+                        .freshness(IntervalFreshness.ofHour(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2024-01-02")
                         .expectedRefreshPartition("hour", "00"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofHour("2"))
+                        .freshness(IntervalFreshness.ofHour(2))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2023-12-31")
                         .expectedRefreshPartition("hour", "22"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofHour("4"))
+                        .freshness(IntervalFreshness.ofHour(4))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2023-12-31")
                         .expectedRefreshPartition("hour", "20"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofHour("8"))
+                        .freshness(IntervalFreshness.ofHour(8))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2023-12-31")
                         .expectedRefreshPartition("hour", "16"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofHour("12"))
+                        .freshness(IntervalFreshness.ofHour(12))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2023-12-31")
                         .expectedRefreshPartition("hour", "12"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 12:00:00")
-                        .freshness(IntervalFreshness.ofHour("12"))
+                        .freshness(IntervalFreshness.ofHour(12))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2024-01-01")
                         .expectedRefreshPartition("hour", "00"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .freshness(IntervalFreshness.ofMinute(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -204,7 +204,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "59"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("2"))
+                        .freshness(IntervalFreshness.ofMinute(2))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -213,7 +213,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "58"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("4"))
+                        .freshness(IntervalFreshness.ofMinute(4))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -222,7 +222,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "56"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("5"))
+                        .freshness(IntervalFreshness.ofMinute(5))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -231,7 +231,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "55"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("6"))
+                        .freshness(IntervalFreshness.ofMinute(6))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -240,7 +240,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "54"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("10"))
+                        .freshness(IntervalFreshness.ofMinute(10))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -249,7 +249,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "50"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("12"))
+                        .freshness(IntervalFreshness.ofMinute(12))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -258,7 +258,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "48"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("15"))
+                        .freshness(IntervalFreshness.ofMinute(15))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -267,7 +267,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "45"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("30"))
+                        .freshness(IntervalFreshness.ofMinute(30))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -276,7 +276,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "30"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:30:00")
-                        .freshness(IntervalFreshness.ofMinute("30"))
+                        .freshness(IntervalFreshness.ofMinute(30))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -288,14 +288,14 @@ class MaterializedTableManagerTest {
                 // 'date-formatter'.
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofDay("1"))
+                        .freshness(IntervalFreshness.ofDay(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2023-12-31")
                         .expectedRefreshPartition("hour", "00"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofDay("1"))
+                        .freshness(IntervalFreshness.ofDay(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -304,7 +304,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "00"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofHour("1"))
+                        .freshness(IntervalFreshness.ofHour(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -313,7 +313,7 @@ class MaterializedTableManagerTest {
                         .expectedRefreshPartition("minute", "00"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 01:00:00")
-                        .freshness(IntervalFreshness.ofHour("1"))
+                        .freshness(IntervalFreshness.ofHour(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -324,84 +324,84 @@ class MaterializedTableManagerTest {
                 // 'date-formatter'.
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofHour("1"))
+                        .freshness(IntervalFreshness.ofHour(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2023-12-31"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 01:00:00")
-                        .freshness(IntervalFreshness.ofHour("1"))
+                        .freshness(IntervalFreshness.ofHour(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2024-01-01"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofHour("2"))
+                        .freshness(IntervalFreshness.ofHour(2))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2023-12-31"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 02:00:00")
-                        .freshness(IntervalFreshness.ofHour("2"))
+                        .freshness(IntervalFreshness.ofHour(2))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2024-01-01"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofHour("4"))
+                        .freshness(IntervalFreshness.ofHour(4))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2023-12-31"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 04:00:00")
-                        .freshness(IntervalFreshness.ofHour("4"))
+                        .freshness(IntervalFreshness.ofHour(4))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2024-01-01"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .freshness(IntervalFreshness.ofMinute(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2023-12-31")
                         .expectedRefreshPartition("hour", "23"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("2"))
+                        .freshness(IntervalFreshness.ofMinute(2))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2023-12-31")
                         .expectedRefreshPartition("hour", "23"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("4"))
+                        .freshness(IntervalFreshness.ofMinute(4))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2023-12-31")
                         .expectedRefreshPartition("hour", "23"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("15"))
+                        .freshness(IntervalFreshness.ofMinute(15))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .expectedRefreshPartition("day", "2023-12-31")
                         .expectedRefreshPartition("hour", "23"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:00:00")
-                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .freshness(IntervalFreshness.ofMinute(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2023-12-31"),
                 TestSpec.create()
                         .schedulerTime("2024-01-01 00:01:00")
-                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .freshness(IntervalFreshness.ofMinute(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .expectedRefreshPartition("day", "2024-01-01"),
 
                 // Invalid test case.
                 TestSpec.create()
                         .schedulerTime(null)
-                        .freshness(IntervalFreshness.ofDay("1"))
+                        .freshness(IntervalFreshness.ofDay(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .errorMessage(
                                 "The scheduler time must not be null during 
the periodic refresh of the materialized table `catalog`.`database`.`table`."),
                 TestSpec.create()
                         .schedulerTime("2024-01-01")
-                        .freshness(IntervalFreshness.ofDay("1"))
+                        .freshness(IntervalFreshness.ofDay(1))
                         .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                         .tableOptions("partition.fields.hour.date-formatter", 
"HH")
                         .errorMessage(
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
index 51e1a5479fd..6b50542898b 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -313,7 +313,7 @@ class ShowCreateUtilTest {
                                 null,
                                 List.of(),
                                 null,
-                                IntervalFreshness.ofMinute("1"),
+                                IntervalFreshness.ofMinute(1),
                                 RefreshMode.CONTINUOUS,
                                 "SELECT 1",
                                 "SELECT 1"),
@@ -331,7 +331,7 @@ class ShowCreateUtilTest {
                                 null,
                                 List.of(),
                                 null,
-                                IntervalFreshness.ofMinute("1"),
+                                IntervalFreshness.ofMinute(1),
                                 RefreshMode.CONTINUOUS,
                                 "SELECT 1",
                                 "SELECT 1"),
@@ -351,7 +351,7 @@ class ShowCreateUtilTest {
                                 null,
                                 List.of(),
                                 null,
-                                IntervalFreshness.ofMinute("1"),
+                                IntervalFreshness.ofMinute(1),
                                 RefreshMode.CONTINUOUS,
                                 "SELECT 1",
                                 "SELECT 1"),
@@ -371,7 +371,7 @@ class ShowCreateUtilTest {
                                 "Materialized table comment",
                                 List.of("id"),
                                 
TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")),
-                                IntervalFreshness.ofMinute("3"),
+                                IntervalFreshness.ofMinute(3),
                                 RefreshMode.FULL,
                                 "SELECT id, name FROM tbl_a",
                                 "SELECT id, name FROM 
`catalogName`.`dbName`.`tbl_a`"),
@@ -393,7 +393,7 @@ class ShowCreateUtilTest {
                                 "Materialized table comment",
                                 List.of("id"),
                                 
TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")),
-                                IntervalFreshness.ofMinute("3"),
+                                IntervalFreshness.ofMinute(3),
                                 RefreshMode.FULL,
                                 "SELECT * FROM tbl_a",
                                 "SELECT id, name FROM 
`catalogName`.`dbName`.`tbl_a`"),
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index cc94ab86fbf..0d26b59db6a 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -257,7 +257,7 @@ class CatalogBaseTableResolutionTest {
         assertThat(resolvedCatalogMaterializedTable.getResolvedSchema())
                 .isEqualTo(RESOLVED_MATERIALIZED_TABLE_SCHEMA);
         assertThat(resolvedCatalogMaterializedTable.getDefinitionFreshness())
-                .isEqualTo(IntervalFreshness.ofSecond("30"));
+                .isEqualTo(IntervalFreshness.ofSecond(30));
         assertThat(resolvedCatalogMaterializedTable.getOriginalQuery())
                 .isEqualTo(DEFAULT_ORIGINAL_QUERY);
         assertThat(resolvedCatalogMaterializedTable.getExpandedQuery())
@@ -502,7 +502,7 @@ class CatalogBaseTableResolutionTest {
                 .options(Collections.emptyMap())
                 .originalQuery(DEFAULT_ORIGINAL_QUERY)
                 .expandedQuery(DEFAULT_EXPANDED_QUERY)
-                .freshness(IntervalFreshness.ofSecond("30"))
+                .freshness(IntervalFreshness.ofSecond(30))
                 .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
                 .refreshMode(RefreshMode.CONTINUOUS)
                 .refreshStatus(RefreshStatus.INITIALIZING)
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
index 67b72bdeb68..58c8d76d858 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
@@ -288,8 +288,8 @@ public final class CatalogPropertiesUtil {
             final String expandedQuery = properties.get(EXPANDED_QUERY);
 
             final String freshnessInterval = 
properties.get(FRESHNESS_INTERVAL);
-            final IntervalFreshness.TimeUnit timeUnit =
-                    
IntervalFreshness.TimeUnit.valueOf(properties.get(FRESHNESS_UNIT));
+            final Interval.TimeUnit timeUnit =
+                    Interval.TimeUnit.valueOf(properties.get(FRESHNESS_UNIT));
             final IntervalFreshness freshness = 
IntervalFreshness.of(freshnessInterval, timeUnit);
 
             final LogicalRefreshMode logicalRefreshMode =
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Interval.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Interval.java
new file mode 100644
index 00000000000..8f6621c969d
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Interval.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import java.time.Duration;
+import java.time.Period;
+import java.util.Objects;
+
+/**
+ * A class representing day-time and year-month intervals.
+ *
+ * <p>Depending on the time unit, the interval is backed day-time or 
year-month. See {@link
+ * TimeUnit} for more details.
+ */
+@PublicEvolving
+public class Interval {
+    private final Duration duration;
+    private final Period period;
+    private final TimeUnit timeUnit;
+
+    private Interval(TimeUnit timeUnit, Duration duration, Period period) {
+        this.timeUnit = timeUnit;
+        this.duration = duration;
+        this.period = period;
+    }
+
+    public static Interval of(Duration duration, TimeUnit timeUnit) {
+        return new Interval(timeUnit, duration, null);
+    }
+
+    public static Interval of(Period period, TimeUnit timeUnit) {
+        return new Interval(timeUnit, null, period);
+    }
+
+    public static Interval of(int duration, TimeUnit timeUnit) {
+        if (timeUnit.isDayTime) {
+            return of(toDuration(duration, timeUnit), timeUnit);
+        } else {
+            return of(toPeriod(duration, timeUnit), timeUnit);
+        }
+    }
+
+    public Duration getDuration() {
+        return duration;
+    }
+
+    public Period getPeriod() {
+        return period;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return timeUnit;
+    }
+
+    public int getInterval() {
+        switch (timeUnit) {
+            case SECOND:
+                return (int) duration.toSeconds();
+            case MINUTE:
+                return (int) duration.toMinutes();
+            case HOUR:
+                return (int) duration.toHours();
+            case DAY:
+                return (int) duration.toDays();
+            case WEEK:
+                return (int) (duration.toDays() / DateTimeUtils.DAYS_PER_WEEK);
+            case MONTH:
+                return period.getMonths();
+            case QUARTER:
+                return period.getMonths() / DateTimeUtils.MONTHS_PER_QUARTER;
+            case YEAR:
+                return period.getYears();
+            default:
+                // Could happen if new TimeUnit is introduced and not 
supported here
+                throw new ValidationException(
+                        String.format("TimeUnit %s is not supported", 
timeUnit));
+        }
+    }
+
+    private static Duration toDuration(int interval, TimeUnit timeUnit) {
+        switch (timeUnit) {
+            case SECOND:
+                return Duration.ofSeconds(interval);
+            case MINUTE:
+                return Duration.ofMinutes(interval);
+            case HOUR:
+                return Duration.ofHours(interval);
+            case DAY:
+                return Duration.ofDays(interval);
+            case WEEK:
+                return Duration.ofDays((long) interval * 
DateTimeUtils.DAYS_PER_WEEK);
+            default:
+                throw new IllegalArgumentException("Unsupported time unit: " + 
timeUnit);
+        }
+    }
+
+    private static Period toPeriod(int interval, TimeUnit timeUnit) {
+        switch (timeUnit) {
+            case MONTH:
+                return Period.ofMonths(interval);
+            case QUARTER:
+                return Period.ofMonths(interval * 
DateTimeUtils.MONTHS_PER_QUARTER);
+            case YEAR:
+                return Period.ofYears(interval);
+            default:
+                throw new IllegalArgumentException("Unsupported time unit: " + 
timeUnit);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Interval interval = (Interval) o;
+        return Objects.equals(duration, interval.duration)
+                && Objects.equals(period, interval.period)
+                && timeUnit == interval.timeUnit;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(duration, period, timeUnit);
+    }
+
+    @Override
+    public String toString() {
+        return "INTERVAL '" + getInterval() + "' " + timeUnit;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // TimeUnit enums
+    // 
--------------------------------------------------------------------------------------------
+
+    /** An enumeration of time unit representing the unit of interval. */
+    @PublicEvolving
+    public enum TimeUnit {
+        SECOND(true),
+        MINUTE(true),
+        HOUR(true),
+        DAY(true),
+        WEEK(true),
+        MONTH(false),
+        QUARTER(false),
+        YEAR(false);
+
+        private final boolean isDayTime;
+
+        TimeUnit(boolean isDayTime) {
+            this.isDayTime = isDayTime;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
index 6548b149663..5c41984c006 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Interval.TimeUnit;
 
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
@@ -42,57 +43,66 @@ public class IntervalFreshness {
     private static final long MINUTE_CRON_UPPER_BOUND = 60;
     private static final long HOUR_CRON_UPPER_BOUND = 24;
 
-    private final int interval;
-    private final TimeUnit timeUnit;
+    private final Interval interval;
 
-    private IntervalFreshness(int interval, TimeUnit timeUnit) {
+    private IntervalFreshness(Interval interval) {
         this.interval = interval;
-        this.timeUnit = timeUnit;
+    }
+
+    public static IntervalFreshness of(Interval interval) {
+        return new IntervalFreshness(interval);
     }
 
     public static IntervalFreshness of(String interval, TimeUnit timeUnit) {
-        final int validateIntervalInput = validateIntervalInput(interval);
-        return new IntervalFreshness(validateIntervalInput, timeUnit);
+        final int validateIntervalInput = validateIntervalValue(interval);
+        return new IntervalFreshness(Interval.of(validateIntervalInput, 
timeUnit));
     }
 
-    private static int validateIntervalInput(final String interval) {
-        final int parsedInt;
-        try {
-            parsedInt = Integer.parseInt(interval);
-        } catch (Exception e) {
-            final String errorMessage =
-                    String.format(
-                            "The freshness interval currently only supports 
positive integer type values. But was: %s",
-                            interval);
-            throw new ValidationException(errorMessage, e);
-        }
+    public static IntervalFreshness of(int intervalDuration, TimeUnit 
timeUnit) {
+        validateIntervalPositiveValue(intervalDuration);
+        return new IntervalFreshness(Interval.of(intervalDuration, timeUnit));
+    }
 
-        if (parsedInt <= 0) {
-            final String errorMessage =
-                    String.format(
-                            "The freshness interval currently only supports 
positive integer type values. But was: %s",
-                            interval);
-            throw new ValidationException(errorMessage);
-        }
-        return parsedInt;
+    public static IntervalFreshness of(Duration duration, TimeUnit timeUnit) {
+        // just check it is positive
+        validateIntervalPositiveValue((int) duration.toSeconds());
+        return new IntervalFreshness(Interval.of(duration, timeUnit));
     }
 
+    @Deprecated
     public static IntervalFreshness ofSecond(String interval) {
         return IntervalFreshness.of(interval, TimeUnit.SECOND);
     }
 
+    public static IntervalFreshness ofSecond(int interval) {
+        return IntervalFreshness.of(interval, TimeUnit.SECOND);
+    }
+
+    @Deprecated
     public static IntervalFreshness ofMinute(String interval) {
         return IntervalFreshness.of(interval, TimeUnit.MINUTE);
     }
 
+    public static IntervalFreshness ofMinute(int interval) {
+        return IntervalFreshness.of(interval, TimeUnit.MINUTE);
+    }
+
     public static IntervalFreshness ofHour(String interval) {
         return IntervalFreshness.of(interval, TimeUnit.HOUR);
     }
 
+    public static IntervalFreshness ofHour(int interval) {
+        return IntervalFreshness.of(interval, TimeUnit.HOUR);
+    }
+
     public static IntervalFreshness ofDay(String interval) {
         return IntervalFreshness.of(interval, TimeUnit.DAY);
     }
 
+    public static IntervalFreshness ofDay(int interval) {
+        return IntervalFreshness.of(interval, TimeUnit.DAY);
+    }
+
     /**
      * Validates that the given freshness can be converted to a cron 
expression in full refresh
      * mode. Since freshness and cron expression cannot be converted 
equivalently, there are
@@ -156,53 +166,30 @@ public class IntervalFreshness {
         }
     }
 
-    private static void validateCronConstraints(
-            IntervalFreshness intervalFreshness, long cronUpperBound) {
-        int interval = intervalFreshness.getIntervalInt();
-        TimeUnit timeUnit = intervalFreshness.getTimeUnit();
-        // Freshness must be less than cronUpperBound for corresponding time 
unit when convert it
-        // to cron expression
-        if (interval >= cronUpperBound) {
-            throw new ValidationException(
-                    String.format(
-                            "In full refresh mode, freshness must be less than 
%s when the time unit is %s.",
-                            cronUpperBound, timeUnit));
-        }
-        // Freshness must be factors of cronUpperBound for corresponding time 
unit
-        if (cronUpperBound % interval != 0) {
-            throw new ValidationException(
-                    String.format(
-                            "In full refresh mode, only freshness that are 
factors of %s are currently supported when the time unit is %s.",
-                            cronUpperBound, timeUnit));
-        }
-    }
-
-    private static void validateDayConstraints(IntervalFreshness 
intervalFreshness) {
-        // Since the number of days in each month is different, only one day 
of freshness is
-        // currently supported when the time unit is DAY
-        int interval = intervalFreshness.getIntervalInt();
-        if (interval > 1) {
-            throw new ValidationException(
-                    "In full refresh mode, freshness must be 1 when the time 
unit is DAY.");
-        }
-    }
-
     /**
      * Creates an IntervalFreshness from a Duration, choosing the most 
appropriate time unit.
      * Prefers larger units when possible (e.g., 60 seconds → 1 minute).
      */
     public static IntervalFreshness fromDuration(Duration duration) {
         if (duration.equals(duration.truncatedTo(ChronoUnit.DAYS))) {
-            return new IntervalFreshness((int) duration.toDays(), 
TimeUnit.DAY);
+            return IntervalFreshness.ofDay((int) duration.toDays());
         }
         if (duration.equals(duration.truncatedTo(ChronoUnit.HOURS))) {
-            return new IntervalFreshness((int) duration.toHours(), 
TimeUnit.HOUR);
+            return IntervalFreshness.ofHour((int) duration.toHours());
         }
         if (duration.equals(duration.truncatedTo(ChronoUnit.MINUTES))) {
-            return new IntervalFreshness((int) duration.toMinutes(), 
TimeUnit.MINUTE);
+            return IntervalFreshness.ofMinute((int) duration.toMinutes());
         }
 
-        return new IntervalFreshness((int) duration.getSeconds(), 
TimeUnit.SECOND);
+        return IntervalFreshness.ofSecond((int) duration.getSeconds());
+    }
+
+    public Duration toDuration() {
+        return interval.getDuration();
+    }
+
+    public TimeUnit getTimeUnit() {
+        return interval.getTimeUnit();
     }
 
     /**
@@ -210,64 +197,84 @@ public class IntervalFreshness {
      */
     @Deprecated
     public String getInterval() {
-        return String.valueOf(interval);
+        return String.valueOf(getIntervalInt());
     }
 
     public int getIntervalInt() {
-        return interval;
-    }
-
-    public TimeUnit getTimeUnit() {
-        return timeUnit;
-    }
-
-    public Duration toDuration() {
-        switch (timeUnit) {
-            case SECOND:
-                return Duration.ofSeconds(interval);
-            case MINUTE:
-                return Duration.ofMinutes(interval);
-            case HOUR:
-                return Duration.ofHours(interval);
-            case DAY:
-                return Duration.ofDays(interval);
-            default:
-                throw new IllegalStateException("Unexpected value: " + 
timeUnit);
-        }
+        return interval.getInterval();
     }
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
         IntervalFreshness that = (IntervalFreshness) o;
-        return Objects.equals(interval, that.interval) && timeUnit == 
that.timeUnit;
+        return Objects.equals(interval, that.interval);
     }
 
     @Override
-    public String toString() {
-        return "INTERVAL '" + interval + "' " + timeUnit;
+    public int hashCode() {
+        return Objects.hashCode(interval);
     }
 
     @Override
-    public int hashCode() {
-        return Objects.hash(interval, timeUnit);
+    public String toString() {
+        return interval.toString();
     }
 
-    // 
--------------------------------------------------------------------------------------------
-    // TimeUnit enums
-    // 
--------------------------------------------------------------------------------------------
-
-    /** An enumeration of time unit representing the unit of interval 
freshness. */
-    @PublicEvolving
-    public enum TimeUnit {
-        SECOND,
-        MINUTE,
-        HOUR,
-        DAY
+    private static int validateIntervalValue(final String interval) {
+        final int parsedInt;
+        try {
+            parsedInt = Integer.parseInt(interval);
+        } catch (Exception e) {
+            final String errorMessage =
+                    String.format(
+                            "The freshness interval currently only supports 
positive integer type values. But was: %s",
+                            interval);
+            throw new ValidationException(errorMessage, e);
+        }
+        validateIntervalPositiveValue(parsedInt);
+        return parsedInt;
+    }
+
+    private static void validateIntervalPositiveValue(final int interval) {
+        if (interval <= 0) {
+            throw new ValidationException(
+                    String.format(
+                            "The freshness interval currently only supports 
positive integer type values. But was: %d",
+                            interval));
+        }
+    }
+
+    private static void validateCronConstraints(
+            IntervalFreshness intervalFreshness, long cronUpperBound) {
+        int interval = intervalFreshness.getIntervalInt();
+        TimeUnit timeUnit = intervalFreshness.getTimeUnit();
+        // Freshness must be less than cronUpperBound for corresponding time 
unit when convert it
+        // to cron expression
+        if (interval >= cronUpperBound) {
+            throw new ValidationException(
+                    String.format(
+                            "In full refresh mode, freshness must be less than 
%s when the time unit is %s.",
+                            cronUpperBound, timeUnit));
+        }
+        // Freshness must be factors of cronUpperBound for corresponding time 
unit
+        if (cronUpperBound % interval != 0) {
+            throw new ValidationException(
+                    String.format(
+                            "In full refresh mode, only freshness that are 
factors of %s are currently supported when the time unit is %s.",
+                            cronUpperBound, timeUnit));
+        }
+    }
+
+    private static void validateDayConstraints(IntervalFreshness 
intervalFreshness) {
+        // Since the number of days in each month is different, only one day 
of freshness is
+        // currently supported when the time unit is DAY
+        int interval = intervalFreshness.getIntervalInt();
+        if (interval > 1) {
+            throw new ValidationException(
+                    "In full refresh mode, freshness must be 1 when the time 
unit is DAY.");
+        }
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index 754e694c53a..9c0071b6bd9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -90,13 +90,13 @@ public class DateTimeUtils {
     public static final int EPOCH_JULIAN = 2440588;
 
     /** The number of milliseconds in a second. */
-    private static final long MILLIS_PER_SECOND = 1000L;
+    public static final long MILLIS_PER_SECOND = 1000L;
 
     /** The number of milliseconds in a minute. */
-    private static final long MILLIS_PER_MINUTE = 60000L;
+    public static final long MILLIS_PER_MINUTE = 60000L;
 
     /** The number of milliseconds in an hour. */
-    private static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000
+    public static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000
 
     /**
      * The number of milliseconds in a day.
@@ -105,6 +105,12 @@ public class DateTimeUtils {
      */
     public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 
1000
 
+    /** The number of days in a week. */
+    public static final int DAYS_PER_WEEK = 7;
+
+    /** The number of months in a month. */
+    public static final int MONTHS_PER_QUARTER = 3;
+
     /** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
     private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
 
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
index 43eb735c4c9..f66f4d52251 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
@@ -167,7 +167,7 @@ class CatalogPropertiesUtilTest {
                                 .distribution(unknownDist)
                                 .originalQuery("SELECT 1, 'two'")
                                 .expandedQuery("SELECT 1, 'two'")
-                                .freshness(IntervalFreshness.ofHour("123"))
+                                .freshness(IntervalFreshness.ofHour(123))
                                 
.logicalRefreshMode(LogicalRefreshMode.CONTINUOUS)
                                 .refreshMode(RefreshMode.CONTINUOUS)
                                 .refreshStatus(RefreshStatus.ACTIVATED)
@@ -175,6 +175,6 @@ class CatalogPropertiesUtilTest {
                                 .build(),
                         resolvedSchema,
                         RefreshMode.CONTINUOUS,
-                        IntervalFreshness.ofHour("123")));
+                        IntervalFreshness.ofHour(123)));
     }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
index de794684956..1b331376bdf 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
@@ -48,114 +48,114 @@ public class IntervalFreshnessTest {
     void testConvertDurationToFreshnessInterval() {
         // verify second
         IntervalFreshness actualSeconds = 
IntervalFreshness.fromDuration(Duration.ofSeconds(20));
-        assertThat(actualSeconds).isEqualTo(IntervalFreshness.ofSecond("20"));
+        assertThat(actualSeconds).isEqualTo(IntervalFreshness.ofSecond(20));
 
         // verify minute
         IntervalFreshness actualMinutes = 
IntervalFreshness.fromDuration(Duration.ofMinutes(3));
-        assertThat(actualMinutes).isEqualTo(IntervalFreshness.ofMinute("3"));
+        assertThat(actualMinutes).isEqualTo(IntervalFreshness.ofMinute(3));
 
         // verify hour
         IntervalFreshness actualHour = 
IntervalFreshness.fromDuration(Duration.ofHours(1));
-        assertThat(actualHour).isEqualTo(IntervalFreshness.ofHour("1"));
+        assertThat(actualHour).isEqualTo(IntervalFreshness.ofHour(1));
 
         // verify day
         IntervalFreshness actualDay = 
IntervalFreshness.fromDuration(Duration.ofDays(2));
-        assertThat(actualDay).isEqualTo(IntervalFreshness.ofDay("2"));
+        assertThat(actualDay).isEqualTo(IntervalFreshness.ofDay(2));
     }
 
     @Test
     void testConvertFreshnessToDuration() {
         // verify second
-        Duration actualSecond = IntervalFreshness.ofSecond("20").toDuration();
+        Duration actualSecond = IntervalFreshness.ofSecond(20).toDuration();
         assertThat(actualSecond).isEqualTo(Duration.ofSeconds(20));
 
         // verify minute
-        Duration actualMinute = IntervalFreshness.ofMinute("3").toDuration();
+        Duration actualMinute = IntervalFreshness.ofMinute(3).toDuration();
         assertThat(actualMinute).isEqualTo(Duration.ofMinutes(3));
 
         // verify hour
-        Duration actualHour = IntervalFreshness.ofHour("3").toDuration();
+        Duration actualHour = IntervalFreshness.ofHour(3).toDuration();
         assertThat(actualHour).isEqualTo(Duration.ofHours(3));
 
         // verify day
-        Duration actualDay = IntervalFreshness.ofDay("3").toDuration();
+        Duration actualDay = IntervalFreshness.ofDay(3).toDuration();
         assertThat(actualDay).isEqualTo(Duration.ofDays(3));
     }
 
     @Test
     void testConvertSecondFreshnessToCronExpression() {
         // verify illegal freshness
-        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofSecond("90")))
+        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofSecond(90)))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "In full refresh mode, freshness must be less than 60 
when the time unit is SECOND.");
 
-        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofSecond("32")))
+        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofSecond(32)))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "In full refresh mode, only freshness that are factors 
of 60 are currently supported when the time unit is SECOND.");
 
-        String actual1 = 
convertFreshnessToCron(IntervalFreshness.ofSecond("30"));
+        String actual1 = 
convertFreshnessToCron(IntervalFreshness.ofSecond(30));
         assertThat(actual1).isEqualTo("0/30 * * * * ? *");
 
-        String actual2 = 
convertFreshnessToCron(IntervalFreshness.ofSecond("5"));
+        String actual2 = convertFreshnessToCron(IntervalFreshness.ofSecond(5));
         assertThat(actual2).isEqualTo("0/5 * * * * ? *");
     }
 
     @Test
     void testConvertMinuteFreshnessToCronExpression() {
         // verify illegal freshness
-        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofMinute("90")))
+        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofMinute(90)))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "In full refresh mode, freshness must be less than 60 
when the time unit is MINUTE.");
 
-        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofMinute("32")))
+        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofMinute(32)))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "In full refresh mode, only freshness that are factors 
of 60 are currently supported when the time unit is MINUTE.");
 
-        String actual1 = 
convertFreshnessToCron(IntervalFreshness.ofMinute("30"));
+        String actual1 = 
convertFreshnessToCron(IntervalFreshness.ofMinute(30));
         assertThat(actual1).isEqualTo("0 0/30 * * * ? *");
 
-        String actual2 = 
convertFreshnessToCron(IntervalFreshness.ofMinute("5"));
+        String actual2 = convertFreshnessToCron(IntervalFreshness.ofMinute(5));
         assertThat(actual2).isEqualTo("0 0/5 * * * ? *");
 
-        String actual3 = 
convertFreshnessToCron(IntervalFreshness.ofMinute("1"));
+        String actual3 = convertFreshnessToCron(IntervalFreshness.ofMinute(1));
         assertThat(actual3).isEqualTo("0 0/1 * * * ? *");
     }
 
     @Test
     void testConvertHourFreshnessToCronExpression() {
         // verify illegal freshness
-        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofHour("24")))
+        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofHour(24)))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "In full refresh mode, freshness must be less than 24 
when the time unit is HOUR.");
 
-        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofHour("14")))
+        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofHour(14)))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "In full refresh mode, only freshness that are factors 
of 24 are currently supported when the time unit is HOUR.");
 
-        String actual1 = 
convertFreshnessToCron(IntervalFreshness.ofHour("12"));
+        String actual1 = convertFreshnessToCron(IntervalFreshness.ofHour(12));
         assertThat(actual1).isEqualTo("0 0 0/12 * * ? *");
 
-        String actual2 = convertFreshnessToCron(IntervalFreshness.ofHour("4"));
+        String actual2 = convertFreshnessToCron(IntervalFreshness.ofHour(4));
         assertThat(actual2).isEqualTo("0 0 0/4 * * ? *");
 
-        String actual3 = convertFreshnessToCron(IntervalFreshness.ofHour("1"));
+        String actual3 = convertFreshnessToCron(IntervalFreshness.ofHour(1));
         assertThat(actual3).isEqualTo("0 0 0/1 * * ? *");
     }
 
     @Test
     void testConvertDayFreshnessToCronExpression() {
         // verify illegal freshness
-        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofDay("2")))
+        assertThatThrownBy(() -> 
convertFreshnessToCron(IntervalFreshness.ofDay(2)))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "In full refresh mode, freshness must be 1 when the 
time unit is DAY.");
-        String actual1 = convertFreshnessToCron(IntervalFreshness.ofDay("1"));
+        String actual1 = convertFreshnessToCron(IntervalFreshness.ofDay(1));
         assertThat(actual1).isEqualTo("0 0 0 * * ? *");
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 8cc5ff524e8..6f50a3d36e7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.Column.ComputedColumn;
 import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.Interval;
+import org.apache.flink.table.catalog.Interval.TimeUnit;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
@@ -39,14 +41,16 @@ import 
org.apache.flink.table.catalog.TableChange.ColumnPosition;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import 
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.DateTimeUtils;
 
 import org.apache.calcite.sql.SqlIntervalLiteral;
 import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.commons.lang3.StringUtils;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -56,6 +60,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
+
 /** The utils for materialized table. */
 @Internal
 public class MaterializedTableUtils {
@@ -68,29 +74,109 @@ public class MaterializedTableUtils {
 
     public static IntervalFreshness getMaterializedTableFreshness(
             SqlIntervalLiteral sqlIntervalLiteral) {
-        if (sqlIntervalLiteral.signum() < 0) {
-            throw new ValidationException(
-                    "Materialized table freshness doesn't support negative 
value.");
+        return IntervalFreshness.of(getFreshnessInterval(sqlIntervalLiteral));
+    }
+
+    private static Interval getFreshnessInterval(SqlIntervalLiteral 
sqlIntervalLiteral) {
+        final IntervalValue intervalValue = 
sqlIntervalLiteral.getValueAs(IntervalValue.class);
+        final SqlTypeName typeName = 
intervalValue.getIntervalQualifier().typeName();
+
+        if (isDateTimeInterval(typeName)) {
+            final Interval freshnessInterval =
+                    getDayTimeInterval(
+                            intervalValue,
+                            typeName,
+                            sqlIntervalLiteral.getValueAs(BigDecimal.class),
+                            "freshness");
+            final int interval = freshnessInterval.getInterval();
+            // Freshness interval might be only positive
+            validateIntervalValuePositive(interval, "freshness");
+            return freshnessInterval;
         }
-        if (sqlIntervalLiteral.getTypeName().getFamily() != 
SqlTypeFamily.INTERVAL_DAY_TIME) {
-            throw new ValidationException(
-                    "Materialized table freshness only support SECOND, MINUTE, 
HOUR, DAY as the time unit.");
+
+        throw new ValidationException(
+                "Materialized table freshness only supports SECOND, MINUTE, 
HOUR, DAY, WEEK as the time unit.");
+    }
+
+    private static Interval intervalFrom(
+            SqlIntervalLiteral sqlIntervalLiteral, String intervalDescription) 
{
+
+        final IntervalValue intervalValue = 
sqlIntervalLiteral.getValueAs(IntervalValue.class);
+        final SqlTypeName typeName = 
intervalValue.getIntervalQualifier().typeName();
+        if (intervalValue.getIntervalQualifier().isYearMonth()) {
+            return getYearMonthInterval(
+                    intervalValue,
+                    typeName,
+                    sqlIntervalLiteral.getValueAs(BigDecimal.class),
+                    intervalDescription);
+        }
+
+        if (isDateTimeInterval(typeName)) {
+            return getDayTimeInterval(
+                    intervalValue,
+                    typeName,
+                    sqlIntervalLiteral.getValueAs(BigDecimal.class),
+                    intervalDescription);
+        }
+
+        throw new ValidationException(
+                String.format(
+                        "Materialized table %s only supports SECOND, MINUTE, 
HOUR, DAY, WEEK, MONTH, QUARTER, YEAR as the time unit.",
+                        intervalDescription));
+    }
+
+    private static Interval getYearMonthInterval(
+            final IntervalValue intervalValue,
+            final SqlTypeName typeName,
+            final BigDecimal interval,
+            final String intervalDescription) {
+        final int intervalInt = interval.intValue();
+        switch (typeName) {
+            case INTERVAL_MONTH:
+                if 
(intervalValue.getIntervalQualifier().timeUnitRange.startUnit
+                        == org.apache.calcite.avatica.util.TimeUnit.QUARTER) {
+                    return Interval.of(
+                            intervalInt / DateTimeUtils.MONTHS_PER_QUARTER, 
TimeUnit.QUARTER);
+                }
+                return Interval.of(intervalInt, TimeUnit.MONTH);
+            case INTERVAL_YEAR:
+                return Interval.of(
+                        (int) (intervalInt / 
MONTH_OF_YEAR.range().getMaximum()), TimeUnit.YEAR);
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Materialized table %s only supports MONTH, 
QUARTER, YEAR as the time unit.",
+                                intervalDescription));
         }
+    }
 
-        IntervalValue intervalValue = 
sqlIntervalLiteral.getValueAs(IntervalValue.class);
-        String interval = intervalValue.getIntervalLiteral();
-        switch (intervalValue.getIntervalQualifier().typeName()) {
+    private static Interval getDayTimeInterval(
+            final IntervalValue intervalValue,
+            final SqlTypeName typeName,
+            final BigDecimal interval,
+            final String intervalDescription) {
+        final long millis = interval.longValue();
+        switch (typeName) {
             case INTERVAL_DAY:
-                return IntervalFreshness.ofDay(interval);
+                final int amountOfDays = (int) (millis / 
DateTimeUtils.MILLIS_PER_DAY);
+                if 
(intervalValue.getIntervalQualifier().timeUnitRange.startUnit
+                        == org.apache.calcite.avatica.util.TimeUnit.WEEK) {
+                    return Interval.of(amountOfDays / 
DateTimeUtils.DAYS_PER_WEEK, TimeUnit.WEEK);
+                }
+                return Interval.of(amountOfDays, TimeUnit.DAY);
             case INTERVAL_HOUR:
-                return IntervalFreshness.ofHour(interval);
+                return Interval.of((int) (millis / 
DateTimeUtils.MILLIS_PER_HOUR), TimeUnit.HOUR);
             case INTERVAL_MINUTE:
-                return IntervalFreshness.ofMinute(interval);
+                return Interval.of(
+                        (int) (millis / DateTimeUtils.MILLIS_PER_MINUTE), 
TimeUnit.MINUTE);
             case INTERVAL_SECOND:
-                return IntervalFreshness.ofSecond(interval);
+                return Interval.of(
+                        (int) (millis / DateTimeUtils.MILLIS_PER_SECOND), 
TimeUnit.SECOND);
             default:
                 throw new ValidationException(
-                        "Materialized table freshness only support SECOND, 
MINUTE, HOUR, DAY as the time unit.");
+                        String.format(
+                                "Materialized table %s only supports SECOND, 
MINUTE, HOUR, DAY, WEEK as the time unit.",
+                                intervalDescription));
         }
     }
 
@@ -210,6 +296,13 @@ public class MaterializedTableUtils {
         return changes;
     }
 
+    private static boolean isDateTimeInterval(SqlTypeName typeName) {
+        return typeName == SqlTypeName.INTERVAL_DAY
+                || typeName == SqlTypeName.INTERVAL_HOUR
+                || typeName == SqlTypeName.INTERVAL_MINUTE
+                || typeName == SqlTypeName.INTERVAL_SECOND;
+    }
+
     // Since it is only for query change, then check only persisted columns 
which could be
     // changed/added/dropped with such change
     private static boolean isSchemaChanged(ResolvedSchema oldSchema, 
ResolvedSchema newSchema) {
@@ -408,4 +501,14 @@ public class MaterializedTableUtils {
         throw new ValidationException(
                 String.format(PERSISTED_COLUMN_NOT_USED_IN_QUERY, type, 
columnName));
     }
+
+    private static void validateIntervalValuePositive(
+            final int interval, final String description) {
+        if (interval <= 0) {
+            throw new ValidationException(
+                    String.format(
+                            "The %s interval currently only supports positive 
integer type values. But was: %d",
+                            description, interval));
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 2d5aecfff4a..3344616c798 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1471,7 +1471,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                                     assertThat(
                                                     
op.getCatalogMaterializedTable()
                                                             
.getDefinitionFreshness())
-                                            
.isEqualTo(IntervalFreshness.ofSecond("30"));
+                                            
.isEqualTo(IntervalFreshness.ofSecond(30));
                                     
assertThat(op.getCatalogMaterializedTable().getRefreshMode())
                                             .isSameAs(RefreshMode.CONTINUOUS);
                                 }),
@@ -2876,7 +2876,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                         .partitionKeys(
                                 hasPartition ? Arrays.asList("b", "c") : 
Collections.emptyList())
                         .options(Collections.unmodifiableMap(options))
-                        .freshness(IntervalFreshness.ofHour("2"))
+                        .freshness(IntervalFreshness.ofHour(2))
                         .logicalRefreshMode(LogicalRefreshMode.CONTINUOUS)
                         .refreshMode(RefreshMode.CONTINUOUS)
                         .refreshStatus(RefreshStatus.ACTIVATED)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 32281b83936..8f8dfb85618 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -214,14 +214,14 @@ class SqlMaterializedTableNodeToOperationConverterTest
                 createResolvedCatalogMaterializedTable(sql);
 
         final IntervalFreshness resolvedFreshness = 
materializedTable.getDefinitionFreshness();
-        
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofSecond("30"));
+        
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofSecond(30));
 
         final RefreshMode resolvedRefreshMode = 
materializedTable.getRefreshMode();
         assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL);
 
         final CatalogMaterializedTable expected =
                 getDefaultMaterializedTableBuilder()
-                        .freshness(IntervalFreshness.ofSecond("30"))
+                        .freshness(IntervalFreshness.ofSecond(30))
                         .logicalRefreshMode(LogicalRefreshMode.FULL)
                         .refreshMode(RefreshMode.FULL)
                         .refreshStatus(RefreshStatus.INITIALIZING)
@@ -248,7 +248,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
 
         // The resolved freshness should default to 1 minute
         final IntervalFreshness resolvedFreshness = 
materializedTable.getDefinitionFreshness();
-        assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofHour("1"));
+        assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofHour(1));
 
         final RefreshMode resolvedRefreshMode = 
materializedTable.getRefreshMode();
         assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL);
@@ -279,7 +279,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                 createResolvedCatalogMaterializedTable(sql);
 
         final IntervalFreshness resolvedFreshness = 
materializedTable.getDefinitionFreshness();
-        
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofMinute("3"));
+        assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofMinute(3));
         final CatalogMaterializedTable expected =
                 getDefaultMaterializedTableBuilder()
                         .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
@@ -641,7 +641,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
 
         final CatalogMaterializedTable expected =
                 getDefaultMaterializedTableBuilder()
-                        .freshness(IntervalFreshness.ofSecond("30"))
+                        .freshness(IntervalFreshness.ofSecond(30))
                         .logicalRefreshMode(LogicalRefreshMode.FULL)
                         .refreshMode(RefreshMode.FULL)
                         .refreshStatus(RefreshStatus.INITIALIZING)
@@ -936,17 +936,17 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 + "FRESHNESS = INTERVAL -'30' SECOND\n"
                                 + "REFRESH_MODE = FULL\n"
                                 + "AS SELECT * FROM t1",
-                        "Materialized table freshness doesn't support negative 
value."),
+                        "The freshness interval currently only supports 
positive integer type values. But was: -30"),
                 TestSpec.of(
                         "CREATE MATERIALIZED TABLE mtbl1\n"
                                 + "FRESHNESS = INTERVAL '30' YEAR\n"
                                 + "AS SELECT * FROM t1",
-                        "Materialized table freshness only support SECOND, 
MINUTE, HOUR, DAY as the time unit."),
+                        "Materialized table freshness only supports SECOND, 
MINUTE, HOUR, DAY, WEEK as the time unit."),
                 TestSpec.of(
                         "CREATE MATERIALIZED TABLE mtbl1\n"
                                 + "FRESHNESS = INTERVAL '30' DAY TO HOUR\n"
                                 + "AS SELECT * FROM t1",
-                        "Materialized table freshness only support SECOND, 
MINUTE, HOUR, DAY as the time unit."));
+                        "Materialized table freshness only supports SECOND, 
MINUTE, HOUR, DAY, WEEK as the time unit."));
     }
 
     private static Collection<TestSpec> alterDrop() {
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
index 7a117ef58c2..122ac8a4bb0 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
@@ -109,7 +109,7 @@ public class TestFileSystemCatalogTest extends 
TestFileSystemCatalogTestBase {
     private static final String DEFAULT_EXPANDED_QUERY =
             String.format(
                     "SELECT id, region, county FROM %s.%s.T", TEST_CATALOG, 
TEST_DEFAULT_DATABASE);
-    private static final IntervalFreshness FRESHNESS = 
IntervalFreshness.ofMinute("3");
+    private static final IntervalFreshness FRESHNESS = 
IntervalFreshness.ofMinute(3);
     private static final ResolvedCatalogMaterializedTable 
EXPECTED_CATALOG_MATERIALIZED_TABLE =
             new ResolvedCatalogMaterializedTable(
                     CatalogMaterializedTable.newBuilder()

Reply via email to