This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new b1f32c6 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout b1f32c6 is described below commit b1f32c6efd732787e27f37fbe18fe88f9e38f7c2 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Mon Jun 8 16:05:51 2020 +0200 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout This commit hardens all CancelingTestBase tests by using the configured Akka ask timeout of 200s as the rpc timeout. This closes #12531. --- .../apache/flink/test/cancelling/CancelingTestBase.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index b3905c0..03bc6eb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -33,6 +33,7 @@ import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -56,14 +57,14 @@ public abstract class CancelingTestBase extends TestLogger { protected static final int PARALLELISM = 4; - protected static final long GET_FUTURE_TIMEOUT = 1000; // 1000 milliseconds + private static final Configuration configuration = getConfiguration(); // -------------------------------------------------------------------------------------------- @ClassRule public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() - .setConfiguration(getConfiguration()) + .setConfiguration(configuration) .setNumberTaskManagers(2) .setNumberSlotsPerTaskManager(4) .build()); @@ -93,15 +94,17 @@ public abstract class CancelingTestBase extends TestLogger { // submit job final JobGraph jobGraph = getJobGraph(plan); + final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds(); + ClusterClient<?> client = CLUSTER.getClusterClient(); JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph); Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); - JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) { Thread.sleep(50); - jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); } if (jobStatus != JobStatus.RUNNING) { Assert.fail("Job not in state RUNNING."); @@ -113,10 +116,10 @@ public abstract class CancelingTestBase extends TestLogger { Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow(); - JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) { Thread.sleep(50); - jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); } if (jobStatusAfterCancel != JobStatus.CANCELED) { Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');