This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c943ab49c4aa58286111e2b95e9580a16f4d6b4c
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Thu Jan 4 16:20:26 2024 +0100

    [FLINK-32570][core] Introduces Duration replacements for 
org.apache.flink.api.common.time.Time-related APIs and deprecates the 
corresponding methods/classes/constructors
---
 .../common/restartstrategy/RestartStrategies.java  | 146 +++++++++++++++++++--
 .../flink/api/common/state/StateTtlConfig.java     |  37 +++++-
 .../org/apache/flink/api/common/time/Time.java     |  14 ++
 .../main/java/org/apache/flink/util/TimeUtils.java |   2 +
 .../RestartBackoffTimeStrategyFactoryLoader.java   |  12 +-
 ...estartBackoffTimeStrategyFactoryLoaderTest.java |  10 +-
 .../environment/StreamExecutionEnvironment.java    |   5 +-
 .../flink/streaming/api/RestartStrategyTest.java   |   4 +-
 8 files changed, 200 insertions(+), 30 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index 10363d77f65..9745e9ad1c1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -90,9 +90,23 @@ public class RestartStrategies {
      * @param restartAttempts Number of restart attempts for the 
FixedDelayRestartStrategy
      * @param delayInterval Delay in-between restart attempts for the 
FixedDelayRestartStrategy
      * @return FixedDelayRestartStrategy
+     * @deprecated Use {@link #fixedDelayRestart(int, Duration)}
      */
+    @Deprecated
     public static RestartStrategyConfiguration fixedDelayRestart(
             int restartAttempts, Time delayInterval) {
+        return fixedDelayRestart(restartAttempts, 
Time.toDuration(delayInterval));
+    }
+
+    /**
+     * Generates a FixedDelayRestartStrategyConfiguration.
+     *
+     * @param restartAttempts Number of restart attempts for the 
FixedDelayRestartStrategy
+     * @param delayInterval Delay in-between restart attempts for the 
FixedDelayRestartStrategy
+     * @return FixedDelayRestartStrategy
+     */
+    public static RestartStrategyConfiguration fixedDelayRestart(
+            int restartAttempts, Duration delayInterval) {
         return new FixedDelayRestartStrategyConfiguration(restartAttempts, 
delayInterval);
     }
 
@@ -103,9 +117,25 @@ public class RestartStrategies {
      *     before failing a job
      * @param failureInterval Time interval for failures
      * @param delayInterval Delay in-between restart attempts
+     * @deprecated Use {@link #failureRateRestart(int, Duration, Duration)}
      */
+    @Deprecated
     public static FailureRateRestartStrategyConfiguration failureRateRestart(
             int failureRate, Time failureInterval, Time delayInterval) {
+        return failureRateRestart(
+                failureRate, Time.toDuration(failureInterval), 
Time.toDuration(delayInterval));
+    }
+
+    /**
+     * Generates a FailureRateRestartStrategyConfiguration.
+     *
+     * @param failureRate Maximum number of restarts in given interval {@code 
failureInterval}
+     *     before failing a job
+     * @param failureInterval Time interval for failures
+     * @param delayInterval Delay in-between restart attempts
+     */
+    public static FailureRateRestartStrategyConfiguration failureRateRestart(
+            int failureRate, Duration failureInterval, Duration delayInterval) 
{
         return new FailureRateRestartStrategyConfiguration(
                 failureRate, failureInterval, delayInterval);
     }
@@ -118,13 +148,39 @@ public class RestartStrategies {
      * @param backoffMultiplier Delay multiplier how many times is the delay 
longer than before
      * @param resetBackoffThreshold How long the job must run smoothly to 
reset the time interval
      * @param jitterFactor How much the delay may differ (in percentage)
+     * @deprecated Use {@link #exponentialDelayRestart(Duration, Duration, 
double, Duration,
+     *     double)}
      */
+    @Deprecated
     public static ExponentialDelayRestartStrategyConfiguration 
exponentialDelayRestart(
             Time initialBackoff,
             Time maxBackoff,
             double backoffMultiplier,
             Time resetBackoffThreshold,
             double jitterFactor) {
+        return exponentialDelayRestart(
+                Time.toDuration(initialBackoff),
+                Time.toDuration(maxBackoff),
+                backoffMultiplier,
+                Time.toDuration(resetBackoffThreshold),
+                jitterFactor);
+    }
+
+    /**
+     * Generates a ExponentialDelayRestartStrategyConfiguration.
+     *
+     * @param initialBackoff Starting duration between restarts
+     * @param maxBackoff The highest possible duration between restarts
+     * @param backoffMultiplier Delay multiplier how many times is the delay 
longer than before
+     * @param resetBackoffThreshold How long the job must run smoothly to 
reset the time interval
+     * @param jitterFactor How much the delay may differ (in percentage)
+     */
+    public static ExponentialDelayRestartStrategyConfiguration 
exponentialDelayRestart(
+            Duration initialBackoff,
+            Duration maxBackoff,
+            double backoffMultiplier,
+            Duration resetBackoffThreshold,
+            double jitterFactor) {
         return new ExponentialDelayRestartStrategyConfiguration(
                 initialBackoff, maxBackoff, backoffMultiplier, 
resetBackoffThreshold, jitterFactor);
     }
@@ -177,10 +233,10 @@ public class RestartStrategies {
         private static final long serialVersionUID = 4149870149673363190L;
 
         private final int restartAttempts;
-        private final Time delayBetweenAttemptsInterval;
+        private final Duration delayBetweenAttemptsInterval;
 
         FixedDelayRestartStrategyConfiguration(
-                int restartAttempts, Time delayBetweenAttemptsInterval) {
+                int restartAttempts, Duration delayBetweenAttemptsInterval) {
             this.restartAttempts = restartAttempts;
             this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
         }
@@ -189,7 +245,13 @@ public class RestartStrategies {
             return restartAttempts;
         }
 
+        /** @deprecated Use {@link #getDurationBetweenAttempts()} */
+        @Deprecated
         public Time getDelayBetweenAttemptsInterval() {
+            return Time.fromDuration(getDurationBetweenAttempts());
+        }
+
+        public Duration getDurationBetweenAttempts() {
             return delayBetweenAttemptsInterval;
         }
 
@@ -230,18 +292,38 @@ public class RestartStrategies {
             extends RestartStrategyConfiguration {
         private static final long serialVersionUID = 1467941615941965194L;
 
-        private final Time initialBackoff;
-        private final Time maxBackoff;
+        private final Duration initialBackoff;
+        private final Duration maxBackoff;
         private final double backoffMultiplier;
-        private final Time resetBackoffThreshold;
+        private final Duration resetBackoffThreshold;
         private final double jitterFactor;
 
+        /**
+         * @deprecated Use {@link
+         *     
ExponentialDelayRestartStrategyConfiguration#ExponentialDelayRestartStrategyConfiguration(Duration,
+         *     Duration, double, Duration, double)}
+         */
+        @Deprecated
         public ExponentialDelayRestartStrategyConfiguration(
                 Time initialBackoff,
                 Time maxBackoff,
                 double backoffMultiplier,
                 Time resetBackoffThreshold,
                 double jitterFactor) {
+            this(
+                    Time.toDuration(initialBackoff),
+                    Time.toDuration(maxBackoff),
+                    backoffMultiplier,
+                    Time.toDuration(resetBackoffThreshold),
+                    jitterFactor);
+        }
+
+        public ExponentialDelayRestartStrategyConfiguration(
+                Duration initialBackoff,
+                Duration maxBackoff,
+                double backoffMultiplier,
+                Duration resetBackoffThreshold,
+                double jitterFactor) {
             this.initialBackoff = initialBackoff;
             this.maxBackoff = maxBackoff;
             this.backoffMultiplier = backoffMultiplier;
@@ -249,11 +331,23 @@ public class RestartStrategies {
             this.jitterFactor = jitterFactor;
         }
 
+        /** @deprecated Use {@link #getInitialBackoffDuration()} */
+        @Deprecated
         public Time getInitialBackoff() {
+            return Time.fromDuration(getInitialBackoffDuration());
+        }
+
+        public Duration getInitialBackoffDuration() {
             return initialBackoff;
         }
 
+        /** @deprecated Use {@link #getMaxBackoffDuration()} */
+        @Deprecated
         public Time getMaxBackoff() {
+            return Time.fromDuration(maxBackoff);
+        }
+
+        public Duration getMaxBackoffDuration() {
             return maxBackoff;
         }
 
@@ -261,7 +355,13 @@ public class RestartStrategies {
             return backoffMultiplier;
         }
 
+        /** @deprecated Use {@link #getResetBackoffDurationThreshold()} */
+        @Deprecated
         public Time getResetBackoffThreshold() {
+            return Time.fromDuration(resetBackoffThreshold);
+        }
+
+        public Duration getResetBackoffDurationThreshold() {
             return resetBackoffThreshold;
         }
 
@@ -315,11 +415,25 @@ public class RestartStrategies {
         private static final long serialVersionUID = 1195028697539661739L;
         private final int maxFailureRate;
 
-        private final Time failureInterval;
-        private final Time delayBetweenAttemptsInterval;
+        private final Duration failureInterval;
+        private final Duration delayBetweenAttemptsInterval;
 
+        /**
+         * @deprecated Use {@link 
#FailureRateRestartStrategyConfiguration(int, Duration, Duration)}
+         */
+        @Deprecated
         public FailureRateRestartStrategyConfiguration(
                 int maxFailureRate, Time failureInterval, Time 
delayBetweenAttemptsInterval) {
+            this(
+                    maxFailureRate,
+                    Time.toDuration(failureInterval),
+                    Time.toDuration(delayBetweenAttemptsInterval));
+        }
+
+        public FailureRateRestartStrategyConfiguration(
+                int maxFailureRate,
+                Duration failureInterval,
+                Duration delayBetweenAttemptsInterval) {
             this.maxFailureRate = maxFailureRate;
             this.failureInterval = failureInterval;
             this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
@@ -329,11 +443,23 @@ public class RestartStrategies {
             return maxFailureRate;
         }
 
+        /** @deprecated Use {@link #getFailureIntervalDuration()} */
+        @Deprecated
         public Time getFailureInterval() {
+            return Time.fromDuration(getFailureIntervalDuration());
+        }
+
+        public Duration getFailureIntervalDuration() {
             return failureInterval;
         }
 
+        /** @deprecated Use {@link #getDurationBetweenAttempts()} */
+        @Deprecated
         public Time getDelayBetweenAttemptsInterval() {
+            return Time.fromDuration(getDurationBetweenAttempts());
+        }
+
+        public Duration getDurationBetweenAttempts() {
             return delayBetweenAttemptsInterval;
         }
 
@@ -448,10 +574,10 @@ public class RestartStrategies {
                                 RestartStrategyOptions
                                         
.RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR);
                 return exponentialDelayRestart(
-                        Time.milliseconds(initialBackoff.toMillis()),
-                        Time.milliseconds(maxBackoff.toMillis()),
+                        initialBackoff,
+                        maxBackoff,
                         backoffMultiplier,
-                        Time.milliseconds(resetBackoffThreshold.toMillis()),
+                        resetBackoffThreshold,
                         jitter);
             case "failurerate":
             case "failure-rate":
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index 9e443c42453..b062dd29f3e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -27,6 +27,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.EnumMap;
 
 import static 
org.apache.flink.api.common.state.StateTtlConfig.CleanupStrategies.EMPTY_STRATEGY;
@@ -53,7 +54,7 @@ public class StateTtlConfig implements Serializable {
     private static final long serialVersionUID = -7592693245044289793L;
 
     public static final StateTtlConfig DISABLED =
-            newBuilder(Time.milliseconds(Long.MAX_VALUE))
+            newBuilder(Duration.ofMillis(Long.MAX_VALUE))
                     .setUpdateType(UpdateType.Disabled)
                     .build();
 
@@ -92,21 +93,21 @@ public class StateTtlConfig implements Serializable {
     private final UpdateType updateType;
     private final StateVisibility stateVisibility;
     private final TtlTimeCharacteristic ttlTimeCharacteristic;
-    private final Time ttl;
+    private final Duration ttl;
     private final CleanupStrategies cleanupStrategies;
 
     private StateTtlConfig(
             UpdateType updateType,
             StateVisibility stateVisibility,
             TtlTimeCharacteristic ttlTimeCharacteristic,
-            Time ttl,
+            Duration ttl,
             CleanupStrategies cleanupStrategies) {
         this.updateType = checkNotNull(updateType);
         this.stateVisibility = checkNotNull(stateVisibility);
         this.ttlTimeCharacteristic = checkNotNull(ttlTimeCharacteristic);
         this.ttl = checkNotNull(ttl);
         this.cleanupStrategies = cleanupStrategies;
-        checkArgument(ttl.toMilliseconds() > 0, "TTL is expected to be 
positive.");
+        checkArgument(ttl.toMillis() > 0, "TTL is expected to be positive.");
     }
 
     @Nonnull
@@ -119,8 +120,14 @@ public class StateTtlConfig implements Serializable {
         return stateVisibility;
     }
 
+    /** @deprecated Use {@link #getTimeToLive()} */
+    @Deprecated
     @Nonnull
     public Time getTtl() {
+        return Time.fromDuration(getTimeToLive());
+    }
+
+    public Duration getTimeToLive() {
         return ttl;
     }
 
@@ -152,23 +159,35 @@ public class StateTtlConfig implements Serializable {
                 + '}';
     }
 
+    /** @deprecated Use {@link #newBuilder(Duration)} */
+    @Deprecated
     @Nonnull
     public static Builder newBuilder(@Nonnull Time ttl) {
         return new Builder(ttl);
     }
 
+    public static Builder newBuilder(Duration ttl) {
+        return new Builder(ttl);
+    }
+
     /** Builder for the {@link StateTtlConfig}. */
     public static class Builder {
 
         private UpdateType updateType = OnCreateAndWrite;
         private StateVisibility stateVisibility = NeverReturnExpired;
         private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;
-        private Time ttl;
+        private Duration ttl;
         private boolean isCleanupInBackground = true;
         private final EnumMap<CleanupStrategies.Strategies, 
CleanupStrategies.CleanupStrategy>
                 strategies = new EnumMap<>(CleanupStrategies.Strategies.class);
 
+        /** @deprecated Use {@link #newBuilder(Duration)} */
+        @Deprecated
         public Builder(@Nonnull Time ttl) {
+            this(Time.toDuration(ttl));
+        }
+
+        private Builder(Duration ttl) {
             this.ttl = ttl;
         }
 
@@ -343,10 +362,16 @@ public class StateTtlConfig implements Serializable {
          * Sets the ttl time.
          *
          * @param ttl The ttl time.
+         * @deprecated Use {@link #setTimeToLive(Duration)}
          */
+        @Deprecated
         @Nonnull
         public Builder setTtl(@Nonnull Time ttl) {
-            this.ttl = ttl;
+            return setTimeToLive(Time.toDuration(ttl));
+        }
+
+        public Builder setTimeToLive(Duration ttl) {
+            this.ttl = Preconditions.checkNotNull(ttl);
             return this;
         }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java 
b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java
index 803478aec91..2018ef4ecda 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.common.time;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.Objects;
@@ -32,12 +34,20 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>Note: This class will fully replace 
org.apache.flink.streaming.api.windowing.time.Time in
  * Flink 2.0
+ *
+ * @deprecated Use {@link Duration}
  */
+@Deprecated
 @PublicEvolving
 public final class Time implements Serializable {
 
     private static final long serialVersionUID = -350254188460915999L;
 
+    @Nullable
+    public static Duration toDuration(@Nullable Time time) {
+        return time != null ? time.toDuration() : null;
+    }
+
     /** The time unit for this policy's time interval. */
     private final TimeUnit unit;
 
@@ -72,6 +82,10 @@ public final class Time implements Serializable {
         return size;
     }
 
+    public Duration toDuration() {
+        return Duration.ofMillis(this.toMilliseconds());
+    }
+
     /**
      * Converts the time interval to milliseconds.
      *
diff --git a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
index f4b31cff35f..80d38c78078 100644
--- a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
@@ -235,7 +235,9 @@ public class TimeUtils {
      *
      * @param time time to transform into duration
      * @return duration equal to the given time
+     * @deprecated Use {@link Duration} APIs
      */
+    @Deprecated
     public static Duration toDuration(Time time) {
         return Duration.of(time.getSize(), toChronoUnit(time.getUnit()));
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
index 0d63962e6a9..6d38e472919 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
@@ -91,7 +91,7 @@ public final class RestartBackoffTimeStrategyFactoryLoader {
                     new FixedDelayRestartBackoffTimeStrategy
                             .FixedDelayRestartBackoffTimeStrategyFactory(
                             fixedDelayConfig.getRestartAttempts(),
-                            
fixedDelayConfig.getDelayBetweenAttemptsInterval().toMilliseconds()));
+                            
fixedDelayConfig.getDurationBetweenAttempts().toMillis()));
         } else if (restartStrategyConfiguration
                 instanceof FailureRateRestartStrategyConfiguration) {
             final FailureRateRestartStrategyConfiguration failureRateConfig =
@@ -101,8 +101,8 @@ public final class RestartBackoffTimeStrategyFactoryLoader {
                     new FailureRateRestartBackoffTimeStrategy
                             .FailureRateRestartBackoffTimeStrategyFactory(
                             failureRateConfig.getMaxFailureRate(),
-                            
failureRateConfig.getFailureInterval().toMilliseconds(),
-                            
failureRateConfig.getDelayBetweenAttemptsInterval().toMilliseconds()));
+                            
failureRateConfig.getFailureIntervalDuration().toMillis(),
+                            
failureRateConfig.getDurationBetweenAttempts().toMillis()));
         } else if (restartStrategyConfiguration instanceof 
FallbackRestartStrategyConfiguration) {
             return Optional.empty();
         } else if (restartStrategyConfiguration
@@ -112,10 +112,10 @@ public final class 
RestartBackoffTimeStrategyFactoryLoader {
             return Optional.of(
                     new ExponentialDelayRestartBackoffTimeStrategy
                             .ExponentialDelayRestartBackoffTimeStrategyFactory(
-                            
exponentialDelayConfig.getInitialBackoff().toMilliseconds(),
-                            
exponentialDelayConfig.getMaxBackoff().toMilliseconds(),
+                            
exponentialDelayConfig.getInitialBackoffDuration().toMillis(),
+                            
exponentialDelayConfig.getMaxBackoffDuration().toMillis(),
                             exponentialDelayConfig.getBackoffMultiplier(),
-                            
exponentialDelayConfig.getResetBackoffThreshold().toMilliseconds(),
+                            
exponentialDelayConfig.getResetBackoffDurationThreshold().toMillis(),
                             exponentialDelayConfig.getJitterFactor(),
                             
RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS
                                     .defaultValue()));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoaderTest.java
index de503d35908..971cf54be18 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoaderTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.RestartStrategyOptions;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -72,10 +74,10 @@ class RestartBackoffTimeStrategyFactoryLoaderTest {
         final RestartBackoffTimeStrategy.Factory factory =
                 
RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
                         RestartStrategies.exponentialDelayRestart(
-                                Time.milliseconds(1),
-                                Time.milliseconds(1000),
+                                Duration.ofMillis(1),
+                                Duration.ofMillis(1000),
                                 1.1,
-                                Time.milliseconds(2000),
+                                Duration.ofMillis(2000),
                                 0),
                         conf,
                         false);
@@ -94,7 +96,7 @@ class RestartBackoffTimeStrategyFactoryLoaderTest {
         final RestartBackoffTimeStrategy.Factory factory =
                 
RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
                         RestartStrategies.failureRateRestart(
-                                1, Time.milliseconds(1000), 
Time.milliseconds(1000)),
+                                1, Duration.ofMillis(1000), 
Duration.ofMillis(1000)),
                         conf,
                         false);
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 711dd59d423..2f92ab013d0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -35,7 +35,6 @@ import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -125,6 +124,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -860,7 +860,8 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
      * @param numberOfExecutionRetries The number of times the system will try 
to re-execute failed
      *     tasks.
      * @deprecated This method will be replaced by {@link 
#setRestartStrategy}. The {@link
-     *     RestartStrategies#fixedDelayRestart(int, Time)} contains the number 
of execution retries.
+     *     RestartStrategies#fixedDelayRestart(int, Duration)} contains the 
number of execution
+     *     retries.
      */
     @Deprecated
     @PublicEvolving
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index 16d86f71735..e27065ed04b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -113,7 +113,7 @@ public class RestartStrategyTest extends TestLogger {
         Assert.assertEquals(
                 1337,
                 ((RestartStrategies.FixedDelayRestartStrategyConfiguration) 
restartStrategy)
-                        .getDelayBetweenAttemptsInterval()
-                        .toMilliseconds());
+                        .getDurationBetweenAttempts()
+                        .toMillis());
     }
 }

Reply via email to