[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341123#comment-16341123 ] ASF GitHub Bot commented on FLINK-8449: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5308 > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339430#comment-16339430 ] ASF GitHub Bot commented on FLINK-8449: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5308 Thanks for the review @GJL. I've rebased onto the latest master and addressed your feedback in 72bebc0. > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339386#comment-16339386 ] ASF GitHub Bot commented on FLINK-8449: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163877367 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java --- @@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception { private volatile boolean finishedByOther; @Override - public void jobFinished(JobResult result) { + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { checkArgument(!isJobFinished(), "job finished already"); checkArgument(!isJobFailed(), "job failed already"); - this.result = result; - } - - @Override - public void jobFailed(JobResult result) { - checkArgument(!isJobFinished(), "job finished already"); - checkArgument(!isJobFailed(), "job failed already"); + this.result = JobResult.createFrom(executionGraph); - this.failedCause = result.getSerializedThrowable().get(); + if (!result.isSuccess()) { --- End diff -- Good idea. I'll leave it like it is though, because this test will be removed in one of my other PRs. > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339383#comment-16339383 ] ASF GitHub Bot commented on FLINK-8449: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163877071 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -1701,40 +1705,4 @@ void notifyExecutionChange( } } } - - @Override - public ArchivedExecutionGraph archive() { --- End diff -- I think it's better that the `ExecutionGraph` does not know about the `ArchivedExecutionGraph` because the coupling should be vice versa. > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339374#comment-16339374 ] ASF GitHub Bot commented on FLINK-8449: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163874024 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -752,16 +751,21 @@ public Executor getFutureExecutor() { /** * Gets a serialized accumulator map. * @return The accumulator map with serialized accumulator values. -* @throws IOException */ @Override - public MapgetAccumulatorsSerialized() throws IOException { + public Map getAccumulatorsSerialized() { Map accumulatorMap = aggregateUserAccumulators(); Map result = new HashMap<>(accumulatorMap.size()); for (Map.Entry entry : accumulatorMap.entrySet()) { - result.put(entry.getKey(), new SerializedValue<>(entry.getValue().getLocalValue())); + + try { + final SerializedValue serializedValue = new SerializedValue<>(entry.getValue().getLocalValue()); + result.put(entry.getKey(), serializedValue); + } catch (IOException ioe) { + LOG.info("Could not serialize accumulator " + entry.getKey() + '.', ioe); --- End diff -- This is mainly a question of behaviour. In case that something goes wrong while serializing the accumulators one can either completely fail or try to return as much as possible, as it is done here. I'll change the log level to error and insert a `FailedAccumulatorSerialization` entry which throws the exception when being accessed instead. > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339353#comment-16339353 ] ASF GitHub Bot commented on FLINK-8449: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163869963 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java --- @@ -173,4 +173,23 @@ public static String getResultsFormatted(Mapmap) { return accumulators; } + /** +* Serializes the given accumulators. +* +* @param accumulators to serialize +* @return Map of serialized accumulators +* @throws IOException if an accumulator could not be serialized +*/ + public static Map serializeAccumulators(Map accumulators) throws IOException { --- End diff -- True, will remove it. > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337672#comment-16337672 ] ASF GitHub Bot commented on FLINK-8449: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163544619 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java --- @@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception { private volatile boolean finishedByOther; @Override - public void jobFinished(JobResult result) { + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { checkArgument(!isJobFinished(), "job finished already"); checkArgument(!isJobFailed(), "job failed already"); - this.result = result; - } - - @Override - public void jobFailed(JobResult result) { - checkArgument(!isJobFinished(), "job finished already"); - checkArgument(!isJobFailed(), "job failed already"); + this.result = JobResult.createFrom(executionGraph); - this.failedCause = result.getSerializedThrowable().get(); + if (!result.isSuccess()) { --- End diff -- Maybe ``` result.getSerializedThrowable() .ifPresent(serializedThrowable -> failedCause = serializedThrowable); ``` to avoid IntelliJ's inspection warning. > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337670#comment-16337670 ] ASF GitHub Bot commented on FLINK-8449: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163540049 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -1701,40 +1705,4 @@ void notifyExecutionChange( } } } - - @Override - public ArchivedExecutionGraph archive() { --- End diff -- Why is it better to move this logic to `ArchivedExecutionGraph`? > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337669#comment-16337669 ] ASF GitHub Bot commented on FLINK-8449: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163536073 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java --- @@ -173,4 +173,23 @@ public static String getResultsFormatted(Mapmap) { return accumulators; } + /** +* Serializes the given accumulators. +* +* @param accumulators to serialize +* @return Map of serialized accumulators +* @throws IOException if an accumulator could not be serialized +*/ + public static Map serializeAccumulators(Map accumulators) throws IOException { --- End diff -- I think you are not using this method anywhere. > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337671#comment-16337671 ] ASF GitHub Bot commented on FLINK-8449: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5308#discussion_r163538231 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -752,16 +751,21 @@ public Executor getFutureExecutor() { /** * Gets a serialized accumulator map. * @return The accumulator map with serialized accumulator values. -* @throws IOException */ @Override - public MapgetAccumulatorsSerialized() throws IOException { + public Map getAccumulatorsSerialized() { Map accumulatorMap = aggregateUserAccumulators(); Map result = new HashMap<>(accumulatorMap.size()); for (Map.Entry entry : accumulatorMap.entrySet()) { - result.put(entry.getKey(), new SerializedValue<>(entry.getValue().getLocalValue())); + + try { + final SerializedValue serializedValue = new SerializedValue<>(entry.getValue().getLocalValue()); + result.put(entry.getKey(), serializedValue); + } catch (IOException ioe) { + LOG.info("Could not serialize accumulator " + entry.getKey() + '.', ioe); --- End diff -- Why is it acceptable to change the behavior, i.e., to ignore the exception. It is not even logged on `ERROR` level. Also: ``` LOG.info("Could not serialize accumulator {}.", entry.getKey(), ioe); ``` > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330267#comment-16330267 ] ASF GitHub Bot commented on FLINK-8449: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5308 [FLINK-8449] [flip6] Extend OnCompletionActions to accept an SerializableExecutionGraph ## What is the purpose of the change This commit introduces the SerializableExecutionGraph which extends the AccessExecutionGraph and adds serializability to it. Moreover, this commit changes the OnCompletionActions interface such that it accepts a SerializableExecutionGraph instead of a plain JobResult. This allows to archive the completed ExecutionGraph for further usage in the container component of the JobMasterRunner. ## Brief change log - Introduce `SerializableExecutionGraph` - Introduce `DummyExecutionGraph` for testing purposes - Change `OnCompletionActions` to accept a `SerializableExecutionGraph` ## Verifying this change This change is already covered by existing tests, such as `DispatcherTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink extendOnCompleteActions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5308.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5308 commit 9883bed1cca16a419a46fe09ad1498ada42d67e2 Author: Till RohrmannDate: 2018-01-16T17:45:53Z [FLINK-8449] [flip6] Extend OnCompletionActions to accept an SerializableExecutionGraph This commit introduces the SerializableExecutionGraph which extends the AccessExecutionGraph and adds serializability to it. Moreover, this commit changes the OnCompletionActions interface such that it accepts a SerializableExecutionGraph instead of a plain JobResult. This allows to archive the completed ExecutionGraph for further usage in the container component of the JobMasterRunner. > Extend OnCompletionActions to receive AccessExecutionGraph > -- > > Key: FLINK-8449 > URL: https://issues.apache.org/jira/browse/FLINK-8449 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{OnCompletionAction}} currently only receives the {{JobResult}} when the > job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we > should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} > contains all the information to derive the {{JobResult}} and additionally the > information needed for serving information about completed jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)