[FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1882c905 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1882c905 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1882c905 Branch: refs/heads/master Commit: 1882c90505b0d25775b969cc025c8a3087b82f37 Parents: 6f8b3c6 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Thu Apr 20 14:48:22 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed May 3 16:24:26 2017 +0200 ---------------------------------------------------------------------- .../flink/client/program/ClusterClient.java | 32 +++++++++++++++++++ .../utils/SavepointMigrationTestBase.java | 33 +++++++++++++++++++- 2 files changed, 64 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1882c905/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 8d0e841..ab4daa9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -489,6 +489,38 @@ public abstract class ClusterClient { } /** + * Reattaches to a running job with the given job id. + * + * @param jobID The job id of the job to attach to + * @return The JobExecutionResult for the jobID + * @throws JobExecutionException if an error occurs during monitoring the job execution + */ + public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException { + final LeaderRetrievalService leaderRetrievalService; + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e); + } + + ActorGateway jobManagerGateway; + try { + jobManagerGateway = getJobManagerGateway(); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway", e); + } + + return JobClient.attachToRunningJob( + jobID, + jobManagerGateway, + flinkConfig, + actorSystemLoader.get(), + leaderRetrievalService, + timeout, + printStatusDuringExecution); + } + + /** * Cancels a job identified by the job id. * @param jobId the job id * @throws Exception In case an error occurred. http://git-wip-us.apache.org/repos/asf/flink/blob/1882c905/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index fced68c..301fc72 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -25,14 +25,17 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobListeningContext; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -49,6 +52,7 @@ import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import static junit.framework.Assert.fail; @@ -207,11 +211,38 @@ public class SavepointMigrationTestBase extends TestBaseUtils { JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); + JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID()); boolean done = false; while (DEADLINE.hasTimeLeft()) { + + // try and get a job result, this will fail if the job already failed. Use this + // to get out of this loop + JobID jobId = jobSubmissionResult.getJobID(); + FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS); + + try { + + Future<Object> future = clusterClient + .getJobManagerGateway() + .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout); + + Object result = Await.result(future, timeout); + + if (result instanceof JobManagerMessages.CurrentJobStatus) { + if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) { + Object jobResult = Await.result( + jobListeningContext.getJobResultFuture(), + Duration.apply(5, TimeUnit.SECONDS)); + fail("Job failed: " + jobResult); + } + } + } catch (Exception e) { + fail("Could not connect to job: " + e); + } + Thread.sleep(100); - Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID()); + Map<String, Object> accumulators = clusterClient.getAccumulators(jobId); boolean allDone = true; for (Tuple2<String, Integer> acc : expectedAccumulators) {