[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r171209553 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java --- @@ -18,28 +18,99 @@ package org.apache.flink.queryablestate.itcases; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; /** * Several integration tests for queryable state using the {@link FsStateBackend}. */ -public class HAQueryableStateFsBackendITCase extends HAAbstractQueryableStateTestBase { +public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase { --- End diff -- I tend to agree. I created an updated version of this here: #5595 It's still using inheritance but I changed it such that the base class now always calls `setDetached(true)`. I don't want to go into refactoring those tests now because there is already enough to do. Plus, I did remove one layer of inheritance. ð ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/5579 ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170913598 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -255,14 +258,24 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad } } - /** -* Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple -* times to poll the {@link JobResult} before giving up. -* -* @param jobId specifying the job for which to retrieve the {@link JobResult} -* @return Future which is completed with the {@link JobResult} once the job has completed or -* with a failure if the {@link JobResult} could not be retrieved. -*/ + @Override + public CompletableFuture getJobStatus(JobID jobId) { + JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance(); + final JobMessageParameters params = new JobMessageParameters(); + params.jobPathParameter.resolve(jobId); + + CompletableFuture responseFuture = sendRequest(detailsHeaders, params); + + return responseFuture.thenApply((JobDetailsInfo jobDetailsInfo) -> { + if (jobDetailsInfo != null) { + return jobDetailsInfo.getJobStatus(); + } + + throw new RuntimeException("Unknown JobStatus."); --- End diff -- OK, I'm simplifying to ``` responseFuture.thenApply(JobDetailsInfo::getJobStatus); ``` which will let the error response go out in case the job wasn't found. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170911860 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -255,14 +258,24 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad } } - /** -* Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple -* times to poll the {@link JobResult} before giving up. -* -* @param jobId specifying the job for which to retrieve the {@link JobResult} -* @return Future which is completed with the {@link JobResult} once the job has completed or -* with a failure if the {@link JobResult} could not be retrieved. -*/ + @Override + public CompletableFuture getJobStatus(JobID jobId) { + JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance(); + final JobMessageParameters params = new JobMessageParameters(); + params.jobPathParameter.resolve(jobId); + + CompletableFuture responseFuture = sendRequest(detailsHeaders, params); + + return responseFuture.thenApply((JobDetailsInfo jobDetailsInfo) -> { + if (jobDetailsInfo != null) { + return jobDetailsInfo.getJobStatus(); + } + + throw new RuntimeException("Unknown JobStatus."); --- End diff -- The alternative I had in mind was to extend `JobStatus` with `UNKNOWN`. WDYT? ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170866059 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -62,6 +68,8 @@ private TestEnvironment executionEnvironment; + private ClusterClient clusterClient; --- End diff -- Raw usage. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170868446 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java --- @@ -18,28 +18,99 @@ package org.apache.flink.queryablestate.itcases; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; /** * Several integration tests for queryable state using the {@link FsStateBackend}. */ -public class HAQueryableStateFsBackendITCase extends HAAbstractQueryableStateTestBase { +public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase { --- End diff -- To be honest, I'm not a huge fan of inheritance when writing tests. It is really hard to understand whats going on and it is hard to enforce invariants which are assumed by the base class. For example, `AbstractQueryableStateTestBase` requires that the `ClusterClient` is set to detached job submission. A user who does not know it and extends this test base will almost certainly stumble across this. Ideally tests are succinct enough that you have everything in a single class. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170863098 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -304,47 +296,18 @@ public Integer getKey(Tuple2value) { final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); final JobID jobId = jobGraph.getJobID(); - final CompletableFuture failedFuture = - notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline); - - final CompletableFuture cancellationFuture = - notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); --- End diff -- Same here. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170863015 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -188,12 +179,12 @@ public Integer getKey(Tuple2value) { } }).asQueryableState(queryName, reducingState); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); --- End diff -- How is this change equivalent? This could easily be a blocking job submission. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170864042 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -577,6 +583,81 @@ public JobListeningContext connectToJob(JobID jobID) throws JobExecutionExceptio printStatusDuringExecution); } + /** +* Requests the {@link JobStatus} of the job with the given {@link JobID}. +*/ + public CompletableFuture getJobStatus(JobID jobId) { + final ActorGateway jobManager; + try { + jobManager = getJobManagerGateway(); + } catch (FlinkException e) { + throw new RuntimeException("Could not retrieve JobManage gateway.", e); + } + + Future response = jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout); + + CompletableFuture javaFuture = FutureUtils.toJava(response); + + return javaFuture.thenApply((responseMessage) -> { + if (responseMessage instanceof JobManagerMessages.CurrentJobStatus) { + return ((JobManagerMessages.CurrentJobStatus) responseMessage).status(); + } else if (responseMessage instanceof JobManagerMessages.JobNotFound) { + throw new CompletionException( + new IllegalStateException("Could not find job with JobId " + jobId)); + } else { + throw new CompletionException( + new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass())); + } + }); + } + + /** +* Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple +* times to poll the {@link JobResult} before giving up. +* +* @param jobId specifying the job for which to retrieve the {@link JobResult} +* @return Future which is completed with the {@link JobResult} once the job has completed or +* with a failure if the {@link JobResult} could not be retrieved. +*/ + public CompletableFuture requestJobResult(JobID jobId) { + + CompletableFuture result = new CompletableFuture<>(); + + try { + JobExecutionResult jobExecutionResult = retrieveJob(jobId); + MapallAccumulatorResults = jobExecutionResult.getAllAccumulatorResults(); + Map allAccumulatorResultsSerialized = new HashMap<>(); + + for (Map.Entry acc : allAccumulatorResults.entrySet()) { + SerializedValue objectSerializedValue = null; + try { + objectSerializedValue = new SerializedValue<>(acc.getValue()); + } catch (IOException e) { + throw new RuntimeException("Could not serialize accumulator result.", e); --- End diff -- Let's fail the returned future exceptionally in this case. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170864686 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -425,18 +388,22 @@ public Integer getKey(Tuple2value) { } }).asQueryableState("hakuna", valueState); - try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(clusterClient, env)) { + + clusterClient.submitJob( + closableJobGraph.getJobGraph(), AbstractQueryableStateTestBase.class.getClassLoader()); + - // register to be notified when the job is running. - CompletableFuture runningFuture = - notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline); + CompletableFuture jobStatusFuture = + clusterClient.getJobStatus(closableJobGraph.getJobId()); - cluster.submitJobDetached(closableJobGraph.getJobGraph()); + while (deadline.hasTimeLeft() && !jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).equals(JobStatus.RUNNING)) { --- End diff -- Why do we have to do something like this? Wouldn't it be better to handle rather the case where you do a QS request and handle the failure if the cluster is not yet running? ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170862163 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -577,6 +583,81 @@ public JobListeningContext connectToJob(JobID jobID) throws JobExecutionExceptio printStatusDuringExecution); } + /** +* Requests the {@link JobStatus} of the job with the given {@link JobID}. +*/ + public CompletableFuture getJobStatus(JobID jobId) { + final ActorGateway jobManager; + try { + jobManager = getJobManagerGateway(); + } catch (FlinkException e) { + throw new RuntimeException("Could not retrieve JobManage gateway.", e); + } + + Future response = jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout); + + CompletableFuture javaFuture = FutureUtils.toJava(response); + + return javaFuture.thenApply((responseMessage) -> { + if (responseMessage instanceof JobManagerMessages.CurrentJobStatus) { + return ((JobManagerMessages.CurrentJobStatus) responseMessage).status(); + } else if (responseMessage instanceof JobManagerMessages.JobNotFound) { + throw new CompletionException( + new IllegalStateException("Could not find job with JobId " + jobId)); + } else { + throw new CompletionException( + new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass())); + } + }); + } + + /** +* Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple +* times to poll the {@link JobResult} before giving up. +* +* @param jobId specifying the job for which to retrieve the {@link JobResult} +* @return Future which is completed with the {@link JobResult} once the job has completed or +* with a failure if the {@link JobResult} could not be retrieved. +*/ + public CompletableFuture requestJobResult(JobID jobId) { + + CompletableFuture result = new CompletableFuture<>(); + + try { + JobExecutionResult jobExecutionResult = retrieveJob(jobId); + MapallAccumulatorResults = jobExecutionResult.getAllAccumulatorResults(); + Map allAccumulatorResultsSerialized = new HashMap<>(); + + for (Map.Entry acc : allAccumulatorResults.entrySet()) { + SerializedValue objectSerializedValue = null; + try { + objectSerializedValue = new SerializedValue<>(acc.getValue()); + } catch (IOException e) { + throw new RuntimeException("Could not serialize accumulator result.", e); + } + allAccumulatorResultsSerialized.put(acc.getKey(), objectSerializedValue); + } + JobResult jobResult = new JobResult.Builder() + .jobId(jobId) + .netRuntime(jobExecutionResult.getNetRuntime()) + .accumulatorResults(allAccumulatorResultsSerialized) + .build(); + result.complete(jobResult); --- End diff -- Completing `result` here and in the catch block is duplicate logic. Better to create `JobResult` variable which is assigned after the `try-catch` block and then return `CompletableFuture.completed(jobResult)`. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170861383 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -255,14 +258,24 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad } } - /** -* Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple -* times to poll the {@link JobResult} before giving up. -* -* @param jobId specifying the job for which to retrieve the {@link JobResult} -* @return Future which is completed with the {@link JobResult} once the job has completed or -* with a failure if the {@link JobResult} could not be retrieved. -*/ + @Override + public CompletableFuture getJobStatus(JobID jobId) { + JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance(); + final JobMessageParameters params = new JobMessageParameters(); + params.jobPathParameter.resolve(jobId); + + CompletableFuture responseFuture = sendRequest(detailsHeaders, params); + + return responseFuture.thenApply((JobDetailsInfo jobDetailsInfo) -> { + if (jobDetailsInfo != null) { --- End diff -- `responseFuture` shouldn't be completed with `null` ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170866611 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -62,6 +68,8 @@ private TestEnvironment executionEnvironment; + private ClusterClient clusterClient; --- End diff -- The cluster client needs to be shut down as well. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170861537 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -255,14 +258,24 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad } } - /** -* Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple -* times to poll the {@link JobResult} before giving up. -* -* @param jobId specifying the job for which to retrieve the {@link JobResult} -* @return Future which is completed with the {@link JobResult} once the job has completed or -* with a failure if the {@link JobResult} could not be retrieved. -*/ + @Override + public CompletableFuture getJobStatus(JobID jobId) { + JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance(); + final JobMessageParameters params = new JobMessageParameters(); + params.jobPathParameter.resolve(jobId); + + CompletableFuture responseFuture = sendRequest(detailsHeaders, params); + + return responseFuture.thenApply((JobDetailsInfo jobDetailsInfo) -> { + if (jobDetailsInfo != null) { + return jobDetailsInfo.getJobStatus(); + } + + throw new RuntimeException("Unknown JobStatus."); --- End diff -- Let's not throw unchecked exceptions, especially no runtime exception if the only reason is that the user requested a wrong `JobID`. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170854988 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -1307,24 +1268,13 @@ JobID getJobId() { public void close() throws Exception { // Free cluster resources if (jobId != null) { - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class)); - - cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + clusterClient.cancel(jobId); + // cancel() is non-blocking so do this to make sure the job finished + clusterClient.requestJobResult(jobId).get(); --- End diff -- couldn't we query the job status instead? ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170854288 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -1307,24 +1268,13 @@ JobID getJobId() { public void close() throws Exception { // Free cluster resources if (jobId != null) { - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class)); - - cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + clusterClient.cancel(jobId); + // cancel() is non-blocking so do this to make sure the job finished + clusterClient.requestJobResult(jobId).get(); --- End diff -- unfortunately, we can't determine whether the job finished from that. But the result is only available of the job finished somehow, either naturally, or because of failure or because of cancel. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170576150 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java --- @@ -50,7 +50,7 @@ public static final String FIELD_NAME_JOB_NAME = "name"; - public static final String FIELD_NAME_IS_STOPPABLE = "isStoppable"; + public static final String FIELD_NAME_IS_STOPPABLE = "stoppable"; --- End diff -- I know, I want @GJL to have a look at this. ð ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170575275 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java --- @@ -50,7 +50,7 @@ public static final String FIELD_NAME_JOB_NAME = "name"; - public static final String FIELD_NAME_IS_STOPPABLE = "isStoppable"; + public static final String FIELD_NAME_IS_STOPPABLE = "stoppable"; --- End diff -- We should figure out why it returned "stoppable", as far as i can tell it should "isStoppable". This would be an API breaking change. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170573131 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -425,18 +385,22 @@ public Integer getKey(Tuple2value) { } }).asQueryableState("hakuna", valueState); - try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(clusterClient, env)) { - // register to be notified when the job is running. - CompletableFuture runningFuture = - notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline); + clusterClient.submitJob( + closableJobGraph.getJobGraph(), AbstractQueryableStateTestBase.class.getClassLoader()); - cluster.submitJobDetached(closableJobGraph.getJobGraph()); - // expect for the job to be running - TestingJobManagerMessages.JobStatusIs jobStatus = - runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertEquals(JobStatus.RUNNING, jobStatus.state()); + CompletableFuture jobStatusFuture = + clusterClient.getJobStatus(closableJobGraph.getJobId()); + + while (deadline.hasTimeLeft() && !jobStatusFuture.get().equals(JobStatus.RUNNING)) { + Thread.sleep(50); + jobStatusFuture = + clusterClient.getJobStatus(closableJobGraph.getJobId()); + } + + assertEquals(JobStatus.RUNNING, jobStatusFuture.get()); --- End diff -- add timeout ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170573280 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -1307,24 +1268,13 @@ JobID getJobId() { public void close() throws Exception { // Free cluster resources if (jobId != null) { - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class)); - - cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + clusterClient.cancel(jobId); + // cancel() is non-blocking so do this to make sure the job finished + clusterClient.requestJobResult(jobId).get(); --- End diff -- add timeout. We should also check the returned status to make sure it is actually finished. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170572839 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -304,47 +294,17 @@ public Integer getKey(Tuple2value) { final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); final JobID jobId = jobGraph.getJobID(); - final CompletableFuture failedFuture = - notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline); - - final CompletableFuture cancellationFuture = - notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline); - - cluster.submitJobDetached(jobGraph); - - try { - final TestingJobManagerMessages.JobStatusIs jobStatus = - failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - assertEquals(JobStatus.FAILED, jobStatus.state()); - } catch (Exception e) { - - // if the assertion fails, it means that the job was (falsely) not cancelled. - // in this case, and given that the mini-cluster is shared with other tests, - // we cancel the job and wait for the cancellation so that the resources are freed. - - if (jobId != null) { - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class)); - - cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); - // and we re-throw the exception. - throw e; - } + CompletableFuture jobResultFuture = clusterClient.requestJobResult(jobId); - // Get the job and check the cause - JobManagerMessages.JobFound jobFound = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobFound.class))) - .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + JobResult jobResult = jobResultFuture.get(); --- End diff -- This should have a timeout. If the timeout is shit we should try to cancel he job. ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170572908 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -304,47 +294,17 @@ public Integer getKey(Tuple2value) { final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); final JobID jobId = jobGraph.getJobID(); - final CompletableFuture failedFuture = - notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline); - - final CompletableFuture cancellationFuture = - notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline); - - cluster.submitJobDetached(jobGraph); - - try { - final TestingJobManagerMessages.JobStatusIs jobStatus = - failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - assertEquals(JobStatus.FAILED, jobStatus.state()); - } catch (Exception e) { - - // if the assertion fails, it means that the job was (falsely) not cancelled. - // in this case, and given that the mini-cluster is shared with other tests, - // we cancel the job and wait for the cancellation so that the resources are freed. - - if (jobId != null) { - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class)); - - cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); - // and we re-throw the exception. - throw e; - } + CompletableFuture jobResultFuture = clusterClient.requestJobResult(jobId); - // Get the job and check the cause - JobManagerMessages.JobFound jobFound = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobFound.class))) - .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + JobResult jobResult = jobResultFuture.get(); + assertFalse(jobResult.isSuccess()); - String failureCause = jobFound.executionGraph().getFailureInfo().getExceptionAsString(); + String failureCause = jobResult.getSerializedThrowable().get().getFullStringifiedStackTrace(); - assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState()); + CompletableFuture jobStatusFuture = clusterClient.getJobStatus(jobId); + assertEquals(JobStatus.FAILED, jobStatusFuture.get()); --- End diff -- add timeout ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170573094 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -425,18 +385,22 @@ public Integer getKey(Tuple2value) { } }).asQueryableState("hakuna", valueState); - try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(clusterClient, env)) { - // register to be notified when the job is running. - CompletableFuture runningFuture = - notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline); + clusterClient.submitJob( + closableJobGraph.getJobGraph(), AbstractQueryableStateTestBase.class.getClassLoader()); - cluster.submitJobDetached(closableJobGraph.getJobGraph()); - // expect for the job to be running - TestingJobManagerMessages.JobStatusIs jobStatus = - runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertEquals(JobStatus.RUNNING, jobStatus.state()); + CompletableFuture jobStatusFuture = + clusterClient.getJobStatus(closableJobGraph.getJobId()); + + while (deadline.hasTimeLeft() && !jobStatusFuture.get().equals(JobStatus.RUNNING)) { --- End diff -- add timeout ---
[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5579 [FLINK-8778] Migrate queryable state ITCases to use MiniClusterResource This also covers https://issues.apache.org/jira/browse/FLINK-8757 and https://issues.apache.org/jira/browse/FLINK-8758. R: @tillrohrmann or @GJL CC: @zentol because of the whole test-porting business, @kl0u because it's QS and because of test porting For now, the tests don't succeed in FLIP-6 mode (you have to activate the `flip-6` Maven profile). When you start a single test in IntelliJ it succeeds, when you start the complete suite only the first test succeeds and the others stall. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8757-port-it-cases-flip-6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5579.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5579 commit db5e474fe3cf9045f291f654a2bca025806f500c Author: Aljoscha KrettekDate: 2018-02-26T09:12:44Z [FLINK-8758] Add getters to JobDetailsInfo commit fc23247596432c5ed025c6532afd5a55283f780b Author: Aljoscha Krettek Date: 2018-02-26T11:28:59Z [hotfix] Fix stoppable field in JobDetailsInfo For some reason, the server was sending JSON with a stoppable field, not the isStoppable field. commit bdabaecaceb608d57cea2ac21374c86f6f1eeae5 Author: Aljoscha Krettek Date: 2018-02-26T10:44:57Z [FLINK-8757] Add MiniClusterResource.getClusterClient() commit c739a73d80d975586b58d82d8eb7ef2ec44ef275 Author: Aljoscha Krettek Date: 2018-02-26T11:29:23Z Add proper toString() on JsonResponse in RestClient commit 01cc7ce9d732cba7b0c51ca0dab87f1e655e09fc Author: Aljoscha Krettek Date: 2018-02-26T10:52:50Z [FLINK-8758] Make non-blocking ClusterClient.submitJob() public commit b4388ac50cd8ba6d3ff8fd44e5159320576f9e65 Author: Aljoscha Krettek Date: 2018-02-26T10:53:47Z [FLINK-8758] Add methods for job status/result to ClusterClient commit 63eccd66217b7a5ec8d399e2bf22378adda8bdb2 Author: Aljoscha Krettek Date: 2018-02-26T10:55:14Z [FLINK-8778] Migrate queryable state ITCases to use MiniClusterResource ---