Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170573131 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -425,18 +385,22 @@ public Integer getKey(Tuple2<Integer, Long> value) { } }).asQueryableState("hakuna", valueState); - try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(clusterClient, env)) { - // register to be notified when the job is running. - CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = - notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline); + clusterClient.submitJob( + closableJobGraph.getJobGraph(), AbstractQueryableStateTestBase.class.getClassLoader()); - cluster.submitJobDetached(closableJobGraph.getJobGraph()); - // expect for the job to be running - TestingJobManagerMessages.JobStatusIs jobStatus = - runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertEquals(JobStatus.RUNNING, jobStatus.state()); + CompletableFuture<JobStatus> jobStatusFuture = + clusterClient.getJobStatus(closableJobGraph.getJobId()); + + while (deadline.hasTimeLeft() && !jobStatusFuture.get().equals(JobStatus.RUNNING)) { + Thread.sleep(50); + jobStatusFuture = + clusterClient.getJobStatus(closableJobGraph.getJobId()); + } + + assertEquals(JobStatus.RUNNING, jobStatusFuture.get()); --- End diff -- add timeout
---