This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c943ab49c4aa58286111e2b95e9580a16f4d6b4c Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Thu Jan 4 16:20:26 2024 +0100 [FLINK-32570][core] Introduces Duration replacements for org.apache.flink.api.common.time.Time-related APIs and deprecates the corresponding methods/classes/constructors --- .../common/restartstrategy/RestartStrategies.java | 146 +++++++++++++++++++-- .../flink/api/common/state/StateTtlConfig.java | 37 +++++- .../org/apache/flink/api/common/time/Time.java | 14 ++ .../main/java/org/apache/flink/util/TimeUtils.java | 2 + .../RestartBackoffTimeStrategyFactoryLoader.java | 12 +- ...estartBackoffTimeStrategyFactoryLoaderTest.java | 10 +- .../environment/StreamExecutionEnvironment.java | 5 +- .../flink/streaming/api/RestartStrategyTest.java | 4 +- 8 files changed, 200 insertions(+), 30 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java index 10363d77f65..9745e9ad1c1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -90,9 +90,23 @@ public class RestartStrategies { * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy * @return FixedDelayRestartStrategy + * @deprecated Use {@link #fixedDelayRestart(int, Duration)} */ + @Deprecated public static RestartStrategyConfiguration fixedDelayRestart( int restartAttempts, Time delayInterval) { + return fixedDelayRestart(restartAttempts, Time.toDuration(delayInterval)); + } + + /** + * Generates a FixedDelayRestartStrategyConfiguration. + * + * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy + * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy + * @return FixedDelayRestartStrategy + */ + public static RestartStrategyConfiguration fixedDelayRestart( + int restartAttempts, Duration delayInterval) { return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval); } @@ -103,9 +117,25 @@ public class RestartStrategies { * before failing a job * @param failureInterval Time interval for failures * @param delayInterval Delay in-between restart attempts + * @deprecated Use {@link #failureRateRestart(int, Duration, Duration)} */ + @Deprecated public static FailureRateRestartStrategyConfiguration failureRateRestart( int failureRate, Time failureInterval, Time delayInterval) { + return failureRateRestart( + failureRate, Time.toDuration(failureInterval), Time.toDuration(delayInterval)); + } + + /** + * Generates a FailureRateRestartStrategyConfiguration. + * + * @param failureRate Maximum number of restarts in given interval {@code failureInterval} + * before failing a job + * @param failureInterval Time interval for failures + * @param delayInterval Delay in-between restart attempts + */ + public static FailureRateRestartStrategyConfiguration failureRateRestart( + int failureRate, Duration failureInterval, Duration delayInterval) { return new FailureRateRestartStrategyConfiguration( failureRate, failureInterval, delayInterval); } @@ -118,13 +148,39 @@ public class RestartStrategies { * @param backoffMultiplier Delay multiplier how many times is the delay longer than before * @param resetBackoffThreshold How long the job must run smoothly to reset the time interval * @param jitterFactor How much the delay may differ (in percentage) + * @deprecated Use {@link #exponentialDelayRestart(Duration, Duration, double, Duration, + * double)} */ + @Deprecated public static ExponentialDelayRestartStrategyConfiguration exponentialDelayRestart( Time initialBackoff, Time maxBackoff, double backoffMultiplier, Time resetBackoffThreshold, double jitterFactor) { + return exponentialDelayRestart( + Time.toDuration(initialBackoff), + Time.toDuration(maxBackoff), + backoffMultiplier, + Time.toDuration(resetBackoffThreshold), + jitterFactor); + } + + /** + * Generates a ExponentialDelayRestartStrategyConfiguration. + * + * @param initialBackoff Starting duration between restarts + * @param maxBackoff The highest possible duration between restarts + * @param backoffMultiplier Delay multiplier how many times is the delay longer than before + * @param resetBackoffThreshold How long the job must run smoothly to reset the time interval + * @param jitterFactor How much the delay may differ (in percentage) + */ + public static ExponentialDelayRestartStrategyConfiguration exponentialDelayRestart( + Duration initialBackoff, + Duration maxBackoff, + double backoffMultiplier, + Duration resetBackoffThreshold, + double jitterFactor) { return new ExponentialDelayRestartStrategyConfiguration( initialBackoff, maxBackoff, backoffMultiplier, resetBackoffThreshold, jitterFactor); } @@ -177,10 +233,10 @@ public class RestartStrategies { private static final long serialVersionUID = 4149870149673363190L; private final int restartAttempts; - private final Time delayBetweenAttemptsInterval; + private final Duration delayBetweenAttemptsInterval; FixedDelayRestartStrategyConfiguration( - int restartAttempts, Time delayBetweenAttemptsInterval) { + int restartAttempts, Duration delayBetweenAttemptsInterval) { this.restartAttempts = restartAttempts; this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval; } @@ -189,7 +245,13 @@ public class RestartStrategies { return restartAttempts; } + /** @deprecated Use {@link #getDurationBetweenAttempts()} */ + @Deprecated public Time getDelayBetweenAttemptsInterval() { + return Time.fromDuration(getDurationBetweenAttempts()); + } + + public Duration getDurationBetweenAttempts() { return delayBetweenAttemptsInterval; } @@ -230,18 +292,38 @@ public class RestartStrategies { extends RestartStrategyConfiguration { private static final long serialVersionUID = 1467941615941965194L; - private final Time initialBackoff; - private final Time maxBackoff; + private final Duration initialBackoff; + private final Duration maxBackoff; private final double backoffMultiplier; - private final Time resetBackoffThreshold; + private final Duration resetBackoffThreshold; private final double jitterFactor; + /** + * @deprecated Use {@link + * ExponentialDelayRestartStrategyConfiguration#ExponentialDelayRestartStrategyConfiguration(Duration, + * Duration, double, Duration, double)} + */ + @Deprecated public ExponentialDelayRestartStrategyConfiguration( Time initialBackoff, Time maxBackoff, double backoffMultiplier, Time resetBackoffThreshold, double jitterFactor) { + this( + Time.toDuration(initialBackoff), + Time.toDuration(maxBackoff), + backoffMultiplier, + Time.toDuration(resetBackoffThreshold), + jitterFactor); + } + + public ExponentialDelayRestartStrategyConfiguration( + Duration initialBackoff, + Duration maxBackoff, + double backoffMultiplier, + Duration resetBackoffThreshold, + double jitterFactor) { this.initialBackoff = initialBackoff; this.maxBackoff = maxBackoff; this.backoffMultiplier = backoffMultiplier; @@ -249,11 +331,23 @@ public class RestartStrategies { this.jitterFactor = jitterFactor; } + /** @deprecated Use {@link #getInitialBackoffDuration()} */ + @Deprecated public Time getInitialBackoff() { + return Time.fromDuration(getInitialBackoffDuration()); + } + + public Duration getInitialBackoffDuration() { return initialBackoff; } + /** @deprecated Use {@link #getMaxBackoffDuration()} */ + @Deprecated public Time getMaxBackoff() { + return Time.fromDuration(maxBackoff); + } + + public Duration getMaxBackoffDuration() { return maxBackoff; } @@ -261,7 +355,13 @@ public class RestartStrategies { return backoffMultiplier; } + /** @deprecated Use {@link #getResetBackoffDurationThreshold()} */ + @Deprecated public Time getResetBackoffThreshold() { + return Time.fromDuration(resetBackoffThreshold); + } + + public Duration getResetBackoffDurationThreshold() { return resetBackoffThreshold; } @@ -315,11 +415,25 @@ public class RestartStrategies { private static final long serialVersionUID = 1195028697539661739L; private final int maxFailureRate; - private final Time failureInterval; - private final Time delayBetweenAttemptsInterval; + private final Duration failureInterval; + private final Duration delayBetweenAttemptsInterval; + /** + * @deprecated Use {@link #FailureRateRestartStrategyConfiguration(int, Duration, Duration)} + */ + @Deprecated public FailureRateRestartStrategyConfiguration( int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) { + this( + maxFailureRate, + Time.toDuration(failureInterval), + Time.toDuration(delayBetweenAttemptsInterval)); + } + + public FailureRateRestartStrategyConfiguration( + int maxFailureRate, + Duration failureInterval, + Duration delayBetweenAttemptsInterval) { this.maxFailureRate = maxFailureRate; this.failureInterval = failureInterval; this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval; @@ -329,11 +443,23 @@ public class RestartStrategies { return maxFailureRate; } + /** @deprecated Use {@link #getFailureIntervalDuration()} */ + @Deprecated public Time getFailureInterval() { + return Time.fromDuration(getFailureIntervalDuration()); + } + + public Duration getFailureIntervalDuration() { return failureInterval; } + /** @deprecated Use {@link #getDurationBetweenAttempts()} */ + @Deprecated public Time getDelayBetweenAttemptsInterval() { + return Time.fromDuration(getDurationBetweenAttempts()); + } + + public Duration getDurationBetweenAttempts() { return delayBetweenAttemptsInterval; } @@ -448,10 +574,10 @@ public class RestartStrategies { RestartStrategyOptions .RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR); return exponentialDelayRestart( - Time.milliseconds(initialBackoff.toMillis()), - Time.milliseconds(maxBackoff.toMillis()), + initialBackoff, + maxBackoff, backoffMultiplier, - Time.milliseconds(resetBackoffThreshold.toMillis()), + resetBackoffThreshold, jitter); case "failurerate": case "failure-rate": diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java index 9e443c42453..b062dd29f3e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java @@ -27,6 +27,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.Serializable; +import java.time.Duration; import java.util.EnumMap; import static org.apache.flink.api.common.state.StateTtlConfig.CleanupStrategies.EMPTY_STRATEGY; @@ -53,7 +54,7 @@ public class StateTtlConfig implements Serializable { private static final long serialVersionUID = -7592693245044289793L; public static final StateTtlConfig DISABLED = - newBuilder(Time.milliseconds(Long.MAX_VALUE)) + newBuilder(Duration.ofMillis(Long.MAX_VALUE)) .setUpdateType(UpdateType.Disabled) .build(); @@ -92,21 +93,21 @@ public class StateTtlConfig implements Serializable { private final UpdateType updateType; private final StateVisibility stateVisibility; private final TtlTimeCharacteristic ttlTimeCharacteristic; - private final Time ttl; + private final Duration ttl; private final CleanupStrategies cleanupStrategies; private StateTtlConfig( UpdateType updateType, StateVisibility stateVisibility, TtlTimeCharacteristic ttlTimeCharacteristic, - Time ttl, + Duration ttl, CleanupStrategies cleanupStrategies) { this.updateType = checkNotNull(updateType); this.stateVisibility = checkNotNull(stateVisibility); this.ttlTimeCharacteristic = checkNotNull(ttlTimeCharacteristic); this.ttl = checkNotNull(ttl); this.cleanupStrategies = cleanupStrategies; - checkArgument(ttl.toMilliseconds() > 0, "TTL is expected to be positive."); + checkArgument(ttl.toMillis() > 0, "TTL is expected to be positive."); } @Nonnull @@ -119,8 +120,14 @@ public class StateTtlConfig implements Serializable { return stateVisibility; } + /** @deprecated Use {@link #getTimeToLive()} */ + @Deprecated @Nonnull public Time getTtl() { + return Time.fromDuration(getTimeToLive()); + } + + public Duration getTimeToLive() { return ttl; } @@ -152,23 +159,35 @@ public class StateTtlConfig implements Serializable { + '}'; } + /** @deprecated Use {@link #newBuilder(Duration)} */ + @Deprecated @Nonnull public static Builder newBuilder(@Nonnull Time ttl) { return new Builder(ttl); } + public static Builder newBuilder(Duration ttl) { + return new Builder(ttl); + } + /** Builder for the {@link StateTtlConfig}. */ public static class Builder { private UpdateType updateType = OnCreateAndWrite; private StateVisibility stateVisibility = NeverReturnExpired; private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime; - private Time ttl; + private Duration ttl; private boolean isCleanupInBackground = true; private final EnumMap<CleanupStrategies.Strategies, CleanupStrategies.CleanupStrategy> strategies = new EnumMap<>(CleanupStrategies.Strategies.class); + /** @deprecated Use {@link #newBuilder(Duration)} */ + @Deprecated public Builder(@Nonnull Time ttl) { + this(Time.toDuration(ttl)); + } + + private Builder(Duration ttl) { this.ttl = ttl; } @@ -343,10 +362,16 @@ public class StateTtlConfig implements Serializable { * Sets the ttl time. * * @param ttl The ttl time. + * @deprecated Use {@link #setTimeToLive(Duration)} */ + @Deprecated @Nonnull public Builder setTtl(@Nonnull Time ttl) { - this.ttl = ttl; + return setTimeToLive(Time.toDuration(ttl)); + } + + public Builder setTimeToLive(Duration ttl) { + this.ttl = Preconditions.checkNotNull(ttl); return this; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java index 803478aec91..2018ef4ecda 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java @@ -20,6 +20,8 @@ package org.apache.flink.api.common.time; import org.apache.flink.annotation.PublicEvolving; +import javax.annotation.Nullable; + import java.io.Serializable; import java.time.Duration; import java.util.Objects; @@ -32,12 +34,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * <p>Note: This class will fully replace org.apache.flink.streaming.api.windowing.time.Time in * Flink 2.0 + * + * @deprecated Use {@link Duration} */ +@Deprecated @PublicEvolving public final class Time implements Serializable { private static final long serialVersionUID = -350254188460915999L; + @Nullable + public static Duration toDuration(@Nullable Time time) { + return time != null ? time.toDuration() : null; + } + /** The time unit for this policy's time interval. */ private final TimeUnit unit; @@ -72,6 +82,10 @@ public final class Time implements Serializable { return size; } + public Duration toDuration() { + return Duration.ofMillis(this.toMilliseconds()); + } + /** * Converts the time interval to milliseconds. * diff --git a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java index f4b31cff35f..80d38c78078 100644 --- a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java @@ -235,7 +235,9 @@ public class TimeUtils { * * @param time time to transform into duration * @return duration equal to the given time + * @deprecated Use {@link Duration} APIs */ + @Deprecated public static Duration toDuration(Time time) { return Duration.of(time.getSize(), toChronoUnit(time.getUnit())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java index 0d63962e6a9..6d38e472919 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java @@ -91,7 +91,7 @@ public final class RestartBackoffTimeStrategyFactoryLoader { new FixedDelayRestartBackoffTimeStrategy .FixedDelayRestartBackoffTimeStrategyFactory( fixedDelayConfig.getRestartAttempts(), - fixedDelayConfig.getDelayBetweenAttemptsInterval().toMilliseconds())); + fixedDelayConfig.getDurationBetweenAttempts().toMillis())); } else if (restartStrategyConfiguration instanceof FailureRateRestartStrategyConfiguration) { final FailureRateRestartStrategyConfiguration failureRateConfig = @@ -101,8 +101,8 @@ public final class RestartBackoffTimeStrategyFactoryLoader { new FailureRateRestartBackoffTimeStrategy .FailureRateRestartBackoffTimeStrategyFactory( failureRateConfig.getMaxFailureRate(), - failureRateConfig.getFailureInterval().toMilliseconds(), - failureRateConfig.getDelayBetweenAttemptsInterval().toMilliseconds())); + failureRateConfig.getFailureIntervalDuration().toMillis(), + failureRateConfig.getDurationBetweenAttempts().toMillis())); } else if (restartStrategyConfiguration instanceof FallbackRestartStrategyConfiguration) { return Optional.empty(); } else if (restartStrategyConfiguration @@ -112,10 +112,10 @@ public final class RestartBackoffTimeStrategyFactoryLoader { return Optional.of( new ExponentialDelayRestartBackoffTimeStrategy .ExponentialDelayRestartBackoffTimeStrategyFactory( - exponentialDelayConfig.getInitialBackoff().toMilliseconds(), - exponentialDelayConfig.getMaxBackoff().toMilliseconds(), + exponentialDelayConfig.getInitialBackoffDuration().toMillis(), + exponentialDelayConfig.getMaxBackoffDuration().toMillis(), exponentialDelayConfig.getBackoffMultiplier(), - exponentialDelayConfig.getResetBackoffThreshold().toMilliseconds(), + exponentialDelayConfig.getResetBackoffDurationThreshold().toMillis(), exponentialDelayConfig.getJitterFactor(), RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS .defaultValue())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoaderTest.java index de503d35908..971cf54be18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoaderTest.java @@ -25,6 +25,8 @@ import org.apache.flink.configuration.RestartStrategyOptions; import org.junit.jupiter.api.Test; +import java.time.Duration; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -72,10 +74,10 @@ class RestartBackoffTimeStrategyFactoryLoaderTest { final RestartBackoffTimeStrategy.Factory factory = RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory( RestartStrategies.exponentialDelayRestart( - Time.milliseconds(1), - Time.milliseconds(1000), + Duration.ofMillis(1), + Duration.ofMillis(1000), 1.1, - Time.milliseconds(2000), + Duration.ofMillis(2000), 0), conf, false); @@ -94,7 +96,7 @@ class RestartBackoffTimeStrategyFactoryLoaderTest { final RestartBackoffTimeStrategy.Factory factory = RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory( RestartStrategies.failureRateRestart( - 1, Time.milliseconds(1000), Time.milliseconds(1000)), + 1, Duration.ofMillis(1000), Duration.ofMillis(1000)), conf, false); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 711dd59d423..2f92ab013d0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -35,7 +35,6 @@ import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; @@ -125,6 +124,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.net.URI; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -860,7 +860,8 @@ public class StreamExecutionEnvironment implements AutoCloseable { * @param numberOfExecutionRetries The number of times the system will try to re-execute failed * tasks. * @deprecated This method will be replaced by {@link #setRestartStrategy}. The {@link - * RestartStrategies#fixedDelayRestart(int, Time)} contains the number of execution retries. + * RestartStrategies#fixedDelayRestart(int, Duration)} contains the number of execution + * retries. */ @Deprecated @PublicEvolving diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java index 16d86f71735..e27065ed04b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java @@ -113,7 +113,7 @@ public class RestartStrategyTest extends TestLogger { Assert.assertEquals( 1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy) - .getDelayBetweenAttemptsInterval() - .toMilliseconds()); + .getDurationBetweenAttempts() + .toMillis()); } }