This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new fecdc59 [FLINK-22997] Replace AkkaUtils#get*Timeout* fecdc59 is described below commit fecdc59b8603bc70ed1af33b206c60c56f5751fb Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Wed Jun 23 22:13:10 2021 +0200 [FLINK-22997] Replace AkkaUtils#get*Timeout* --- .../shortcodes/generated/akka_configuration.html | 8 ++--- .../apache/flink/client/program/ClientTest.java | 3 +- .../org/apache/flink/api/common/time/Time.java | 8 +++++ .../apache/flink/configuration/AkkaOptions.java | 34 ++++++++++++++++++---- .../DefaultSlotPoolServiceSchedulerFactory.java | 5 ++-- .../runtime/jobmaster/JobMasterConfiguration.java | 5 ++-- .../minicluster/MiniClusterConfiguration.java | 4 +-- .../StandaloneResourceManagerFactory.java | 4 +-- .../active/ActiveResourceManager.java | 6 ++-- .../slotmanager/SlotManagerConfiguration.java | 14 ++------- .../rpc/akka/AkkaRpcServiceConfiguration.java | 3 +- .../taskexecutor/TaskManagerConfiguration.java | 12 ++------ .../runtime/taskexecutor/TaskManagerRunner.java | 6 ++-- .../TaskManagerServicesConfiguration.java | 5 ++-- .../runtime/webmonitor/WebMonitorEndpoint.java | 10 ++----- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 27 +---------------- .../executiongraph/ExecutionGraphTestUtils.java | 1 - .../TestingDefaultExecutionGraphBuilder.java | 10 ++----- .../PartialConsumePipelinedResultTest.java | 2 +- .../SlotCountExceedingParallelismTest.java | 2 +- .../jobmaster/slotpool/TestingSlotPoolImpl.java | 6 ++-- .../TestingResourceManagerFactory.java | 4 +-- .../flink/runtime/rpc/RpcConnectionTest.java | 6 ++-- .../apache/flink/runtime/rpc/RpcSSLAuthITCase.java | 3 +- .../TaskManagerRunnerConfigurationTest.java | 3 +- .../MiniClusterResourceConfiguration.java | 5 ++-- .../flink/runtime/testutils/TestingUtils.java | 2 +- .../runtime/testutils/ZooKeeperTestUtils.java | 4 ++- .../apache/flink/runtime/akka/AkkaUtilsTest.scala | 4 ++- .../util/MiniClusterResourceConfiguration.java | 5 ++-- .../test/accumulators/AccumulatorLiveITCase.java | 2 +- .../flink/test/cancelling/CancelingTestBase.java | 5 ++-- .../EventTimeAllWindowCheckpointingITCase.java | 6 ++-- .../checkpointing/UnalignedCheckpointTestBase.java | 2 +- .../jar/StreamingCustomInputSplitProgram.java | 3 +- ...tractTaskManagerProcessFailureRecoveryTest.java | 2 +- .../recovery/ProcessFailureCancelingITCase.java | 2 +- .../test/runtime/ShuffleCompressionITCase.java | 5 ++-- .../apache/flink/yarn/YARNApplicationITCase.java | 2 +- .../flink/yarn/YARNFileReplicationITCase.java | 2 +- .../java/org/apache/flink/yarn/YARNITCase.java | 2 +- .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 5 ++-- 42 files changed, 121 insertions(+), 128 deletions(-) diff --git a/docs/layouts/shortcodes/generated/akka_configuration.html b/docs/layouts/shortcodes/generated/akka_configuration.html index 8b9cd9b..fc64414 100644 --- a/docs/layouts/shortcodes/generated/akka_configuration.html +++ b/docs/layouts/shortcodes/generated/akka_configuration.html @@ -16,8 +16,8 @@ </tr> <tr> <td><h5>akka.ask.timeout</h5></td> - <td style="word-wrap: break-word;">"10 s"</td> - <td>String</td> + <td style="word-wrap: break-word;">10 s</td> + <td>Duration</td> <td>Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).</td> </tr> <tr> @@ -76,8 +76,8 @@ </tr> <tr> <td><h5>akka.lookup.timeout</h5></td> - <td style="word-wrap: break-word;">"10 s"</td> - <td>String</td> + <td style="word-wrap: break-word;">10 s</td> + <td>Duration</td> <td>Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d).</td> </tr> <tr> diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 4e5b80b..19d8e22 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -106,7 +106,8 @@ public class ClientTest extends TestLogger { config = new Configuration(); config.setString(JobManagerOptions.ADDRESS, "localhost"); config.setInteger(JobManagerOptions.PORT, freePort); - config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue()); + config.set( + AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()); } private Configuration fromPackagedProgram( diff --git a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java index 1a5c413..3e34123 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.time; import org.apache.flink.annotation.PublicEvolving; import java.io.Serializable; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -141,4 +142,11 @@ public final class Time implements Serializable { public static Time days(long days) { return of(days, TimeUnit.DAYS); } + + /** + * Creates a new {@link Time} that represents the number of milliseconds in the given duration. + */ + public static Time fromDuration(Duration duration) { + return milliseconds(duration.toMillis()); + } } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 0627327..abd4e73 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -20,6 +20,9 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.description.Description; +import org.apache.flink.util.TimeUtils; + +import java.time.Duration; import static org.apache.flink.configuration.description.LinkElement.link; @@ -39,14 +42,24 @@ public class AkkaOptions { + "memory footprint."); /** Timeout for akka ask calls. */ - public static final ConfigOption<String> ASK_TIMEOUT = + public static final ConfigOption<Duration> ASK_TIMEOUT_DURATION = ConfigOptions.key("akka.ask.timeout") - .stringType() - .defaultValue("10 s") + .durationType() + .defaultValue(Duration.ofSeconds(10)) .withDescription( "Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you" + " should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" + " timeout value requires a time-unit specifier (ms/s/min/h/d)."); + + /** @deprecated Use {@link #ASK_TIMEOUT_DURATION} */ + @Deprecated + public static final ConfigOption<String> ASK_TIMEOUT = + ConfigOptions.key(ASK_TIMEOUT_DURATION.key()) + .stringType() + .defaultValue( + TimeUtils.formatWithHighestUnit(ASK_TIMEOUT_DURATION.defaultValue())) + .withDescription(ASK_TIMEOUT_DURATION.description()); + /** The Akka tcp connection timeout. */ public static final ConfigOption<String> TCP_TIMEOUT = ConfigOptions.key("akka.tcp.timeout") @@ -101,14 +114,23 @@ public class AkkaOptions { "Turns on the Akka’s remote logging of events. Set this value to 'true' in case of debugging."); /** Timeout for all blocking calls that look up remote actors. */ - public static final ConfigOption<String> LOOKUP_TIMEOUT = + public static final ConfigOption<Duration> LOOKUP_TIMEOUT_DURATION = ConfigOptions.key("akka.lookup.timeout") - .stringType() - .defaultValue("10 s") + .durationType() + .defaultValue(Duration.ofSeconds(10)) .withDescription( "Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit" + " specifier (ms/s/min/h/d)."); + /** @deprecated use {@link #LOOKUP_TIMEOUT_DURATION} */ + @Deprecated + public static final ConfigOption<String> LOOKUP_TIMEOUT = + ConfigOptions.key(LOOKUP_TIMEOUT_DURATION.key()) + .stringType() + .defaultValue( + TimeUtils.formatWithHighestUnit(LOOKUP_TIMEOUT_DURATION.defaultValue())) + .withDescription(LOOKUP_TIMEOUT_DURATION.description()); + /** * Timeout for all blocking calls on the client side. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index 1c77aee..9612698 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -21,11 +21,11 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.SchedulerExecutionMode; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; @@ -137,7 +137,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration( Configuration configuration, JobType jobType) { - final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration); + final Time rpcTimeout = + Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT)); final Time batchSlotTimeout = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java index 16ba2dc..009318c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java @@ -19,10 +19,10 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; import org.apache.flink.util.Preconditions; @@ -74,7 +74,8 @@ public class JobMasterConfiguration { public static JobMasterConfiguration fromConfiguration(Configuration configuration) { - final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration); + final Time rpcTimeout = + Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); final Time slotRequestTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index 8bbe68a4..4e81d0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.minicluster; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils; import org.apache.flink.util.Preconditions; @@ -115,7 +115,7 @@ public class MiniClusterConfiguration { } public Time getRpcTimeout() { - return AkkaUtils.getTimeoutAsTime(configuration); + return Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); } public UnmodifiableConfiguration getConfiguration() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java index 4677aca..7119226 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java @@ -19,10 +19,10 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.ResourceManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -83,7 +83,7 @@ public final class StandaloneResourceManagerFactory extends ResourceManagerFacto fatalErrorHandler, resourceManagerMetricGroup, standaloneClusterStartupPeriodTime, - AkkaUtils.getTimeoutAsTime(configuration), + Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)), ioExecutor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java index 12304f6..79e9b85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java @@ -20,8 +20,8 @@ package org.apache.flink.runtime.resourcemanager.active; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; @@ -128,7 +128,9 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable> clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, - AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)), + Time.fromDuration( + Preconditions.checkNotNull(flinkConfig) + .get(AkkaOptions.ASK_TIMEOUT_DURATION)), ioExecutor); this.flinkConfig = flinkConfig; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java index ec05f92..5703a5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java @@ -27,7 +27,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.Preconditions; @@ -129,17 +128,8 @@ public class SlotManagerConfiguration { Configuration configuration, WorkerResourceSpec defaultWorkerResourceSpec) throws ConfigurationException { - final Time rpcTimeout; - try { - rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration); - } catch (IllegalArgumentException e) { - throw new ConfigurationException( - "Could not parse the resource manager's timeout " - + "value " - + AkkaOptions.ASK_TIMEOUT - + '.', - e); - } + final Time rpcTimeout = + Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); final Time slotRequestTimeout = getSlotRequestTimeout(configuration); final Time taskManagerTimeout = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java index 5a3dad6..a6cabbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.rpc.akka; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; import javax.annotation.Nonnull; @@ -69,7 +68,7 @@ public class AkkaRpcServiceConfiguration { } public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) { - final Time timeout = AkkaUtils.getTimeoutAsTime(configuration); + final Time timeout = Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); final long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index 48a0bb2..bae9b60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -186,15 +185,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration); - final Time rpcTimeout; - try { - rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration); - } catch (Exception e) { - throw new IllegalArgumentException( - "Invalid format for '" - + AkkaOptions.ASK_TIMEOUT.key() - + "'. Use formats like '50 s' or '1 min' to specify the timeout."); - } + final Time rpcTimeout = + Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); LOG.debug("Messages have a max timeout of " + rpcTimeout); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 261009b..49a45e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JMXServerOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -30,7 +31,6 @@ import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.core.security.FlinkSecurityManager; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -132,7 +132,7 @@ public class TaskManagerRunner implements FatalErrorHandler { throws Exception { this.configuration = checkNotNull(configuration); - timeout = AkkaUtils.getTimeoutAsTime(configuration); + timeout = Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); this.executor = java.util.concurrent.Executors.newScheduledThreadPool( @@ -580,7 +580,7 @@ public class TaskManagerRunner implements FatalErrorHandler { final Configuration configuration, final HighAvailabilityServices haServices) throws LeaderRetrievalException { - final Duration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); + final Duration lookupTimeout = configuration.get(AkkaOptions.LOOKUP_TIMEOUT_DURATION); final InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index fc90895..6596358 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; @@ -26,7 +27,6 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; @@ -260,7 +260,8 @@ public class TaskManagerServicesConfiguration { final QueryableStateConfiguration queryableStateConfig = QueryableStateConfiguration.fromConfiguration(configuration); - long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis(); + long timerServiceShutdownTimeout = + configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis(); final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 5704bcb..d5bec8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -20,11 +20,10 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.TransientBlobService; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.leaderelection.LeaderContender; @@ -230,12 +229,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> initializeThreadInfoTracker( ScheduledExecutorService executor) { - final Duration akkaTimeout; - try { - akkaTimeout = AkkaUtils.getTimeout(clusterConfiguration); - } catch (NumberFormatException e) { - throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage()); - } + final Duration akkaTimeout = clusterConfiguration.get(AkkaOptions.ASK_TIMEOUT_DURATION); final Duration flameGraphCleanUpInterval = clusterConfiguration.get(RestOptions.FLAMEGRAPH_CLEANUP_INTERVAL); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala old mode 100755 new mode 100644 index b0ab135..a233122 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -408,7 +408,7 @@ object AkkaUtils { val normalizedExternalHostname = NetUtils.unresolvedHostToNormalizedString(externalHostname) - val akkaAskTimeout = getTimeout(configuration) + val akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION) val startupTimeout = TimeUtils.getStringInMillis( TimeUtils.parseDuration( @@ -738,31 +738,6 @@ object AkkaUtils { } } - def getTimeout(config: Configuration): time.Duration = { - TimeUtils.parseDuration(config.getString(AkkaOptions.ASK_TIMEOUT)) - } - - def getTimeoutAsTime(config: Configuration): Time = { - try { - val duration = getTimeout(config) - - Time.milliseconds(duration.toMillis) - } catch { - case _: NumberFormatException => - throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage) - } - } - - def getDefaultTimeout: Time = { - val duration = TimeUtils.parseDuration(AkkaOptions.ASK_TIMEOUT.defaultValue()) - - Time.milliseconds(duration.toMillis) - } - - def getLookupTimeout(config: Configuration): time.Duration = { - TimeUtils.parseDuration(config.getString(AkkaOptions.LOOKUP_TIMEOUT)) - } - /** Returns the address of the given [[ActorSystem]]. The [[Address]] object contains * the port and the host under which the actor system is reachable * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 352a9fb..36e0275 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -336,7 +336,6 @@ public class ExecutionGraphTestUtils { .setJobGraph(JobGraphTestUtils.streamingJobGraph(vertices)) .setFutureExecutor(executor) .setIoExecutor(executor) - .setAllocationTimeout(timeout) .setRpcTimeout(timeout) .build(); executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index 99f566b..5e291b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -19,11 +19,11 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -62,10 +62,9 @@ public class TestingDefaultExecutionGraphBuilder { private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor(); private Executor ioExecutor = TestingUtils.defaultExecutor(); - private Time rpcTimeout = AkkaUtils.getDefaultTimeout(); + private Time rpcTimeout = Time.fromDuration(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()); private ClassLoader userClassLoader = DefaultExecutionGraph.class.getClassLoader(); private BlobWriter blobWriter = VoidBlobWriter.getInstance(); - private Time allocationTimeout = AkkaUtils.getDefaultTimeout(); private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE; private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE; private Configuration jobMasterConfig = new Configuration(); @@ -117,11 +116,6 @@ public class TestingDefaultExecutionGraphBuilder { return this; } - public TestingDefaultExecutionGraphBuilder setAllocationTimeout(Time allocationTimeout) { - this.allocationTimeout = allocationTimeout; - return this; - } - public TestingDefaultExecutionGraphBuilder setShuffleMaster(ShuffleMaster<?> shuffleMaster) { this.shuffleMaster = shuffleMaster; return this; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 3f458ff..6d1c2a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -63,7 +63,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger { private static Configuration getFlinkConfiguration() { final Configuration config = new Configuration(); - config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT); + config.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT); config.setInteger( NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index 47de001..6e2aa0a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -68,7 +68,7 @@ public class SlotCountExceedingParallelismTest extends TestLogger { private static Configuration getFlinkConfiguration() { final Configuration config = new Configuration(); - config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT); + config.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT); return config; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java index 599c576..87bb9b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java @@ -20,8 +20,8 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmaster.SlotRequestId; @@ -47,8 +47,8 @@ public class TestingSlotPoolImpl extends SlotPoolImpl { this( jobId, SystemClock.getInstance(), - AkkaUtils.getDefaultTimeout(), - AkkaUtils.getDefaultTimeout(), + Time.fromDuration(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()), + Time.fromDuration(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()), Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue())); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java index 377d102..be76ee6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterInformation; @@ -89,7 +89,7 @@ public class TestingResourceManagerFactory extends ResourceManagerFactory<Resour clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, - AkkaUtils.getTimeoutAsTime(configuration), + Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)), ioExecutor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java index 2ac9cbf..3969bf2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java @@ -33,6 +33,7 @@ import akka.actor.ActorSystem; import akka.actor.Terminated; import org.junit.Test; +import java.time.Duration; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -42,7 +43,8 @@ import java.util.concurrent.TimeoutException; import scala.Option; import scala.Tuple2; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * This test validates that the RPC service gives a good message when it cannot connect to an @@ -63,7 +65,7 @@ public class RpcConnectionTest extends TestLogger { // we start the RPC service with a very long timeout to ensure that the test // can only pass if the connection problem is not recognized merely via a timeout Configuration configuration = new Configuration(); - configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s"); + configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(10000000)); rpcService = new AkkaRpcService( actorSystem, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java index 3d7a032..87cd5d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java @@ -31,6 +31,7 @@ import org.apache.flink.util.TestLogger; import akka.actor.ActorSystem; import org.junit.Test; +import java.time.Duration; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -96,7 +97,7 @@ public class RpcSSLAuthITCase extends TestLogger { // we start the RPC service with a very long timeout to ensure that the test // can only pass if the connection problem is not recognized merely via a timeout Configuration configuration = new Configuration(); - configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s"); + configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(10000000)); AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration); rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java index 5326135..0ffafb6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java @@ -49,6 +49,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.URI; +import java.time.Duration; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsString; @@ -221,7 +222,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { final Configuration config = new Configuration(); config.setString(TaskManagerOptions.HOST_BIND_POLICY, bindPolicy.toString()); config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setString(AkkaOptions.LOOKUP_TIMEOUT, "10 ms"); + config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10)); return new UnmodifiableConfiguration(config); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java index fefd657..7aec05c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java @@ -19,10 +19,10 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.RpcServiceSharing; @@ -90,7 +90,8 @@ public class MiniClusterResourceConfiguration { private Configuration configuration = new Configuration(); private int numberTaskManagers = 1; private int numberSlotsPerTaskManager = 1; - private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration); + private Time shutdownTimeout = + Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED; private MiniCluster.HaServices haServices = MiniCluster.HaServices.CONFIGURED; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingUtils.java index f097fec..f0b9b0d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingUtils.java @@ -30,7 +30,7 @@ public class TestingUtils { public static final Duration TESTING_DURATION = Duration.ofMinutes(2L); public static final Time TIMEOUT = Time.minutes(1L); - public static final String DEFAULT_AKKA_ASK_TIMEOUT = "200 s"; + public static final Duration DEFAULT_AKKA_ASK_TIMEOUT = Duration.ofSeconds(200); private static ScheduledExecutorService sharedExecutorInstance; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index a0bd411..793c1f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; + import static org.apache.flink.util.Preconditions.checkNotNull; /** ZooKeeper test utilities. */ @@ -86,7 +88,7 @@ public class ZooKeeperTestUtils { CheckpointingOptions.CHECKPOINTS_DIRECTORY, fsStateHandlePath + "/checkpoints"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery"); - config.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); + config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100)); return config; } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index 5ced3bc..891fc89 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -31,6 +31,8 @@ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import java.time.Duration + @RunWith(classOf[JUnitRunner]) class AkkaUtilsTest extends FunSuite @@ -201,7 +203,7 @@ class AkkaUtilsTest test("getAkkaConfig should set startup timeout to be 10 times of ask timeout by default") { val configuration = new Configuration() - configuration.setString(AkkaOptions.ASK_TIMEOUT.key(), "100ms") + configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMillis(100)) val akkaConfig = AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337))) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java index a3ab193..0ce715d 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java @@ -19,8 +19,8 @@ package org.apache.flink.test.util; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.RpcServiceSharing; @@ -56,7 +56,8 @@ public class MiniClusterResourceConfiguration private Configuration configuration = new Configuration(); private int numberTaskManagers = 1; private int numberSlotsPerTaskManager = 1; - private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration); + private Time shutdownTimeout = + Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED; diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index d924745..3d134a0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -95,7 +95,7 @@ public class AccumulatorLiveITCase extends TestLogger { private static Configuration getConfiguration() { Configuration config = new Configuration(); - config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT); + config.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT); config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL); return config; diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index 9538963..5702de2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -32,7 +32,6 @@ import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.testutils.TestingUtils; @@ -84,7 +83,7 @@ public abstract class CancelingTestBase extends TestLogger { verifyJvmOptions(); Configuration config = new Configuration(); config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true); - config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT); + config.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT); config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096")); config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048); @@ -98,7 +97,7 @@ public abstract class CancelingTestBase extends TestLogger { // submit job final JobGraph jobGraph = getJobGraph(plan); - final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds(); + final long rpcTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis(); ClusterClient<?> client = CLUSTER.getClusterClient(); JobID jobID = client.submitJob(jobGraph).get(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index f5c1299..aa57007 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -43,6 +43,8 @@ import org.apache.flink.util.TestLogger; import org.junit.ClassRule; import org.junit.Test; +import java.time.Duration; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -69,8 +71,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { private static Configuration getConfiguration() { Configuration config = new Configuration(); config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("48m")); - config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s"); - config.setString(AkkaOptions.ASK_TIMEOUT, "60 s"); + config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1)); + config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); return config; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index cc6129a..8a3cf6b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -773,7 +773,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { conf.set( NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, BUFFER_PER_CHANNEL); conf.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 60000); - conf.setString(AkkaOptions.ASK_TIMEOUT, "1 min"); + conf.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); return conf; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java index 26b6e51..db395ad 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import java.io.Serializable; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -45,7 +46,7 @@ public class StreamingCustomInputSplitProgram { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); - config.setString(AkkaOptions.ASK_TIMEOUT, "5 s"); + config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(5)); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 3e76660..a8ad37c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -91,7 +91,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test File coordinateTempDir = null; Configuration config = new Configuration(); - config.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); + config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100)); config.setString(JobManagerOptions.ADDRESS, "localhost"); config.setString(RestOptions.BIND_PORT, "0"); config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 407fe7a..2debe62 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -98,7 +98,7 @@ public class ProcessFailureCancelingITCase extends TestLogger { Configuration config = new Configuration(); config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); + config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100)); config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); config.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java index e6931df..c14344c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java @@ -44,6 +44,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import static org.junit.Assert.assertEquals; @@ -85,7 +86,7 @@ public class ShuffleCompressionITCase { Configuration configuration = new Configuration(); configuration.setBoolean( NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, true); - configuration.setString(AkkaOptions.ASK_TIMEOUT, "60 s"); + configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH); JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, NUM_SLOTS); @@ -98,7 +99,7 @@ public class ShuffleCompressionITCase { NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, true); configuration.setInteger( NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1); - configuration.setString(AkkaOptions.ASK_TIMEOUT, "60 s"); + configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH); JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, NUM_SLOTS); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java index be88df5..32073f2 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java @@ -132,7 +132,7 @@ public class YARNApplicationITCase extends YarnTestBase { Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); - configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s"); + configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30)); configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName()); configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion); configuration.set(PipelineOptions.JARS, Collections.singletonList(userJar.toString())); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java index 85cde8c..29a7635 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java @@ -149,7 +149,7 @@ public class YARNFileReplicationITCase extends YarnTestBase { final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); - configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s"); + configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30)); configuration.set(CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED); return configuration; diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java index effc7cd..c2347ac 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java @@ -227,7 +227,7 @@ public class YARNITCase extends YarnTestBase { Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); - configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s"); + configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30)); configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion); return configuration; diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 74c9f22..c99a68f 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -58,6 +58,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.StandardOpenOption; +import java.time.Duration; import java.util.List; import static org.hamcrest.Matchers.is; @@ -104,7 +105,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { "-j", "fake.jar", "-D", - AkkaOptions.ASK_TIMEOUT.key() + "=5 min", + AkkaOptions.ASK_TIMEOUT_DURATION.key() + "=5 min", "-D", CoreOptions.FLINK_JVM_OPTIONS.key() + "=-DappName=foobar", "-D", @@ -112,7 +113,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { }); Configuration executorConfig = cli.toConfiguration(cmd); - assertEquals("5 min", executorConfig.get(AkkaOptions.ASK_TIMEOUT)); + assertEquals(Duration.ofMinutes(5), executorConfig.get(AkkaOptions.ASK_TIMEOUT_DURATION)); assertEquals("-DappName=foobar", executorConfig.get(CoreOptions.FLINK_JVM_OPTIONS)); assertEquals("changeit", executorConfig.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD)); }