This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1839fa5  [FLINK-17498][tests] Increase CancelingTestBase rpc timeout 
to configured Akka ask timeout
1839fa5 is described below

commit 1839fa57a91723f8ef10bcbd2c271366b5509b0b
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Mon Jun 8 16:05:51 2020 +0200

    [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured 
Akka ask timeout
    
    This commit hardens all CancelingTestBase tests by using the configured 
Akka ask timeout of
    200s as the rpc timeout.
    
    This closes #12531.
---
 .../apache/flink/test/cancelling/CancelingTestBase.java   | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index b3905c0..03bc6eb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -33,6 +33,7 @@ import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -56,14 +57,14 @@ public abstract class CancelingTestBase extends TestLogger {
 
        protected static final int PARALLELISM = 4;
 
-       protected static final long GET_FUTURE_TIMEOUT = 1000; // 1000 
milliseconds
+       private static final Configuration configuration = getConfiguration();
 
        // 
--------------------------------------------------------------------------------------------
 
        @ClassRule
        public static final MiniClusterWithClientResource CLUSTER = new 
MiniClusterWithClientResource(
                new MiniClusterResourceConfiguration.Builder()
-                       .setConfiguration(getConfiguration())
+                       .setConfiguration(configuration)
                        .setNumberTaskManagers(2)
                        .setNumberSlotsPerTaskManager(4)
                        .build());
@@ -93,15 +94,17 @@ public abstract class CancelingTestBase extends TestLogger {
                // submit job
                final JobGraph jobGraph = getJobGraph(plan);
 
+               final long rpcTimeout = 
AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();
+
                ClusterClient<?> client = CLUSTER.getClusterClient();
                JobSubmissionResult jobSubmissionResult = 
ClientUtils.submitJob(client, jobGraph);
 
                Deadline submissionDeadLine = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
 
-               JobStatus jobStatus = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, 
TimeUnit.MILLISECONDS);
+               JobStatus jobStatus = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
                while (jobStatus != JobStatus.RUNNING && 
submissionDeadLine.hasTimeLeft()) {
                        Thread.sleep(50);
-                       jobStatus = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, 
TimeUnit.MILLISECONDS);
+                       jobStatus = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
                }
                if (jobStatus != JobStatus.RUNNING) {
                        Assert.fail("Job not in state RUNNING.");
@@ -113,10 +116,10 @@ public abstract class CancelingTestBase extends 
TestLogger {
 
                Deadline cancelDeadline = new 
FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
 
-               JobStatus jobStatusAfterCancel = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, 
TimeUnit.MILLISECONDS);
+               JobStatus jobStatusAfterCancel = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
                while (jobStatusAfterCancel != JobStatus.CANCELED && 
cancelDeadline.hasTimeLeft()) {
                        Thread.sleep(50);
-                       jobStatusAfterCancel = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, 
TimeUnit.MILLISECONDS);
+                       jobStatusAfterCancel = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
                }
                if (jobStatusAfterCancel != JobStatus.CANCELED) {
                        Assert.fail("Failed to cancel job with ID " + 
jobSubmissionResult.getJobID() + '.');

Reply via email to