This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 64a21e7fff8 [FLINK-39348][table] Add `Interval` to reuse code for
`FRESHNESS` and `START MODE`
64a21e7fff8 is described below
commit 64a21e7fff838bb5ac8a7f80beb2137285dc6ab7
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 31 12:33:25 2026 +0200
[FLINK-39348][table] Add `Interval` to reuse code for `FRESHNESS` and
`START MODE`
---
.../service/MaterializedTableStatementITCase.java | 2 +-
.../MaterializedTableManagerTest.java | 74 ++++----
.../table/api/internal/ShowCreateUtilTest.java | 10 +-
.../catalog/CatalogBaseTableResolutionTest.java | 4 +-
.../flink/table/catalog/CatalogPropertiesUtil.java | 4 +-
.../org/apache/flink/table/catalog/Interval.java | 173 +++++++++++++++++
.../flink/table/catalog/IntervalFreshness.java | 207 +++++++++++----------
.../apache/flink/table/utils/DateTimeUtils.java | 12 +-
.../table/catalog/CatalogPropertiesUtilTest.java | 4 +-
.../flink/table/catalog/IntervalFreshnessTest.java | 48 ++---
.../planner/utils/MaterializedTableUtils.java | 133 +++++++++++--
.../operations/SqlDdlToOperationConverterTest.java | 4 +-
...erializedTableNodeToOperationConverterTest.java | 16 +-
.../catalog/TestFileSystemCatalogTest.java | 2 +-
14 files changed, 491 insertions(+), 202 deletions(-)
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 31564138b61..bf484e57c7d 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -288,7 +288,7 @@ class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatemen
assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
assertThat(actualMaterializedTable.getDefinitionFreshness())
- .isEqualTo(IntervalFreshness.ofMinute("3"));
+ .isEqualTo(IntervalFreshness.ofMinute(3));
assertThat(actualMaterializedTable.getLogicalRefreshMode())
.isSameAs(LogicalRefreshMode.AUTOMATIC);
assertThat(actualMaterializedTable.getRefreshMode()).isSameAs(RefreshMode.CONTINUOUS);
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java
index 082766cd298..993e60db156 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java
@@ -136,66 +136,66 @@ class MaterializedTableManagerTest {
// The interval of freshness match the partition specified by
the 'date-formatter'.
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofDay("1"))
+ .freshness(IntervalFreshness.ofDay(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2023-12-31"),
TestSpec.create()
.schedulerTime("2024-01-02 00:00:00")
- .freshness(IntervalFreshness.ofDay("1"))
+ .freshness(IntervalFreshness.ofDay(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2024-01-01"),
TestSpec.create()
.schedulerTime("2024-01-02 00:00:00")
- .freshness(IntervalFreshness.ofHour("1"))
+ .freshness(IntervalFreshness.ofHour(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2024-01-01")
.expectedRefreshPartition("hour", "23"),
TestSpec.create()
.schedulerTime("2024-01-02 01:00:00")
- .freshness(IntervalFreshness.ofHour("1"))
+ .freshness(IntervalFreshness.ofHour(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2024-01-02")
.expectedRefreshPartition("hour", "00"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofHour("2"))
+ .freshness(IntervalFreshness.ofHour(2))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2023-12-31")
.expectedRefreshPartition("hour", "22"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofHour("4"))
+ .freshness(IntervalFreshness.ofHour(4))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2023-12-31")
.expectedRefreshPartition("hour", "20"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofHour("8"))
+ .freshness(IntervalFreshness.ofHour(8))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2023-12-31")
.expectedRefreshPartition("hour", "16"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofHour("12"))
+ .freshness(IntervalFreshness.ofHour(12))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2023-12-31")
.expectedRefreshPartition("hour", "12"),
TestSpec.create()
.schedulerTime("2024-01-01 12:00:00")
- .freshness(IntervalFreshness.ofHour("12"))
+ .freshness(IntervalFreshness.ofHour(12))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2024-01-01")
.expectedRefreshPartition("hour", "00"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("1"))
+ .freshness(IntervalFreshness.ofMinute(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -204,7 +204,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "59"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("2"))
+ .freshness(IntervalFreshness.ofMinute(2))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -213,7 +213,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "58"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("4"))
+ .freshness(IntervalFreshness.ofMinute(4))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -222,7 +222,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "56"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("5"))
+ .freshness(IntervalFreshness.ofMinute(5))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -231,7 +231,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "55"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("6"))
+ .freshness(IntervalFreshness.ofMinute(6))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -240,7 +240,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "54"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("10"))
+ .freshness(IntervalFreshness.ofMinute(10))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -249,7 +249,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "50"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("12"))
+ .freshness(IntervalFreshness.ofMinute(12))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -258,7 +258,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "48"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("15"))
+ .freshness(IntervalFreshness.ofMinute(15))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -267,7 +267,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "45"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("30"))
+ .freshness(IntervalFreshness.ofMinute(30))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -276,7 +276,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "30"),
TestSpec.create()
.schedulerTime("2024-01-01 00:30:00")
- .freshness(IntervalFreshness.ofMinute("30"))
+ .freshness(IntervalFreshness.ofMinute(30))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -288,14 +288,14 @@ class MaterializedTableManagerTest {
// 'date-formatter'.
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofDay("1"))
+ .freshness(IntervalFreshness.ofDay(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2023-12-31")
.expectedRefreshPartition("hour", "00"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofDay("1"))
+ .freshness(IntervalFreshness.ofDay(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -304,7 +304,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "00"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofHour("1"))
+ .freshness(IntervalFreshness.ofHour(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -313,7 +313,7 @@ class MaterializedTableManagerTest {
.expectedRefreshPartition("minute", "00"),
TestSpec.create()
.schedulerTime("2024-01-01 01:00:00")
- .freshness(IntervalFreshness.ofHour("1"))
+ .freshness(IntervalFreshness.ofHour(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
@@ -324,84 +324,84 @@ class MaterializedTableManagerTest {
// 'date-formatter'.
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofHour("1"))
+ .freshness(IntervalFreshness.ofHour(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2023-12-31"),
TestSpec.create()
.schedulerTime("2024-01-01 01:00:00")
- .freshness(IntervalFreshness.ofHour("1"))
+ .freshness(IntervalFreshness.ofHour(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2024-01-01"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofHour("2"))
+ .freshness(IntervalFreshness.ofHour(2))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2023-12-31"),
TestSpec.create()
.schedulerTime("2024-01-01 02:00:00")
- .freshness(IntervalFreshness.ofHour("2"))
+ .freshness(IntervalFreshness.ofHour(2))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2024-01-01"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofHour("4"))
+ .freshness(IntervalFreshness.ofHour(4))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2023-12-31"),
TestSpec.create()
.schedulerTime("2024-01-01 04:00:00")
- .freshness(IntervalFreshness.ofHour("4"))
+ .freshness(IntervalFreshness.ofHour(4))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2024-01-01"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("1"))
+ .freshness(IntervalFreshness.ofMinute(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2023-12-31")
.expectedRefreshPartition("hour", "23"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("2"))
+ .freshness(IntervalFreshness.ofMinute(2))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2023-12-31")
.expectedRefreshPartition("hour", "23"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("4"))
+ .freshness(IntervalFreshness.ofMinute(4))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2023-12-31")
.expectedRefreshPartition("hour", "23"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("15"))
+ .freshness(IntervalFreshness.ofMinute(15))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.expectedRefreshPartition("day", "2023-12-31")
.expectedRefreshPartition("hour", "23"),
TestSpec.create()
.schedulerTime("2024-01-01 00:00:00")
- .freshness(IntervalFreshness.ofMinute("1"))
+ .freshness(IntervalFreshness.ofMinute(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2023-12-31"),
TestSpec.create()
.schedulerTime("2024-01-01 00:01:00")
- .freshness(IntervalFreshness.ofMinute("1"))
+ .freshness(IntervalFreshness.ofMinute(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.expectedRefreshPartition("day", "2024-01-01"),
// Invalid test case.
TestSpec.create()
.schedulerTime(null)
- .freshness(IntervalFreshness.ofDay("1"))
+ .freshness(IntervalFreshness.ofDay(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.errorMessage(
"The scheduler time must not be null during
the periodic refresh of the materialized table `catalog`.`database`.`table`."),
TestSpec.create()
.schedulerTime("2024-01-01")
- .freshness(IntervalFreshness.ofDay("1"))
+ .freshness(IntervalFreshness.ofDay(1))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter",
"HH")
.errorMessage(
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
index 51e1a5479fd..6b50542898b 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -313,7 +313,7 @@ class ShowCreateUtilTest {
null,
List.of(),
null,
- IntervalFreshness.ofMinute("1"),
+ IntervalFreshness.ofMinute(1),
RefreshMode.CONTINUOUS,
"SELECT 1",
"SELECT 1"),
@@ -331,7 +331,7 @@ class ShowCreateUtilTest {
null,
List.of(),
null,
- IntervalFreshness.ofMinute("1"),
+ IntervalFreshness.ofMinute(1),
RefreshMode.CONTINUOUS,
"SELECT 1",
"SELECT 1"),
@@ -351,7 +351,7 @@ class ShowCreateUtilTest {
null,
List.of(),
null,
- IntervalFreshness.ofMinute("1"),
+ IntervalFreshness.ofMinute(1),
RefreshMode.CONTINUOUS,
"SELECT 1",
"SELECT 1"),
@@ -371,7 +371,7 @@ class ShowCreateUtilTest {
"Materialized table comment",
List.of("id"),
TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")),
- IntervalFreshness.ofMinute("3"),
+ IntervalFreshness.ofMinute(3),
RefreshMode.FULL,
"SELECT id, name FROM tbl_a",
"SELECT id, name FROM
`catalogName`.`dbName`.`tbl_a`"),
@@ -393,7 +393,7 @@ class ShowCreateUtilTest {
"Materialized table comment",
List.of("id"),
TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")),
- IntervalFreshness.ofMinute("3"),
+ IntervalFreshness.ofMinute(3),
RefreshMode.FULL,
"SELECT * FROM tbl_a",
"SELECT id, name FROM
`catalogName`.`dbName`.`tbl_a`"),
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index cc94ab86fbf..0d26b59db6a 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -257,7 +257,7 @@ class CatalogBaseTableResolutionTest {
assertThat(resolvedCatalogMaterializedTable.getResolvedSchema())
.isEqualTo(RESOLVED_MATERIALIZED_TABLE_SCHEMA);
assertThat(resolvedCatalogMaterializedTable.getDefinitionFreshness())
- .isEqualTo(IntervalFreshness.ofSecond("30"));
+ .isEqualTo(IntervalFreshness.ofSecond(30));
assertThat(resolvedCatalogMaterializedTable.getOriginalQuery())
.isEqualTo(DEFAULT_ORIGINAL_QUERY);
assertThat(resolvedCatalogMaterializedTable.getExpandedQuery())
@@ -502,7 +502,7 @@ class CatalogBaseTableResolutionTest {
.options(Collections.emptyMap())
.originalQuery(DEFAULT_ORIGINAL_QUERY)
.expandedQuery(DEFAULT_EXPANDED_QUERY)
- .freshness(IntervalFreshness.ofSecond("30"))
+ .freshness(IntervalFreshness.ofSecond(30))
.logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
.refreshMode(RefreshMode.CONTINUOUS)
.refreshStatus(RefreshStatus.INITIALIZING)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
index 67b72bdeb68..58c8d76d858 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
@@ -288,8 +288,8 @@ public final class CatalogPropertiesUtil {
final String expandedQuery = properties.get(EXPANDED_QUERY);
final String freshnessInterval =
properties.get(FRESHNESS_INTERVAL);
- final IntervalFreshness.TimeUnit timeUnit =
-
IntervalFreshness.TimeUnit.valueOf(properties.get(FRESHNESS_UNIT));
+ final Interval.TimeUnit timeUnit =
+ Interval.TimeUnit.valueOf(properties.get(FRESHNESS_UNIT));
final IntervalFreshness freshness =
IntervalFreshness.of(freshnessInterval, timeUnit);
final LogicalRefreshMode logicalRefreshMode =
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Interval.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Interval.java
new file mode 100644
index 00000000000..8f6621c969d
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Interval.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import java.time.Duration;
+import java.time.Period;
+import java.util.Objects;
+
+/**
+ * A class representing day-time and year-month intervals.
+ *
+ * <p>Depending on the time unit, the interval is backed day-time or
year-month. See {@link
+ * TimeUnit} for more details.
+ */
+@PublicEvolving
+public class Interval {
+ private final Duration duration;
+ private final Period period;
+ private final TimeUnit timeUnit;
+
+ private Interval(TimeUnit timeUnit, Duration duration, Period period) {
+ this.timeUnit = timeUnit;
+ this.duration = duration;
+ this.period = period;
+ }
+
+ public static Interval of(Duration duration, TimeUnit timeUnit) {
+ return new Interval(timeUnit, duration, null);
+ }
+
+ public static Interval of(Period period, TimeUnit timeUnit) {
+ return new Interval(timeUnit, null, period);
+ }
+
+ public static Interval of(int duration, TimeUnit timeUnit) {
+ if (timeUnit.isDayTime) {
+ return of(toDuration(duration, timeUnit), timeUnit);
+ } else {
+ return of(toPeriod(duration, timeUnit), timeUnit);
+ }
+ }
+
+ public Duration getDuration() {
+ return duration;
+ }
+
+ public Period getPeriod() {
+ return period;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ public int getInterval() {
+ switch (timeUnit) {
+ case SECOND:
+ return (int) duration.toSeconds();
+ case MINUTE:
+ return (int) duration.toMinutes();
+ case HOUR:
+ return (int) duration.toHours();
+ case DAY:
+ return (int) duration.toDays();
+ case WEEK:
+ return (int) (duration.toDays() / DateTimeUtils.DAYS_PER_WEEK);
+ case MONTH:
+ return period.getMonths();
+ case QUARTER:
+ return period.getMonths() / DateTimeUtils.MONTHS_PER_QUARTER;
+ case YEAR:
+ return period.getYears();
+ default:
+ // Could happen if new TimeUnit is introduced and not
supported here
+ throw new ValidationException(
+ String.format("TimeUnit %s is not supported",
timeUnit));
+ }
+ }
+
+ private static Duration toDuration(int interval, TimeUnit timeUnit) {
+ switch (timeUnit) {
+ case SECOND:
+ return Duration.ofSeconds(interval);
+ case MINUTE:
+ return Duration.ofMinutes(interval);
+ case HOUR:
+ return Duration.ofHours(interval);
+ case DAY:
+ return Duration.ofDays(interval);
+ case WEEK:
+ return Duration.ofDays((long) interval *
DateTimeUtils.DAYS_PER_WEEK);
+ default:
+ throw new IllegalArgumentException("Unsupported time unit: " +
timeUnit);
+ }
+ }
+
+ private static Period toPeriod(int interval, TimeUnit timeUnit) {
+ switch (timeUnit) {
+ case MONTH:
+ return Period.ofMonths(interval);
+ case QUARTER:
+ return Period.ofMonths(interval *
DateTimeUtils.MONTHS_PER_QUARTER);
+ case YEAR:
+ return Period.ofYears(interval);
+ default:
+ throw new IllegalArgumentException("Unsupported time unit: " +
timeUnit);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Interval interval = (Interval) o;
+ return Objects.equals(duration, interval.duration)
+ && Objects.equals(period, interval.period)
+ && timeUnit == interval.timeUnit;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(duration, period, timeUnit);
+ }
+
+ @Override
+ public String toString() {
+ return "INTERVAL '" + getInterval() + "' " + timeUnit;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // TimeUnit enums
+ //
--------------------------------------------------------------------------------------------
+
+ /** An enumeration of time unit representing the unit of interval. */
+ @PublicEvolving
+ public enum TimeUnit {
+ SECOND(true),
+ MINUTE(true),
+ HOUR(true),
+ DAY(true),
+ WEEK(true),
+ MONTH(false),
+ QUARTER(false),
+ YEAR(false);
+
+ private final boolean isDayTime;
+
+ TimeUnit(boolean isDayTime) {
+ this.isDayTime = isDayTime;
+ }
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
index 6548b149663..5c41984c006 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Interval.TimeUnit;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
@@ -42,57 +43,66 @@ public class IntervalFreshness {
private static final long MINUTE_CRON_UPPER_BOUND = 60;
private static final long HOUR_CRON_UPPER_BOUND = 24;
- private final int interval;
- private final TimeUnit timeUnit;
+ private final Interval interval;
- private IntervalFreshness(int interval, TimeUnit timeUnit) {
+ private IntervalFreshness(Interval interval) {
this.interval = interval;
- this.timeUnit = timeUnit;
+ }
+
+ public static IntervalFreshness of(Interval interval) {
+ return new IntervalFreshness(interval);
}
public static IntervalFreshness of(String interval, TimeUnit timeUnit) {
- final int validateIntervalInput = validateIntervalInput(interval);
- return new IntervalFreshness(validateIntervalInput, timeUnit);
+ final int validateIntervalInput = validateIntervalValue(interval);
+ return new IntervalFreshness(Interval.of(validateIntervalInput,
timeUnit));
}
- private static int validateIntervalInput(final String interval) {
- final int parsedInt;
- try {
- parsedInt = Integer.parseInt(interval);
- } catch (Exception e) {
- final String errorMessage =
- String.format(
- "The freshness interval currently only supports
positive integer type values. But was: %s",
- interval);
- throw new ValidationException(errorMessage, e);
- }
+ public static IntervalFreshness of(int intervalDuration, TimeUnit
timeUnit) {
+ validateIntervalPositiveValue(intervalDuration);
+ return new IntervalFreshness(Interval.of(intervalDuration, timeUnit));
+ }
- if (parsedInt <= 0) {
- final String errorMessage =
- String.format(
- "The freshness interval currently only supports
positive integer type values. But was: %s",
- interval);
- throw new ValidationException(errorMessage);
- }
- return parsedInt;
+ public static IntervalFreshness of(Duration duration, TimeUnit timeUnit) {
+ // just check it is positive
+ validateIntervalPositiveValue((int) duration.toSeconds());
+ return new IntervalFreshness(Interval.of(duration, timeUnit));
}
+ @Deprecated
public static IntervalFreshness ofSecond(String interval) {
return IntervalFreshness.of(interval, TimeUnit.SECOND);
}
+ public static IntervalFreshness ofSecond(int interval) {
+ return IntervalFreshness.of(interval, TimeUnit.SECOND);
+ }
+
+ @Deprecated
public static IntervalFreshness ofMinute(String interval) {
return IntervalFreshness.of(interval, TimeUnit.MINUTE);
}
+ public static IntervalFreshness ofMinute(int interval) {
+ return IntervalFreshness.of(interval, TimeUnit.MINUTE);
+ }
+
public static IntervalFreshness ofHour(String interval) {
return IntervalFreshness.of(interval, TimeUnit.HOUR);
}
+ public static IntervalFreshness ofHour(int interval) {
+ return IntervalFreshness.of(interval, TimeUnit.HOUR);
+ }
+
public static IntervalFreshness ofDay(String interval) {
return IntervalFreshness.of(interval, TimeUnit.DAY);
}
+ public static IntervalFreshness ofDay(int interval) {
+ return IntervalFreshness.of(interval, TimeUnit.DAY);
+ }
+
/**
* Validates that the given freshness can be converted to a cron
expression in full refresh
* mode. Since freshness and cron expression cannot be converted
equivalently, there are
@@ -156,53 +166,30 @@ public class IntervalFreshness {
}
}
- private static void validateCronConstraints(
- IntervalFreshness intervalFreshness, long cronUpperBound) {
- int interval = intervalFreshness.getIntervalInt();
- TimeUnit timeUnit = intervalFreshness.getTimeUnit();
- // Freshness must be less than cronUpperBound for corresponding time
unit when convert it
- // to cron expression
- if (interval >= cronUpperBound) {
- throw new ValidationException(
- String.format(
- "In full refresh mode, freshness must be less than
%s when the time unit is %s.",
- cronUpperBound, timeUnit));
- }
- // Freshness must be factors of cronUpperBound for corresponding time
unit
- if (cronUpperBound % interval != 0) {
- throw new ValidationException(
- String.format(
- "In full refresh mode, only freshness that are
factors of %s are currently supported when the time unit is %s.",
- cronUpperBound, timeUnit));
- }
- }
-
- private static void validateDayConstraints(IntervalFreshness
intervalFreshness) {
- // Since the number of days in each month is different, only one day
of freshness is
- // currently supported when the time unit is DAY
- int interval = intervalFreshness.getIntervalInt();
- if (interval > 1) {
- throw new ValidationException(
- "In full refresh mode, freshness must be 1 when the time
unit is DAY.");
- }
- }
-
/**
* Creates an IntervalFreshness from a Duration, choosing the most
appropriate time unit.
* Prefers larger units when possible (e.g., 60 seconds → 1 minute).
*/
public static IntervalFreshness fromDuration(Duration duration) {
if (duration.equals(duration.truncatedTo(ChronoUnit.DAYS))) {
- return new IntervalFreshness((int) duration.toDays(),
TimeUnit.DAY);
+ return IntervalFreshness.ofDay((int) duration.toDays());
}
if (duration.equals(duration.truncatedTo(ChronoUnit.HOURS))) {
- return new IntervalFreshness((int) duration.toHours(),
TimeUnit.HOUR);
+ return IntervalFreshness.ofHour((int) duration.toHours());
}
if (duration.equals(duration.truncatedTo(ChronoUnit.MINUTES))) {
- return new IntervalFreshness((int) duration.toMinutes(),
TimeUnit.MINUTE);
+ return IntervalFreshness.ofMinute((int) duration.toMinutes());
}
- return new IntervalFreshness((int) duration.getSeconds(),
TimeUnit.SECOND);
+ return IntervalFreshness.ofSecond((int) duration.getSeconds());
+ }
+
+ public Duration toDuration() {
+ return interval.getDuration();
+ }
+
+ public TimeUnit getTimeUnit() {
+ return interval.getTimeUnit();
}
/**
@@ -210,64 +197,84 @@ public class IntervalFreshness {
*/
@Deprecated
public String getInterval() {
- return String.valueOf(interval);
+ return String.valueOf(getIntervalInt());
}
public int getIntervalInt() {
- return interval;
- }
-
- public TimeUnit getTimeUnit() {
- return timeUnit;
- }
-
- public Duration toDuration() {
- switch (timeUnit) {
- case SECOND:
- return Duration.ofSeconds(interval);
- case MINUTE:
- return Duration.ofMinutes(interval);
- case HOUR:
- return Duration.ofHours(interval);
- case DAY:
- return Duration.ofDays(interval);
- default:
- throw new IllegalStateException("Unexpected value: " +
timeUnit);
- }
+ return interval.getInterval();
}
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
if (o == null || getClass() != o.getClass()) {
return false;
}
IntervalFreshness that = (IntervalFreshness) o;
- return Objects.equals(interval, that.interval) && timeUnit ==
that.timeUnit;
+ return Objects.equals(interval, that.interval);
}
@Override
- public String toString() {
- return "INTERVAL '" + interval + "' " + timeUnit;
+ public int hashCode() {
+ return Objects.hashCode(interval);
}
@Override
- public int hashCode() {
- return Objects.hash(interval, timeUnit);
+ public String toString() {
+ return interval.toString();
}
- //
--------------------------------------------------------------------------------------------
- // TimeUnit enums
- //
--------------------------------------------------------------------------------------------
-
- /** An enumeration of time unit representing the unit of interval
freshness. */
- @PublicEvolving
- public enum TimeUnit {
- SECOND,
- MINUTE,
- HOUR,
- DAY
+ private static int validateIntervalValue(final String interval) {
+ final int parsedInt;
+ try {
+ parsedInt = Integer.parseInt(interval);
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format(
+ "The freshness interval currently only supports
positive integer type values. But was: %s",
+ interval);
+ throw new ValidationException(errorMessage, e);
+ }
+ validateIntervalPositiveValue(parsedInt);
+ return parsedInt;
+ }
+
+ private static void validateIntervalPositiveValue(final int interval) {
+ if (interval <= 0) {
+ throw new ValidationException(
+ String.format(
+ "The freshness interval currently only supports
positive integer type values. But was: %d",
+ interval));
+ }
+ }
+
+ private static void validateCronConstraints(
+ IntervalFreshness intervalFreshness, long cronUpperBound) {
+ int interval = intervalFreshness.getIntervalInt();
+ TimeUnit timeUnit = intervalFreshness.getTimeUnit();
+ // Freshness must be less than cronUpperBound for corresponding time
unit when convert it
+ // to cron expression
+ if (interval >= cronUpperBound) {
+ throw new ValidationException(
+ String.format(
+ "In full refresh mode, freshness must be less than
%s when the time unit is %s.",
+ cronUpperBound, timeUnit));
+ }
+ // Freshness must be factors of cronUpperBound for corresponding time
unit
+ if (cronUpperBound % interval != 0) {
+ throw new ValidationException(
+ String.format(
+ "In full refresh mode, only freshness that are
factors of %s are currently supported when the time unit is %s.",
+ cronUpperBound, timeUnit));
+ }
+ }
+
+ private static void validateDayConstraints(IntervalFreshness
intervalFreshness) {
+ // Since the number of days in each month is different, only one day
of freshness is
+ // currently supported when the time unit is DAY
+ int interval = intervalFreshness.getIntervalInt();
+ if (interval > 1) {
+ throw new ValidationException(
+ "In full refresh mode, freshness must be 1 when the time
unit is DAY.");
+ }
}
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index 754e694c53a..9c0071b6bd9 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -90,13 +90,13 @@ public class DateTimeUtils {
public static final int EPOCH_JULIAN = 2440588;
/** The number of milliseconds in a second. */
- private static final long MILLIS_PER_SECOND = 1000L;
+ public static final long MILLIS_PER_SECOND = 1000L;
/** The number of milliseconds in a minute. */
- private static final long MILLIS_PER_MINUTE = 60000L;
+ public static final long MILLIS_PER_MINUTE = 60000L;
/** The number of milliseconds in an hour. */
- private static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000
+ public static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000
/**
* The number of milliseconds in a day.
@@ -105,6 +105,12 @@ public class DateTimeUtils {
*/
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 *
1000
+ /** The number of days in a week. */
+ public static final int DAYS_PER_WEEK = 7;
+
+ /** The number of months in a month. */
+ public static final int MONTHS_PER_QUARTER = 3;
+
/** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
index 43eb735c4c9..f66f4d52251 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
@@ -167,7 +167,7 @@ class CatalogPropertiesUtilTest {
.distribution(unknownDist)
.originalQuery("SELECT 1, 'two'")
.expandedQuery("SELECT 1, 'two'")
- .freshness(IntervalFreshness.ofHour("123"))
+ .freshness(IntervalFreshness.ofHour(123))
.logicalRefreshMode(LogicalRefreshMode.CONTINUOUS)
.refreshMode(RefreshMode.CONTINUOUS)
.refreshStatus(RefreshStatus.ACTIVATED)
@@ -175,6 +175,6 @@ class CatalogPropertiesUtilTest {
.build(),
resolvedSchema,
RefreshMode.CONTINUOUS,
- IntervalFreshness.ofHour("123")));
+ IntervalFreshness.ofHour(123)));
}
}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
index de794684956..1b331376bdf 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java
@@ -48,114 +48,114 @@ public class IntervalFreshnessTest {
void testConvertDurationToFreshnessInterval() {
// verify second
IntervalFreshness actualSeconds =
IntervalFreshness.fromDuration(Duration.ofSeconds(20));
- assertThat(actualSeconds).isEqualTo(IntervalFreshness.ofSecond("20"));
+ assertThat(actualSeconds).isEqualTo(IntervalFreshness.ofSecond(20));
// verify minute
IntervalFreshness actualMinutes =
IntervalFreshness.fromDuration(Duration.ofMinutes(3));
- assertThat(actualMinutes).isEqualTo(IntervalFreshness.ofMinute("3"));
+ assertThat(actualMinutes).isEqualTo(IntervalFreshness.ofMinute(3));
// verify hour
IntervalFreshness actualHour =
IntervalFreshness.fromDuration(Duration.ofHours(1));
- assertThat(actualHour).isEqualTo(IntervalFreshness.ofHour("1"));
+ assertThat(actualHour).isEqualTo(IntervalFreshness.ofHour(1));
// verify day
IntervalFreshness actualDay =
IntervalFreshness.fromDuration(Duration.ofDays(2));
- assertThat(actualDay).isEqualTo(IntervalFreshness.ofDay("2"));
+ assertThat(actualDay).isEqualTo(IntervalFreshness.ofDay(2));
}
@Test
void testConvertFreshnessToDuration() {
// verify second
- Duration actualSecond = IntervalFreshness.ofSecond("20").toDuration();
+ Duration actualSecond = IntervalFreshness.ofSecond(20).toDuration();
assertThat(actualSecond).isEqualTo(Duration.ofSeconds(20));
// verify minute
- Duration actualMinute = IntervalFreshness.ofMinute("3").toDuration();
+ Duration actualMinute = IntervalFreshness.ofMinute(3).toDuration();
assertThat(actualMinute).isEqualTo(Duration.ofMinutes(3));
// verify hour
- Duration actualHour = IntervalFreshness.ofHour("3").toDuration();
+ Duration actualHour = IntervalFreshness.ofHour(3).toDuration();
assertThat(actualHour).isEqualTo(Duration.ofHours(3));
// verify day
- Duration actualDay = IntervalFreshness.ofDay("3").toDuration();
+ Duration actualDay = IntervalFreshness.ofDay(3).toDuration();
assertThat(actualDay).isEqualTo(Duration.ofDays(3));
}
@Test
void testConvertSecondFreshnessToCronExpression() {
// verify illegal freshness
- assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofSecond("90")))
+ assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofSecond(90)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, freshness must be less than 60
when the time unit is SECOND.");
- assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofSecond("32")))
+ assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofSecond(32)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, only freshness that are factors
of 60 are currently supported when the time unit is SECOND.");
- String actual1 =
convertFreshnessToCron(IntervalFreshness.ofSecond("30"));
+ String actual1 =
convertFreshnessToCron(IntervalFreshness.ofSecond(30));
assertThat(actual1).isEqualTo("0/30 * * * * ? *");
- String actual2 =
convertFreshnessToCron(IntervalFreshness.ofSecond("5"));
+ String actual2 = convertFreshnessToCron(IntervalFreshness.ofSecond(5));
assertThat(actual2).isEqualTo("0/5 * * * * ? *");
}
@Test
void testConvertMinuteFreshnessToCronExpression() {
// verify illegal freshness
- assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofMinute("90")))
+ assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofMinute(90)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, freshness must be less than 60
when the time unit is MINUTE.");
- assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofMinute("32")))
+ assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofMinute(32)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, only freshness that are factors
of 60 are currently supported when the time unit is MINUTE.");
- String actual1 =
convertFreshnessToCron(IntervalFreshness.ofMinute("30"));
+ String actual1 =
convertFreshnessToCron(IntervalFreshness.ofMinute(30));
assertThat(actual1).isEqualTo("0 0/30 * * * ? *");
- String actual2 =
convertFreshnessToCron(IntervalFreshness.ofMinute("5"));
+ String actual2 = convertFreshnessToCron(IntervalFreshness.ofMinute(5));
assertThat(actual2).isEqualTo("0 0/5 * * * ? *");
- String actual3 =
convertFreshnessToCron(IntervalFreshness.ofMinute("1"));
+ String actual3 = convertFreshnessToCron(IntervalFreshness.ofMinute(1));
assertThat(actual3).isEqualTo("0 0/1 * * * ? *");
}
@Test
void testConvertHourFreshnessToCronExpression() {
// verify illegal freshness
- assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofHour("24")))
+ assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofHour(24)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, freshness must be less than 24
when the time unit is HOUR.");
- assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofHour("14")))
+ assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofHour(14)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, only freshness that are factors
of 24 are currently supported when the time unit is HOUR.");
- String actual1 =
convertFreshnessToCron(IntervalFreshness.ofHour("12"));
+ String actual1 = convertFreshnessToCron(IntervalFreshness.ofHour(12));
assertThat(actual1).isEqualTo("0 0 0/12 * * ? *");
- String actual2 = convertFreshnessToCron(IntervalFreshness.ofHour("4"));
+ String actual2 = convertFreshnessToCron(IntervalFreshness.ofHour(4));
assertThat(actual2).isEqualTo("0 0 0/4 * * ? *");
- String actual3 = convertFreshnessToCron(IntervalFreshness.ofHour("1"));
+ String actual3 = convertFreshnessToCron(IntervalFreshness.ofHour(1));
assertThat(actual3).isEqualTo("0 0 0/1 * * ? *");
}
@Test
void testConvertDayFreshnessToCronExpression() {
// verify illegal freshness
- assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofDay("2")))
+ assertThatThrownBy(() ->
convertFreshnessToCron(IntervalFreshness.ofDay(2)))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"In full refresh mode, freshness must be 1 when the
time unit is DAY.");
- String actual1 = convertFreshnessToCron(IntervalFreshness.ofDay("1"));
+ String actual1 = convertFreshnessToCron(IntervalFreshness.ofDay(1));
assertThat(actual1).isEqualTo("0 0 0 * * ? *");
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 8cc5ff524e8..6f50a3d36e7 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -32,6 +32,8 @@ import
org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.Column.ComputedColumn;
import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.Interval;
+import org.apache.flink.table.catalog.Interval.TimeUnit;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
@@ -39,14 +41,16 @@ import
org.apache.flink.table.catalog.TableChange.ColumnPosition;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.calcite.sql.SqlIntervalLiteral;
import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.commons.lang3.StringUtils;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -56,6 +60,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
+import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
+
/** The utils for materialized table. */
@Internal
public class MaterializedTableUtils {
@@ -68,29 +74,109 @@ public class MaterializedTableUtils {
public static IntervalFreshness getMaterializedTableFreshness(
SqlIntervalLiteral sqlIntervalLiteral) {
- if (sqlIntervalLiteral.signum() < 0) {
- throw new ValidationException(
- "Materialized table freshness doesn't support negative
value.");
+ return IntervalFreshness.of(getFreshnessInterval(sqlIntervalLiteral));
+ }
+
+ private static Interval getFreshnessInterval(SqlIntervalLiteral
sqlIntervalLiteral) {
+ final IntervalValue intervalValue =
sqlIntervalLiteral.getValueAs(IntervalValue.class);
+ final SqlTypeName typeName =
intervalValue.getIntervalQualifier().typeName();
+
+ if (isDateTimeInterval(typeName)) {
+ final Interval freshnessInterval =
+ getDayTimeInterval(
+ intervalValue,
+ typeName,
+ sqlIntervalLiteral.getValueAs(BigDecimal.class),
+ "freshness");
+ final int interval = freshnessInterval.getInterval();
+ // Freshness interval might be only positive
+ validateIntervalValuePositive(interval, "freshness");
+ return freshnessInterval;
}
- if (sqlIntervalLiteral.getTypeName().getFamily() !=
SqlTypeFamily.INTERVAL_DAY_TIME) {
- throw new ValidationException(
- "Materialized table freshness only support SECOND, MINUTE,
HOUR, DAY as the time unit.");
+
+ throw new ValidationException(
+ "Materialized table freshness only supports SECOND, MINUTE,
HOUR, DAY, WEEK as the time unit.");
+ }
+
+ private static Interval intervalFrom(
+ SqlIntervalLiteral sqlIntervalLiteral, String intervalDescription)
{
+
+ final IntervalValue intervalValue =
sqlIntervalLiteral.getValueAs(IntervalValue.class);
+ final SqlTypeName typeName =
intervalValue.getIntervalQualifier().typeName();
+ if (intervalValue.getIntervalQualifier().isYearMonth()) {
+ return getYearMonthInterval(
+ intervalValue,
+ typeName,
+ sqlIntervalLiteral.getValueAs(BigDecimal.class),
+ intervalDescription);
+ }
+
+ if (isDateTimeInterval(typeName)) {
+ return getDayTimeInterval(
+ intervalValue,
+ typeName,
+ sqlIntervalLiteral.getValueAs(BigDecimal.class),
+ intervalDescription);
+ }
+
+ throw new ValidationException(
+ String.format(
+ "Materialized table %s only supports SECOND, MINUTE,
HOUR, DAY, WEEK, MONTH, QUARTER, YEAR as the time unit.",
+ intervalDescription));
+ }
+
+ private static Interval getYearMonthInterval(
+ final IntervalValue intervalValue,
+ final SqlTypeName typeName,
+ final BigDecimal interval,
+ final String intervalDescription) {
+ final int intervalInt = interval.intValue();
+ switch (typeName) {
+ case INTERVAL_MONTH:
+ if
(intervalValue.getIntervalQualifier().timeUnitRange.startUnit
+ == org.apache.calcite.avatica.util.TimeUnit.QUARTER) {
+ return Interval.of(
+ intervalInt / DateTimeUtils.MONTHS_PER_QUARTER,
TimeUnit.QUARTER);
+ }
+ return Interval.of(intervalInt, TimeUnit.MONTH);
+ case INTERVAL_YEAR:
+ return Interval.of(
+ (int) (intervalInt /
MONTH_OF_YEAR.range().getMaximum()), TimeUnit.YEAR);
+ default:
+ throw new ValidationException(
+ String.format(
+ "Materialized table %s only supports MONTH,
QUARTER, YEAR as the time unit.",
+ intervalDescription));
}
+ }
- IntervalValue intervalValue =
sqlIntervalLiteral.getValueAs(IntervalValue.class);
- String interval = intervalValue.getIntervalLiteral();
- switch (intervalValue.getIntervalQualifier().typeName()) {
+ private static Interval getDayTimeInterval(
+ final IntervalValue intervalValue,
+ final SqlTypeName typeName,
+ final BigDecimal interval,
+ final String intervalDescription) {
+ final long millis = interval.longValue();
+ switch (typeName) {
case INTERVAL_DAY:
- return IntervalFreshness.ofDay(interval);
+ final int amountOfDays = (int) (millis /
DateTimeUtils.MILLIS_PER_DAY);
+ if
(intervalValue.getIntervalQualifier().timeUnitRange.startUnit
+ == org.apache.calcite.avatica.util.TimeUnit.WEEK) {
+ return Interval.of(amountOfDays /
DateTimeUtils.DAYS_PER_WEEK, TimeUnit.WEEK);
+ }
+ return Interval.of(amountOfDays, TimeUnit.DAY);
case INTERVAL_HOUR:
- return IntervalFreshness.ofHour(interval);
+ return Interval.of((int) (millis /
DateTimeUtils.MILLIS_PER_HOUR), TimeUnit.HOUR);
case INTERVAL_MINUTE:
- return IntervalFreshness.ofMinute(interval);
+ return Interval.of(
+ (int) (millis / DateTimeUtils.MILLIS_PER_MINUTE),
TimeUnit.MINUTE);
case INTERVAL_SECOND:
- return IntervalFreshness.ofSecond(interval);
+ return Interval.of(
+ (int) (millis / DateTimeUtils.MILLIS_PER_SECOND),
TimeUnit.SECOND);
default:
throw new ValidationException(
- "Materialized table freshness only support SECOND,
MINUTE, HOUR, DAY as the time unit.");
+ String.format(
+ "Materialized table %s only supports SECOND,
MINUTE, HOUR, DAY, WEEK as the time unit.",
+ intervalDescription));
}
}
@@ -210,6 +296,13 @@ public class MaterializedTableUtils {
return changes;
}
+ private static boolean isDateTimeInterval(SqlTypeName typeName) {
+ return typeName == SqlTypeName.INTERVAL_DAY
+ || typeName == SqlTypeName.INTERVAL_HOUR
+ || typeName == SqlTypeName.INTERVAL_MINUTE
+ || typeName == SqlTypeName.INTERVAL_SECOND;
+ }
+
// Since it is only for query change, then check only persisted columns
which could be
// changed/added/dropped with such change
private static boolean isSchemaChanged(ResolvedSchema oldSchema,
ResolvedSchema newSchema) {
@@ -408,4 +501,14 @@ public class MaterializedTableUtils {
throw new ValidationException(
String.format(PERSISTED_COLUMN_NOT_USED_IN_QUERY, type,
columnName));
}
+
+ private static void validateIntervalValuePositive(
+ final int interval, final String description) {
+ if (interval <= 0) {
+ throw new ValidationException(
+ String.format(
+ "The %s interval currently only supports positive
integer type values. But was: %d",
+ description, interval));
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 2d5aecfff4a..3344616c798 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1471,7 +1471,7 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
assertThat(
op.getCatalogMaterializedTable()
.getDefinitionFreshness())
-
.isEqualTo(IntervalFreshness.ofSecond("30"));
+
.isEqualTo(IntervalFreshness.ofSecond(30));
assertThat(op.getCatalogMaterializedTable().getRefreshMode())
.isSameAs(RefreshMode.CONTINUOUS);
}),
@@ -2876,7 +2876,7 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
.partitionKeys(
hasPartition ? Arrays.asList("b", "c") :
Collections.emptyList())
.options(Collections.unmodifiableMap(options))
- .freshness(IntervalFreshness.ofHour("2"))
+ .freshness(IntervalFreshness.ofHour(2))
.logicalRefreshMode(LogicalRefreshMode.CONTINUOUS)
.refreshMode(RefreshMode.CONTINUOUS)
.refreshStatus(RefreshStatus.ACTIVATED)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 32281b83936..8f8dfb85618 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -214,14 +214,14 @@ class SqlMaterializedTableNodeToOperationConverterTest
createResolvedCatalogMaterializedTable(sql);
final IntervalFreshness resolvedFreshness =
materializedTable.getDefinitionFreshness();
-
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofSecond("30"));
+
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofSecond(30));
final RefreshMode resolvedRefreshMode =
materializedTable.getRefreshMode();
assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL);
final CatalogMaterializedTable expected =
getDefaultMaterializedTableBuilder()
- .freshness(IntervalFreshness.ofSecond("30"))
+ .freshness(IntervalFreshness.ofSecond(30))
.logicalRefreshMode(LogicalRefreshMode.FULL)
.refreshMode(RefreshMode.FULL)
.refreshStatus(RefreshStatus.INITIALIZING)
@@ -248,7 +248,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
// The resolved freshness should default to 1 minute
final IntervalFreshness resolvedFreshness =
materializedTable.getDefinitionFreshness();
- assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofHour("1"));
+ assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofHour(1));
final RefreshMode resolvedRefreshMode =
materializedTable.getRefreshMode();
assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL);
@@ -279,7 +279,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
createResolvedCatalogMaterializedTable(sql);
final IntervalFreshness resolvedFreshness =
materializedTable.getDefinitionFreshness();
-
assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofMinute("3"));
+ assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofMinute(3));
final CatalogMaterializedTable expected =
getDefaultMaterializedTableBuilder()
.logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
@@ -641,7 +641,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
final CatalogMaterializedTable expected =
getDefaultMaterializedTableBuilder()
- .freshness(IntervalFreshness.ofSecond("30"))
+ .freshness(IntervalFreshness.ofSecond(30))
.logicalRefreshMode(LogicalRefreshMode.FULL)
.refreshMode(RefreshMode.FULL)
.refreshStatus(RefreshStatus.INITIALIZING)
@@ -936,17 +936,17 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ "FRESHNESS = INTERVAL -'30' SECOND\n"
+ "REFRESH_MODE = FULL\n"
+ "AS SELECT * FROM t1",
- "Materialized table freshness doesn't support negative
value."),
+ "The freshness interval currently only supports
positive integer type values. But was: -30"),
TestSpec.of(
"CREATE MATERIALIZED TABLE mtbl1\n"
+ "FRESHNESS = INTERVAL '30' YEAR\n"
+ "AS SELECT * FROM t1",
- "Materialized table freshness only support SECOND,
MINUTE, HOUR, DAY as the time unit."),
+ "Materialized table freshness only supports SECOND,
MINUTE, HOUR, DAY, WEEK as the time unit."),
TestSpec.of(
"CREATE MATERIALIZED TABLE mtbl1\n"
+ "FRESHNESS = INTERVAL '30' DAY TO HOUR\n"
+ "AS SELECT * FROM t1",
- "Materialized table freshness only support SECOND,
MINUTE, HOUR, DAY as the time unit."));
+ "Materialized table freshness only supports SECOND,
MINUTE, HOUR, DAY, WEEK as the time unit."));
}
private static Collection<TestSpec> alterDrop() {
diff --git
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
index 7a117ef58c2..122ac8a4bb0 100644
---
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
+++
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
@@ -109,7 +109,7 @@ public class TestFileSystemCatalogTest extends
TestFileSystemCatalogTestBase {
private static final String DEFAULT_EXPANDED_QUERY =
String.format(
"SELECT id, region, county FROM %s.%s.T", TEST_CATALOG,
TEST_DEFAULT_DATABASE);
- private static final IntervalFreshness FRESHNESS =
IntervalFreshness.ofMinute("3");
+ private static final IntervalFreshness FRESHNESS =
IntervalFreshness.ofMinute(3);
private static final ResolvedCatalogMaterializedTable
EXPECTED_CATALOG_MATERIALIZED_TABLE =
new ResolvedCatalogMaterializedTable(
CatalogMaterializedTable.newBuilder()