This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a0147374ee0 [FLINK-39303][table] Support `START_MODE` for `CREATE`
path for `MATERIALIZED TABLE`
a0147374ee0 is described below
commit a0147374ee09fcd6a8e7c492fc9333784501bf90
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Apr 23 17:41:38 2026 +0200
[FLINK-39303][table] Support `START_MODE` for `CREATE` path for
`MATERIALIZED TABLE`
---
.../AlterMaterializedTableChangeOperation.java | 4 +
.../MaterializedTableChangeHandler.java | 15 +++
.../table/catalog/CatalogMaterializedTable.java | 14 ++-
.../catalog/DefaultCatalogMaterializedTable.java | 19 +++-
.../AbstractCreateMaterializedTableConverter.java | 18 +++
...SqlCreateOrAlterMaterializedTableConverter.java | 24 +++-
.../planner/utils/MaterializedTableUtils.java | 72 ++++++++++++
...reateOrAlterMaterializedTableConverterTest.java | 124 +++++++++++++++++++++
8 files changed, 276 insertions(+), 14 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
index c6d42012565..13a3019a5c7 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
+import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import java.util.List;
@@ -120,6 +121,9 @@ public class AlterMaterializedTableChangeOperation extends
AlterMaterializedTabl
ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery)
tableChange;
return String.format(
" MODIFY DEFINITION QUERY TO '%s'",
definitionQuery.getDefinitionQuery());
+ } else if (tableChange instanceof ModifyStartMode) {
+ ModifyStartMode startMode = (ModifyStartMode) tableChange;
+ return String.format(" MODIFY START_MODE TO '%s'",
startMode.getStartMode());
} else {
return AlterTableChangeOperation.toString(tableChange);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
index 8602812b6c3..54a55414a8e 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.StartMode;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableChange.AddColumn;
import org.apache.flink.table.catalog.TableChange.AddDistribution;
@@ -44,6 +45,7 @@ import
org.apache.flink.table.catalog.TableChange.ModifyDistribution;
import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
+import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
import org.apache.flink.table.catalog.TableChange.ModifyUniqueConstraint;
import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
import org.apache.flink.table.catalog.TableChange.ResetOption;
@@ -84,6 +86,7 @@ public class MaterializedTableChangeHandler {
private int droppedPersistedCnt = 0;
private String originalQuery;
private String expandedQuery;
+ private StartMode startMode;
private final Map<String, String> options;
private final List<String> validationErrors = new ArrayList<>();
@@ -102,6 +105,7 @@ public class MaterializedTableChangeHandler {
}
originalQuery = oldTable.getOriginalQuery();
expandedQuery = oldTable.getExpandedQuery();
+ startMode = oldTable.getStartMode().orElse(null);
this.oldTable = oldTable;
this.options = new HashMap<>(oldTable.getOptions());
}
@@ -170,6 +174,7 @@ public class MaterializedTableChangeHandler {
.refreshStatus(context.getRefreshStatus())
.refreshHandlerDescription(context.getRefreshHandlerDesc())
.serializedRefreshHandler(context.getRefreshHandlerBytes())
+ .startMode(context.getStartMode())
.build();
}
@@ -222,6 +227,8 @@ public class MaterializedTableChangeHandler {
registry.register(SetOption.class,
MaterializedTableChangeHandler::setTableOption);
registry.register(ResetOption.class,
MaterializedTableChangeHandler::resetTableOption);
+ registry.register(ModifyStartMode.class,
MaterializedTableChangeHandler::modifyStartMode);
+
return registry;
}
@@ -284,6 +291,10 @@ public class MaterializedTableChangeHandler {
return refreshHandlerBytes;
}
+ public StartMode getStartMode() {
+ return startMode;
+ }
+
@Nullable
public String getRefreshHandlerDesc() {
return refreshHandlerDesc;
@@ -409,6 +420,10 @@ public class MaterializedTableChangeHandler {
refreshStatus = modifyRefreshStatus.getRefreshStatus();
}
+ private void modifyStartMode(ModifyStartMode modifyStartMode) {
+ startMode = modifyStartMode.getStartMode();
+ }
+
private void addDistribution(AddDistribution addDistribution) {
distribution = addDistribution.getDistribution();
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
index b4a5084e9f6..f05ebc1e089 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
@@ -25,7 +25,6 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.time.Duration;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -243,8 +242,8 @@ public interface CatalogMaterializedTable extends
CatalogBaseTable {
private Schema schema;
private String comment;
private TableDistribution distribution = null;
- private List<String> partitionKeys = Collections.emptyList();
- private Map<String, String> options = Collections.emptyMap();
+ private List<String> partitionKeys = List.of();
+ private Map<String, String> options = Map.of();
private @Nullable Long snapshot;
private String originalQuery;
private String expandedQuery;
@@ -254,6 +253,7 @@ public interface CatalogMaterializedTable extends
CatalogBaseTable {
private RefreshStatus refreshStatus;
private @Nullable String refreshHandlerDescription;
private @Nullable byte[] serializedRefreshHandler;
+ private StartMode startMode;
private Builder() {}
@@ -341,6 +341,11 @@ public interface CatalogMaterializedTable extends
CatalogBaseTable {
return this;
}
+ public Builder startMode(StartMode startMode) {
+ this.startMode = startMode;
+ return this;
+ }
+
public CatalogMaterializedTable build() {
return new DefaultCatalogMaterializedTable(
schema,
@@ -356,7 +361,8 @@ public interface CatalogMaterializedTable extends
CatalogBaseTable {
refreshMode,
refreshStatus,
refreshHandlerDescription,
- serializedRefreshHandler);
+ serializedRefreshHandler,
+ startMode);
}
}
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
index 78eca0ce897..7b7439994fc 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
@@ -52,6 +52,7 @@ public class DefaultCatalogMaterializedTable implements
CatalogMaterializedTable
private final RefreshStatus refreshStatus;
private final @Nullable String refreshHandlerDescription;
private final @Nullable byte[] serializedRefreshHandler;
+ private final @Nullable StartMode startMode;
protected DefaultCatalogMaterializedTable(
Schema schema,
@@ -67,7 +68,8 @@ public class DefaultCatalogMaterializedTable implements
CatalogMaterializedTable
@Nullable RefreshMode refreshMode,
RefreshStatus refreshStatus,
@Nullable String refreshHandlerDescription,
- @Nullable byte[] serializedRefreshHandler) {
+ @Nullable byte[] serializedRefreshHandler,
+ @Nullable StartMode startMode) {
this.schema = checkNotNull(schema, "Schema must not be null.");
this.comment = comment;
this.distribution = distribution;
@@ -83,6 +85,7 @@ public class DefaultCatalogMaterializedTable implements
CatalogMaterializedTable
this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must
not be null.");
this.refreshHandlerDescription = refreshHandlerDescription;
this.serializedRefreshHandler = serializedRefreshHandler;
+ this.startMode = startMode;
checkArgument(
options.entrySet().stream()
@@ -136,7 +139,8 @@ public class DefaultCatalogMaterializedTable implements
CatalogMaterializedTable
refreshMode,
refreshStatus,
refreshHandlerDescription,
- serializedRefreshHandler);
+ serializedRefreshHandler,
+ startMode);
}
@Override
@@ -155,7 +159,8 @@ public class DefaultCatalogMaterializedTable implements
CatalogMaterializedTable
refreshMode,
refreshStatus,
refreshHandlerDescription,
- serializedRefreshHandler);
+ serializedRefreshHandler,
+ startMode);
}
@Override
@@ -177,7 +182,8 @@ public class DefaultCatalogMaterializedTable implements
CatalogMaterializedTable
refreshMode,
refreshStatus,
refreshHandlerDescription,
- serializedRefreshHandler);
+ serializedRefreshHandler,
+ startMode);
}
@Override
@@ -225,6 +231,11 @@ public class DefaultCatalogMaterializedTable implements
CatalogMaterializedTable
return refreshStatus;
}
+ @Override
+ public Optional<StartMode> getStartMode() {
+ return Optional.ofNullable(startMode);
+ }
+
@Override
public Optional<String> getRefreshHandlerDescription() {
return Optional.ofNullable(refreshHandlerDescription);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index 9c4d1969601..2d6d10bdc53 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -21,6 +21,7 @@ package
org.apache.flink.table.planner.operations.converters.materializedtable;
import
org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateMaterializedTable;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.MaterializedTableConfigOptions;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
@@ -29,6 +30,7 @@ import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.StartMode;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
@@ -78,6 +80,8 @@ public abstract class
AbstractCreateMaterializedTableConverter<T extends SqlCrea
ResolvedSchema getMergedQuerySchema();
RefreshMode getMergedRefreshMode();
+
+ StartMode getMergedStartMode();
}
protected abstract MergeContext getMergeContext(
@@ -99,6 +103,17 @@ public abstract class
AbstractCreateMaterializedTableConverter<T extends SqlCrea
.orElse(null);
}
+ protected final StartMode getStartMode(T sqlCreateMaterializedTable,
ConvertContext context) {
+ StartMode startMode =
+
MaterializedTableUtils.getStartMode(sqlCreateMaterializedTable.getStartMode());
+ if (startMode != null) {
+ return startMode;
+ }
+ return StartMode.of(
+ context.getTableConfig()
+
.get(MaterializedTableConfigOptions.MATERIALIZED_TABLE_DEFAULT_START_MODE));
+ }
+
protected final ResolvedSchema getQueryResolvedSchema(
T sqlCreateMaterializedTable, ConvertContext context) {
SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
@@ -160,6 +175,8 @@ public abstract class
AbstractCreateMaterializedTableConverter<T extends SqlCrea
final RefreshMode refreshMode =
getDerivedRefreshMode(logicalRefreshMode);
+ final StartMode startMode = mergeContext.getMergedStartMode();
+
return context.getCatalogManager()
.resolveCatalogMaterializedTable(
CatalogMaterializedTable.newBuilder()
@@ -174,6 +191,7 @@ public abstract class
AbstractCreateMaterializedTableConverter<T extends SqlCrea
.logicalRefreshMode(logicalRefreshMode)
.refreshMode(refreshMode)
.refreshStatus(RefreshStatus.INITIALIZING)
+ .startMode(startMode)
.build());
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 198a747406b..0d81346262c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -29,6 +29,7 @@ import
org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaResolver;
+import org.apache.flink.table.catalog.StartMode;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UniqueConstraint;
@@ -137,6 +138,15 @@ public class SqlCreateOrAlterMaterializedTableConverter
throw new ValidationException("Changing of REFRESH MODE is
unsupported");
}
+ final StartMode newStartMode = mergeContext.getMergedStartMode();
+ final StartMode oldStartMode =
+ oldTable.getStartMode()
+ .orElseThrow(
+ () -> new ValidationException("START_MODE
must not be null"));
+ if (!Objects.equals(oldStartMode, newStartMode)) {
+ changes.add(TableChange.modifyStartMode(newStartMode));
+ }
+
return changes;
};
}
@@ -213,7 +223,7 @@ public class SqlCreateOrAlterMaterializedTableConverter
private Optional<TableChange> getConstraintChange(
final ResolvedSchema oldSchema,
final ResolvedSchema newSchema,
- boolean hasConstraintDefinition) {
+ final boolean hasConstraintDefinition) {
final UniqueConstraint oldConstraint =
oldSchema.getPrimaryKey().orElse(null);
final UniqueConstraint newConstraint =
newSchema.getPrimaryKey().orElse(null);
if (hasConstraintDefinition && !Objects.equals(oldConstraint,
newConstraint)) {
@@ -279,11 +289,8 @@ public class SqlCreateOrAlterMaterializedTableConverter
@Override
public boolean hasConstraintDefinition() {
- if
(!sqlCreateMaterializedTable.getTableConstraints().isEmpty()) {
- return true;
- }
-
- return hasSchemaDefinition();
+ return
!sqlCreateMaterializedTable.getTableConstraints().isEmpty()
+ || hasSchemaDefinition();
}
@Override
@@ -354,6 +361,11 @@ public class SqlCreateOrAlterMaterializedTableConverter
return getDerivedRefreshMode(
getDerivedLogicalRefreshMode(sqlCreateMaterializedTable));
}
+
+ @Override
+ public StartMode getMergedStartMode() {
+ return getStartMode(sqlCreateMaterializedTable, context);
+ }
};
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 6f50a3d36e7..99fd660e836 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -24,6 +24,8 @@ import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlMetadataColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
import
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema;
+import org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode;
+import
org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode.SqlStartModeKind;
import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
@@ -36,6 +38,8 @@ import org.apache.flink.table.catalog.Interval;
import org.apache.flink.table.catalog.Interval.TimeUnit;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.StartMode;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableChange.ColumnPosition;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
@@ -47,10 +51,13 @@ import org.apache.calcite.sql.SqlIntervalLiteral;
import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlTimestampLiteral;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.TimestampString;
import org.apache.commons.lang3.StringUtils;
import java.math.BigDecimal;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -77,6 +84,51 @@ public class MaterializedTableUtils {
return IntervalFreshness.of(getFreshnessInterval(sqlIntervalLiteral));
}
+ public static StartMode getStartMode(SqlStartMode sqlStartMode) {
+ if (sqlStartMode == null) {
+ return null;
+ }
+
+ SqlStartModeKind sqlStartModeKind = sqlStartMode.getKind();
+ StartModeKind startModeKind = deriveStartModeKind(sqlStartModeKind);
+ switch (sqlStartModeKind) {
+ case FROM_NOW:
+ case RESUME_OR_FROM_NOW:
+ SqlIntervalLiteral intervalLiteral =
sqlStartMode.getIntervalLiteral();
+ if (intervalLiteral == null) {
+ return StartMode.of(startModeKind, null, false);
+ }
+
+ Interval interval = intervalFrom(intervalLiteral, "start
mode");
+ validateIntervalValuePositive(interval.getInterval(), "start
mode");
+ return StartMode.of(startModeKind, interval);
+
+ case RESUME_OR_FROM_BEGINNING:
+ case FROM_BEGINNING:
+ return StartMode.of(startModeKind);
+ case RESUME_OR_FROM_TIMESTAMP:
+ case FROM_TIMESTAMP:
+ SqlTimestampLiteral timestampLiteral =
sqlStartMode.getTimestampLiteral();
+ if (timestampLiteral == null) {
+ return StartMode.of(startModeKind, null, false);
+ }
+
+ TimestampString timestampString =
+ timestampLiteral.getValueAs(TimestampString.class);
+ SqlTypeName timestampTypeName = timestampLiteral.getTypeName();
+ long millis = timestampString.getMillisSinceEpoch();
+ Instant timestamp = Instant.ofEpochMilli(millis);
+ return StartMode.of(
+ startModeKind,
+ timestamp,
+ timestampTypeName ==
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+
+ default:
+ throw new ValidationException(
+ String.format("Unsupported start mode: %s.",
sqlStartModeKind));
+ }
+ }
+
private static Interval getFreshnessInterval(SqlIntervalLiteral
sqlIntervalLiteral) {
final IntervalValue intervalValue =
sqlIntervalLiteral.getValueAs(IntervalValue.class);
final SqlTypeName typeName =
intervalValue.getIntervalQualifier().typeName();
@@ -303,6 +355,26 @@ public class MaterializedTableUtils {
|| typeName == SqlTypeName.INTERVAL_SECOND;
}
+ private static StartModeKind deriveStartModeKind(SqlStartModeKind
sqlStartModeKind) {
+ switch (sqlStartModeKind) {
+ case FROM_NOW:
+ return StartModeKind.FROM_NOW;
+ case RESUME_OR_FROM_NOW:
+ return StartModeKind.RESUME_OR_FROM_NOW;
+ case RESUME_OR_FROM_BEGINNING:
+ return StartModeKind.RESUME_OR_FROM_BEGINNING;
+ case FROM_BEGINNING:
+ return StartModeKind.FROM_BEGINNING;
+ case RESUME_OR_FROM_TIMESTAMP:
+ return StartModeKind.RESUME_OR_FROM_TIMESTAMP;
+ case FROM_TIMESTAMP:
+ return StartModeKind.FROM_TIMESTAMP;
+ default:
+ throw new ValidationException(
+ String.format("Unsupported start mode: %s.",
sqlStartModeKind));
+ }
+ }
+
// Since it is only for query change, then check only persisted columns
which could be
// changed/added/dropped with such change
private static boolean isSchemaChanged(ResolvedSchema oldSchema,
ResolvedSchema newSchema) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
index 9ba20168766..4082919ef02 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
@@ -23,7 +23,11 @@ import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Interval;
+import org.apache.flink.table.catalog.Interval.TimeUnit;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.StartMode;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UniqueConstraint;
@@ -37,8 +41,11 @@ import
org.apache.flink.table.operations.materializedtable.FullAlterMaterialized
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
+import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -521,6 +528,123 @@ class
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
+ " MODIFY `t` COMMENT 'Timestamp Comment'");
}
+ @ParameterizedTest
+ @EnumSource(
+ value = StartModeKind.class,
+ names = {
+ "FROM_NOW",
+ "RESUME_OR_FROM_NOW",
+ "RESUME_OR_FROM_BEGINNING",
+ "FROM_BEGINNING"
+ })
+ void
testCreateOrAlterMaterializedTableWithNotChangedStartMode(StartModeKind kind)
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ final String prepSql =
+ String.format(
+ "CREATE MATERIALIZED TABLE mt1\n"
+ + "START_MODE=%s\n"
+ + "AS SELECT * FROM t1",
+ kind.toString());
+ createMaterializedTableInCatalog(prepSql, "mt1");
+
+ final String sql =
+ String.format(
+ "CREATE OR ALTER MATERIALIZED TABLE mt1\n"
+ + "START_MODE=%s\n"
+ + "AS SELECT * FROM t1",
+ kind);
+ Operation operation = parse(sql);
+
+
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+ FullAlterMaterializedTableOperation op =
(FullAlterMaterializedTableOperation) operation;
+ assertThat(op.getTableChanges()).isEmpty();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = StartModeKind.class,
+ names = {
+ "FROM_NOW",
+ "RESUME_OR_FROM_NOW",
+ "RESUME_OR_FROM_BEGINNING",
+ "FROM_BEGINNING"
+ })
+ void
testCreateOrAlterMaterializedTableWithFinalDefaultStartMode(StartModeKind kind)
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ final String prepSql =
+ String.format(
+ "CREATE MATERIALIZED TABLE mt1\n"
+ + "START_MODE=%s\n"
+ + "AS SELECT * FROM t1",
+ kind.toString());
+ createMaterializedTableInCatalog(prepSql, "mt1");
+
+ final String sql = "CREATE OR ALTER MATERIALIZED TABLE mt1\n" + "AS
SELECT * FROM t1";
+ Operation operation = parse(sql);
+
+
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+ FullAlterMaterializedTableOperation op =
(FullAlterMaterializedTableOperation) operation;
+ if (kind == StartModeKind.FROM_BEGINNING) {
+ assertThat(op.getTableChanges()).isEmpty();
+ } else {
+ assertThat(op.getTableChanges())
+ .contains(
+ TableChange.modifyStartMode(
+
StartMode.of(StartModeKind.FROM_BEGINNING)));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("startModeAlterCases")
+ void testCreateOrAlterMaterializedTableWithChangedStartMode(
+ String newStartModeClause, StartMode newStartMode)
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ final String prepSql =
+ "CREATE MATERIALIZED TABLE mt1\n"
+ + "START_MODE=FROM_BEGINNING\n"
+ + "AS SELECT * FROM t1";
+ createMaterializedTableInCatalog(prepSql, "mt1");
+
+ final String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE mt1\n"
+ + "START_MODE = "
+ + newStartModeClause
+ + "\n"
+ + "AS SELECT * FROM t1";
+ Operation operation = parse(sql);
+
+
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+ FullAlterMaterializedTableOperation op =
(FullAlterMaterializedTableOperation) operation;
+
assertThat(op.getTableChanges()).containsExactly(TableChange.modifyStartMode(newStartMode));
+ }
+
+ private static Collection<Arguments> startModeAlterCases() {
+ return List.of(
+ Arguments.of(
+ "RESUME_OR_FROM_BEGINNING",
+ StartMode.of(StartModeKind.RESUME_OR_FROM_BEGINNING)),
+ Arguments.of("FROM_NOW", StartMode.of(StartModeKind.FROM_NOW)),
+ Arguments.of(
+ "FROM_NOW(INTERVAL '2' HOUR)",
+ StartMode.of(StartModeKind.FROM_NOW, Interval.of(2,
TimeUnit.HOUR))),
+ Arguments.of(
+ "FROM_NOW",
+ StartMode.of(StartModeKind.FROM_NOW),
+ "FROM_TIMESTAMP(TIMESTAMP '1234-12-10 11:22:33')",
+ StartMode.of(
+ StartModeKind.FROM_TIMESTAMP,
+ Instant.parse("1234-12-10T11:22:33Z"))),
+ Arguments.of("RESUME_OR_FROM_NOW",
StartMode.of(StartModeKind.RESUME_OR_FROM_NOW)),
+ Arguments.of(
+ "RESUME_OR_FROM_TIMESTAMP(TIMESTAMP '2025-01-15
10:00:00')",
+ StartMode.of(
+ StartModeKind.RESUME_OR_FROM_TIMESTAMP,
+ Instant.parse("2025-01-15T10:00:00Z"))));
+ }
+
private void createMaterializedTableInCatalog(String sql, String
materializedTableName)
throws TableAlreadyExistException, DatabaseNotExistException {
final ObjectPath objectPath =