This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b0d0d2e0c2978d48e53478356df6fce1be795b4d Author: Sergey Nuyanzin <[email protected]> AuthorDate: Thu Apr 2 16:16:39 2026 +0200 [FLINK-39323][table] Support `SHOW CREATE [OR ALTER]` with relative time intervals for `FROM_NOW(<interval_expression>)` This closes #27889. --- .../materialized_table_config_configuration.html | 6 ++ .../api/config/MaterializedTableConfigOptions.java | 11 +++ .../flink/table/api/internal/ShowCreateUtil.java | 110 +++++++++++++++++++-- .../apache/flink/table/catalog/CatalogManager.java | 17 +++- .../ShowCreateMaterializedTableOperation.java | 1 + .../table/api/internal/ShowCreateUtilTest.java | 50 +++++++++- .../table/catalog/CatalogMaterializedTable.java | 5 + .../catalog/DefaultMaterializedTableEnricher.java | 17 +++- .../catalog/MaterializedTableEnrichmentResult.java | 10 +- .../catalog/ResolvedCatalogMaterializedTable.java | 21 +++- .../table/catalog/CatalogPropertiesUtilTest.java | 3 +- .../catalog/TestFileSystemCatalogTest.java | 5 +- 12 files changed, 234 insertions(+), 22 deletions(-) diff --git a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html index f3cb3972a73..10b6105338a 100644 --- a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html +++ b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html @@ -20,6 +20,12 @@ <td>Duration</td> <td>The default freshness interval for full refresh mode when the FRESHNESS clause is omitted in a materialized table definition.</td> </tr> + <tr> + <td><h5>materialized-table.default-start-mode</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">FROM_BEGINNING</td> + <td><p>Enum</p></td> + <td>The default start mode for materialized tables.Supported values: FROM_BEGINNING, FROM_NOW, RESUME_OR_FROM_BEGINNING, RESUME_OR_FROM_NOW.<br /><br />Possible values:<ul><li>"FROM_BEGINNING"</li><li>"FROM_NOW"</li><li>"FROM_TIMESTAMP"</li><li>"RESUME_OR_FROM_BEGINNING"</li><li>"RESUME_OR_FROM_NOW"</li><li>"RESUME_OR_FROM_TIMESTAMP"</li></ul></td> + </tr> <tr> <td><h5>materialized-table.refresh-mode.freshness-threshold</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">30 min</td> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java index e387705e1a3..f876ec2034e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java @@ -21,6 +21,7 @@ package org.apache.flink.table.api.config; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.StartMode.StartModeKind; import java.time.Duration; @@ -74,4 +75,14 @@ public class MaterializedTableConfigOptions { .defaultValue(Duration.ofHours(1)) .withDescription( "The default freshness interval for full refresh mode when the FRESHNESS clause is omitted in a materialized table definition."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<StartModeKind> MATERIALIZED_TABLE_DEFAULT_START_MODE = + key("materialized-table.default-start-mode") + .enumType(StartModeKind.class) + .defaultValue(StartModeKind.FROM_BEGINNING) + .withDescription( + "The default start mode for materialized tables." + + "Supported values: FROM_BEGINNING, FROM_NOW, " + + "RESUME_OR_FROM_BEGINNING, RESUME_OR_FROM_NOW."); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java index c62cf638494..74f17c06807 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java @@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.Interval; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.QueryOperationCatalogView; @@ -33,6 +34,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogModel; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.StartMode; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.expressions.SqlFactory; @@ -40,17 +42,26 @@ import org.apache.flink.table.utils.EncodingUtils; import org.apache.commons.lang3.StringUtils; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAmount; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.TreeSet; import java.util.stream.Collectors; /** SHOW CREATE statement Util. */ @Internal public class ShowCreateUtil { + private static final DateTimeFormatter TIMESTAMP_FORMATTER = + DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss"); private static final String PRINT_INDENT = " "; private ShowCreateUtil() {} @@ -118,6 +129,7 @@ public class ShowCreateUtil { ObjectIdentifier tableIdentifier, boolean isTemporary, boolean createOrAlter, + ZoneId timeZoneId, SqlFactory sqlFactory) { validateTableKind(table, tableIdentifier, TableKind.MATERIALIZED_TABLE); StringBuilder sb = @@ -143,6 +155,7 @@ public class ShowCreateUtil { .ifPresent(partitionedBy -> sb.append(formatPartitionedBy(partitionedBy))); extractFormattedOptions(table.getOptions(), PRINT_INDENT) .ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n")); + sb.append(extractStartMode(table, timeZoneId)).append("\n"); sb.append(extractFreshness(table)) .append("\n") .append(extractRefreshMode(table)) @@ -277,7 +290,7 @@ public class ShowCreateUtil { watermarkSpec.getRowtimeAttribute()), watermarkSpec .getWatermarkExpression() - .asSerializableString(sqlFactory))) + .asSerializableString())) .collect(Collectors.joining("\n"))); } @@ -337,21 +350,81 @@ public class ShowCreateUtil { return String.format("REFRESH_MODE = %s", materializedTable.getRefreshMode()); } + static String extractStartMode( + ResolvedCatalogMaterializedTable materializedTable, ZoneId timeZoneId) { + StringBuilder sb = new StringBuilder("START_MODE = "); + StartMode startMode = materializedTable.getStartMode().get(); + switch (startMode.getKind()) { + case FROM_BEGINNING: + case RESUME_OR_FROM_BEGINNING: + sb.append(startMode.getKind().name()); + break; + + case FROM_NOW: + case RESUME_OR_FROM_NOW: + Interval interval = startMode.getInterval(); + if (interval == null) { + sb.append(startMode.getKind().name()); + break; + } + + final TemporalAmount amount = + interval.getDuration() == null + ? interval.getPeriod() + : interval.getDuration(); + sb.append(startMode.getKind().name()) + .append("(") + .append(interval) + .append(")") + .append(" /* Evaluated to FROM_TIMESTAMP(TIMESTAMP '") + .append( + getFormattedLocalDateTime( + LocalDateTime.now().plus(amount).toInstant(ZoneOffset.UTC), + ZoneOffset.UTC)) + .append("') at execution */"); + break; + + case FROM_TIMESTAMP: + case RESUME_OR_FROM_TIMESTAMP: + sb.append(startMode.getKind().name()); + sb.append("(TIMESTAMP"); + if (startMode.isLocalTimeZone()) { + sb.append(" WITH LOCAL TIME ZONE"); + } + sb.append(" '") + .append( + getFormattedLocalDateTime( + startMode.getTimestamp(), + startMode.isLocalTimeZone() ? timeZoneId : ZoneOffset.UTC)) + .append("')"); + break; + + default: + throw new TableException(String.format("Unsupported start mode '%s'.", startMode)); + } + return sb.toString(); + } + + private static String getFormattedLocalDateTime(Instant instant, ZoneId timeZone) { + return TIMESTAMP_FORMATTER.format(LocalDateTime.ofInstant(instant, timeZone)); + } + static Optional<String> extractFormattedOptions(Map<String, String> conf, String printIndent) { if (Objects.isNull(conf) || conf.isEmpty()) { return Optional.empty(); } + TreeSet<String> treeSet = new TreeSet<>(conf.keySet()); + return Optional.of( - conf.entrySet().stream() + treeSet.stream() .map( entry -> String.format( "%s'%s' = '%s'", printIndent, - EncodingUtils.escapeSingleQuotes(entry.getKey()), - EncodingUtils.escapeSingleQuotes(entry.getValue()))) - .sorted() - .collect(Collectors.joining("," + System.lineSeparator()))); + EncodingUtils.escapeSingleQuotes(entry), + EncodingUtils.escapeSingleQuotes(conf.get(entry)))) + .collect(Collectors.joining(",\n"))); } static String extractFormattedColumnNames( @@ -366,6 +439,31 @@ public class ShowCreateUtil { .collect(Collectors.joining(",\n")); } + private static String maybeLowerCaseKey(String key, boolean lowerCaseKey) { + return lowerCaseKey ? key.toLowerCase() : key; + } + + static Optional<String> extractFormattedOptions( + Map<String, String> conf, String printIndent, boolean lowerCaseKeys) { + if (Objects.isNull(conf) || conf.isEmpty()) { + return Optional.empty(); + } + return Optional.of( + conf.entrySet().stream() + .map( + entry -> + String.format( + "%s'%s' = '%s'", + printIndent, + maybeLowerCaseKey( + EncodingUtils.escapeSingleQuotes( + entry.getKey()), + lowerCaseKeys), + EncodingUtils.escapeSingleQuotes(entry.getValue()))) + .sorted() + .collect(Collectors.joining("," + System.lineSeparator()))); + } + private static void validateTableKind( ResolvedCatalogBaseTable<?> table, ObjectIdentifier tableIdentifier, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 3bba4c05f4e..047d2c89d5b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -31,6 +31,7 @@ import org.apache.flink.table.api.config.MaterializedTableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; +import org.apache.flink.table.catalog.StartMode.StartModeKind; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -86,6 +87,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import static java.lang.String.format; +import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_DEFAULT_START_MODE; import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -287,8 +289,18 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { .MATERIALIZED_TABLE_DEFAULT_FRESHNESS_FULL); final Duration freshnessThreshold = catalogStoreHolder.config().get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD); + final StartModeKind startModeKind = + catalogStoreHolder.config().get(MATERIALIZED_TABLE_DEFAULT_START_MODE); + if (StartMode.requiresParameters(startModeKind)) { + throw new ValidationException( + String.format( + "Invalid default start mode '%s'. " + + "Only parameterless start modes are supported as defaults.")); + } + + final StartMode startMode = StartMode.of(startModeKind); return DefaultMaterializedTableEnricher.create( - defaultDurationContinuous, defaultDurationFull, freshnessThreshold); + defaultDurationContinuous, defaultDurationFull, freshnessThreshold, startMode); } } @@ -1912,6 +1924,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { this.materializedTableEnricher.enrich(table); IntervalFreshness freshness = enrichmentResult.getFreshness(); RefreshMode resolvedRefreshMode = enrichmentResult.getRefreshMode(); + StartMode startMode = enrichmentResult.getStartMode(); // Validate partition keys are included in physical columns final List<String> physicalColumns = @@ -1933,7 +1946,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { }); return new ResolvedCatalogMaterializedTable( - table, resolvedSchema, resolvedRefreshMode, freshness); + table, resolvedSchema, resolvedRefreshMode, freshness, startMode); } /** Resolves a {@link CatalogView} to a validated {@link ResolvedCatalogView}. */ diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java index 22383d39eb3..0ca8a1475ce 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java @@ -64,6 +64,7 @@ public class ShowCreateMaterializedTableOperation implements ShowOperation { tableIdentifier, table.isTemporary(), createOrAlter, + ctx.getTableConfig().getLocalTimeZone(), ctx.getCatalogManager().getSqlFactory()); return buildStringArrayResult("result", new String[] {resultRow}); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java index 93b4683f966..ff79a0c8f15 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java @@ -31,12 +31,16 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ImmutableColumnsConstraint; +import org.apache.flink.table.catalog.Interval; +import org.apache.flink.table.catalog.Interval.TimeUnit; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedCatalogView; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.StartMode; +import org.apache.flink.table.catalog.StartMode.StartModeKind; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.expressions.DefaultSqlFactory; @@ -45,11 +49,15 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.time.Instant; +import java.time.Period; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import static org.assertj.core.api.Assertions.assertThat; @@ -62,6 +70,10 @@ class ShowCreateUtilTest { private static final ObjectIdentifier MATERIALIZED_TABLE_IDENTIFIER = ObjectIdentifier.of("catalogName", "dbName", "materializedTableName"); + private static final Pattern START_MODE_EVALUATED_TIMESTAMP = + Pattern.compile( + "/\\* Evaluated to FROM_TIMESTAMP\\(TIMESTAMP '[^']*'\\) at execution \\*/"); + private static final ResolvedSchema ONE_COLUMN_SCHEMA = ResolvedSchema.of(Column.physical("id", DataTypes.INT())); @@ -113,7 +125,7 @@ class ShowCreateUtilTest { assertThat(createViewString).isEqualTo(expected); } - @ParameterizedTest(name = "{index}: {1}") + @ParameterizedTest(name = "{index}: {2}") @MethodSource("argsForShowCreateMaterializedTable") void showCreateMaterializedTable( ResolvedCatalogMaterializedTable materializedTable, @@ -125,8 +137,13 @@ class ShowCreateUtilTest { MATERIALIZED_TABLE_IDENTIFIER, false, createOrAlter, + ZoneOffset.UTC, DefaultSqlFactory.INSTANCE); - assertThat(createMaterializedTableString).isEqualTo(expected); + final String fixedTimestamp = "1970-01-02 12:34:56"; + final String normalizedMTString = + setFixedTimestamp(createMaterializedTableString, fixedTimestamp); + final String normalizedExpected = setFixedTimestamp(expected, fixedTimestamp); + assertThat(normalizedMTString).isEqualTo(normalizedExpected); } @ParameterizedTest(name = "{index}: {1}") @@ -304,6 +321,7 @@ class ShowCreateUtilTest { null, List.of(), null, + StartMode.of(StartModeKind.FROM_NOW, Interval.of(3, TimeUnit.MINUTE)), IntervalFreshness.ofMinute(1), RefreshMode.CONTINUOUS, "SELECT 1", @@ -311,6 +329,7 @@ class ShowCreateUtilTest { "%sMATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" + " `id` INT\n" + ")\n" + + "START_MODE = FROM_NOW(INTERVAL '3' MINUTE) /* Evaluated to FROM_TIMESTAMP(TIMESTAMP '2020-12-12 23:21:12') at execution */\n" + "FRESHNESS = INTERVAL '1' MINUTE\n" + "REFRESH_MODE = CONTINUOUS\n" + "AS SELECT 1\n"); @@ -322,6 +341,7 @@ class ShowCreateUtilTest { null, List.of(), null, + StartMode.of(StartModeKind.FROM_BEGINNING), IntervalFreshness.ofMinute(1), RefreshMode.CONTINUOUS, "SELECT 1", @@ -331,6 +351,7 @@ class ShowCreateUtilTest { + " `mt_column` VARCHAR(2147483647) METADATA VIRTUAL,\n" + " CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n" + ")\n" + + "START_MODE = FROM_BEGINNING\n" + "FRESHNESS = INTERVAL '1' MINUTE\n" + "REFRESH_MODE = CONTINUOUS\n" + "AS SELECT 1\n"); @@ -342,6 +363,8 @@ class ShowCreateUtilTest { null, List.of(), null, + StartMode.of( + StartModeKind.FROM_TIMESTAMP, Instant.ofEpochSecond(1740000000)), IntervalFreshness.ofMinute(1), RefreshMode.CONTINUOUS, "SELECT 1", @@ -351,6 +374,7 @@ class ShowCreateUtilTest { + " `name` VARCHAR(2147483647),\n" + " CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n" + ")\n" + + "START_MODE = FROM_TIMESTAMP(TIMESTAMP '2025-02-19 21:20:00')\n" + "FRESHNESS = INTERVAL '1' MINUTE\n" + "REFRESH_MODE = CONTINUOUS\n" + "AS SELECT 1\n"); @@ -362,6 +386,10 @@ class ShowCreateUtilTest { "Materialized table comment", List.of("id"), TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")), + StartMode.of( + StartModeKind.FROM_TIMESTAMP, + Instant.ofEpochSecond(1740000000), + true), IntervalFreshness.ofMinute(3), RefreshMode.FULL, "SELECT id, name FROM tbl_a", @@ -373,6 +401,7 @@ class ShowCreateUtilTest { + "COMMENT 'Materialized table comment'\n" + "DISTRIBUTED BY HASH(`id`) INTO 5 BUCKETS\n" + "PARTITIONED BY (`id`)\n" + + "START_MODE = FROM_TIMESTAMP(TIMESTAMP WITH LOCAL TIME ZONE '2025-02-19 21:20:00')\n" + "FRESHNESS = INTERVAL '3' MINUTE\n" + "REFRESH_MODE = FULL\n" + "AS SELECT id, name FROM `catalogName`.`dbName`.`tbl_a`\n"); @@ -384,6 +413,9 @@ class ShowCreateUtilTest { "Materialized table comment", List.of("id"), TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")), + StartMode.of( + StartModeKind.FROM_NOW, + Interval.of(Period.of(0, 1, 2), TimeUnit.MONTH)), IntervalFreshness.ofMinute(3), RefreshMode.FULL, "SELECT * FROM tbl_a", @@ -395,6 +427,7 @@ class ShowCreateUtilTest { + "COMMENT 'Materialized table comment'\n" + "DISTRIBUTED BY HASH(`id`) INTO 5 BUCKETS\n" + "PARTITIONED BY (`id`)\n" + + "START_MODE = FROM_NOW(INTERVAL '1' MONTH) /* Evaluated to FROM_TIMESTAMP(TIMESTAMP '1970-01-02 12:34:56') at execution */\n" + "FRESHNESS = INTERVAL '3' MINUTE\n" + "REFRESH_MODE = FULL\n" + "AS SELECT id, name FROM `catalogName`.`dbName`.`tbl_a`\n"); @@ -452,6 +485,7 @@ class ShowCreateUtilTest { String comment, List<String> partitionBy, TableDistribution distribution, + StartMode startMode, IntervalFreshness freshness, RefreshMode refreshMode, String originalQuery, @@ -471,6 +505,16 @@ class ShowCreateUtilTest { .build(), resolvedSchema, refreshMode, - freshness); + freshness, + startMode); + } + + private static String setFixedTimestamp(String sql, String fixedTimestamp) { + return START_MODE_EVALUATED_TIMESTAMP + .matcher(sql) + .replaceAll( + "/* Evaluated to FROM_TIMESTAMP(TIMESTAMP '" + + fixedTimestamp + + "') at execution */"); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java index 0597fdd3a50..b4a5084e9f6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -173,6 +173,11 @@ public interface CatalogMaterializedTable extends CatalogBaseTable { /** Return summary description of refresh handler. */ Optional<String> getRefreshHandlerDescription(); + /** Get the start mode of materialized table. */ + default Optional<StartMode> getStartMode() { + return Optional.empty(); + } + /** * Return the serialized refresh handler of materialized table. This will not be used for * describe table. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java index c9858dab7d0..86a51490852 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java @@ -36,26 +36,30 @@ public class DefaultMaterializedTableEnricher implements MaterializedTableEnrich private final IntervalFreshness defaultContinuousFreshness; private final IntervalFreshness defaultFullFreshness; private final Duration freshnessThreshold; + private final StartMode defaultStartMode; public static DefaultMaterializedTableEnricher create( final Duration defaultContinuousFreshness, final Duration defaultFullFreshness, - final Duration freshnessThreshold) { + final Duration freshnessThreshold, + final StartMode defaultStartMode) { final IntervalFreshness continuousFreshness = IntervalFreshness.fromDuration(defaultContinuousFreshness); final IntervalFreshness fullFreshness = IntervalFreshness.fromDuration(defaultFullFreshness); return new DefaultMaterializedTableEnricher( - continuousFreshness, fullFreshness, freshnessThreshold); + continuousFreshness, fullFreshness, freshnessThreshold, defaultStartMode); } private DefaultMaterializedTableEnricher( final IntervalFreshness defaultContinuousFreshness, final IntervalFreshness defaultFullFreshness, - final Duration freshnessThreshold) { + final Duration freshnessThreshold, + final StartMode startMode) { this.defaultContinuousFreshness = defaultContinuousFreshness; this.defaultFullFreshness = defaultFullFreshness; this.freshnessThreshold = freshnessThreshold; + this.defaultStartMode = startMode; } @Override @@ -68,7 +72,8 @@ public class DefaultMaterializedTableEnricher implements MaterializedTableEnrich deriveRefreshMode( table.getLogicalRefreshMode(), finalFreshness, freshnessThreshold); - return new MaterializedTableEnrichmentResult(finalFreshness, finalRefreshMode); + final StartMode startMode = deriveStartMode(table); + return new MaterializedTableEnrichmentResult(finalFreshness, finalRefreshMode, startMode); } /** @@ -112,4 +117,8 @@ public class DefaultMaterializedTableEnricher implements MaterializedTableEnrich IntervalFreshness.validateFreshnessForCron(freshness); return RefreshMode.FULL; } + + private StartMode deriveStartMode(final CatalogMaterializedTable table) { + return table.getStartMode().orElse(defaultStartMode); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java index dc19d82ea27..94ebcc45de9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java @@ -33,11 +33,15 @@ public class MaterializedTableEnrichmentResult { private final IntervalFreshness freshness; private final RefreshMode refreshMode; + private final StartMode startMode; public MaterializedTableEnrichmentResult( - final IntervalFreshness freshness, final RefreshMode refreshMode) { + final IntervalFreshness freshness, + final RefreshMode refreshMode, + final StartMode startMode) { this.freshness = freshness; this.refreshMode = refreshMode; + this.startMode = startMode; } public IntervalFreshness getFreshness() { @@ -47,4 +51,8 @@ public class MaterializedTableEnrichmentResult { public RefreshMode getRefreshMode() { return refreshMode; } + + public StartMode getStartMode() { + return startMode; + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java index 722627d74ec..5fd3efb62ad 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java @@ -49,15 +49,19 @@ public class ResolvedCatalogMaterializedTable private final IntervalFreshness freshness; + private final StartMode startMode; + public ResolvedCatalogMaterializedTable( CatalogMaterializedTable origin, ResolvedSchema resolvedSchema, RefreshMode refreshMode, - IntervalFreshness freshness) { + IntervalFreshness freshness, + StartMode startMode) { this.origin = checkNotNull(origin, "Original catalog materialized table must not be null."); this.resolvedSchema = checkNotNull(resolvedSchema, "Resolved schema must not be null."); this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be null."); this.freshness = checkNotNull(freshness, "Freshness must not be null."); + this.startMode = checkNotNull(startMode, "Start mode must not be null."); } @Override @@ -73,13 +77,17 @@ public class ResolvedCatalogMaterializedTable @Override public CatalogBaseTable copy() { return new ResolvedCatalogMaterializedTable( - (CatalogMaterializedTable) origin.copy(), resolvedSchema, refreshMode, freshness); + (CatalogMaterializedTable) origin.copy(), + resolvedSchema, + refreshMode, + freshness, + startMode); } @Override public ResolvedCatalogMaterializedTable copy(Map<String, String> options) { return new ResolvedCatalogMaterializedTable( - origin.copy(options), resolvedSchema, refreshMode, freshness); + origin.copy(options), resolvedSchema, refreshMode, freshness, startMode); } @Override @@ -91,7 +99,8 @@ public class ResolvedCatalogMaterializedTable origin.copy(refreshStatus, refreshHandlerDescription, serializedRefreshHandler), resolvedSchema, refreshMode, - freshness); + freshness, + startMode); } @Override @@ -175,6 +184,10 @@ public class ResolvedCatalogMaterializedTable return origin.getSerializedRefreshHandler(); } + public Optional<StartMode> getStartMode() { + return Optional.of(startMode); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java index f66f4d52251..3712ff2057f 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java @@ -175,6 +175,7 @@ class CatalogPropertiesUtilTest { .build(), resolvedSchema, RefreshMode.CONTINUOUS, - IntervalFreshness.ofHour(123))); + IntervalFreshness.ofHour(123), + StartMode.of(StartMode.StartModeKind.FROM_BEGINNING))); } } diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java index 122ac8a4bb0..d6a07593948 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java @@ -35,6 +35,8 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.StartMode; +import org.apache.flink.table.catalog.StartMode.StartModeKind; import org.apache.flink.table.catalog.TestSchemaResolver; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.exceptions.CatalogException; @@ -126,7 +128,8 @@ public class TestFileSystemCatalogTest extends TestFileSystemCatalogTestBase { .build(), CREATE_RESOLVED_SCHEMA, RefreshMode.CONTINUOUS, - FRESHNESS); + FRESHNESS, + StartMode.of(StartModeKind.FROM_BEGINNING)); private static final TestRefreshHandler REFRESH_HANDLER = new TestRefreshHandler("jobID: xxx, clusterId: yyy");
