This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6a66f7a [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class. 6a66f7a is described below commit 6a66f7acf370e12ad65ee24293ed47d2c5db225c Author: klion26 <qcx978132...@gmail.com> AuthorDate: Thu Mar 7 15:07:19 2019 +0800 [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class. We can not remove the class StateTtlConfig#TimeCharacteristic and use org.apache.flink.streaming.api.TimeCharacteristic directly, because StateTtlConfig locates in module flink-core and org.apache.flink.streaming.api.TimeCharacteristic locates in flink-streaming-java, so we choice to rename StateTtlConfig#TimeCharacteristic. changes include: - Deprecated the StateTtlConfig#TimeCharacteristic class (for backward-compatibility). - Introduce a new class named StateTtlConfig#TtlTimeCharacteristic. --- .../flink/api/common/state/StateTtlConfig.java | 62 ++++++++++++++++------ 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java index 5bb44d1..2a78f19 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.state; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.time.Time; import org.apache.flink.util.Preconditions; @@ -29,8 +30,10 @@ import java.io.Serializable; import java.util.EnumMap; import static org.apache.flink.api.common.state.StateTtlConfig.StateVisibility.NeverReturnExpired; -import static org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic.ProcessingTime; +import static org.apache.flink.api.common.state.StateTtlConfig.TtlTimeCharacteristic.ProcessingTime; import static org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCreateAndWrite; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Configuration of state TTL logic. @@ -41,6 +44,7 @@ import static org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCrea * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer} * at the cost of an extra byte in the serialized form. */ +@PublicEvolving public class StateTtlConfig implements Serializable { private static final long serialVersionUID = -7592693245044289793L; @@ -72,31 +76,41 @@ public class StateTtlConfig implements Serializable { /** * This option configures time scale to use for ttl. + * + * @deprecated will be removed in a future version in favor of {@link TtlTimeCharacteristic} */ + @Deprecated public enum TimeCharacteristic { /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */ ProcessingTime } + /** + * This option configures time scale to use for ttl. + */ + public enum TtlTimeCharacteristic { + /** Processing time, see also <code>org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime</code>. */ + ProcessingTime + } + private final UpdateType updateType; private final StateVisibility stateVisibility; - private final TimeCharacteristic timeCharacteristic; + private final TtlTimeCharacteristic ttlTimeCharacteristic; private final Time ttl; private final CleanupStrategies cleanupStrategies; private StateTtlConfig( UpdateType updateType, StateVisibility stateVisibility, - TimeCharacteristic timeCharacteristic, + TtlTimeCharacteristic ttlTimeCharacteristic, Time ttl, CleanupStrategies cleanupStrategies) { - this.updateType = Preconditions.checkNotNull(updateType); - this.stateVisibility = Preconditions.checkNotNull(stateVisibility); - this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic); - this.ttl = Preconditions.checkNotNull(ttl); + this.updateType = checkNotNull(updateType); + this.stateVisibility = checkNotNull(stateVisibility); + this.ttlTimeCharacteristic = checkNotNull(ttlTimeCharacteristic); + this.ttl = checkNotNull(ttl); this.cleanupStrategies = cleanupStrategies; - Preconditions.checkArgument(ttl.toMilliseconds() > 0, - "TTL is expected to be positive"); + checkArgument(ttl.toMilliseconds() > 0, "TTL is expected to be positive."); } @Nonnull @@ -115,8 +129,8 @@ public class StateTtlConfig implements Serializable { } @Nonnull - public TimeCharacteristic getTimeCharacteristic() { - return timeCharacteristic; + public TtlTimeCharacteristic getTtlTimeCharacteristic() { + return ttlTimeCharacteristic; } public boolean isEnabled() { @@ -133,7 +147,7 @@ public class StateTtlConfig implements Serializable { return "StateTtlConfig{" + "updateType=" + updateType + ", stateVisibility=" + stateVisibility + - ", timeCharacteristic=" + timeCharacteristic + + ", ttlTimeCharacteristic=" + ttlTimeCharacteristic + ", ttl=" + ttl + '}'; } @@ -150,7 +164,7 @@ public class StateTtlConfig implements Serializable { private UpdateType updateType = OnCreateAndWrite; private StateVisibility stateVisibility = NeverReturnExpired; - private TimeCharacteristic timeCharacteristic = ProcessingTime; + private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime; private Time ttl; private CleanupStrategies cleanupStrategies = new CleanupStrategies(); @@ -204,16 +218,32 @@ public class StateTtlConfig implements Serializable { * Sets the time characteristic. * * @param timeCharacteristic The time characteristic configures time scale to use for ttl. + * + * @deprecated will be removed in a future version in favor of {@link #setTtlTimeCharacteristic} */ + @Deprecated @Nonnull public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) { - this.timeCharacteristic = timeCharacteristic; + checkArgument(timeCharacteristic.equals(TimeCharacteristic.ProcessingTime), + "Only support TimeCharacteristic.ProcessingTime, this function has replaced by setTtlTimeCharacteristic."); + setTtlTimeCharacteristic(TtlTimeCharacteristic.ProcessingTime); + return this; + } + + /** + * Sets the time characteristic. + * + * @param ttlTimeCharacteristic The time characteristic configures time scale to use for ttl. + */ + @Nonnull + public Builder setTtlTimeCharacteristic(@Nonnull TtlTimeCharacteristic ttlTimeCharacteristic) { + this.ttlTimeCharacteristic = ttlTimeCharacteristic; return this; } @Nonnull public Builder useProcessingTime() { - return setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + return setTtlTimeCharacteristic(ProcessingTime); } /** Cleanup expired state in full snapshot on checkpoint. */ @@ -312,7 +342,7 @@ public class StateTtlConfig implements Serializable { return new StateTtlConfig( updateType, stateVisibility, - timeCharacteristic, + ttlTimeCharacteristic, ttl, cleanupStrategies); }