[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341123#comment-16341123
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5308


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339430#comment-16339430
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5308
  
Thanks for the review @GJL. I've rebased onto the latest master and 
addressed your feedback in 72bebc0.


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339386#comment-16339386
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163877367
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 ---
@@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception {
private volatile boolean finishedByOther;
 
@Override
-   public void jobFinished(JobResult result) {
+   public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
checkArgument(!isJobFinished(), "job finished already");
checkArgument(!isJobFailed(), "job failed already");
 
-   this.result = result;
-   }
-
-   @Override
-   public void jobFailed(JobResult result) {
-   checkArgument(!isJobFinished(), "job finished already");
-   checkArgument(!isJobFailed(), "job failed already");
+   this.result = JobResult.createFrom(executionGraph);
 
-   this.failedCause = 
result.getSerializedThrowable().get();
+   if (!result.isSuccess()) {
--- End diff --

Good idea. I'll leave it like it is though, because this test will be 
removed in one of my other PRs.


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339383#comment-16339383
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163877071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1701,40 +1705,4 @@ void notifyExecutionChange(
}
}
}
-
-   @Override
-   public ArchivedExecutionGraph archive() {
--- End diff --

I think it's better that the `ExecutionGraph` does not know about the 
`ArchivedExecutionGraph` because the coupling should be vice versa.


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339374#comment-16339374
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163874024
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
/**
 * Gets a serialized accumulator map.
 * @return The accumulator map with serialized accumulator values.
-* @throws IOException
 */
@Override
-   public Map getAccumulatorsSerialized() 
throws IOException {
+   public Map getAccumulatorsSerialized() 
{
 
Map accumulatorMap = 
aggregateUserAccumulators();
 
Map result = new 
HashMap<>(accumulatorMap.size());
for (Map.Entry entry : 
accumulatorMap.entrySet()) {
-   result.put(entry.getKey(), new 
SerializedValue<>(entry.getValue().getLocalValue()));
+
+   try {
+   final SerializedValue serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
+   result.put(entry.getKey(), serializedValue);
+   } catch (IOException ioe) {
+   LOG.info("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
--- End diff --

This is mainly a question of behaviour. In case that something goes wrong 
while serializing the accumulators one can either completely fail or try to 
return as much as possible, as it is done here.

I'll change the log level to error and insert a 
`FailedAccumulatorSerialization` entry which throws the exception when being 
accessed instead.


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339353#comment-16339353
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163869963
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 ---
@@ -173,4 +173,23 @@ public static String getResultsFormatted(Map map) {
return accumulators;
}
 
+   /**
+* Serializes the given accumulators.
+*
+* @param accumulators to serialize
+* @return Map of serialized accumulators
+* @throws IOException if an accumulator could not be serialized
+*/
+   public static Map 
serializeAccumulators(Map accumulators) throws 
IOException {
--- End diff --

True, will remove it.


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337672#comment-16337672
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163544619
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 ---
@@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception {
private volatile boolean finishedByOther;
 
@Override
-   public void jobFinished(JobResult result) {
+   public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
checkArgument(!isJobFinished(), "job finished already");
checkArgument(!isJobFailed(), "job failed already");
 
-   this.result = result;
-   }
-
-   @Override
-   public void jobFailed(JobResult result) {
-   checkArgument(!isJobFinished(), "job finished already");
-   checkArgument(!isJobFailed(), "job failed already");
+   this.result = JobResult.createFrom(executionGraph);
 
-   this.failedCause = 
result.getSerializedThrowable().get();
+   if (!result.isSuccess()) {
--- End diff --

Maybe 
```
result.getSerializedThrowable()
.ifPresent(serializedThrowable -> failedCause = 
serializedThrowable);
```
to avoid IntelliJ's inspection warning.


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337670#comment-16337670
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163540049
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1701,40 +1705,4 @@ void notifyExecutionChange(
}
}
}
-
-   @Override
-   public ArchivedExecutionGraph archive() {
--- End diff --

Why is it better to move this logic to `ArchivedExecutionGraph`?


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337669#comment-16337669
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163536073
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 ---
@@ -173,4 +173,23 @@ public static String getResultsFormatted(Map map) {
return accumulators;
}
 
+   /**
+* Serializes the given accumulators.
+*
+* @param accumulators to serialize
+* @return Map of serialized accumulators
+* @throws IOException if an accumulator could not be serialized
+*/
+   public static Map 
serializeAccumulators(Map accumulators) throws 
IOException {
--- End diff --

I think you are not using this method anywhere.


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337671#comment-16337671
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163538231
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
/**
 * Gets a serialized accumulator map.
 * @return The accumulator map with serialized accumulator values.
-* @throws IOException
 */
@Override
-   public Map getAccumulatorsSerialized() 
throws IOException {
+   public Map getAccumulatorsSerialized() 
{
 
Map accumulatorMap = 
aggregateUserAccumulators();
 
Map result = new 
HashMap<>(accumulatorMap.size());
for (Map.Entry entry : 
accumulatorMap.entrySet()) {
-   result.put(entry.getKey(), new 
SerializedValue<>(entry.getValue().getLocalValue()));
+
+   try {
+   final SerializedValue serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
+   result.put(entry.getKey(), serializedValue);
+   } catch (IOException ioe) {
+   LOG.info("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
--- End diff --

Why is it acceptable to change the behavior, i.e., to ignore the exception. 
It is not even logged on `ERROR` level.
Also:
```
LOG.info("Could not serialize accumulator {}.", entry.getKey(), ioe);
```


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330267#comment-16330267
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5308

[FLINK-8449] [flip6] Extend OnCompletionActions to accept an 
SerializableExecutionGraph

## What is the purpose of the change

This commit introduces the SerializableExecutionGraph which extends the
AccessExecutionGraph and adds serializability to it. Moreover, this commit
changes the OnCompletionActions interface such that it accepts a
SerializableExecutionGraph instead of a plain JobResult. This allows to
archive the completed ExecutionGraph for further usage in the container
component of the JobMasterRunner.

## Brief change log

- Introduce `SerializableExecutionGraph`
- Introduce `DummyExecutionGraph` for testing purposes
- Change `OnCompletionActions` to accept a `SerializableExecutionGraph`

## Verifying this change

This change is already covered by existing tests, such as `DispatcherTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink extendOnCompleteActions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5308


commit 9883bed1cca16a419a46fe09ad1498ada42d67e2
Author: Till Rohrmann 
Date:   2018-01-16T17:45:53Z

[FLINK-8449] [flip6] Extend OnCompletionActions to accept an 
SerializableExecutionGraph

This commit introduces the SerializableExecutionGraph which extends the
AccessExecutionGraph and adds serializability to it. Moreover, this commit
changes the OnCompletionActions interface such that it accepts a
SerializableExecutionGraph instead of a plain JobResult. This allows to
archive the completed ExecutionGraph for further usage in the container
component of the JobMasterRunner.




> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)