[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5207


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r161010273
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
+   private JobExecutionResult waitForJobExecutionResult(
+   final JobID jobId) throws ProgramInvocationException {
+
+   final JobMessageParameters messageParameters = new 
JobMessageParameters();
+   messageParameters.jobPathParameter.resolve(jobId);
+   JobExecutionResultResponseBody jobExecutionResultResponseBody;
+   try {
+   long attempt = 0;
+   do {
+   final 
CompletableFuture responseFuture =
+   restClient.sendRequest(
+   
restClusterClientConfiguration.getRestServerAddress(),
+   
restClusterClientConfiguration.getRestServerPort(),
+   
JobExecutionResultHeaders.getInstance(),
+   messageParameters);
+   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
+   attempt++;
+   }
+   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
--- End diff --

Alright, then let's do it there.


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160997382
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
+   private JobExecutionResult waitForJobExecutionResult(
+   final JobID jobId) throws ProgramInvocationException {
+
+   final JobMessageParameters messageParameters = new 
JobMessageParameters();
+   messageParameters.jobPathParameter.resolve(jobId);
+   JobExecutionResultResponseBody jobExecutionResultResponseBody;
+   try {
+   long attempt = 0;
+   do {
+   final 
CompletableFuture responseFuture =
+   restClient.sendRequest(
+   
restClusterClientConfiguration.getRestServerAddress(),
+   
restClusterClientConfiguration.getRestServerPort(),
+   
JobExecutionResultHeaders.getInstance(),
+   messageParameters);
+   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
+   attempt++;
+   }
+   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
--- End diff --

Can change it to `getStatus().getId()` to avoid redundancy. However, the 
code will be touched once more in #5223. I can do it there.


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160980332
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -104,11 +122,26 @@ protected JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoad
} catch (JobSubmissionException e) {
throw new ProgramInvocationException(e);
}
-   // don't return just a JobSubmissionResult here, the signature 
is lying
-   // The CliFrontend expects this to be a JobExecutionResult
 
-   // TOOD: do not exit this method until job is finished
-   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   final JobExecutionResult jobExecutionResult = 
waitForJobExecutionResult(jobGraph.getJobID());
+
+   if (jobExecutionResult.getSerializedThrowable().isPresent()) {
+   final SerializedThrowable serializedThrowable = 
jobExecutionResult.getSerializedThrowable().get();
+   final Throwable throwable = 
serializedThrowable.deserializeError(classLoader);
+   throw new ProgramInvocationException(throwable);
+   }
+
+   try {
+   // don't return just a JobSubmissionResult here, the 
signature is lying
+   // The CliFrontend expects this to be a 
JobExecutionResult
+   this.lastJobExecutionResult = new 
SerializedJobExecutionResult(
+   jobExecutionResult.getJobId(),
+   jobExecutionResult.getNetRuntime(),
+   
jobExecutionResult.getAccumulatorResults()).toJobExecutionResult(classLoader);
--- End diff --

I need the `JobExecutionResult` here; not only accumulators.


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160979808
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategy.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest.retry;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * {@link WaitStrategy} with exponentially increasing sleep time.
+ */
+public class ExponentialWaitStrategy implements WaitStrategy {
+
+   private final long initialWait;
+
+   private final long maxWait;
+
+   public ExponentialWaitStrategy(final long initialWait, final long 
maxWait) {
+   checkArgument(initialWait > 0, "initialWait must be positive, 
was %s", initialWait);
+   checkArgument(maxWait > 0, "maxWait must be positive, was %s", 
maxWait);
+   this.initialWait = initialWait;
+   this.maxWait = maxWait;
--- End diff --

Ok, will add a check.


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160976558
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategy.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest.retry;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * {@link WaitStrategy} with exponentially increasing sleep time.
+ */
+public class ExponentialWaitStrategy implements WaitStrategy {
+
+   private final long initialWait;
+
+   private final long maxWait;
+
+   public ExponentialWaitStrategy(final long initialWait, final long 
maxWait) {
+   checkArgument(initialWait > 0, "initialWait must be positive, 
was %s", initialWait);
+   checkArgument(maxWait > 0, "maxWait must be positive, was %s", 
maxWait);
+   this.initialWait = initialWait;
+   this.maxWait = maxWait;
+   }
+
+   @Override
+   public long sleepTime(final long attempt) {
+   final long exponentialSleepTime = initialWait * 
Math.round(Math.pow(2, attempt));
--- End diff --

`1L` can be only shifted 62 times:

```
public static void main(String[] args) {
System.out.println(1L << 62);
System.out.println(1L << 63);
System.out.println(1L << 64);
}
```
output
```
4611686018427387904
-9223372036854775808
1
```


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160125564
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategy.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest.retry;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * {@link WaitStrategy} with exponentially increasing sleep time.
+ */
+public class ExponentialWaitStrategy implements WaitStrategy {
+
+   private final long initialWait;
+
+   private final long maxWait;
+
+   public ExponentialWaitStrategy(final long initialWait, final long 
maxWait) {
+   checkArgument(initialWait > 0, "initialWait must be positive, 
was %s", initialWait);
+   checkArgument(maxWait > 0, "maxWait must be positive, was %s", 
maxWait);
+   this.initialWait = initialWait;
+   this.maxWait = maxWait;
--- End diff --

Should `maxWait` be larger than `initialWait`? Otherwise one would see a 
decrease in the waiting time between the first and the second waiting attempt.


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160128690
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategy.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest.retry;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * {@link WaitStrategy} with exponentially increasing sleep time.
+ */
+public class ExponentialWaitStrategy implements WaitStrategy {
+
+   private final long initialWait;
+
+   private final long maxWait;
+
+   public ExponentialWaitStrategy(final long initialWait, final long 
maxWait) {
+   checkArgument(initialWait > 0, "initialWait must be positive, 
was %s", initialWait);
+   checkArgument(maxWait > 0, "maxWait must be positive, was %s", 
maxWait);
+   this.initialWait = initialWait;
+   this.maxWait = maxWait;
+   }
+
+   @Override
+   public long sleepTime(final long attempt) {
+   final long exponentialSleepTime = initialWait * 
Math.round(Math.pow(2, attempt));
--- End diff --

`Math.pow(2, attempt)` could also be calculated cheaper with a bit shift 
operation.


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160127783
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -104,11 +122,26 @@ protected JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoad
} catch (JobSubmissionException e) {
throw new ProgramInvocationException(e);
}
-   // don't return just a JobSubmissionResult here, the signature 
is lying
-   // The CliFrontend expects this to be a JobExecutionResult
 
-   // TOOD: do not exit this method until job is finished
-   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   final JobExecutionResult jobExecutionResult = 
waitForJobExecutionResult(jobGraph.getJobID());
+
+   if (jobExecutionResult.getSerializedThrowable().isPresent()) {
+   final SerializedThrowable serializedThrowable = 
jobExecutionResult.getSerializedThrowable().get();
+   final Throwable throwable = 
serializedThrowable.deserializeError(classLoader);
+   throw new ProgramInvocationException(throwable);
+   }
+
+   try {
+   // don't return just a JobSubmissionResult here, the 
signature is lying
+   // The CliFrontend expects this to be a 
JobExecutionResult
+   this.lastJobExecutionResult = new 
SerializedJobExecutionResult(
+   jobExecutionResult.getJobId(),
+   jobExecutionResult.getNetRuntime(),
+   
jobExecutionResult.getAccumulatorResults()).toJobExecutionResult(classLoader);
--- End diff --

We could also directly use `AccumulatorHelper#deserializeAccumulators`.


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160125031
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
+   private JobExecutionResult waitForJobExecutionResult(
+   final JobID jobId) throws ProgramInvocationException {
+
+   final JobMessageParameters messageParameters = new 
JobMessageParameters();
+   messageParameters.jobPathParameter.resolve(jobId);
+   JobExecutionResultResponseBody jobExecutionResultResponseBody;
+   try {
+   long attempt = 0;
+   do {
+   final 
CompletableFuture responseFuture =
+   restClient.sendRequest(
+   
restClusterClientConfiguration.getRestServerAddress(),
+   
restClusterClientConfiguration.getRestServerPort(),
+   
JobExecutionResultHeaders.getInstance(),
+   messageParameters);
+   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
+   attempt++;
+   }
+   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
--- End diff --

`getStatus().getStatusId()` looks not so nice tbh.


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2017-12-24 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5207

[FLINK-8299][flip6] Poll JobExecutionResult after job submission

## What is the purpose of the change

*Poll JobExecutionResult after job submission. This is needed, for example, 
to enable `collect()` calls from the job in FLIP-6 mode. This PR is based on 
#5194.*

CC: @tillrohrmann 


## Brief change log

  - *Retrieve JobExecutionResult after job submission in 
`RestClusterClient`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for all new classes and changed classes.*
  - *Manually run job in examples/batch/WordCount.jar and verified that the 
results are correctly collected/printed.*
  
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8299

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5207


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.

commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c
Author: gyao 
Date:   2017-12-22T23:02:10Z

[FLINK-8299][flip6] Retrieve JobExecutionResult after job submission




---