This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 39f0344a9585ff00064c8d632ff4222c4066c2bf Author: Sergey Nuyanzin <[email protected]> AuthorDate: Thu Apr 2 14:51:49 2026 +0200 [FLINK-39393][table] Add `TableChange` for `START_MODE` Co-authored-by: Ramin Gharib <[email protected]> --- .../org/apache/flink/table/catalog/StartMode.java | 144 +++++++++++++++++++++ .../apache/flink/table/catalog/TableChange.java | 47 +++++++ 2 files changed, 191 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StartMode.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StartMode.java new file mode 100644 index 00000000000..245e50a9ac7 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StartMode.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; + +import javax.annotation.Nullable; + +import java.time.Instant; +import java.util.Objects; + +/** The start mode of materialized table. */ +@PublicEvolving +public class StartMode { + private final StartModeKind kind; + private final @Nullable Instant timestamp; + private final boolean localTimeZone; + private final @Nullable Interval interval; + + @PublicEvolving + public enum StartModeKind { + FROM_BEGINNING, + FROM_NOW, + FROM_TIMESTAMP, + RESUME_OR_FROM_BEGINNING, + RESUME_OR_FROM_NOW, + RESUME_OR_FROM_TIMESTAMP; + } + + private StartMode( + StartModeKind kind, + @Nullable Instant timestamp, + boolean localTimeZone, + @Nullable Interval interval) { + this.kind = kind; + this.timestamp = timestamp; + this.localTimeZone = localTimeZone; + this.interval = interval; + } + + public static StartMode of(StartModeKind kind) { + return new StartMode(kind, null, false, null); + } + + public static StartMode of(StartModeKind kind, Instant timestamp) { + return new StartMode(kind, timestamp, false, null); + } + + public static StartMode of(StartModeKind kind, Instant timestamp, boolean localTimeZone) { + return new StartMode(kind, timestamp, localTimeZone, null); + } + + public static StartMode of(StartModeKind kind, Interval interval) { + return new StartMode(kind, null, false, interval); + } + + public static boolean requiresParameters(StartModeKind kind) { + return kind == StartModeKind.FROM_TIMESTAMP + || kind == StartModeKind.RESUME_OR_FROM_TIMESTAMP; + } + + public StartModeKind getKind() { + return kind; + } + + @Nullable + public Instant getTimestamp() { + return timestamp; + } + + public boolean isLocalTimeZone() { + return localTimeZone; + } + + @Nullable + public Interval getInterval() { + return interval; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + StartMode startMode = (StartMode) o; + return localTimeZone == startMode.localTimeZone + && kind == startMode.kind + && Objects.equals(timestamp, startMode.timestamp) + && Objects.equals(interval, startMode.interval); + } + + @Override + public int hashCode() { + return Objects.hash(kind, timestamp, localTimeZone, interval); + } + + public String asSummaryString() { + switch (kind) { + case FROM_BEGINNING: + case RESUME_OR_FROM_BEGINNING: + return kind.name(); + case FROM_NOW: + case RESUME_OR_FROM_NOW: + if (interval == null) { + return kind.name(); + } + + return kind.name() + "(" + interval + ")"; + + case FROM_TIMESTAMP: + case RESUME_OR_FROM_TIMESTAMP: + return kind.name() + + "(TIMESTAMP " + + (localTimeZone ? "WITH LOCAL TIME ZONE " : "") + + "'" + + timestamp + + "')"; + + default: + throw new IllegalStateException("Unexpected StartModeKind: " + kind); + } + } + + @Override + public String toString() { + return asSummaryString(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java index 67b4941a48a..c890b3b8adf 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java @@ -357,6 +357,16 @@ public interface TableChange { return new ResetOption(key); } + /** + * A table change to modify materialized table start mode. + * + * @param startMode the modified start mode. + * @return a TableChange represents the modification. + */ + static ModifyStartMode modifyStartMode(StartMode startMode) { + return new ModifyStartMode(startMode); + } + /** * A table change to modify materialized table refresh status. * @@ -1470,4 +1480,41 @@ public interface TableChange { return Objects.hash(definitionQuery, originalQuery); } } + + /** A table change to modify materialized table start mode. */ + @PublicEvolving + class ModifyStartMode implements MaterializedTableChange { + + private final StartMode startMode; + + public ModifyStartMode(StartMode startMode) { + this.startMode = startMode; + } + + public StartMode getStartMode() { + return startMode; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ModifyStartMode that = (ModifyStartMode) o; + return Objects.equals(startMode, that.startMode); + } + + @Override + public int hashCode() { + return Objects.hash(startMode); + } + + @Override + public String toString() { + return "ModifyStartMode{" + "startMode=" + startMode + '}'; + } + } }
