[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5308


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163877367
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 ---
@@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception {
private volatile boolean finishedByOther;
 
@Override
-   public void jobFinished(JobResult result) {
+   public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
checkArgument(!isJobFinished(), "job finished already");
checkArgument(!isJobFailed(), "job failed already");
 
-   this.result = result;
-   }
-
-   @Override
-   public void jobFailed(JobResult result) {
-   checkArgument(!isJobFinished(), "job finished already");
-   checkArgument(!isJobFailed(), "job failed already");
+   this.result = JobResult.createFrom(executionGraph);
 
-   this.failedCause = 
result.getSerializedThrowable().get();
+   if (!result.isSuccess()) {
--- End diff --

Good idea. I'll leave it like it is though, because this test will be 
removed in one of my other PRs.


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163877071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1701,40 +1705,4 @@ void notifyExecutionChange(
}
}
}
-
-   @Override
-   public ArchivedExecutionGraph archive() {
--- End diff --

I think it's better that the `ExecutionGraph` does not know about the 
`ArchivedExecutionGraph` because the coupling should be vice versa.


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163874024
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
/**
 * Gets a serialized accumulator map.
 * @return The accumulator map with serialized accumulator values.
-* @throws IOException
 */
@Override
-   public Map> getAccumulatorsSerialized() 
throws IOException {
+   public Map> getAccumulatorsSerialized() 
{
 
Map> accumulatorMap = 
aggregateUserAccumulators();
 
Map> result = new 
HashMap<>(accumulatorMap.size());
for (Map.Entry> entry : 
accumulatorMap.entrySet()) {
-   result.put(entry.getKey(), new 
SerializedValue<>(entry.getValue().getLocalValue()));
+
+   try {
+   final SerializedValue serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
+   result.put(entry.getKey(), serializedValue);
+   } catch (IOException ioe) {
+   LOG.info("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
--- End diff --

This is mainly a question of behaviour. In case that something goes wrong 
while serializing the accumulators one can either completely fail or try to 
return as much as possible, as it is done here.

I'll change the log level to error and insert a 
`FailedAccumulatorSerialization` entry which throws the exception when being 
accessed instead.


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163869963
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 ---
@@ -173,4 +173,23 @@ public static String getResultsFormatted(Map map) {
return accumulators;
}
 
+   /**
+* Serializes the given accumulators.
+*
+* @param accumulators to serialize
+* @return Map of serialized accumulators
+* @throws IOException if an accumulator could not be serialized
+*/
+   public static Map> 
serializeAccumulators(Map> accumulators) throws 
IOException {
--- End diff --

True, will remove it.


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163538231
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
/**
 * Gets a serialized accumulator map.
 * @return The accumulator map with serialized accumulator values.
-* @throws IOException
 */
@Override
-   public Map> getAccumulatorsSerialized() 
throws IOException {
+   public Map> getAccumulatorsSerialized() 
{
 
Map> accumulatorMap = 
aggregateUserAccumulators();
 
Map> result = new 
HashMap<>(accumulatorMap.size());
for (Map.Entry> entry : 
accumulatorMap.entrySet()) {
-   result.put(entry.getKey(), new 
SerializedValue<>(entry.getValue().getLocalValue()));
+
+   try {
+   final SerializedValue serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
+   result.put(entry.getKey(), serializedValue);
+   } catch (IOException ioe) {
+   LOG.info("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
--- End diff --

Why is it acceptable to change the behavior, i.e., to ignore the exception. 
It is not even logged on `ERROR` level.
Also:
```
LOG.info("Could not serialize accumulator {}.", entry.getKey(), ioe);
```


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163536073
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 ---
@@ -173,4 +173,23 @@ public static String getResultsFormatted(Map map) {
return accumulators;
}
 
+   /**
+* Serializes the given accumulators.
+*
+* @param accumulators to serialize
+* @return Map of serialized accumulators
+* @throws IOException if an accumulator could not be serialized
+*/
+   public static Map> 
serializeAccumulators(Map> accumulators) throws 
IOException {
--- End diff --

I think you are not using this method anywhere.


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163540049
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1701,40 +1705,4 @@ void notifyExecutionChange(
}
}
}
-
-   @Override
-   public ArchivedExecutionGraph archive() {
--- End diff --

Why is it better to move this logic to `ArchivedExecutionGraph`?


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5308#discussion_r163544619
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 ---
@@ -250,19 +255,15 @@ public void testRegainLeadership() throws Exception {
private volatile boolean finishedByOther;
 
@Override
-   public void jobFinished(JobResult result) {
+   public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
checkArgument(!isJobFinished(), "job finished already");
checkArgument(!isJobFailed(), "job failed already");
 
-   this.result = result;
-   }
-
-   @Override
-   public void jobFailed(JobResult result) {
-   checkArgument(!isJobFinished(), "job finished already");
-   checkArgument(!isJobFailed(), "job failed already");
+   this.result = JobResult.createFrom(executionGraph);
 
-   this.failedCause = 
result.getSerializedThrowable().get();
+   if (!result.isSuccess()) {
--- End diff --

Maybe 
```
result.getSerializedThrowable()
.ifPresent(serializedThrowable -> failedCause = 
serializedThrowable);
```
to avoid IntelliJ's inspection warning.


---


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-18 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5308

[FLINK-8449] [flip6] Extend OnCompletionActions to accept an 
SerializableExecutionGraph

## What is the purpose of the change

This commit introduces the SerializableExecutionGraph which extends the
AccessExecutionGraph and adds serializability to it. Moreover, this commit
changes the OnCompletionActions interface such that it accepts a
SerializableExecutionGraph instead of a plain JobResult. This allows to
archive the completed ExecutionGraph for further usage in the container
component of the JobMasterRunner.

## Brief change log

- Introduce `SerializableExecutionGraph`
- Introduce `DummyExecutionGraph` for testing purposes
- Change `OnCompletionActions` to accept a `SerializableExecutionGraph`

## Verifying this change

This change is already covered by existing tests, such as `DispatcherTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink extendOnCompleteActions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5308


commit 9883bed1cca16a419a46fe09ad1498ada42d67e2
Author: Till Rohrmann 
Date:   2018-01-16T17:45:53Z

[FLINK-8449] [flip6] Extend OnCompletionActions to accept an 
SerializableExecutionGraph

This commit introduces the SerializableExecutionGraph which extends the
AccessExecutionGraph and adds serializability to it. Moreover, this commit
changes the OnCompletionActions interface such that it accepts a
SerializableExecutionGraph instead of a plain JobResult. This allows to
archive the completed ExecutionGraph for further usage in the container
component of the JobMasterRunner.




---