Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170934128
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 ---
    @@ -104,175 +91,97 @@ public static void beforeClass() {
                SavepointSerializers.setFailWhenLegacyStateDetected(false);
        }
     
    -   @BeforeClass
    -   public static void setupCluster() throws Exception {
    -           final Configuration configuration = new Configuration();
    -
    -           FiniteDuration timeout = new FiniteDuration(30L, 
TimeUnit.SECONDS);
    -
    -           actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
    -
    -           highAvailabilityServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    -                   configuration,
    -                   TestingUtils.defaultExecutor());
    -
    -           Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
    -                   configuration,
    -                   actorSystem,
    -                   TestingUtils.defaultExecutor(),
    -                   TestingUtils.defaultExecutor(),
    -                   highAvailabilityServices,
    -                   NoOpMetricRegistry.INSTANCE,
    -                   Option.empty(),
    -                   Option.apply("jm"),
    -                   Option.apply("arch"),
    -                   TestingJobManager.class,
    -                   TestingMemoryArchivist.class);
    -
    -           jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
    -                   
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    -                   actorSystem,
    -                   timeout);
    -
    -           archiver = new AkkaActorGateway(master._2(), 
jobManager.leaderSessionID());
    -
    -           Configuration tmConfig = new Configuration();
    -           
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    -
    -           ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
    -                   tmConfig,
    -                   ResourceID.generate(),
    -                   actorSystem,
    -                   highAvailabilityServices,
    -                   NoOpMetricRegistry.INSTANCE,
    -                   "localhost",
    -                   Option.apply("tm"),
    -                   true,
    -                   TestingTaskManager.class);
    -
    -           taskManager = new AkkaActorGateway(taskManagerRef, 
jobManager.leaderSessionID());
    -
    -           // Wait until connected
    -           Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    -           Await.ready(taskManager.ask(msg, timeout), timeout);
    -   }
    -
    -   @AfterClass
    -   public static void tearDownCluster() throws Exception {
    -           if (highAvailabilityServices != null) {
    -                   highAvailabilityServices.closeAndCleanupAllData();
    -           }
    -
    -           if (actorSystem != null) {
    -                   actorSystem.shutdown();
    -           }
    -
    -           if (archiver != null) {
    -                   archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
    -           }
    -
    -           if (jobManager != null) {
    -                   jobManager.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
    -           }
    -
    -           if (taskManager != null) {
    -                   taskManager.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
    -           }
    -   }
    -
        @Test
        public void testMigrationAndRestore() throws Throwable {
    +           ClassLoader classLoader = this.getClass().getClassLoader();
    +           ClusterClient<?> clusterClient = 
miniClusterResource.getClusterClient();
    +           clusterClient.setDetached(true);
    +           final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
                // submit job with old version savepoint and create a migrated 
savepoint in the new version
    -           String savepointPath = migrateJob();
    +           String savepointPath = migrateJob(classLoader, clusterClient, 
deadline);
                // restore from migrated new version savepoint
    -           restoreJob(savepointPath);
    +           restoreJob(classLoader, clusterClient, deadline, savepointPath);
        }
     
    -   private String migrateJob() throws Throwable {
    +   private String migrateJob(ClassLoader classLoader, ClusterClient<?> 
clusterClient, Deadline deadline) throws Throwable {
    +
                URL savepointResource = 
AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/"
 + getMigrationSavepointName());
                if (savepointResource == null) {
                        throw new IllegalArgumentException("Savepoint file does 
not exist.");
                }
                JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
                
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
     
    -           Object msg;
    -           Object result;
    +           assertNotNull(jobToMigrate.getJobID());
     
    -           // Submit job graph
    -           msg = new JobManagerMessages.SubmitJob(jobToMigrate, 
ListeningBehaviour.DETACHED);
    -           result = Await.result(jobManager.ask(msg, timeout), timeout);
    +           clusterClient.submitJob(jobToMigrate, classLoader);
     
    -           if (result instanceof JobManagerMessages.JobResultFailure) {
    -                   JobManagerMessages.JobResultFailure failure = 
(JobManagerMessages.JobResultFailure) result;
    -                   throw new Exception(failure.cause());
    -           }
    -           Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, 
result.getClass());
    +           CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
    +                   () -> 
clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +                   deadline.timeLeft().toMillis() / 50,
    +                   Time.milliseconds(50),
    +                   (jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
    +                   TestingUtils.defaultScheduledExecutor());
     
    -           // Wait for all tasks to be running
    -           msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
    -           Await.result(jobManager.ask(msg, timeout), timeout);
    +           assertEquals(JobStatus.RUNNING, 
jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
                // Trigger savepoint
                File targetDirectory = tmpFolder.newFolder();
    -           msg = new 
JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), 
targetDirectory.getAbsolutePath());
    +           String savepointPath = null;
     
                // FLINK-6918: Retry cancel with savepoint message in case that 
StreamTasks were not running
                // TODO: The retry logic should be removed once the StreamTask 
lifecycle has been fixed (see FLINK-4714)
    -           boolean retry = true;
    -           for (int i = 0; retry && i < 10; i++) {
    -                   Future<Object> future = jobManager.ask(msg, timeout);
    -                   result = Await.result(future, timeout);
    -
    -                   if (result instanceof 
JobManagerMessages.CancellationFailure) {
    -                           Thread.sleep(50L);
    -                   } else {
    -                           retry = false;
    +           while (deadline.hasTimeLeft() && savepointPath == null) {
    +                   try {
    +                           savepointPath = 
clusterClient.cancelWithSavepoint(
    +                                   jobToMigrate.getJobID(),
    +                                   targetDirectory.getAbsolutePath());
    +                   } catch (Exception e) {
    +                           if (!e.toString().matches(".* savepoint for the 
job .* failed.*")) {
    +                                   throw e;
    +                           }
                        }
                }
     
    -           if (result instanceof JobManagerMessages.CancellationFailure) {
    -                   JobManagerMessages.CancellationFailure failure = 
(JobManagerMessages.CancellationFailure) result;
    -                   throw new Exception(failure.cause());
    -           }
    +           assertNotNull(savepointPath);
     
    -           String savepointPath = 
((JobManagerMessages.CancellationSuccess) result).savepointPath();
    +           jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +                   () -> 
clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +                   deadline.timeLeft().toMillis() / 50,
    +                   Time.milliseconds(50),
    +                   (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
    +                   TestingUtils.defaultScheduledExecutor());
     
    -           // Wait until canceled
    -           msg = new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), 
JobStatus.CANCELED);
    -           Await.ready(jobManager.ask(msg, timeout), timeout);
    +           assertEquals(JobStatus.CANCELED, 
jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
                return savepointPath;
        }
     
    -   private void restoreJob(String savepointPath) throws Exception {
    +   private void restoreJob(ClassLoader classLoader, ClusterClient<?> 
clusterClient, Deadline deadline, String savepointPath) throws Exception {
                JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
                
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath,
 allowNonRestoredState));
     
    -           Object msg;
    -           Object result;
    +           assertNotNull(jobToRestore.getJobID());
     
    -           // Submit job graph
    -           msg = new JobManagerMessages.SubmitJob(jobToRestore, 
ListeningBehaviour.DETACHED);
    -           result = Await.result(jobManager.ask(msg, timeout), timeout);
    +           clusterClient.submitJob(jobToRestore, classLoader);
     
    -           if (result instanceof JobManagerMessages.JobResultFailure) {
    -                   JobManagerMessages.JobResultFailure failure = 
(JobManagerMessages.JobResultFailure) result;
    -                   throw new Exception(failure.cause());
    -           }
    -           Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, 
result.getClass());
    +           CompletableFuture<JobStatus> jobStatusFuture =
    +                   clusterClient.getJobStatus(jobToRestore.getJobID());
    +
    +           while (deadline.hasTimeLeft()
    --- End diff --
    
    dammit, I had this fixed but didn't commit it...


---

Reply via email to