This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a0147374ee0 [FLINK-39303][table] Support `START_MODE` for `CREATE` 
path for `MATERIALIZED TABLE`
a0147374ee0 is described below

commit a0147374ee09fcd6a8e7c492fc9333784501bf90
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Apr 23 17:41:38 2026 +0200

    [FLINK-39303][table] Support `START_MODE` for `CREATE` path for 
`MATERIALIZED TABLE`
---
 .../AlterMaterializedTableChangeOperation.java     |   4 +
 .../MaterializedTableChangeHandler.java            |  15 +++
 .../table/catalog/CatalogMaterializedTable.java    |  14 ++-
 .../catalog/DefaultCatalogMaterializedTable.java   |  19 +++-
 .../AbstractCreateMaterializedTableConverter.java  |  18 +++
 ...SqlCreateOrAlterMaterializedTableConverter.java |  24 +++-
 .../planner/utils/MaterializedTableUtils.java      |  72 ++++++++++++
 ...reateOrAlterMaterializedTableConverterTest.java | 124 +++++++++++++++++++++
 8 files changed, 276 insertions(+), 14 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
index c6d42012565..13a3019a5c7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery;
 import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
 import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
+import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 
 import java.util.List;
@@ -120,6 +121,9 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
             ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery) 
tableChange;
             return String.format(
                     "  MODIFY DEFINITION QUERY TO '%s'", 
definitionQuery.getDefinitionQuery());
+        } else if (tableChange instanceof ModifyStartMode) {
+            ModifyStartMode startMode = (ModifyStartMode) tableChange;
+            return String.format("  MODIFY START_MODE TO '%s'", 
startMode.getStartMode());
         } else {
             return AlterTableChangeOperation.toString(tableChange);
         }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
index 8602812b6c3..54a55414a8e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.StartMode;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableChange.AddColumn;
 import org.apache.flink.table.catalog.TableChange.AddDistribution;
@@ -44,6 +45,7 @@ import 
org.apache.flink.table.catalog.TableChange.ModifyDistribution;
 import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
 import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
 import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
+import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
 import org.apache.flink.table.catalog.TableChange.ModifyUniqueConstraint;
 import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
 import org.apache.flink.table.catalog.TableChange.ResetOption;
@@ -84,6 +86,7 @@ public class MaterializedTableChangeHandler {
     private int droppedPersistedCnt = 0;
     private String originalQuery;
     private String expandedQuery;
+    private StartMode startMode;
     private final Map<String, String> options;
     private final List<String> validationErrors = new ArrayList<>();
 
@@ -102,6 +105,7 @@ public class MaterializedTableChangeHandler {
         }
         originalQuery = oldTable.getOriginalQuery();
         expandedQuery = oldTable.getExpandedQuery();
+        startMode = oldTable.getStartMode().orElse(null);
         this.oldTable = oldTable;
         this.options = new HashMap<>(oldTable.getOptions());
     }
@@ -170,6 +174,7 @@ public class MaterializedTableChangeHandler {
                 .refreshStatus(context.getRefreshStatus())
                 .refreshHandlerDescription(context.getRefreshHandlerDesc())
                 .serializedRefreshHandler(context.getRefreshHandlerBytes())
+                .startMode(context.getStartMode())
                 .build();
     }
 
@@ -222,6 +227,8 @@ public class MaterializedTableChangeHandler {
         registry.register(SetOption.class, 
MaterializedTableChangeHandler::setTableOption);
         registry.register(ResetOption.class, 
MaterializedTableChangeHandler::resetTableOption);
 
+        registry.register(ModifyStartMode.class, 
MaterializedTableChangeHandler::modifyStartMode);
+
         return registry;
     }
 
@@ -284,6 +291,10 @@ public class MaterializedTableChangeHandler {
         return refreshHandlerBytes;
     }
 
+    public StartMode getStartMode() {
+        return startMode;
+    }
+
     @Nullable
     public String getRefreshHandlerDesc() {
         return refreshHandlerDesc;
@@ -409,6 +420,10 @@ public class MaterializedTableChangeHandler {
         refreshStatus = modifyRefreshStatus.getRefreshStatus();
     }
 
+    private void modifyStartMode(ModifyStartMode modifyStartMode) {
+        startMode = modifyStartMode.getStartMode();
+    }
+
     private void addDistribution(AddDistribution addDistribution) {
         distribution = addDistribution.getDistribution();
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
index b4a5084e9f6..f05ebc1e089 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
@@ -25,7 +25,6 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -243,8 +242,8 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
         private Schema schema;
         private String comment;
         private TableDistribution distribution = null;
-        private List<String> partitionKeys = Collections.emptyList();
-        private Map<String, String> options = Collections.emptyMap();
+        private List<String> partitionKeys = List.of();
+        private Map<String, String> options = Map.of();
         private @Nullable Long snapshot;
         private String originalQuery;
         private String expandedQuery;
@@ -254,6 +253,7 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
         private RefreshStatus refreshStatus;
         private @Nullable String refreshHandlerDescription;
         private @Nullable byte[] serializedRefreshHandler;
+        private StartMode startMode;
 
         private Builder() {}
 
@@ -341,6 +341,11 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
             return this;
         }
 
+        public Builder startMode(StartMode startMode) {
+            this.startMode = startMode;
+            return this;
+        }
+
         public CatalogMaterializedTable build() {
             return new DefaultCatalogMaterializedTable(
                     schema,
@@ -356,7 +361,8 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
                     refreshMode,
                     refreshStatus,
                     refreshHandlerDescription,
-                    serializedRefreshHandler);
+                    serializedRefreshHandler,
+                    startMode);
         }
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
index 78eca0ce897..7b7439994fc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
@@ -52,6 +52,7 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
     private final RefreshStatus refreshStatus;
     private final @Nullable String refreshHandlerDescription;
     private final @Nullable byte[] serializedRefreshHandler;
+    private final @Nullable StartMode startMode;
 
     protected DefaultCatalogMaterializedTable(
             Schema schema,
@@ -67,7 +68,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
             @Nullable RefreshMode refreshMode,
             RefreshStatus refreshStatus,
             @Nullable String refreshHandlerDescription,
-            @Nullable byte[] serializedRefreshHandler) {
+            @Nullable byte[] serializedRefreshHandler,
+            @Nullable StartMode startMode) {
         this.schema = checkNotNull(schema, "Schema must not be null.");
         this.comment = comment;
         this.distribution = distribution;
@@ -83,6 +85,7 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
         this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must 
not be null.");
         this.refreshHandlerDescription = refreshHandlerDescription;
         this.serializedRefreshHandler = serializedRefreshHandler;
+        this.startMode = startMode;
 
         checkArgument(
                 options.entrySet().stream()
@@ -136,7 +139,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
                 refreshMode,
                 refreshStatus,
                 refreshHandlerDescription,
-                serializedRefreshHandler);
+                serializedRefreshHandler,
+                startMode);
     }
 
     @Override
@@ -155,7 +159,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
                 refreshMode,
                 refreshStatus,
                 refreshHandlerDescription,
-                serializedRefreshHandler);
+                serializedRefreshHandler,
+                startMode);
     }
 
     @Override
@@ -177,7 +182,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
                 refreshMode,
                 refreshStatus,
                 refreshHandlerDescription,
-                serializedRefreshHandler);
+                serializedRefreshHandler,
+                startMode);
     }
 
     @Override
@@ -225,6 +231,11 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
         return refreshStatus;
     }
 
+    @Override
+    public Optional<StartMode> getStartMode() {
+        return Optional.ofNullable(startMode);
+    }
+
     @Override
     public Optional<String> getRefreshHandlerDescription() {
         return Optional.ofNullable(refreshHandlerDescription);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index 9c4d1969601..2d6d10bdc53 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.table.planner.operations.converters.materializedtable;
 import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateMaterializedTable;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.MaterializedTableConfigOptions;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
@@ -29,6 +30,7 @@ import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.StartMode;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
@@ -78,6 +80,8 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
         ResolvedSchema getMergedQuerySchema();
 
         RefreshMode getMergedRefreshMode();
+
+        StartMode getMergedStartMode();
     }
 
     protected abstract MergeContext getMergeContext(
@@ -99,6 +103,17 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
                 .orElse(null);
     }
 
+    protected final StartMode getStartMode(T sqlCreateMaterializedTable, 
ConvertContext context) {
+        StartMode startMode =
+                
MaterializedTableUtils.getStartMode(sqlCreateMaterializedTable.getStartMode());
+        if (startMode != null) {
+            return startMode;
+        }
+        return StartMode.of(
+                context.getTableConfig()
+                        
.get(MaterializedTableConfigOptions.MATERIALIZED_TABLE_DEFAULT_START_MODE));
+    }
+
     protected final ResolvedSchema getQueryResolvedSchema(
             T sqlCreateMaterializedTable, ConvertContext context) {
         SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
@@ -160,6 +175,8 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
 
         final RefreshMode refreshMode = 
getDerivedRefreshMode(logicalRefreshMode);
 
+        final StartMode startMode = mergeContext.getMergedStartMode();
+
         return context.getCatalogManager()
                 .resolveCatalogMaterializedTable(
                         CatalogMaterializedTable.newBuilder()
@@ -174,6 +191,7 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
                                 .logicalRefreshMode(logicalRefreshMode)
                                 .refreshMode(refreshMode)
                                 .refreshStatus(RefreshStatus.INITIALIZING)
+                                .startMode(startMode)
                                 .build());
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 198a747406b..0d81346262c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.SchemaResolver;
+import org.apache.flink.table.catalog.StartMode;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UniqueConstraint;
@@ -137,6 +138,15 @@ public class SqlCreateOrAlterMaterializedTableConverter
                 throw new ValidationException("Changing of REFRESH MODE is 
unsupported");
             }
 
+            final StartMode newStartMode = mergeContext.getMergedStartMode();
+            final StartMode oldStartMode =
+                    oldTable.getStartMode()
+                            .orElseThrow(
+                                    () -> new ValidationException("START_MODE 
must not be null"));
+            if (!Objects.equals(oldStartMode, newStartMode)) {
+                changes.add(TableChange.modifyStartMode(newStartMode));
+            }
+
             return changes;
         };
     }
@@ -213,7 +223,7 @@ public class SqlCreateOrAlterMaterializedTableConverter
     private Optional<TableChange> getConstraintChange(
             final ResolvedSchema oldSchema,
             final ResolvedSchema newSchema,
-            boolean hasConstraintDefinition) {
+            final boolean hasConstraintDefinition) {
         final UniqueConstraint oldConstraint = 
oldSchema.getPrimaryKey().orElse(null);
         final UniqueConstraint newConstraint = 
newSchema.getPrimaryKey().orElse(null);
         if (hasConstraintDefinition && !Objects.equals(oldConstraint, 
newConstraint)) {
@@ -279,11 +289,8 @@ public class SqlCreateOrAlterMaterializedTableConverter
 
             @Override
             public boolean hasConstraintDefinition() {
-                if 
(!sqlCreateMaterializedTable.getTableConstraints().isEmpty()) {
-                    return true;
-                }
-
-                return hasSchemaDefinition();
+                return 
!sqlCreateMaterializedTable.getTableConstraints().isEmpty()
+                        || hasSchemaDefinition();
             }
 
             @Override
@@ -354,6 +361,11 @@ public class SqlCreateOrAlterMaterializedTableConverter
                 return getDerivedRefreshMode(
                         
getDerivedLogicalRefreshMode(sqlCreateMaterializedTable));
             }
+
+            @Override
+            public StartMode getMergedStartMode() {
+                return getStartMode(sqlCreateMaterializedTable, context);
+            }
         };
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 6f50a3d36e7..99fd660e836 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -24,6 +24,8 @@ import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlMetadataColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
 import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema;
+import org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode;
+import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode.SqlStartModeKind;
 import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
@@ -36,6 +38,8 @@ import org.apache.flink.table.catalog.Interval;
 import org.apache.flink.table.catalog.Interval.TimeUnit;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.StartMode;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableChange.ColumnPosition;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
@@ -47,10 +51,13 @@ import org.apache.calcite.sql.SqlIntervalLiteral;
 import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlTimestampLiteral;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.TimestampString;
 import org.apache.commons.lang3.StringUtils;
 
 import java.math.BigDecimal;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -77,6 +84,51 @@ public class MaterializedTableUtils {
         return IntervalFreshness.of(getFreshnessInterval(sqlIntervalLiteral));
     }
 
+    public static StartMode getStartMode(SqlStartMode sqlStartMode) {
+        if (sqlStartMode == null) {
+            return null;
+        }
+
+        SqlStartModeKind sqlStartModeKind = sqlStartMode.getKind();
+        StartModeKind startModeKind = deriveStartModeKind(sqlStartModeKind);
+        switch (sqlStartModeKind) {
+            case FROM_NOW:
+            case RESUME_OR_FROM_NOW:
+                SqlIntervalLiteral intervalLiteral = 
sqlStartMode.getIntervalLiteral();
+                if (intervalLiteral == null) {
+                    return StartMode.of(startModeKind, null, false);
+                }
+
+                Interval interval = intervalFrom(intervalLiteral, "start 
mode");
+                validateIntervalValuePositive(interval.getInterval(), "start 
mode");
+                return StartMode.of(startModeKind, interval);
+
+            case RESUME_OR_FROM_BEGINNING:
+            case FROM_BEGINNING:
+                return StartMode.of(startModeKind);
+            case RESUME_OR_FROM_TIMESTAMP:
+            case FROM_TIMESTAMP:
+                SqlTimestampLiteral timestampLiteral = 
sqlStartMode.getTimestampLiteral();
+                if (timestampLiteral == null) {
+                    return StartMode.of(startModeKind, null, false);
+                }
+
+                TimestampString timestampString =
+                        timestampLiteral.getValueAs(TimestampString.class);
+                SqlTypeName timestampTypeName = timestampLiteral.getTypeName();
+                long millis = timestampString.getMillisSinceEpoch();
+                Instant timestamp = Instant.ofEpochMilli(millis);
+                return StartMode.of(
+                        startModeKind,
+                        timestamp,
+                        timestampTypeName == 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+
+            default:
+                throw new ValidationException(
+                        String.format("Unsupported start mode: %s.", 
sqlStartModeKind));
+        }
+    }
+
     private static Interval getFreshnessInterval(SqlIntervalLiteral 
sqlIntervalLiteral) {
         final IntervalValue intervalValue = 
sqlIntervalLiteral.getValueAs(IntervalValue.class);
         final SqlTypeName typeName = 
intervalValue.getIntervalQualifier().typeName();
@@ -303,6 +355,26 @@ public class MaterializedTableUtils {
                 || typeName == SqlTypeName.INTERVAL_SECOND;
     }
 
+    private static StartModeKind deriveStartModeKind(SqlStartModeKind 
sqlStartModeKind) {
+        switch (sqlStartModeKind) {
+            case FROM_NOW:
+                return StartModeKind.FROM_NOW;
+            case RESUME_OR_FROM_NOW:
+                return StartModeKind.RESUME_OR_FROM_NOW;
+            case RESUME_OR_FROM_BEGINNING:
+                return StartModeKind.RESUME_OR_FROM_BEGINNING;
+            case FROM_BEGINNING:
+                return StartModeKind.FROM_BEGINNING;
+            case RESUME_OR_FROM_TIMESTAMP:
+                return StartModeKind.RESUME_OR_FROM_TIMESTAMP;
+            case FROM_TIMESTAMP:
+                return StartModeKind.FROM_TIMESTAMP;
+            default:
+                throw new ValidationException(
+                        String.format("Unsupported start mode: %s.", 
sqlStartModeKind));
+        }
+    }
+
     // Since it is only for query change, then check only persisted columns 
which could be
     // changed/added/dropped with such change
     private static boolean isSchemaChanged(ResolvedSchema oldSchema, 
ResolvedSchema newSchema) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
index 9ba20168766..4082919ef02 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
@@ -23,7 +23,11 @@ import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Interval;
+import org.apache.flink.table.catalog.Interval.TimeUnit;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.StartMode;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UniqueConstraint;
@@ -37,8 +41,11 @@ import 
org.apache.flink.table.operations.materializedtable.FullAlterMaterialized
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.time.Instant;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -521,6 +528,123 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
                                 + "  MODIFY `t` COMMENT 'Timestamp Comment'");
     }
 
+    @ParameterizedTest
+    @EnumSource(
+            value = StartModeKind.class,
+            names = {
+                "FROM_NOW",
+                "RESUME_OR_FROM_NOW",
+                "RESUME_OR_FROM_BEGINNING",
+                "FROM_BEGINNING"
+            })
+    void 
testCreateOrAlterMaterializedTableWithNotChangedStartMode(StartModeKind kind)
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        final String prepSql =
+                String.format(
+                        "CREATE MATERIALIZED TABLE mt1\n"
+                                + "START_MODE=%s\n"
+                                + "AS SELECT * FROM t1",
+                        kind.toString());
+        createMaterializedTableInCatalog(prepSql, "mt1");
+
+        final String sql =
+                String.format(
+                        "CREATE OR ALTER MATERIALIZED TABLE mt1\n"
+                                + "START_MODE=%s\n"
+                                + "AS SELECT * FROM t1",
+                        kind);
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges()).isEmpty();
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = StartModeKind.class,
+            names = {
+                "FROM_NOW",
+                "RESUME_OR_FROM_NOW",
+                "RESUME_OR_FROM_BEGINNING",
+                "FROM_BEGINNING"
+            })
+    void 
testCreateOrAlterMaterializedTableWithFinalDefaultStartMode(StartModeKind kind)
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        final String prepSql =
+                String.format(
+                        "CREATE MATERIALIZED TABLE mt1\n"
+                                + "START_MODE=%s\n"
+                                + "AS SELECT * FROM t1",
+                        kind.toString());
+        createMaterializedTableInCatalog(prepSql, "mt1");
+
+        final String sql = "CREATE OR ALTER MATERIALIZED TABLE mt1\n" + "AS 
SELECT * FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        if (kind == StartModeKind.FROM_BEGINNING) {
+            assertThat(op.getTableChanges()).isEmpty();
+        } else {
+            assertThat(op.getTableChanges())
+                    .contains(
+                            TableChange.modifyStartMode(
+                                    
StartMode.of(StartModeKind.FROM_BEGINNING)));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("startModeAlterCases")
+    void testCreateOrAlterMaterializedTableWithChangedStartMode(
+            String newStartModeClause, StartMode newStartMode)
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        final String prepSql =
+                "CREATE MATERIALIZED TABLE mt1\n"
+                        + "START_MODE=FROM_BEGINNING\n"
+                        + "AS SELECT * FROM t1";
+        createMaterializedTableInCatalog(prepSql, "mt1");
+
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt1\n"
+                        + "START_MODE = "
+                        + newStartModeClause
+                        + "\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        
assertThat(op.getTableChanges()).containsExactly(TableChange.modifyStartMode(newStartMode));
+    }
+
+    private static Collection<Arguments> startModeAlterCases() {
+        return List.of(
+                Arguments.of(
+                        "RESUME_OR_FROM_BEGINNING",
+                        StartMode.of(StartModeKind.RESUME_OR_FROM_BEGINNING)),
+                Arguments.of("FROM_NOW", StartMode.of(StartModeKind.FROM_NOW)),
+                Arguments.of(
+                        "FROM_NOW(INTERVAL '2' HOUR)",
+                        StartMode.of(StartModeKind.FROM_NOW, Interval.of(2, 
TimeUnit.HOUR))),
+                Arguments.of(
+                        "FROM_NOW",
+                        StartMode.of(StartModeKind.FROM_NOW),
+                        "FROM_TIMESTAMP(TIMESTAMP '1234-12-10 11:22:33')",
+                        StartMode.of(
+                                StartModeKind.FROM_TIMESTAMP,
+                                Instant.parse("1234-12-10T11:22:33Z"))),
+                Arguments.of("RESUME_OR_FROM_NOW", 
StartMode.of(StartModeKind.RESUME_OR_FROM_NOW)),
+                Arguments.of(
+                        "RESUME_OR_FROM_TIMESTAMP(TIMESTAMP '2025-01-15 
10:00:00')",
+                        StartMode.of(
+                                StartModeKind.RESUME_OR_FROM_TIMESTAMP,
+                                Instant.parse("2025-01-15T10:00:00Z"))));
+    }
+
     private void createMaterializedTableInCatalog(String sql, String 
materializedTableName)
             throws TableAlreadyExistException, DatabaseNotExistException {
         final ObjectPath objectPath =

Reply via email to