[FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1882c905
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1882c905
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1882c905

Branch: refs/heads/master
Commit: 1882c90505b0d25775b969cc025c8a3087b82f37
Parents: 6f8b3c6
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Thu Apr 20 14:48:22 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed May 3 16:24:26 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 32 +++++++++++++++++++
 .../utils/SavepointMigrationTestBase.java       | 33 +++++++++++++++++++-
 2 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1882c905/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 8d0e841..ab4daa9 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -489,6 +489,38 @@ public abstract class ClusterClient {
        }
 
        /**
+        * Reattaches to a running job with the given job id.
+        *
+        * @param jobID The job id of the job to attach to
+        * @return The JobExecutionResult for the jobID
+        * @throws JobExecutionException if an error occurs during monitoring 
the job execution
+        */
+       public JobListeningContext connectToJob(JobID jobID) throws 
JobExecutionException {
+               final LeaderRetrievalService leaderRetrievalService;
+               try {
+                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+               } catch (Exception e) {
+                       throw new JobRetrievalException(jobID, "Could not 
create the leader retrieval service", e);
+               }
+
+               ActorGateway jobManagerGateway;
+               try {
+                       jobManagerGateway = getJobManagerGateway();
+               } catch (Exception e) {
+                       throw new JobRetrievalException(jobID, "Could not 
retrieve the JobManager Gateway", e);
+               }
+
+               return JobClient.attachToRunningJob(
+                               jobID,
+                               jobManagerGateway,
+                               flinkConfig,
+                               actorSystemLoader.get(),
+                               leaderRetrievalService,
+                               timeout,
+                               printStatusDuringExecution);
+       }
+
+       /**
         * Cancels a job identified by the job id.
         * @param jobId the job id
         * @throws Exception In case an error occurred.

http://git-wip-us.apache.org/repos/asf/flink/blob/1882c905/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index fced68c..301fc72 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,14 +25,17 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -49,6 +52,7 @@ import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static junit.framework.Assert.fail;
@@ -207,11 +211,38 @@ public class SavepointMigrationTestBase extends 
TestBaseUtils {
                JobSubmissionResult jobSubmissionResult = 
cluster.submitJobDetached(jobGraph);
 
                StandaloneClusterClient clusterClient = new 
StandaloneClusterClient(cluster.configuration());
+               JobListeningContext jobListeningContext = 
clusterClient.connectToJob(jobSubmissionResult.getJobID());
 
                boolean done = false;
                while (DEADLINE.hasTimeLeft()) {
+
+                       // try and get a job result, this will fail if the job 
already failed. Use this
+                       // to get out of this loop
+                       JobID jobId = jobSubmissionResult.getJobID();
+                       FiniteDuration timeout = FiniteDuration.apply(5, 
TimeUnit.SECONDS);
+
+                       try {
+
+                               Future<Object> future = clusterClient
+                                               .getJobManagerGateway()
+                                               
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), 
timeout);
+
+                               Object result = Await.result(future, timeout);
+
+                               if (result instanceof 
JobManagerMessages.CurrentJobStatus) {
+                                       if 
(((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
+                                               Object jobResult = Await.result(
+                                                               
jobListeningContext.getJobResultFuture(),
+                                                               
Duration.apply(5, TimeUnit.SECONDS));
+                                               fail("Job failed: " + 
jobResult);
+                                       }
+                               }
+                       } catch (Exception e) {
+                               fail("Could not connect to job: " + e);
+                       }
+
                        Thread.sleep(100);
-                       Map<String, Object> accumulators = 
clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+                       Map<String, Object> accumulators = 
clusterClient.getAccumulators(jobId);
 
                        boolean allDone = true;
                        for (Tuple2<String, Integer> acc : 
expectedAccumulators) {

Reply via email to