[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-28 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r171209553
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
 ---
@@ -18,28 +18,99 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.test.util.MiniClusterResource;
 
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link 
FsStateBackend}.
  */
-public class HAQueryableStateFsBackendITCase extends 
HAAbstractQueryableStateTestBase {
+public class HAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTestBase {
--- End diff --

I tend to agree. I created an updated version of this here: #5595 It's 
still using inheritance but I changed it such that the base class now always 
calls `setDetached(true)`. I don't want to go into refactoring those tests now 
because there is already enough to do. Plus, I did remove one layer of 
inheritance. 😅 


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/5579


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170913598
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -255,14 +258,24 @@ protected JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoad
}
}
 
-   /**
-* Requests the {@link JobResult} for the given {@link JobID}. The 
method retries multiple
-* times to poll the {@link JobResult} before giving up.
-*
-* @param jobId specifying the job for which to retrieve the {@link 
JobResult}
-* @return Future which is completed with the {@link JobResult} once 
the job has completed or
-* with a failure if the {@link JobResult} could not be retrieved.
-*/
+   @Override
+   public CompletableFuture getJobStatus(JobID jobId) {
+   JobDetailsHeaders detailsHeaders = 
JobDetailsHeaders.getInstance();
+   final JobMessageParameters  params = new JobMessageParameters();
+   params.jobPathParameter.resolve(jobId);
+
+   CompletableFuture responseFuture = 
sendRequest(detailsHeaders, params);
+
+   return responseFuture.thenApply((JobDetailsInfo jobDetailsInfo) 
-> {
+   if (jobDetailsInfo != null) {
+   return jobDetailsInfo.getJobStatus();
+   }
+
+   throw new RuntimeException("Unknown JobStatus.");
--- End diff --

OK, I'm simplifying to

```
responseFuture.thenApply(JobDetailsInfo::getJobStatus);
```

which will let the error response go out in case the job wasn't found.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170911860
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -255,14 +258,24 @@ protected JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoad
}
}
 
-   /**
-* Requests the {@link JobResult} for the given {@link JobID}. The 
method retries multiple
-* times to poll the {@link JobResult} before giving up.
-*
-* @param jobId specifying the job for which to retrieve the {@link 
JobResult}
-* @return Future which is completed with the {@link JobResult} once 
the job has completed or
-* with a failure if the {@link JobResult} could not be retrieved.
-*/
+   @Override
+   public CompletableFuture getJobStatus(JobID jobId) {
+   JobDetailsHeaders detailsHeaders = 
JobDetailsHeaders.getInstance();
+   final JobMessageParameters  params = new JobMessageParameters();
+   params.jobPathParameter.resolve(jobId);
+
+   CompletableFuture responseFuture = 
sendRequest(detailsHeaders, params);
+
+   return responseFuture.thenApply((JobDetailsInfo jobDetailsInfo) 
-> {
+   if (jobDetailsInfo != null) {
+   return jobDetailsInfo.getJobStatus();
+   }
+
+   throw new RuntimeException("Unknown JobStatus.");
--- End diff --

The alternative I had in mind was to extend `JobStatus` with `UNKNOWN`. 
WDYT?


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170866059
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -62,6 +68,8 @@
 
private TestEnvironment executionEnvironment;
 
+   private ClusterClient clusterClient;
--- End diff --

Raw usage.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170868446
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
 ---
@@ -18,28 +18,99 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.test.util.MiniClusterResource;
 
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link 
FsStateBackend}.
  */
-public class HAQueryableStateFsBackendITCase extends 
HAAbstractQueryableStateTestBase {
+public class HAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTestBase {
--- End diff --

To be honest, I'm not a huge fan of inheritance when writing tests. It is 
really hard to understand whats going on and it is hard to enforce invariants 
which are assumed by the base class. For example, 
`AbstractQueryableStateTestBase` requires that the `ClusterClient` is set to 
detached job submission. A user who does not know it and extends this test base 
will almost certainly stumble across this. Ideally tests are succinct enough 
that you have everything in a single class.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170863098
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -304,47 +296,18 @@ public Integer getKey(Tuple2 value) {
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();
 
-   final CompletableFuture 
failedFuture =
-   notifyWhenJobStatusIs(jobId, JobStatus.FAILED, 
deadline);
-
-   final CompletableFuture 
cancellationFuture =
-   notifyWhenJobStatusIs(jobId, 
JobStatus.CANCELED, deadline);
+   clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
--- End diff --

Same here.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170863015
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -188,12 +179,12 @@ public Integer getKey(Tuple2 value) {
}
}).asQueryableState(queryName, reducingState);
 
-   try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+   try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(clusterClient, env)) {
 
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-   cluster.submitJobDetached(jobGraph);
+   clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
--- End diff --

How is this change equivalent? This could easily be a blocking job 
submission.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170864042
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -577,6 +583,81 @@ public JobListeningContext connectToJob(JobID jobID) 
throws JobExecutionExceptio
printStatusDuringExecution);
}
 
+   /**
+* Requests the {@link JobStatus} of the job with the given {@link 
JobID}.
+*/
+   public CompletableFuture getJobStatus(JobID jobId) {
+   final ActorGateway jobManager;
+   try {
+   jobManager = getJobManagerGateway();
+   } catch (FlinkException e) {
+   throw new RuntimeException("Could not retrieve 
JobManage gateway.", e);
+   }
+
+   Future response = 
jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
+
+   CompletableFuture javaFuture = 
FutureUtils.toJava(response);
+
+   return javaFuture.thenApply((responseMessage) -> {
+   if (responseMessage instanceof 
JobManagerMessages.CurrentJobStatus) {
+   return ((JobManagerMessages.CurrentJobStatus) 
responseMessage).status();
+   } else if (responseMessage instanceof 
JobManagerMessages.JobNotFound) {
+   throw new CompletionException(
+   new IllegalStateException("Could not 
find job with JobId " + jobId));
+   } else {
+   throw new CompletionException(
+   new IllegalStateException("Unknown 
JobManager response of type " + responseMessage.getClass()));
+   }
+   });
+   }
+
+   /**
+* Requests the {@link JobResult} for the given {@link JobID}. The 
method retries multiple
+* times to poll the {@link JobResult} before giving up.
+*
+* @param jobId specifying the job for which to retrieve the {@link 
JobResult}
+* @return Future which is completed with the {@link JobResult} once 
the job has completed or
+* with a failure if the {@link JobResult} could not be retrieved.
+*/
+   public CompletableFuture requestJobResult(JobID jobId) {
+
+   CompletableFuture result = new CompletableFuture<>();
+
+   try {
+   JobExecutionResult jobExecutionResult = 
retrieveJob(jobId);
+   Map allAccumulatorResults = 
jobExecutionResult.getAllAccumulatorResults();
+   Map 
allAccumulatorResultsSerialized = new HashMap<>();
+
+   for (Map.Entry acc : 
allAccumulatorResults.entrySet()) {
+   SerializedValue objectSerializedValue = 
null;
+   try {
+   objectSerializedValue = new 
SerializedValue<>(acc.getValue());
+   } catch (IOException e) {
+   throw new RuntimeException("Could not 
serialize accumulator result.", e);
--- End diff --

Let's fail the returned future exceptionally in this case.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170864686
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -425,18 +388,22 @@ public Integer getKey(Tuple2 value) {
}
}).asQueryableState("hakuna", valueState);
 
-   try (AutoCancellableJob closableJobGraph = new 
AutoCancellableJob(cluster, env, deadline)) {
+   try (AutoCancellableJob closableJobGraph = new 
AutoCancellableJob(clusterClient, env)) {
+
+   clusterClient.submitJob(
+   closableJobGraph.getJobGraph(), 
AbstractQueryableStateTestBase.class.getClassLoader());
+
 
-   // register to be notified when the job is running.
-   
CompletableFuture runningFuture =
-   
notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline);
+   CompletableFuture jobStatusFuture =
+   
clusterClient.getJobStatus(closableJobGraph.getJobId());
 
-   
cluster.submitJobDetached(closableJobGraph.getJobGraph());
+   while (deadline.hasTimeLeft() && 
!jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS).equals(JobStatus.RUNNING)) {
--- End diff --

Why do we have to do something like this? Wouldn't it be better to handle 
rather the case where you do a QS request and handle the failure if the cluster 
is not yet running?


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170862163
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -577,6 +583,81 @@ public JobListeningContext connectToJob(JobID jobID) 
throws JobExecutionExceptio
printStatusDuringExecution);
}
 
+   /**
+* Requests the {@link JobStatus} of the job with the given {@link 
JobID}.
+*/
+   public CompletableFuture getJobStatus(JobID jobId) {
+   final ActorGateway jobManager;
+   try {
+   jobManager = getJobManagerGateway();
+   } catch (FlinkException e) {
+   throw new RuntimeException("Could not retrieve 
JobManage gateway.", e);
+   }
+
+   Future response = 
jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
+
+   CompletableFuture javaFuture = 
FutureUtils.toJava(response);
+
+   return javaFuture.thenApply((responseMessage) -> {
+   if (responseMessage instanceof 
JobManagerMessages.CurrentJobStatus) {
+   return ((JobManagerMessages.CurrentJobStatus) 
responseMessage).status();
+   } else if (responseMessage instanceof 
JobManagerMessages.JobNotFound) {
+   throw new CompletionException(
+   new IllegalStateException("Could not 
find job with JobId " + jobId));
+   } else {
+   throw new CompletionException(
+   new IllegalStateException("Unknown 
JobManager response of type " + responseMessage.getClass()));
+   }
+   });
+   }
+
+   /**
+* Requests the {@link JobResult} for the given {@link JobID}. The 
method retries multiple
+* times to poll the {@link JobResult} before giving up.
+*
+* @param jobId specifying the job for which to retrieve the {@link 
JobResult}
+* @return Future which is completed with the {@link JobResult} once 
the job has completed or
+* with a failure if the {@link JobResult} could not be retrieved.
+*/
+   public CompletableFuture requestJobResult(JobID jobId) {
+
+   CompletableFuture result = new CompletableFuture<>();
+
+   try {
+   JobExecutionResult jobExecutionResult = 
retrieveJob(jobId);
+   Map allAccumulatorResults = 
jobExecutionResult.getAllAccumulatorResults();
+   Map 
allAccumulatorResultsSerialized = new HashMap<>();
+
+   for (Map.Entry acc : 
allAccumulatorResults.entrySet()) {
+   SerializedValue objectSerializedValue = 
null;
+   try {
+   objectSerializedValue = new 
SerializedValue<>(acc.getValue());
+   } catch (IOException e) {
+   throw new RuntimeException("Could not 
serialize accumulator result.", e);
+   }
+   
allAccumulatorResultsSerialized.put(acc.getKey(), objectSerializedValue);
+   }
+   JobResult jobResult = new JobResult.Builder()
+   .jobId(jobId)
+   .netRuntime(jobExecutionResult.getNetRuntime())
+   
.accumulatorResults(allAccumulatorResultsSerialized)
+   .build();
+   result.complete(jobResult);
--- End diff --

Completing `result` here and in the catch block is duplicate logic. Better 
to create `JobResult` variable which is assigned after the `try-catch` block 
and then return `CompletableFuture.completed(jobResult)`.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170861383
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -255,14 +258,24 @@ protected JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoad
}
}
 
-   /**
-* Requests the {@link JobResult} for the given {@link JobID}. The 
method retries multiple
-* times to poll the {@link JobResult} before giving up.
-*
-* @param jobId specifying the job for which to retrieve the {@link 
JobResult}
-* @return Future which is completed with the {@link JobResult} once 
the job has completed or
-* with a failure if the {@link JobResult} could not be retrieved.
-*/
+   @Override
+   public CompletableFuture getJobStatus(JobID jobId) {
+   JobDetailsHeaders detailsHeaders = 
JobDetailsHeaders.getInstance();
+   final JobMessageParameters  params = new JobMessageParameters();
+   params.jobPathParameter.resolve(jobId);
+
+   CompletableFuture responseFuture = 
sendRequest(detailsHeaders, params);
+
+   return responseFuture.thenApply((JobDetailsInfo jobDetailsInfo) 
-> {
+   if (jobDetailsInfo != null) {
--- End diff --

`responseFuture` shouldn't be completed with `null`


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170866611
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -62,6 +68,8 @@
 
private TestEnvironment executionEnvironment;
 
+   private ClusterClient clusterClient;
--- End diff --

The cluster client needs to be shut down as well.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170861537
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -255,14 +258,24 @@ protected JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoad
}
}
 
-   /**
-* Requests the {@link JobResult} for the given {@link JobID}. The 
method retries multiple
-* times to poll the {@link JobResult} before giving up.
-*
-* @param jobId specifying the job for which to retrieve the {@link 
JobResult}
-* @return Future which is completed with the {@link JobResult} once 
the job has completed or
-* with a failure if the {@link JobResult} could not be retrieved.
-*/
+   @Override
+   public CompletableFuture getJobStatus(JobID jobId) {
+   JobDetailsHeaders detailsHeaders = 
JobDetailsHeaders.getInstance();
+   final JobMessageParameters  params = new JobMessageParameters();
+   params.jobPathParameter.resolve(jobId);
+
+   CompletableFuture responseFuture = 
sendRequest(detailsHeaders, params);
+
+   return responseFuture.thenApply((JobDetailsInfo jobDetailsInfo) 
-> {
+   if (jobDetailsInfo != null) {
+   return jobDetailsInfo.getJobStatus();
+   }
+
+   throw new RuntimeException("Unknown JobStatus.");
--- End diff --

Let's not throw unchecked exceptions, especially no runtime exception if 
the only reason is that the user requested a wrong `JobID`.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170854988
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -1307,24 +1268,13 @@ JobID getJobId() {
public void close() throws Exception {
// Free cluster resources
if (jobId != null) {
-   cluster.getLeaderGateway(deadline.timeLeft())
-   .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-   
.mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class));
-
-   
cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+   clusterClient.cancel(jobId);
+   // cancel() is non-blocking so do this to make 
sure the job finished
+   clusterClient.requestJobResult(jobId).get();
--- End diff --

couldn't we query the job status instead?


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170854288
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -1307,24 +1268,13 @@ JobID getJobId() {
public void close() throws Exception {
// Free cluster resources
if (jobId != null) {
-   cluster.getLeaderGateway(deadline.timeLeft())
-   .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-   
.mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class));
-
-   
cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+   clusterClient.cancel(jobId);
+   // cancel() is non-blocking so do this to make 
sure the job finished
+   clusterClient.requestJobResult(jobId).get();
--- End diff --

unfortunately, we can't determine whether the job finished from that. But 
the result is only available of the job finished somehow, either naturally, or 
because of failure or because of cancel.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-26 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170576150
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
 ---
@@ -50,7 +50,7 @@
 
public static final String FIELD_NAME_JOB_NAME = "name";
 
-   public static final String FIELD_NAME_IS_STOPPABLE = "isStoppable";
+   public static final String FIELD_NAME_IS_STOPPABLE = "stoppable";
--- End diff --

I know, I want @GJL to have a look at this. 😅 


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170575275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
 ---
@@ -50,7 +50,7 @@
 
public static final String FIELD_NAME_JOB_NAME = "name";
 
-   public static final String FIELD_NAME_IS_STOPPABLE = "isStoppable";
+   public static final String FIELD_NAME_IS_STOPPABLE = "stoppable";
--- End diff --

We should figure out why it returned "stoppable", as far as i can tell it 
should "isStoppable". This would be an API breaking change.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170573131
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -425,18 +385,22 @@ public Integer getKey(Tuple2 value) {
}
}).asQueryableState("hakuna", valueState);
 
-   try (AutoCancellableJob closableJobGraph = new 
AutoCancellableJob(cluster, env, deadline)) {
+   try (AutoCancellableJob closableJobGraph = new 
AutoCancellableJob(clusterClient, env)) {
 
-   // register to be notified when the job is running.
-   
CompletableFuture runningFuture =
-   
notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline);
+   clusterClient.submitJob(
+   closableJobGraph.getJobGraph(), 
AbstractQueryableStateTestBase.class.getClassLoader());
 
-   
cluster.submitJobDetached(closableJobGraph.getJobGraph());
 
-   // expect for the job to be running
-   TestingJobManagerMessages.JobStatusIs jobStatus =
-   
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-   assertEquals(JobStatus.RUNNING, jobStatus.state());
+   CompletableFuture jobStatusFuture =
+   
clusterClient.getJobStatus(closableJobGraph.getJobId());
+
+   while (deadline.hasTimeLeft() && 
!jobStatusFuture.get().equals(JobStatus.RUNNING)) {
+   Thread.sleep(50);
+   jobStatusFuture =
+   
clusterClient.getJobStatus(closableJobGraph.getJobId());
+   }
+
+   assertEquals(JobStatus.RUNNING, jobStatusFuture.get());
--- End diff --

add timeout


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170573280
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -1307,24 +1268,13 @@ JobID getJobId() {
public void close() throws Exception {
// Free cluster resources
if (jobId != null) {
-   cluster.getLeaderGateway(deadline.timeLeft())
-   .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-   
.mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class));
-
-   
cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+   clusterClient.cancel(jobId);
+   // cancel() is non-blocking so do this to make 
sure the job finished
+   clusterClient.requestJobResult(jobId).get();
--- End diff --

add timeout. We should also check the returned status to make sure it is 
actually finished.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170572839
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -304,47 +294,17 @@ public Integer getKey(Tuple2 value) {
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();
 
-   final CompletableFuture 
failedFuture =
-   notifyWhenJobStatusIs(jobId, JobStatus.FAILED, 
deadline);
-
-   final CompletableFuture 
cancellationFuture =
-   notifyWhenJobStatusIs(jobId, 
JobStatus.CANCELED, deadline);
-
-   cluster.submitJobDetached(jobGraph);
-
-   try {
-   final TestingJobManagerMessages.JobStatusIs jobStatus =
-   
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-   assertEquals(JobStatus.FAILED, jobStatus.state());
-   } catch (Exception e) {
-
-   // if the assertion fails, it means that the job was 
(falsely) not cancelled.
-   // in this case, and given that the mini-cluster is 
shared with other tests,
-   // we cancel the job and wait for the cancellation so 
that the resources are freed.
-
-   if (jobId != null) {
-   cluster.getLeaderGateway(deadline.timeLeft())
-   .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-   
.mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class));
-
-   
cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-   }
+   clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
-   // and we re-throw the exception.
-   throw e;
-   }
+   CompletableFuture jobResultFuture = 
clusterClient.requestJobResult(jobId);
 
-   // Get the job and check the cause
-   JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
-   cluster.getLeaderGateway(deadline.timeLeft())
-   .ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-   
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobFound.class)))
-   .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+   JobResult jobResult = jobResultFuture.get();
--- End diff --

This should have a timeout. If the timeout is shit we should try to cancel 
he job.


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170572908
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -304,47 +294,17 @@ public Integer getKey(Tuple2 value) {
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();
 
-   final CompletableFuture 
failedFuture =
-   notifyWhenJobStatusIs(jobId, JobStatus.FAILED, 
deadline);
-
-   final CompletableFuture 
cancellationFuture =
-   notifyWhenJobStatusIs(jobId, 
JobStatus.CANCELED, deadline);
-
-   cluster.submitJobDetached(jobGraph);
-
-   try {
-   final TestingJobManagerMessages.JobStatusIs jobStatus =
-   
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-   assertEquals(JobStatus.FAILED, jobStatus.state());
-   } catch (Exception e) {
-
-   // if the assertion fails, it means that the job was 
(falsely) not cancelled.
-   // in this case, and given that the mini-cluster is 
shared with other tests,
-   // we cancel the job and wait for the cancellation so 
that the resources are freed.
-
-   if (jobId != null) {
-   cluster.getLeaderGateway(deadline.timeLeft())
-   .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-   
.mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class));
-
-   
cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-   }
+   clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
-   // and we re-throw the exception.
-   throw e;
-   }
+   CompletableFuture jobResultFuture = 
clusterClient.requestJobResult(jobId);
 
-   // Get the job and check the cause
-   JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
-   cluster.getLeaderGateway(deadline.timeLeft())
-   .ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-   
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobFound.class)))
-   .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+   JobResult jobResult = jobResultFuture.get();
+   assertFalse(jobResult.isSuccess());
 
-   String failureCause = 
jobFound.executionGraph().getFailureInfo().getExceptionAsString();
+   String failureCause = 
jobResult.getSerializedThrowable().get().getFullStringifiedStackTrace();
 
-   assertEquals(JobStatus.FAILED, 
jobFound.executionGraph().getState());
+   CompletableFuture jobStatusFuture = 
clusterClient.getJobStatus(jobId);
+   assertEquals(JobStatus.FAILED, jobStatusFuture.get());
--- End diff --

add timeout


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5579#discussion_r170573094
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -425,18 +385,22 @@ public Integer getKey(Tuple2 value) {
}
}).asQueryableState("hakuna", valueState);
 
-   try (AutoCancellableJob closableJobGraph = new 
AutoCancellableJob(cluster, env, deadline)) {
+   try (AutoCancellableJob closableJobGraph = new 
AutoCancellableJob(clusterClient, env)) {
 
-   // register to be notified when the job is running.
-   
CompletableFuture runningFuture =
-   
notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline);
+   clusterClient.submitJob(
+   closableJobGraph.getJobGraph(), 
AbstractQueryableStateTestBase.class.getClassLoader());
 
-   
cluster.submitJobDetached(closableJobGraph.getJobGraph());
 
-   // expect for the job to be running
-   TestingJobManagerMessages.JobStatusIs jobStatus =
-   
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-   assertEquals(JobStatus.RUNNING, jobStatus.state());
+   CompletableFuture jobStatusFuture =
+   
clusterClient.getJobStatus(closableJobGraph.getJobId());
+
+   while (deadline.hasTimeLeft() && 
!jobStatusFuture.get().equals(JobStatus.RUNNING)) {
--- End diff --

add timeout


---


[GitHub] flink pull request #5579: [FLINK-8778] Migrate queryable state ITCases to us...

2018-02-26 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5579

[FLINK-8778] Migrate queryable state ITCases to use MiniClusterResource

This also covers https://issues.apache.org/jira/browse/FLINK-8757 and 
https://issues.apache.org/jira/browse/FLINK-8758.

R: @tillrohrmann or @GJL 

CC: @zentol because of the whole test-porting business, @kl0u because it's 
QS and because of test porting

For now, the tests don't succeed in FLIP-6 mode (you have to activate the 
`flip-6` Maven profile). When you start a single test in IntelliJ it succeeds, 
when you start the complete suite only the first test succeeds and the others 
stall.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-8757-port-it-cases-flip-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5579.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5579


commit db5e474fe3cf9045f291f654a2bca025806f500c
Author: Aljoscha Krettek 
Date:   2018-02-26T09:12:44Z

[FLINK-8758] Add getters to JobDetailsInfo

commit fc23247596432c5ed025c6532afd5a55283f780b
Author: Aljoscha Krettek 
Date:   2018-02-26T11:28:59Z

[hotfix] Fix stoppable field in JobDetailsInfo

For some reason, the server was sending JSON with a stoppable field, not
the isStoppable field.

commit bdabaecaceb608d57cea2ac21374c86f6f1eeae5
Author: Aljoscha Krettek 
Date:   2018-02-26T10:44:57Z

[FLINK-8757] Add MiniClusterResource.getClusterClient()

commit c739a73d80d975586b58d82d8eb7ef2ec44ef275
Author: Aljoscha Krettek 
Date:   2018-02-26T11:29:23Z

Add proper toString() on JsonResponse in RestClient

commit 01cc7ce9d732cba7b0c51ca0dab87f1e655e09fc
Author: Aljoscha Krettek 
Date:   2018-02-26T10:52:50Z

[FLINK-8758] Make non-blocking ClusterClient.submitJob() public

commit b4388ac50cd8ba6d3ff8fd44e5159320576f9e65
Author: Aljoscha Krettek 
Date:   2018-02-26T10:53:47Z

[FLINK-8758] Add methods for job status/result to ClusterClient

commit 63eccd66217b7a5ec8d399e2bf22378adda8bdb2
Author: Aljoscha Krettek 
Date:   2018-02-26T10:55:14Z

[FLINK-8778] Migrate queryable state ITCases to use MiniClusterResource




---