Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5589#discussion_r170934128 --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java --- @@ -104,175 +91,97 @@ public static void beforeClass() { SavepointSerializers.setFailWhenLegacyStateDetected(false); } - @BeforeClass - public static void setupCluster() throws Exception { - final Configuration configuration = new Configuration(); - - FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS); - - actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); - - highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( - configuration, - TestingUtils.defaultExecutor()); - - Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( - configuration, - actorSystem, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - highAvailabilityServices, - NoOpMetricRegistry.INSTANCE, - Option.empty(), - Option.apply("jm"), - Option.apply("arch"), - TestingJobManager.class, - TestingMemoryArchivist.class); - - jobManager = LeaderRetrievalUtils.retrieveLeaderGateway( - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), - actorSystem, - timeout); - - archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID()); - - Configuration tmConfig = new Configuration(); - tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - - ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( - tmConfig, - ResourceID.generate(), - actorSystem, - highAvailabilityServices, - NoOpMetricRegistry.INSTANCE, - "localhost", - Option.apply("tm"), - true, - TestingTaskManager.class); - - taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID()); - - // Wait until connected - Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); - Await.ready(taskManager.ask(msg, timeout), timeout); - } - - @AfterClass - public static void tearDownCluster() throws Exception { - if (highAvailabilityServices != null) { - highAvailabilityServices.closeAndCleanupAllData(); - } - - if (actorSystem != null) { - actorSystem.shutdown(); - } - - if (archiver != null) { - archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - - if (jobManager != null) { - jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - - if (taskManager != null) { - taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - } - @Test public void testMigrationAndRestore() throws Throwable { + ClassLoader classLoader = this.getClass().getClassLoader(); + ClusterClient<?> clusterClient = miniClusterResource.getClusterClient(); + clusterClient.setDetached(true); + final Deadline deadline = TEST_TIMEOUT.fromNow(); + // submit job with old version savepoint and create a migrated savepoint in the new version - String savepointPath = migrateJob(); + String savepointPath = migrateJob(classLoader, clusterClient, deadline); // restore from migrated new version savepoint - restoreJob(savepointPath); + restoreJob(classLoader, clusterClient, deadline, savepointPath); } - private String migrateJob() throws Throwable { + private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable { + URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName()); if (savepointResource == null) { throw new IllegalArgumentException("Savepoint file does not exist."); } JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE); jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile())); - Object msg; - Object result; + assertNotNull(jobToMigrate.getJobID()); - // Submit job graph - msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED); - result = Await.result(jobManager.ask(msg, timeout), timeout); + clusterClient.submitJob(jobToMigrate, classLoader); - if (result instanceof JobManagerMessages.JobResultFailure) { - JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result; - throw new Exception(failure.cause()); - } - Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass()); + CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( + () -> clusterClient.getJobStatus(jobToMigrate.getJobID()), + deadline.timeLeft().toMillis() / 50, + Time.milliseconds(50), + (jobStatus) -> jobStatus.equals(JobStatus.RUNNING), + TestingUtils.defaultScheduledExecutor()); - // Wait for all tasks to be running - msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID()); - Await.result(jobManager.ask(msg, timeout), timeout); + assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); // Trigger savepoint File targetDirectory = tmpFolder.newFolder(); - msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath()); + String savepointPath = null; // FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running // TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714) - boolean retry = true; - for (int i = 0; retry && i < 10; i++) { - Future<Object> future = jobManager.ask(msg, timeout); - result = Await.result(future, timeout); - - if (result instanceof JobManagerMessages.CancellationFailure) { - Thread.sleep(50L); - } else { - retry = false; + while (deadline.hasTimeLeft() && savepointPath == null) { + try { + savepointPath = clusterClient.cancelWithSavepoint( + jobToMigrate.getJobID(), + targetDirectory.getAbsolutePath()); + } catch (Exception e) { + if (!e.toString().matches(".* savepoint for the job .* failed.*")) { + throw e; + } } } - if (result instanceof JobManagerMessages.CancellationFailure) { - JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result; - throw new Exception(failure.cause()); - } + assertNotNull(savepointPath); - String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath(); + jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( + () -> clusterClient.getJobStatus(jobToMigrate.getJobID()), + deadline.timeLeft().toMillis() / 50, + Time.milliseconds(50), + (jobStatus) -> jobStatus.equals(JobStatus.CANCELED), + TestingUtils.defaultScheduledExecutor()); - // Wait until canceled - msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED); - Await.ready(jobManager.ask(msg, timeout), timeout); + assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); return savepointPath; } - private void restoreJob(String savepointPath) throws Exception { + private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception { JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE); jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState)); - Object msg; - Object result; + assertNotNull(jobToRestore.getJobID()); - // Submit job graph - msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED); - result = Await.result(jobManager.ask(msg, timeout), timeout); + clusterClient.submitJob(jobToRestore, classLoader); - if (result instanceof JobManagerMessages.JobResultFailure) { - JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result; - throw new Exception(failure.cause()); - } - Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass()); + CompletableFuture<JobStatus> jobStatusFuture = + clusterClient.getJobStatus(jobToRestore.getJobID()); + + while (deadline.hasTimeLeft() --- End diff -- dammit, I had this fixed but didn't commit it...
---