[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5308 ---
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163877367 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java --- @@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception { private volatile boolean finishedByOther; @Override - public void jobFinished(JobResult result) { + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { checkArgument(!isJobFinished(), "job finished already"); checkArgument(!isJobFailed(), "job failed already"); - this.result = result; - } - - @Override - public void jobFailed(JobResult result) { - checkArgument(!isJobFinished(), "job finished already"); - checkArgument(!isJobFailed(), "job failed already"); + this.result = JobResult.createFrom(executionGraph); - this.failedCause = result.getSerializedThrowable().get(); + if (!result.isSuccess()) { --- End diff -- Good idea. I'll leave it like it is though, because this test will be removed in one of my other PRs. ---
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163877071 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -1701,40 +1705,4 @@ void notifyExecutionChange( } } } - - @Override - public ArchivedExecutionGraph archive() { --- End diff -- I think it's better that the `ExecutionGraph` does not know about the `ArchivedExecutionGraph` because the coupling should be vice versa. ---
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163874024 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -752,16 +751,21 @@ public Executor getFutureExecutor() { /** * Gets a serialized accumulator map. * @return The accumulator map with serialized accumulator values. -* @throws IOException */ @Override - public Map> getAccumulatorsSerialized() throws IOException { + public Map> getAccumulatorsSerialized() { Map> accumulatorMap = aggregateUserAccumulators(); Map> result = new HashMap<>(accumulatorMap.size()); for (Map.Entry> entry : accumulatorMap.entrySet()) { - result.put(entry.getKey(), new SerializedValue<>(entry.getValue().getLocalValue())); + + try { + final SerializedValue serializedValue = new SerializedValue<>(entry.getValue().getLocalValue()); + result.put(entry.getKey(), serializedValue); + } catch (IOException ioe) { + LOG.info("Could not serialize accumulator " + entry.getKey() + '.', ioe); --- End diff -- This is mainly a question of behaviour. In case that something goes wrong while serializing the accumulators one can either completely fail or try to return as much as possible, as it is done here. I'll change the log level to error and insert a `FailedAccumulatorSerialization` entry which throws the exception when being accessed instead. ---
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163869963 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java --- @@ -173,4 +173,23 @@ public static String getResultsFormatted(Map map) { return accumulators; } + /** +* Serializes the given accumulators. +* +* @param accumulators to serialize +* @return Map of serialized accumulators +* @throws IOException if an accumulator could not be serialized +*/ + public static Map> serializeAccumulators(Map> accumulators) throws IOException { --- End diff -- True, will remove it. ---
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163538231 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -752,16 +751,21 @@ public Executor getFutureExecutor() { /** * Gets a serialized accumulator map. * @return The accumulator map with serialized accumulator values. -* @throws IOException */ @Override - public Map> getAccumulatorsSerialized() throws IOException { + public Map> getAccumulatorsSerialized() { Map> accumulatorMap = aggregateUserAccumulators(); Map> result = new HashMap<>(accumulatorMap.size()); for (Map.Entry> entry : accumulatorMap.entrySet()) { - result.put(entry.getKey(), new SerializedValue<>(entry.getValue().getLocalValue())); + + try { + final SerializedValue serializedValue = new SerializedValue<>(entry.getValue().getLocalValue()); + result.put(entry.getKey(), serializedValue); + } catch (IOException ioe) { + LOG.info("Could not serialize accumulator " + entry.getKey() + '.', ioe); --- End diff -- Why is it acceptable to change the behavior, i.e., to ignore the exception. It is not even logged on `ERROR` level. Also: ``` LOG.info("Could not serialize accumulator {}.", entry.getKey(), ioe); ``` ---
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163536073 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java --- @@ -173,4 +173,23 @@ public static String getResultsFormatted(Map map) { return accumulators; } + /** +* Serializes the given accumulators. +* +* @param accumulators to serialize +* @return Map of serialized accumulators +* @throws IOException if an accumulator could not be serialized +*/ + public static Map> serializeAccumulators(Map> accumulators) throws IOException { --- End diff -- I think you are not using this method anywhere. ---
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163540049 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -1701,40 +1705,4 @@ void notifyExecutionChange( } } } - - @Override - public ArchivedExecutionGraph archive() { --- End diff -- Why is it better to move this logic to `ArchivedExecutionGraph`? ---
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163544619 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java --- @@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception { private volatile boolean finishedByOther; @Override - public void jobFinished(JobResult result) { + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { checkArgument(!isJobFinished(), "job finished already"); checkArgument(!isJobFailed(), "job failed already"); - this.result = result; - } - - @Override - public void jobFailed(JobResult result) { - checkArgument(!isJobFinished(), "job finished already"); - checkArgument(!isJobFailed(), "job failed already"); + this.result = JobResult.createFrom(executionGraph); - this.failedCause = result.getSerializedThrowable().get(); + if (!result.isSuccess()) { --- End diff -- Maybe ``` result.getSerializedThrowable() .ifPresent(serializedThrowable -> failedCause = serializedThrowable); ``` to avoid IntelliJ's inspection warning. ---
[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5308 [FLINK-8449] [flip6] Extend OnCompletionActions to accept an SerializableExecutionGraph ## What is the purpose of the change This commit introduces the SerializableExecutionGraph which extends the AccessExecutionGraph and adds serializability to it. Moreover, this commit changes the OnCompletionActions interface such that it accepts a SerializableExecutionGraph instead of a plain JobResult. This allows to archive the completed ExecutionGraph for further usage in the container component of the JobMasterRunner. ## Brief change log - Introduce `SerializableExecutionGraph` - Introduce `DummyExecutionGraph` for testing purposes - Change `OnCompletionActions` to accept a `SerializableExecutionGraph` ## Verifying this change This change is already covered by existing tests, such as `DispatcherTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink extendOnCompleteActions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5308.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5308 commit 9883bed1cca16a419a46fe09ad1498ada42d67e2 Author: Till Rohrmann Date: 2018-01-16T17:45:53Z [FLINK-8449] [flip6] Extend OnCompletionActions to accept an SerializableExecutionGraph This commit introduces the SerializableExecutionGraph which extends the AccessExecutionGraph and adds serializability to it. Moreover, this commit changes the OnCompletionActions interface such that it accepts a SerializableExecutionGraph instead of a plain JobResult. This allows to archive the completed ExecutionGraph for further usage in the container component of the JobMasterRunner. ---