This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fecdc59  [FLINK-22997] Replace AkkaUtils#get*Timeout*
fecdc59 is described below

commit fecdc59b8603bc70ed1af33b206c60c56f5751fb
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Wed Jun 23 22:13:10 2021 +0200

    [FLINK-22997] Replace AkkaUtils#get*Timeout*
---
 .../shortcodes/generated/akka_configuration.html   |  8 ++---
 .../apache/flink/client/program/ClientTest.java    |  3 +-
 .../org/apache/flink/api/common/time/Time.java     |  8 +++++
 .../apache/flink/configuration/AkkaOptions.java    | 34 ++++++++++++++++++----
 .../DefaultSlotPoolServiceSchedulerFactory.java    |  5 ++--
 .../runtime/jobmaster/JobMasterConfiguration.java  |  5 ++--
 .../minicluster/MiniClusterConfiguration.java      |  4 +--
 .../StandaloneResourceManagerFactory.java          |  4 +--
 .../active/ActiveResourceManager.java              |  6 ++--
 .../slotmanager/SlotManagerConfiguration.java      | 14 ++-------
 .../rpc/akka/AkkaRpcServiceConfiguration.java      |  3 +-
 .../taskexecutor/TaskManagerConfiguration.java     | 12 ++------
 .../runtime/taskexecutor/TaskManagerRunner.java    |  6 ++--
 .../TaskManagerServicesConfiguration.java          |  5 ++--
 .../runtime/webmonitor/WebMonitorEndpoint.java     | 10 ++-----
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  | 27 +----------------
 .../executiongraph/ExecutionGraphTestUtils.java    |  1 -
 .../TestingDefaultExecutionGraphBuilder.java       | 10 ++-----
 .../PartialConsumePipelinedResultTest.java         |  2 +-
 .../SlotCountExceedingParallelismTest.java         |  2 +-
 .../jobmaster/slotpool/TestingSlotPoolImpl.java    |  6 ++--
 .../TestingResourceManagerFactory.java             |  4 +--
 .../flink/runtime/rpc/RpcConnectionTest.java       |  6 ++--
 .../apache/flink/runtime/rpc/RpcSSLAuthITCase.java |  3 +-
 .../TaskManagerRunnerConfigurationTest.java        |  3 +-
 .../MiniClusterResourceConfiguration.java          |  5 ++--
 .../flink/runtime/testutils/TestingUtils.java      |  2 +-
 .../runtime/testutils/ZooKeeperTestUtils.java      |  4 ++-
 .../apache/flink/runtime/akka/AkkaUtilsTest.scala  |  4 ++-
 .../util/MiniClusterResourceConfiguration.java     |  5 ++--
 .../test/accumulators/AccumulatorLiveITCase.java   |  2 +-
 .../flink/test/cancelling/CancelingTestBase.java   |  5 ++--
 .../EventTimeAllWindowCheckpointingITCase.java     |  6 ++--
 .../checkpointing/UnalignedCheckpointTestBase.java |  2 +-
 .../jar/StreamingCustomInputSplitProgram.java      |  3 +-
 ...tractTaskManagerProcessFailureRecoveryTest.java |  2 +-
 .../recovery/ProcessFailureCancelingITCase.java    |  2 +-
 .../test/runtime/ShuffleCompressionITCase.java     |  5 ++--
 .../apache/flink/yarn/YARNApplicationITCase.java   |  2 +-
 .../flink/yarn/YARNFileReplicationITCase.java      |  2 +-
 .../java/org/apache/flink/yarn/YARNITCase.java     |  2 +-
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java |  5 ++--
 42 files changed, 121 insertions(+), 128 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/akka_configuration.html 
b/docs/layouts/shortcodes/generated/akka_configuration.html
index 8b9cd9b..fc64414 100644
--- a/docs/layouts/shortcodes/generated/akka_configuration.html
+++ b/docs/layouts/shortcodes/generated/akka_configuration.html
@@ -16,8 +16,8 @@
         </tr>
         <tr>
             <td><h5>akka.ask.timeout</h5></td>
-            <td style="word-wrap: break-word;">"10 s"</td>
-            <td>String</td>
+            <td style="word-wrap: break-word;">10 s</td>
+            <td>Duration</td>
             <td>Timeout used for all futures and blocking Akka calls. If Flink 
fails due to timeouts then you should try to increase this value. Timeouts can 
be caused by slow machines or a congested network. The timeout value requires a 
time-unit specifier (ms/s/min/h/d).</td>
         </tr>
         <tr>
@@ -76,8 +76,8 @@
         </tr>
         <tr>
             <td><h5>akka.lookup.timeout</h5></td>
-            <td style="word-wrap: break-word;">"10 s"</td>
-            <td>String</td>
+            <td style="word-wrap: break-word;">10 s</td>
+            <td>Duration</td>
             <td>Timeout used for the lookup of the JobManager. The timeout 
value has to contain a time-unit specifier (ms/s/min/h/d).</td>
         </tr>
         <tr>
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 4e5b80b..19d8e22 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -106,7 +106,8 @@ public class ClientTest extends TestLogger {
         config = new Configuration();
         config.setString(JobManagerOptions.ADDRESS, "localhost");
         config.setInteger(JobManagerOptions.PORT, freePort);
-        config.setString(AkkaOptions.ASK_TIMEOUT, 
AkkaOptions.ASK_TIMEOUT.defaultValue());
+        config.set(
+                AkkaOptions.ASK_TIMEOUT_DURATION, 
AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
     }
 
     private Configuration fromPackagedProgram(
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java 
b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java
index 1a5c413..3e34123 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.time;
 import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -141,4 +142,11 @@ public final class Time implements Serializable {
     public static Time days(long days) {
         return of(days, TimeUnit.DAYS);
     }
+
+    /**
+     * Creates a new {@link Time} that represents the number of milliseconds 
in the given duration.
+     */
+    public static Time fromDuration(Duration duration) {
+        return milliseconds(duration.toMillis());
+    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 0627327..abd4e73 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -20,6 +20,9 @@ package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.description.Description;
+import org.apache.flink.util.TimeUtils;
+
+import java.time.Duration;
 
 import static org.apache.flink.configuration.description.LinkElement.link;
 
@@ -39,14 +42,24 @@ public class AkkaOptions {
                                     + "memory footprint.");
 
     /** Timeout for akka ask calls. */
-    public static final ConfigOption<String> ASK_TIMEOUT =
+    public static final ConfigOption<Duration> ASK_TIMEOUT_DURATION =
             ConfigOptions.key("akka.ask.timeout")
-                    .stringType()
-                    .defaultValue("10 s")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "Timeout used for all futures and blocking Akka 
calls. If Flink fails due to timeouts then you"
                                     + " should try to increase this value. 
Timeouts can be caused by slow machines or a congested network. The"
                                     + " timeout value requires a time-unit 
specifier (ms/s/min/h/d).");
+
+    /** @deprecated Use {@link #ASK_TIMEOUT_DURATION} */
+    @Deprecated
+    public static final ConfigOption<String> ASK_TIMEOUT =
+            ConfigOptions.key(ASK_TIMEOUT_DURATION.key())
+                    .stringType()
+                    .defaultValue(
+                            
TimeUtils.formatWithHighestUnit(ASK_TIMEOUT_DURATION.defaultValue()))
+                    .withDescription(ASK_TIMEOUT_DURATION.description());
+
     /** The Akka tcp connection timeout. */
     public static final ConfigOption<String> TCP_TIMEOUT =
             ConfigOptions.key("akka.tcp.timeout")
@@ -101,14 +114,23 @@ public class AkkaOptions {
                             "Turns on the Akka’s remote logging of events. Set 
this value to 'true' in case of debugging.");
 
     /** Timeout for all blocking calls that look up remote actors. */
-    public static final ConfigOption<String> LOOKUP_TIMEOUT =
+    public static final ConfigOption<Duration> LOOKUP_TIMEOUT_DURATION =
             ConfigOptions.key("akka.lookup.timeout")
-                    .stringType()
-                    .defaultValue("10 s")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "Timeout used for the lookup of the JobManager. 
The timeout value has to contain a time-unit"
                                     + " specifier (ms/s/min/h/d).");
 
+    /** @deprecated use {@link #LOOKUP_TIMEOUT_DURATION} */
+    @Deprecated
+    public static final ConfigOption<String> LOOKUP_TIMEOUT =
+            ConfigOptions.key(LOOKUP_TIMEOUT_DURATION.key())
+                    .stringType()
+                    .defaultValue(
+                            
TimeUtils.formatWithHighestUnit(LOOKUP_TIMEOUT_DURATION.defaultValue()))
+                    .withDescription(LOOKUP_TIMEOUT_DURATION.description());
+
     /**
      * Timeout for all blocking calls on the client side.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 1c77aee..9612698 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -137,7 +137,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory
     public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(
             Configuration configuration, JobType jobType) {
 
-        final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
+        final Time rpcTimeout =
+                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
         final Time slotIdleTimeout =
                 
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
         final Time batchSlotTimeout =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
index 16ba2dc..009318c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.util.Preconditions;
 
@@ -74,7 +74,8 @@ public class JobMasterConfiguration {
 
     public static JobMasterConfiguration fromConfiguration(Configuration 
configuration) {
 
-        final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
+        final Time rpcTimeout =
+                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 
         final Time slotRequestTimeout =
                 
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 8bbe68a4..4e81d0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -115,7 +115,7 @@ public class MiniClusterConfiguration {
     }
 
     public Time getRpcTimeout() {
-        return AkkaUtils.getTimeoutAsTime(configuration);
+        return 
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
     }
 
     public UnmodifiableConfiguration getConfiguration() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index 4677aca..7119226 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -83,7 +83,7 @@ public final class StandaloneResourceManagerFactory extends 
ResourceManagerFacto
                 fatalErrorHandler,
                 resourceManagerMetricGroup,
                 standaloneClusterStartupPeriodTime,
-                AkkaUtils.getTimeoutAsTime(configuration),
+                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)),
                 ioExecutor);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
index 12304f6..79e9b85 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.resourcemanager.active;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -128,7 +128,9 @@ public class ActiveResourceManager<WorkerType extends 
ResourceIDRetrievable>
                 clusterInformation,
                 fatalErrorHandler,
                 resourceManagerMetricGroup,
-                
AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)),
+                Time.fromDuration(
+                        Preconditions.checkNotNull(flinkConfig)
+                                .get(AkkaOptions.ASK_TIMEOUT_DURATION)),
                 ioExecutor);
 
         this.flinkConfig = flinkConfig;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index ec05f92..5703a5a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
@@ -129,17 +128,8 @@ public class SlotManagerConfiguration {
             Configuration configuration, WorkerResourceSpec 
defaultWorkerResourceSpec)
             throws ConfigurationException {
 
-        final Time rpcTimeout;
-        try {
-            rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
-        } catch (IllegalArgumentException e) {
-            throw new ConfigurationException(
-                    "Could not parse the resource manager's timeout "
-                            + "value "
-                            + AkkaOptions.ASK_TIMEOUT
-                            + '.',
-                    e);
-        }
+        final Time rpcTimeout =
+                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 
         final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
         final Time taskManagerTimeout =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
index 5a3dad6..a6cabbc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rpc.akka;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 
 import javax.annotation.Nonnull;
 
@@ -69,7 +68,7 @@ public class AkkaRpcServiceConfiguration {
     }
 
     public static AkkaRpcServiceConfiguration fromConfiguration(Configuration 
configuration) {
-        final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
+        final Time timeout = 
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 
         final long maximumFramesize = 
AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 48a0bb2..bae9b60 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -186,15 +185,8 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
 
         final String[] tmpDirPaths = 
ConfigurationUtils.parseTempDirectories(configuration);
 
-        final Time rpcTimeout;
-        try {
-            rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
-        } catch (Exception e) {
-            throw new IllegalArgumentException(
-                    "Invalid format for '"
-                            + AkkaOptions.ASK_TIMEOUT.key()
-                            + "'. Use formats like '50 s' or '1 min' to 
specify the timeout.");
-        }
+        final Time rpcTimeout =
+                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 
         LOG.debug("Messages have a max timeout of " + rpcTimeout);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 261009b..49a45e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JMXServerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -30,7 +31,6 @@ import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.core.security.FlinkSecurityManager;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -132,7 +132,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
             throws Exception {
         this.configuration = checkNotNull(configuration);
 
-        timeout = AkkaUtils.getTimeoutAsTime(configuration);
+        timeout = 
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 
         this.executor =
                 java.util.concurrent.Executors.newScheduledThreadPool(
@@ -580,7 +580,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
             final Configuration configuration, final HighAvailabilityServices 
haServices)
             throws LeaderRetrievalException {
 
-        final Duration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
+        final Duration lookupTimeout = 
configuration.get(AkkaOptions.LOOKUP_TIMEOUT_DURATION);
 
         final InetAddress taskManagerAddress =
                 LeaderRetrievalUtils.findConnectingAddress(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index fc90895..6596358 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
@@ -26,7 +27,6 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
@@ -260,7 +260,8 @@ public class TaskManagerServicesConfiguration {
         final QueryableStateConfiguration queryableStateConfig =
                 QueryableStateConfiguration.fromConfiguration(configuration);
 
-        long timerServiceShutdownTimeout = 
AkkaUtils.getTimeout(configuration).toMillis();
+        long timerServiceShutdownTimeout =
+                configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis();
 
         final RetryingRegistrationConfiguration 
retryingRegistrationConfiguration =
                 
RetryingRegistrationConfiguration.fromConfiguration(configuration);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5704bcb..d5bec8c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -20,11 +20,10 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -230,12 +229,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
 
     private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> 
initializeThreadInfoTracker(
             ScheduledExecutorService executor) {
-        final Duration akkaTimeout;
-        try {
-            akkaTimeout = AkkaUtils.getTimeout(clusterConfiguration);
-        } catch (NumberFormatException e) {
-            throw new 
IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage());
-        }
+        final Duration akkaTimeout = 
clusterConfiguration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
 
         final Duration flameGraphCleanUpInterval =
                 
clusterConfiguration.get(RestOptions.FLAMEGRAPH_CLEANUP_INTERVAL);
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
old mode 100755
new mode 100644
index b0ab135..a233122
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -408,7 +408,7 @@ object AkkaUtils {
 
     val normalizedExternalHostname = 
NetUtils.unresolvedHostToNormalizedString(externalHostname)
 
-    val akkaAskTimeout = getTimeout(configuration)
+    val akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)
 
     val startupTimeout = TimeUtils.getStringInMillis(
       TimeUtils.parseDuration(
@@ -738,31 +738,6 @@ object AkkaUtils {
     }
   }
 
-  def getTimeout(config: Configuration): time.Duration = {
-    TimeUtils.parseDuration(config.getString(AkkaOptions.ASK_TIMEOUT))
-  }
-
-  def getTimeoutAsTime(config: Configuration): Time = {
-    try {
-      val duration = getTimeout(config)
-
-      Time.milliseconds(duration.toMillis)
-    } catch {
-      case _: NumberFormatException =>
-        throw new 
IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage)
-    }
-  }
-
-  def getDefaultTimeout: Time = {
-    val duration = 
TimeUtils.parseDuration(AkkaOptions.ASK_TIMEOUT.defaultValue())
-
-    Time.milliseconds(duration.toMillis)
-  }
-
-  def getLookupTimeout(config: Configuration): time.Duration = {
-    TimeUtils.parseDuration(config.getString(AkkaOptions.LOOKUP_TIMEOUT))
-  }
-
   /** Returns the address of the given [[ActorSystem]]. The [[Address]] object 
contains
     * the port and the host under which the actor system is reachable
     *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 352a9fb..36e0275 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -336,7 +336,6 @@ public class ExecutionGraphTestUtils {
                         
.setJobGraph(JobGraphTestUtils.streamingJobGraph(vertices))
                         .setFutureExecutor(executor)
                         .setIoExecutor(executor)
-                        .setAllocationTimeout(timeout)
                         .setRpcTimeout(timeout)
                         .build();
         
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 99f566b..5e291b7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -62,10 +62,9 @@ public class TestingDefaultExecutionGraphBuilder {
 
     private ScheduledExecutorService futureExecutor = 
TestingUtils.defaultExecutor();
     private Executor ioExecutor = TestingUtils.defaultExecutor();
-    private Time rpcTimeout = AkkaUtils.getDefaultTimeout();
+    private Time rpcTimeout = 
Time.fromDuration(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
     private ClassLoader userClassLoader = 
DefaultExecutionGraph.class.getClassLoader();
     private BlobWriter blobWriter = VoidBlobWriter.getInstance();
-    private Time allocationTimeout = AkkaUtils.getDefaultTimeout();
     private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
     private JobMasterPartitionTracker partitionTracker = 
NoOpJobMasterPartitionTracker.INSTANCE;
     private Configuration jobMasterConfig = new Configuration();
@@ -117,11 +116,6 @@ public class TestingDefaultExecutionGraphBuilder {
         return this;
     }
 
-    public TestingDefaultExecutionGraphBuilder setAllocationTimeout(Time 
allocationTimeout) {
-        this.allocationTimeout = allocationTimeout;
-        return this;
-    }
-
     public TestingDefaultExecutionGraphBuilder 
setShuffleMaster(ShuffleMaster<?> shuffleMaster) {
         this.shuffleMaster = shuffleMaster;
         return this;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 3f458ff..6d1c2a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -63,7 +63,7 @@ public class PartialConsumePipelinedResultTest extends 
TestLogger {
 
     private static Configuration getFlinkConfiguration() {
         final Configuration config = new Configuration();
-        config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
+        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
         config.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 
NUMBER_OF_NETWORK_BUFFERS);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 47de001..6e2aa0a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -68,7 +68,7 @@ public class SlotCountExceedingParallelismTest extends 
TestLogger {
 
     private static Configuration getFlinkConfiguration() {
         final Configuration config = new Configuration();
-        config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
+        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
 
         return config;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
index 599c576..87bb9b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
@@ -47,8 +47,8 @@ public class TestingSlotPoolImpl extends SlotPoolImpl {
         this(
                 jobId,
                 SystemClock.getInstance(),
-                AkkaUtils.getDefaultTimeout(),
-                AkkaUtils.getDefaultTimeout(),
+                
Time.fromDuration(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()),
+                
Time.fromDuration(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()),
                 
Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()));
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
index 377d102..be76ee6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -89,7 +89,7 @@ public class TestingResourceManagerFactory extends 
ResourceManagerFactory<Resour
                 clusterInformation,
                 fatalErrorHandler,
                 resourceManagerMetricGroup,
-                AkkaUtils.getTimeoutAsTime(configuration),
+                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)),
                 ioExecutor);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index 2ac9cbf..3969bf2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -33,6 +33,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Terminated;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -42,7 +43,8 @@ import java.util.concurrent.TimeoutException;
 import scala.Option;
 import scala.Tuple2;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * This test validates that the RPC service gives a good message when it 
cannot connect to an
@@ -63,7 +65,7 @@ public class RpcConnectionTest extends TestLogger {
             // we start the RPC service with a very long timeout to ensure 
that the test
             // can only pass if the connection problem is not recognized 
merely via a timeout
             Configuration configuration = new Configuration();
-            configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
+            configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(10000000));
             rpcService =
                     new AkkaRpcService(
                             actorSystem,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
index 3d7a032..87cd5d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.TestLogger;
 import akka.actor.ActorSystem;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -96,7 +97,7 @@ public class RpcSSLAuthITCase extends TestLogger {
             // we start the RPC service with a very long timeout to ensure 
that the test
             // can only pass if the connection problem is not recognized 
merely via a timeout
             Configuration configuration = new Configuration();
-            configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
+            configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(10000000));
             AkkaRpcServiceConfiguration akkaRpcServiceConfig =
                     
AkkaRpcServiceConfiguration.fromConfiguration(configuration);
             rpcService1 = new AkkaRpcService(actorSystem1, 
akkaRpcServiceConfig);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
index 5326135..0ffafb6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
@@ -49,6 +49,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.ServerSocket;
 import java.net.URI;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.containsString;
@@ -221,7 +222,7 @@ public class TaskManagerRunnerConfigurationTest extends 
TestLogger {
         final Configuration config = new Configuration();
         config.setString(TaskManagerOptions.HOST_BIND_POLICY, 
bindPolicy.toString());
         config.setString(JobManagerOptions.ADDRESS, "localhost");
-        config.setString(AkkaOptions.LOOKUP_TIMEOUT, "10 ms");
+        config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10));
         return new UnmodifiableConfiguration(config);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
index fefd657..7aec05c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -90,7 +90,8 @@ public class MiniClusterResourceConfiguration {
         private Configuration configuration = new Configuration();
         private int numberTaskManagers = 1;
         private int numberSlotsPerTaskManager = 1;
-        private Time shutdownTimeout = 
AkkaUtils.getTimeoutAsTime(configuration);
+        private Time shutdownTimeout =
+                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 
         private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
         private MiniCluster.HaServices haServices = 
MiniCluster.HaServices.CONFIGURED;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingUtils.java
index f097fec..f0b9b0d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingUtils.java
@@ -30,7 +30,7 @@ public class TestingUtils {
 
     public static final Duration TESTING_DURATION = Duration.ofMinutes(2L);
     public static final Time TIMEOUT = Time.minutes(1L);
-    public static final String DEFAULT_AKKA_ASK_TIMEOUT = "200 s";
+    public static final Duration DEFAULT_AKKA_ASK_TIMEOUT = 
Duration.ofSeconds(200);
 
     private static ScheduledExecutorService sharedExecutorInstance;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index a0bd411..793c1f0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** ZooKeeper test utilities. */
@@ -86,7 +88,7 @@ public class ZooKeeperTestUtils {
                 CheckpointingOptions.CHECKPOINTS_DIRECTORY, fsStateHandlePath 
+ "/checkpoints");
         config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
fsStateHandlePath + "/recovery");
 
-        config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
 
         return config;
     }
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 5ced3bc..891fc89 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -31,6 +31,8 @@ import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
+import java.time.Duration
+
 @RunWith(classOf[JUnitRunner])
 class AkkaUtilsTest
   extends FunSuite
@@ -201,7 +203,7 @@ class AkkaUtilsTest
 
   test("getAkkaConfig should set startup timeout to be 10 times of ask timeout 
by default") {
     val configuration = new Configuration()
-    configuration.setString(AkkaOptions.ASK_TIMEOUT.key(), "100ms")
+    configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMillis(100))
 
     val akkaConfig = AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 
31337)))
 
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
index a3ab193..0ce715d 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -19,8 +19,8 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 
@@ -56,7 +56,8 @@ public class MiniClusterResourceConfiguration
         private Configuration configuration = new Configuration();
         private int numberTaskManagers = 1;
         private int numberSlotsPerTaskManager = 1;
-        private Time shutdownTimeout = 
AkkaUtils.getTimeoutAsTime(configuration);
+        private Time shutdownTimeout =
+                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 
         private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index d924745..3d134a0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -95,7 +95,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 
     private static Configuration getConfiguration() {
         Configuration config = new Configuration();
-        config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
+        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
         config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 
HEARTBEAT_INTERVAL);
 
         return config;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 9538963..5702de2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -32,7 +32,6 @@ import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.testutils.TestingUtils;
@@ -84,7 +83,7 @@ public abstract class CancelingTestBase extends TestLogger {
         verifyJvmOptions();
         Configuration config = new Configuration();
         config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
-        config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
+        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
         config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
MemorySize.parse("4096"));
         config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 
2048);
 
@@ -98,7 +97,7 @@ public abstract class CancelingTestBase extends TestLogger {
         // submit job
         final JobGraph jobGraph = getJobGraph(plan);
 
-        final long rpcTimeout = 
AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();
+        final long rpcTimeout = 
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis();
 
         ClusterClient<?> client = CLUSTER.getClusterClient();
         JobID jobID = client.submitJob(jobGraph).get();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index f5c1299..aa57007 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -43,6 +43,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import java.time.Duration;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -69,8 +71,8 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
     private static Configuration getConfiguration() {
         Configuration config = new Configuration();
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("48m"));
-        config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
-        config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
+        config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1));
+        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
         return config;
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index cc6129a..8a3cf6b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -773,7 +773,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             conf.set(
                     
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, BUFFER_PER_CHANNEL);
             
conf.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 60000);
-            conf.setString(AkkaOptions.ASK_TIMEOUT, "1 min");
+            conf.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
             return conf;
         }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 26b6e51..db395ad 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -45,7 +46,7 @@ public class StreamingCustomInputSplitProgram {
     public static void main(String[] args) throws Exception {
         Configuration config = new Configuration();
 
-        config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
+        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(5));
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 3e76660..a8ad37c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -91,7 +91,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
         File coordinateTempDir = null;
 
         Configuration config = new Configuration();
-        config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
         config.setString(JobManagerOptions.ADDRESS, "localhost");
         config.setString(RestOptions.BIND_PORT, "0");
         config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 407fe7a..2debe62 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -98,7 +98,7 @@ public class ProcessFailureCancelingITCase extends TestLogger 
{
 
         Configuration config = new Configuration();
         config.setString(JobManagerOptions.ADDRESS, "localhost");
-        config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));
         config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
         config.setString(
                 HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index e6931df..c14344c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -44,6 +44,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
@@ -85,7 +86,7 @@ public class ShuffleCompressionITCase {
         Configuration configuration = new Configuration();
         configuration.setBoolean(
                 
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, true);
-        configuration.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
+        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(1));
 
         JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, 
ExecutionMode.BATCH);
         JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, 
NUM_SLOTS);
@@ -98,7 +99,7 @@ public class ShuffleCompressionITCase {
                 
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, true);
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
-        configuration.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
+        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(1));
 
         JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, 
ExecutionMode.BATCH);
         JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, 
NUM_SLOTS);
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
index be88df5..32073f2 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java
@@ -132,7 +132,7 @@ public class YARNApplicationITCase extends YarnTestBase {
         Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(768));
         configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
-        configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
+        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(30));
         configuration.set(DeploymentOptions.TARGET, 
YarnDeploymentTarget.APPLICATION.getName());
         configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);
         configuration.set(PipelineOptions.JARS, 
Collections.singletonList(userJar.toString()));
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
index 85cde8c..29a7635 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
@@ -149,7 +149,7 @@ public class YARNFileReplicationITCase extends YarnTestBase 
{
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(768));
         configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
-        configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
+        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(30));
         configuration.set(CLASSPATH_INCLUDE_USER_JAR, 
YarnConfigOptions.UserJarInclusion.DISABLED);
 
         return configuration;
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index effc7cd..c2347ac 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -227,7 +227,7 @@ public class YARNITCase extends YarnTestBase {
         Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(768));
         configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
-        configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
+        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofSeconds(30));
         configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);
 
         return configuration;
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 74c9f22..c99a68f 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -58,6 +58,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
+import java.time.Duration;
 import java.util.List;
 
 import static org.hamcrest.Matchers.is;
@@ -104,7 +105,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                             "-j",
                             "fake.jar",
                             "-D",
-                            AkkaOptions.ASK_TIMEOUT.key() + "=5 min",
+                            AkkaOptions.ASK_TIMEOUT_DURATION.key() + "=5 min",
                             "-D",
                             CoreOptions.FLINK_JVM_OPTIONS.key() + 
"=-DappName=foobar",
                             "-D",
@@ -112,7 +113,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                         });
 
         Configuration executorConfig = cli.toConfiguration(cmd);
-        assertEquals("5 min", executorConfig.get(AkkaOptions.ASK_TIMEOUT));
+        assertEquals(Duration.ofMinutes(5), 
executorConfig.get(AkkaOptions.ASK_TIMEOUT_DURATION));
         assertEquals("-DappName=foobar", 
executorConfig.get(CoreOptions.FLINK_JVM_OPTIONS));
         assertEquals("changeit", 
executorConfig.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD));
     }

Reply via email to