This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0d0d2e0c2978d48e53478356df6fce1be795b4d
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Apr 2 16:16:39 2026 +0200

    [FLINK-39323][table] Support `SHOW CREATE [OR ALTER]` with relative time 
intervals for `FROM_NOW(<interval_expression>)`
    
    This closes #27889.
---
 .../materialized_table_config_configuration.html   |   6 ++
 .../api/config/MaterializedTableConfigOptions.java |  11 +++
 .../flink/table/api/internal/ShowCreateUtil.java   | 110 +++++++++++++++++++--
 .../apache/flink/table/catalog/CatalogManager.java |  17 +++-
 .../ShowCreateMaterializedTableOperation.java      |   1 +
 .../table/api/internal/ShowCreateUtilTest.java     |  50 +++++++++-
 .../table/catalog/CatalogMaterializedTable.java    |   5 +
 .../catalog/DefaultMaterializedTableEnricher.java  |  17 +++-
 .../catalog/MaterializedTableEnrichmentResult.java |  10 +-
 .../catalog/ResolvedCatalogMaterializedTable.java  |  21 +++-
 .../table/catalog/CatalogPropertiesUtilTest.java   |   3 +-
 .../catalog/TestFileSystemCatalogTest.java         |   5 +-
 12 files changed, 234 insertions(+), 22 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
 
b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
index f3cb3972a73..10b6105338a 100644
--- 
a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html
@@ -20,6 +20,12 @@
             <td>Duration</td>
             <td>The default freshness interval for full refresh mode when the 
FRESHNESS clause is omitted in a materialized table definition.</td>
         </tr>
+        <tr>
+            <td><h5>materialized-table.default-start-mode</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">FROM_BEGINNING</td>
+            <td><p>Enum</p></td>
+            <td>The default start mode for materialized tables.Supported 
values: FROM_BEGINNING, FROM_NOW, RESUME_OR_FROM_BEGINNING, 
RESUME_OR_FROM_NOW.<br /><br />Possible 
values:<ul><li>"FROM_BEGINNING"</li><li>"FROM_NOW"</li><li>"FROM_TIMESTAMP"</li><li>"RESUME_OR_FROM_BEGINNING"</li><li>"RESUME_OR_FROM_NOW"</li><li>"RESUME_OR_FROM_TIMESTAMP"</li></ul></td>
+        </tr>
         <tr>
             
<td><h5>materialized-table.refresh-mode.freshness-threshold</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">30 min</td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
index e387705e1a3..f876ec2034e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.config;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
 
 import java.time.Duration;
 
@@ -74,4 +75,14 @@ public class MaterializedTableConfigOptions {
                     .defaultValue(Duration.ofHours(1))
                     .withDescription(
                             "The default freshness interval for full refresh 
mode when the FRESHNESS clause is omitted in a materialized table definition.");
+
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<StartModeKind> 
MATERIALIZED_TABLE_DEFAULT_START_MODE =
+            key("materialized-table.default-start-mode")
+                    .enumType(StartModeKind.class)
+                    .defaultValue(StartModeKind.FROM_BEGINNING)
+                    .withDescription(
+                            "The default start mode for materialized tables."
+                                    + "Supported values: FROM_BEGINNING, 
FROM_NOW, "
+                                    + "RESUME_OR_FROM_BEGINNING, 
RESUME_OR_FROM_NOW.");
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index c62cf638494..74f17c06807 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogDescriptor;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Interval;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
@@ -33,6 +34,7 @@ import 
org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedCatalogModel;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.StartMode;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.expressions.SqlFactory;
@@ -40,17 +42,26 @@ import org.apache.flink.table.utils.EncodingUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAmount;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 /** SHOW CREATE statement Util. */
 @Internal
 public class ShowCreateUtil {
 
+    private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+            DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");
     private static final String PRINT_INDENT = "  ";
 
     private ShowCreateUtil() {}
@@ -118,6 +129,7 @@ public class ShowCreateUtil {
             ObjectIdentifier tableIdentifier,
             boolean isTemporary,
             boolean createOrAlter,
+            ZoneId timeZoneId,
             SqlFactory sqlFactory) {
         validateTableKind(table, tableIdentifier, 
TableKind.MATERIALIZED_TABLE);
         StringBuilder sb =
@@ -143,6 +155,7 @@ public class ShowCreateUtil {
                 .ifPresent(partitionedBy -> 
sb.append(formatPartitionedBy(partitionedBy)));
         extractFormattedOptions(table.getOptions(), PRINT_INDENT)
                 .ifPresent(v -> sb.append("WITH 
(\n").append(v).append("\n)\n"));
+        sb.append(extractStartMode(table, timeZoneId)).append("\n");
         sb.append(extractFreshness(table))
                 .append("\n")
                 .append(extractRefreshMode(table))
@@ -277,7 +290,7 @@ public class ShowCreateUtil {
                                                         
watermarkSpec.getRowtimeAttribute()),
                                                 watermarkSpec
                                                         
.getWatermarkExpression()
-                                                        
.asSerializableString(sqlFactory)))
+                                                        
.asSerializableString()))
                         .collect(Collectors.joining("\n")));
     }
 
@@ -337,21 +350,81 @@ public class ShowCreateUtil {
         return String.format("REFRESH_MODE = %s", 
materializedTable.getRefreshMode());
     }
 
+    static String extractStartMode(
+            ResolvedCatalogMaterializedTable materializedTable, ZoneId 
timeZoneId) {
+        StringBuilder sb = new StringBuilder("START_MODE = ");
+        StartMode startMode = materializedTable.getStartMode().get();
+        switch (startMode.getKind()) {
+            case FROM_BEGINNING:
+            case RESUME_OR_FROM_BEGINNING:
+                sb.append(startMode.getKind().name());
+                break;
+
+            case FROM_NOW:
+            case RESUME_OR_FROM_NOW:
+                Interval interval = startMode.getInterval();
+                if (interval == null) {
+                    sb.append(startMode.getKind().name());
+                    break;
+                }
+
+                final TemporalAmount amount =
+                        interval.getDuration() == null
+                                ? interval.getPeriod()
+                                : interval.getDuration();
+                sb.append(startMode.getKind().name())
+                        .append("(")
+                        .append(interval)
+                        .append(")")
+                        .append(" /* Evaluated to FROM_TIMESTAMP(TIMESTAMP '")
+                        .append(
+                                getFormattedLocalDateTime(
+                                        
LocalDateTime.now().plus(amount).toInstant(ZoneOffset.UTC),
+                                        ZoneOffset.UTC))
+                        .append("') at execution */");
+                break;
+
+            case FROM_TIMESTAMP:
+            case RESUME_OR_FROM_TIMESTAMP:
+                sb.append(startMode.getKind().name());
+                sb.append("(TIMESTAMP");
+                if (startMode.isLocalTimeZone()) {
+                    sb.append(" WITH LOCAL TIME ZONE");
+                }
+                sb.append(" '")
+                        .append(
+                                getFormattedLocalDateTime(
+                                        startMode.getTimestamp(),
+                                        startMode.isLocalTimeZone() ? 
timeZoneId : ZoneOffset.UTC))
+                        .append("')");
+                break;
+
+            default:
+                throw new TableException(String.format("Unsupported start mode 
'%s'.", startMode));
+        }
+        return sb.toString();
+    }
+
+    private static String getFormattedLocalDateTime(Instant instant, ZoneId 
timeZone) {
+        return TIMESTAMP_FORMATTER.format(LocalDateTime.ofInstant(instant, 
timeZone));
+    }
+
     static Optional<String> extractFormattedOptions(Map<String, String> conf, 
String printIndent) {
         if (Objects.isNull(conf) || conf.isEmpty()) {
             return Optional.empty();
         }
+        TreeSet<String> treeSet = new TreeSet<>(conf.keySet());
+
         return Optional.of(
-                conf.entrySet().stream()
+                treeSet.stream()
                         .map(
                                 entry ->
                                         String.format(
                                                 "%s'%s' = '%s'",
                                                 printIndent,
-                                                
EncodingUtils.escapeSingleQuotes(entry.getKey()),
-                                                
EncodingUtils.escapeSingleQuotes(entry.getValue())))
-                        .sorted()
-                        .collect(Collectors.joining("," + 
System.lineSeparator())));
+                                                
EncodingUtils.escapeSingleQuotes(entry),
+                                                
EncodingUtils.escapeSingleQuotes(conf.get(entry))))
+                        .collect(Collectors.joining(",\n")));
     }
 
     static String extractFormattedColumnNames(
@@ -366,6 +439,31 @@ public class ShowCreateUtil {
                 .collect(Collectors.joining(",\n"));
     }
 
+    private static String maybeLowerCaseKey(String key, boolean lowerCaseKey) {
+        return lowerCaseKey ? key.toLowerCase() : key;
+    }
+
+    static Optional<String> extractFormattedOptions(
+            Map<String, String> conf, String printIndent, boolean 
lowerCaseKeys) {
+        if (Objects.isNull(conf) || conf.isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(
+                conf.entrySet().stream()
+                        .map(
+                                entry ->
+                                        String.format(
+                                                "%s'%s' = '%s'",
+                                                printIndent,
+                                                maybeLowerCaseKey(
+                                                        
EncodingUtils.escapeSingleQuotes(
+                                                                
entry.getKey()),
+                                                        lowerCaseKeys),
+                                                
EncodingUtils.escapeSingleQuotes(entry.getValue())))
+                        .sorted()
+                        .collect(Collectors.joining("," + 
System.lineSeparator())));
+    }
+
     private static void validateTableKind(
             ResolvedCatalogBaseTable<?> table,
             ObjectIdentifier tableIdentifier,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 3bba4c05f4e..047d2c89d5b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.api.config.MaterializedTableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -86,6 +87,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static java.lang.String.format;
+import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_DEFAULT_START_MODE;
 import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -287,8 +289,18 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                                             
.MATERIALIZED_TABLE_DEFAULT_FRESHNESS_FULL);
             final Duration freshnessThreshold =
                     
catalogStoreHolder.config().get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD);
+            final StartModeKind startModeKind =
+                    
catalogStoreHolder.config().get(MATERIALIZED_TABLE_DEFAULT_START_MODE);
+            if (StartMode.requiresParameters(startModeKind)) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid default start mode '%s'. "
+                                        + "Only parameterless start modes are 
supported as defaults."));
+            }
+
+            final StartMode startMode = StartMode.of(startModeKind);
             return DefaultMaterializedTableEnricher.create(
-                    defaultDurationContinuous, defaultDurationFull, 
freshnessThreshold);
+                    defaultDurationContinuous, defaultDurationFull, 
freshnessThreshold, startMode);
         }
     }
 
@@ -1912,6 +1924,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                 this.materializedTableEnricher.enrich(table);
         IntervalFreshness freshness = enrichmentResult.getFreshness();
         RefreshMode resolvedRefreshMode = enrichmentResult.getRefreshMode();
+        StartMode startMode = enrichmentResult.getStartMode();
 
         // Validate partition keys are included in physical columns
         final List<String> physicalColumns =
@@ -1933,7 +1946,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                         });
 
         return new ResolvedCatalogMaterializedTable(
-                table, resolvedSchema, resolvedRefreshMode, freshness);
+                table, resolvedSchema, resolvedRefreshMode, freshness, 
startMode);
     }
 
     /** Resolves a {@link CatalogView} to a validated {@link 
ResolvedCatalogView}. */
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
index 22383d39eb3..0ca8a1475ce 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
@@ -64,6 +64,7 @@ public class ShowCreateMaterializedTableOperation implements 
ShowOperation {
                         tableIdentifier,
                         table.isTemporary(),
                         createOrAlter,
+                        ctx.getTableConfig().getLocalTimeZone(),
                         ctx.getCatalogManager().getSqlFactory());
 
         return buildStringArrayResult("result", new String[] {resultRow});
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
index 93b4683f966..ff79a0c8f15 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -31,12 +31,16 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
+import org.apache.flink.table.catalog.Interval;
+import org.apache.flink.table.catalog.Interval.TimeUnit;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedCatalogView;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.StartMode;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.expressions.DefaultSqlFactory;
@@ -45,11 +49,15 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.time.Instant;
+import java.time.Period;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -62,6 +70,10 @@ class ShowCreateUtilTest {
     private static final ObjectIdentifier MATERIALIZED_TABLE_IDENTIFIER =
             ObjectIdentifier.of("catalogName", "dbName", 
"materializedTableName");
 
+    private static final Pattern START_MODE_EVALUATED_TIMESTAMP =
+            Pattern.compile(
+                    "/\\* Evaluated to FROM_TIMESTAMP\\(TIMESTAMP '[^']*'\\) 
at execution \\*/");
+
     private static final ResolvedSchema ONE_COLUMN_SCHEMA =
             ResolvedSchema.of(Column.physical("id", DataTypes.INT()));
 
@@ -113,7 +125,7 @@ class ShowCreateUtilTest {
         assertThat(createViewString).isEqualTo(expected);
     }
 
-    @ParameterizedTest(name = "{index}: {1}")
+    @ParameterizedTest(name = "{index}: {2}")
     @MethodSource("argsForShowCreateMaterializedTable")
     void showCreateMaterializedTable(
             ResolvedCatalogMaterializedTable materializedTable,
@@ -125,8 +137,13 @@ class ShowCreateUtilTest {
                         MATERIALIZED_TABLE_IDENTIFIER,
                         false,
                         createOrAlter,
+                        ZoneOffset.UTC,
                         DefaultSqlFactory.INSTANCE);
-        assertThat(createMaterializedTableString).isEqualTo(expected);
+        final String fixedTimestamp = "1970-01-02 12:34:56";
+        final String normalizedMTString =
+                setFixedTimestamp(createMaterializedTableString, 
fixedTimestamp);
+        final String normalizedExpected = setFixedTimestamp(expected, 
fixedTimestamp);
+        assertThat(normalizedMTString).isEqualTo(normalizedExpected);
     }
 
     @ParameterizedTest(name = "{index}: {1}")
@@ -304,6 +321,7 @@ class ShowCreateUtilTest {
                         null,
                         List.of(),
                         null,
+                        StartMode.of(StartModeKind.FROM_NOW, Interval.of(3, 
TimeUnit.MINUTE)),
                         IntervalFreshness.ofMinute(1),
                         RefreshMode.CONTINUOUS,
                         "SELECT 1",
@@ -311,6 +329,7 @@ class ShowCreateUtilTest {
                 "%sMATERIALIZED TABLE 
`catalogName`.`dbName`.`materializedTableName` (\n"
                         + "  `id` INT\n"
                         + ")\n"
+                        + "START_MODE = FROM_NOW(INTERVAL '3' MINUTE) /* 
Evaluated to FROM_TIMESTAMP(TIMESTAMP '2020-12-12 23:21:12') at execution */\n"
                         + "FRESHNESS = INTERVAL '1' MINUTE\n"
                         + "REFRESH_MODE = CONTINUOUS\n"
                         + "AS SELECT 1\n");
@@ -322,6 +341,7 @@ class ShowCreateUtilTest {
                         null,
                         List.of(),
                         null,
+                        StartMode.of(StartModeKind.FROM_BEGINNING),
                         IntervalFreshness.ofMinute(1),
                         RefreshMode.CONTINUOUS,
                         "SELECT 1",
@@ -331,6 +351,7 @@ class ShowCreateUtilTest {
                         + "  `mt_column` VARCHAR(2147483647) METADATA 
VIRTUAL,\n"
                         + "  CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n"
                         + ")\n"
+                        + "START_MODE = FROM_BEGINNING\n"
                         + "FRESHNESS = INTERVAL '1' MINUTE\n"
                         + "REFRESH_MODE = CONTINUOUS\n"
                         + "AS SELECT 1\n");
@@ -342,6 +363,8 @@ class ShowCreateUtilTest {
                         null,
                         List.of(),
                         null,
+                        StartMode.of(
+                                StartModeKind.FROM_TIMESTAMP, 
Instant.ofEpochSecond(1740000000)),
                         IntervalFreshness.ofMinute(1),
                         RefreshMode.CONTINUOUS,
                         "SELECT 1",
@@ -351,6 +374,7 @@ class ShowCreateUtilTest {
                         + "  `name` VARCHAR(2147483647),\n"
                         + "  CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n"
                         + ")\n"
+                        + "START_MODE = FROM_TIMESTAMP(TIMESTAMP '2025-02-19 
21:20:00')\n"
                         + "FRESHNESS = INTERVAL '1' MINUTE\n"
                         + "REFRESH_MODE = CONTINUOUS\n"
                         + "AS SELECT 1\n");
@@ -362,6 +386,10 @@ class ShowCreateUtilTest {
                         "Materialized table comment",
                         List.of("id"),
                         TableDistribution.of(TableDistribution.Kind.HASH, 5, 
List.of("id")),
+                        StartMode.of(
+                                StartModeKind.FROM_TIMESTAMP,
+                                Instant.ofEpochSecond(1740000000),
+                                true),
                         IntervalFreshness.ofMinute(3),
                         RefreshMode.FULL,
                         "SELECT id, name FROM tbl_a",
@@ -373,6 +401,7 @@ class ShowCreateUtilTest {
                         + "COMMENT 'Materialized table comment'\n"
                         + "DISTRIBUTED BY HASH(`id`) INTO 5 BUCKETS\n"
                         + "PARTITIONED BY (`id`)\n"
+                        + "START_MODE = FROM_TIMESTAMP(TIMESTAMP WITH LOCAL 
TIME ZONE '2025-02-19 21:20:00')\n"
                         + "FRESHNESS = INTERVAL '3' MINUTE\n"
                         + "REFRESH_MODE = FULL\n"
                         + "AS SELECT id, name FROM 
`catalogName`.`dbName`.`tbl_a`\n");
@@ -384,6 +413,9 @@ class ShowCreateUtilTest {
                         "Materialized table comment",
                         List.of("id"),
                         TableDistribution.of(TableDistribution.Kind.HASH, 5, 
List.of("id")),
+                        StartMode.of(
+                                StartModeKind.FROM_NOW,
+                                Interval.of(Period.of(0, 1, 2), 
TimeUnit.MONTH)),
                         IntervalFreshness.ofMinute(3),
                         RefreshMode.FULL,
                         "SELECT * FROM tbl_a",
@@ -395,6 +427,7 @@ class ShowCreateUtilTest {
                         + "COMMENT 'Materialized table comment'\n"
                         + "DISTRIBUTED BY HASH(`id`) INTO 5 BUCKETS\n"
                         + "PARTITIONED BY (`id`)\n"
+                        + "START_MODE = FROM_NOW(INTERVAL '1' MONTH) /* 
Evaluated to FROM_TIMESTAMP(TIMESTAMP '1970-01-02 12:34:56') at execution */\n"
                         + "FRESHNESS = INTERVAL '3' MINUTE\n"
                         + "REFRESH_MODE = FULL\n"
                         + "AS SELECT id, name FROM 
`catalogName`.`dbName`.`tbl_a`\n");
@@ -452,6 +485,7 @@ class ShowCreateUtilTest {
             String comment,
             List<String> partitionBy,
             TableDistribution distribution,
+            StartMode startMode,
             IntervalFreshness freshness,
             RefreshMode refreshMode,
             String originalQuery,
@@ -471,6 +505,16 @@ class ShowCreateUtilTest {
                         .build(),
                 resolvedSchema,
                 refreshMode,
-                freshness);
+                freshness,
+                startMode);
+    }
+
+    private static String setFixedTimestamp(String sql, String fixedTimestamp) 
{
+        return START_MODE_EVALUATED_TIMESTAMP
+                .matcher(sql)
+                .replaceAll(
+                        "/* Evaluated to FROM_TIMESTAMP(TIMESTAMP '"
+                                + fixedTimestamp
+                                + "') at execution */");
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
index 0597fdd3a50..b4a5084e9f6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
@@ -173,6 +173,11 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
     /** Return summary description of refresh handler. */
     Optional<String> getRefreshHandlerDescription();
 
+    /** Get the start mode of materialized table. */
+    default Optional<StartMode> getStartMode() {
+        return Optional.empty();
+    }
+
     /**
      * Return the serialized refresh handler of materialized table. This will 
not be used for
      * describe table.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
index c9858dab7d0..86a51490852 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java
@@ -36,26 +36,30 @@ public class DefaultMaterializedTableEnricher implements 
MaterializedTableEnrich
     private final IntervalFreshness defaultContinuousFreshness;
     private final IntervalFreshness defaultFullFreshness;
     private final Duration freshnessThreshold;
+    private final StartMode defaultStartMode;
 
     public static DefaultMaterializedTableEnricher create(
             final Duration defaultContinuousFreshness,
             final Duration defaultFullFreshness,
-            final Duration freshnessThreshold) {
+            final Duration freshnessThreshold,
+            final StartMode defaultStartMode) {
         final IntervalFreshness continuousFreshness =
                 IntervalFreshness.fromDuration(defaultContinuousFreshness);
         final IntervalFreshness fullFreshness =
                 IntervalFreshness.fromDuration(defaultFullFreshness);
         return new DefaultMaterializedTableEnricher(
-                continuousFreshness, fullFreshness, freshnessThreshold);
+                continuousFreshness, fullFreshness, freshnessThreshold, 
defaultStartMode);
     }
 
     private DefaultMaterializedTableEnricher(
             final IntervalFreshness defaultContinuousFreshness,
             final IntervalFreshness defaultFullFreshness,
-            final Duration freshnessThreshold) {
+            final Duration freshnessThreshold,
+            final StartMode startMode) {
         this.defaultContinuousFreshness = defaultContinuousFreshness;
         this.defaultFullFreshness = defaultFullFreshness;
         this.freshnessThreshold = freshnessThreshold;
+        this.defaultStartMode = startMode;
     }
 
     @Override
@@ -68,7 +72,8 @@ public class DefaultMaterializedTableEnricher implements 
MaterializedTableEnrich
                 deriveRefreshMode(
                         table.getLogicalRefreshMode(), finalFreshness, 
freshnessThreshold);
 
-        return new MaterializedTableEnrichmentResult(finalFreshness, 
finalRefreshMode);
+        final StartMode startMode = deriveStartMode(table);
+        return new MaterializedTableEnrichmentResult(finalFreshness, 
finalRefreshMode, startMode);
     }
 
     /**
@@ -112,4 +117,8 @@ public class DefaultMaterializedTableEnricher implements 
MaterializedTableEnrich
         IntervalFreshness.validateFreshnessForCron(freshness);
         return RefreshMode.FULL;
     }
+
+    private StartMode deriveStartMode(final CatalogMaterializedTable table) {
+        return table.getStartMode().orElse(defaultStartMode);
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
index dc19d82ea27..94ebcc45de9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java
@@ -33,11 +33,15 @@ public class MaterializedTableEnrichmentResult {
 
     private final IntervalFreshness freshness;
     private final RefreshMode refreshMode;
+    private final StartMode startMode;
 
     public MaterializedTableEnrichmentResult(
-            final IntervalFreshness freshness, final RefreshMode refreshMode) {
+            final IntervalFreshness freshness,
+            final RefreshMode refreshMode,
+            final StartMode startMode) {
         this.freshness = freshness;
         this.refreshMode = refreshMode;
+        this.startMode = startMode;
     }
 
     public IntervalFreshness getFreshness() {
@@ -47,4 +51,8 @@ public class MaterializedTableEnrichmentResult {
     public RefreshMode getRefreshMode() {
         return refreshMode;
     }
+
+    public StartMode getStartMode() {
+        return startMode;
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
index 722627d74ec..5fd3efb62ad 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
@@ -49,15 +49,19 @@ public class ResolvedCatalogMaterializedTable
 
     private final IntervalFreshness freshness;
 
+    private final StartMode startMode;
+
     public ResolvedCatalogMaterializedTable(
             CatalogMaterializedTable origin,
             ResolvedSchema resolvedSchema,
             RefreshMode refreshMode,
-            IntervalFreshness freshness) {
+            IntervalFreshness freshness,
+            StartMode startMode) {
         this.origin = checkNotNull(origin, "Original catalog materialized 
table must not be null.");
         this.resolvedSchema = checkNotNull(resolvedSchema, "Resolved schema 
must not be null.");
         this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be 
null.");
         this.freshness = checkNotNull(freshness, "Freshness must not be 
null.");
+        this.startMode = checkNotNull(startMode, "Start mode must not be 
null.");
     }
 
     @Override
@@ -73,13 +77,17 @@ public class ResolvedCatalogMaterializedTable
     @Override
     public CatalogBaseTable copy() {
         return new ResolvedCatalogMaterializedTable(
-                (CatalogMaterializedTable) origin.copy(), resolvedSchema, 
refreshMode, freshness);
+                (CatalogMaterializedTable) origin.copy(),
+                resolvedSchema,
+                refreshMode,
+                freshness,
+                startMode);
     }
 
     @Override
     public ResolvedCatalogMaterializedTable copy(Map<String, String> options) {
         return new ResolvedCatalogMaterializedTable(
-                origin.copy(options), resolvedSchema, refreshMode, freshness);
+                origin.copy(options), resolvedSchema, refreshMode, freshness, 
startMode);
     }
 
     @Override
@@ -91,7 +99,8 @@ public class ResolvedCatalogMaterializedTable
                 origin.copy(refreshStatus, refreshHandlerDescription, 
serializedRefreshHandler),
                 resolvedSchema,
                 refreshMode,
-                freshness);
+                freshness,
+                startMode);
     }
 
     @Override
@@ -175,6 +184,10 @@ public class ResolvedCatalogMaterializedTable
         return origin.getSerializedRefreshHandler();
     }
 
+    public Optional<StartMode> getStartMode() {
+        return Optional.of(startMode);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
index f66f4d52251..3712ff2057f 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
@@ -175,6 +175,7 @@ class CatalogPropertiesUtilTest {
                                 .build(),
                         resolvedSchema,
                         RefreshMode.CONTINUOUS,
-                        IntervalFreshness.ofHour(123)));
+                        IntervalFreshness.ofHour(123),
+                        StartMode.of(StartMode.StartModeKind.FROM_BEGINNING)));
     }
 }
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
index 122ac8a4bb0..d6a07593948 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.StartMode;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
 import org.apache.flink.table.catalog.TestSchemaResolver;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -126,7 +128,8 @@ public class TestFileSystemCatalogTest extends 
TestFileSystemCatalogTestBase {
                             .build(),
                     CREATE_RESOLVED_SCHEMA,
                     RefreshMode.CONTINUOUS,
-                    FRESHNESS);
+                    FRESHNESS,
+                    StartMode.of(StartModeKind.FROM_BEGINNING));
 
     private static final TestRefreshHandler REFRESH_HANDLER =
             new TestRefreshHandler("jobID: xxx, clusterId: yyy");


Reply via email to