[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5207 ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r161010273 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } + private JobExecutionResult waitForJobExecutionResult( + final JobID jobId) throws ProgramInvocationException { + + final JobMessageParameters messageParameters = new JobMessageParameters(); + messageParameters.jobPathParameter.resolve(jobId); + JobExecutionResultResponseBody jobExecutionResultResponseBody; + try { + long attempt = 0; + do { + final CompletableFuture responseFuture = + restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), + JobExecutionResultHeaders.getInstance(), + messageParameters); + jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); + attempt++; + } + while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); --- End diff -- Alright, then let's do it there. ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160997382 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } + private JobExecutionResult waitForJobExecutionResult( + final JobID jobId) throws ProgramInvocationException { + + final JobMessageParameters messageParameters = new JobMessageParameters(); + messageParameters.jobPathParameter.resolve(jobId); + JobExecutionResultResponseBody jobExecutionResultResponseBody; + try { + long attempt = 0; + do { + final CompletableFuture responseFuture = + restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), + JobExecutionResultHeaders.getInstance(), + messageParameters); + jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); + attempt++; + } + while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); --- End diff -- Can change it to `getStatus().getId()` to avoid redundancy. However, the code will be touched once more in #5223. I can do it there. ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160980332 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -104,11 +122,26 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad } catch (JobSubmissionException e) { throw new ProgramInvocationException(e); } - // don't return just a JobSubmissionResult here, the signature is lying - // The CliFrontend expects this to be a JobExecutionResult - // TOOD: do not exit this method until job is finished - return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap()); + final JobExecutionResult jobExecutionResult = waitForJobExecutionResult(jobGraph.getJobID()); + + if (jobExecutionResult.getSerializedThrowable().isPresent()) { + final SerializedThrowable serializedThrowable = jobExecutionResult.getSerializedThrowable().get(); + final Throwable throwable = serializedThrowable.deserializeError(classLoader); + throw new ProgramInvocationException(throwable); + } + + try { + // don't return just a JobSubmissionResult here, the signature is lying + // The CliFrontend expects this to be a JobExecutionResult + this.lastJobExecutionResult = new SerializedJobExecutionResult( + jobExecutionResult.getJobId(), + jobExecutionResult.getNetRuntime(), + jobExecutionResult.getAccumulatorResults()).toJobExecutionResult(classLoader); --- End diff -- I need the `JobExecutionResult` here; not only accumulators. ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160979808 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategy.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest.retry; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * {@link WaitStrategy} with exponentially increasing sleep time. + */ +public class ExponentialWaitStrategy implements WaitStrategy { + + private final long initialWait; + + private final long maxWait; + + public ExponentialWaitStrategy(final long initialWait, final long maxWait) { + checkArgument(initialWait > 0, "initialWait must be positive, was %s", initialWait); + checkArgument(maxWait > 0, "maxWait must be positive, was %s", maxWait); + this.initialWait = initialWait; + this.maxWait = maxWait; --- End diff -- Ok, will add a check. ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160976558 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategy.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest.retry; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * {@link WaitStrategy} with exponentially increasing sleep time. + */ +public class ExponentialWaitStrategy implements WaitStrategy { + + private final long initialWait; + + private final long maxWait; + + public ExponentialWaitStrategy(final long initialWait, final long maxWait) { + checkArgument(initialWait > 0, "initialWait must be positive, was %s", initialWait); + checkArgument(maxWait > 0, "maxWait must be positive, was %s", maxWait); + this.initialWait = initialWait; + this.maxWait = maxWait; + } + + @Override + public long sleepTime(final long attempt) { + final long exponentialSleepTime = initialWait * Math.round(Math.pow(2, attempt)); --- End diff -- `1L` can be only shifted 62 times: ``` public static void main(String[] args) { System.out.println(1L << 62); System.out.println(1L << 63); System.out.println(1L << 64); } ``` output ``` 4611686018427387904 -9223372036854775808 1 ``` ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160125564 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategy.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest.retry; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * {@link WaitStrategy} with exponentially increasing sleep time. + */ +public class ExponentialWaitStrategy implements WaitStrategy { + + private final long initialWait; + + private final long maxWait; + + public ExponentialWaitStrategy(final long initialWait, final long maxWait) { + checkArgument(initialWait > 0, "initialWait must be positive, was %s", initialWait); + checkArgument(maxWait > 0, "maxWait must be positive, was %s", maxWait); + this.initialWait = initialWait; + this.maxWait = maxWait; --- End diff -- Should `maxWait` be larger than `initialWait`? Otherwise one would see a decrease in the waiting time between the first and the second waiting attempt. ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160128690 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategy.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest.retry; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * {@link WaitStrategy} with exponentially increasing sleep time. + */ +public class ExponentialWaitStrategy implements WaitStrategy { + + private final long initialWait; + + private final long maxWait; + + public ExponentialWaitStrategy(final long initialWait, final long maxWait) { + checkArgument(initialWait > 0, "initialWait must be positive, was %s", initialWait); + checkArgument(maxWait > 0, "maxWait must be positive, was %s", maxWait); + this.initialWait = initialWait; + this.maxWait = maxWait; + } + + @Override + public long sleepTime(final long attempt) { + final long exponentialSleepTime = initialWait * Math.round(Math.pow(2, attempt)); --- End diff -- `Math.pow(2, attempt)` could also be calculated cheaper with a bit shift operation. ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160127783 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -104,11 +122,26 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad } catch (JobSubmissionException e) { throw new ProgramInvocationException(e); } - // don't return just a JobSubmissionResult here, the signature is lying - // The CliFrontend expects this to be a JobExecutionResult - // TOOD: do not exit this method until job is finished - return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap()); + final JobExecutionResult jobExecutionResult = waitForJobExecutionResult(jobGraph.getJobID()); + + if (jobExecutionResult.getSerializedThrowable().isPresent()) { + final SerializedThrowable serializedThrowable = jobExecutionResult.getSerializedThrowable().get(); + final Throwable throwable = serializedThrowable.deserializeError(classLoader); + throw new ProgramInvocationException(throwable); + } + + try { + // don't return just a JobSubmissionResult here, the signature is lying + // The CliFrontend expects this to be a JobExecutionResult + this.lastJobExecutionResult = new SerializedJobExecutionResult( + jobExecutionResult.getJobId(), + jobExecutionResult.getNetRuntime(), + jobExecutionResult.getAccumulatorResults()).toJobExecutionResult(classLoader); --- End diff -- We could also directly use `AccumulatorHelper#deserializeAccumulators`. ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160125031 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } + private JobExecutionResult waitForJobExecutionResult( + final JobID jobId) throws ProgramInvocationException { + + final JobMessageParameters messageParameters = new JobMessageParameters(); + messageParameters.jobPathParameter.resolve(jobId); + JobExecutionResultResponseBody jobExecutionResultResponseBody; + try { + long attempt = 0; + do { + final CompletableFuture responseFuture = + restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), + JobExecutionResultHeaders.getInstance(), + messageParameters); + jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); + attempt++; + } + while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); --- End diff -- `getStatus().getStatusId()` looks not so nice tbh. ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5207 [FLINK-8299][flip6] Poll JobExecutionResult after job submission ## What is the purpose of the change *Poll JobExecutionResult after job submission. This is needed, for example, to enable `collect()` calls from the job in FLIP-6 mode. This PR is based on #5194.* CC: @tillrohrmann ## Brief change log - *Retrieve JobExecutionResult after job submission in `RestClusterClient`* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for all new classes and changed classes.* - *Manually run job in examples/batch/WordCount.jar and verified that the results are correctly collected/printed.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8299 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5207.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5207 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyaoDate: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c Author: gyao Date: 2017-12-22T23:02:10Z [FLINK-8299][flip6] Retrieve JobExecutionResult after job submission ---