[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

2020-05-16 Thread GitBox


StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r426152842



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##
@@ -186,10 +188,18 @@ private void maybeHandleTaskFailure(final 
TaskExecutionState taskExecutionState,
 
private void handleTaskFailure(final ExecutionVertexID 
executionVertexId, @Nullable final Throwable error) {
setGlobalFailureCause(error);
+   notifyCoordinatorsAboutTaskFailure(executionVertexId, error);

Review comment:
   I think we may need to remove that check anyways. It is quite possible 
that tasks fail immediately after they were deployed, before they could 
register.
   
   The `OperatorCoordinator` interface also has no notion of "registering", so 
it cannot decide which notifications to forward and which not. I'd rather err 
on the "forward too many" side, because missing a notification easily leads to 
a stall or data loss (split not added back, result is missing).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

2020-05-16 Thread GitBox


StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r426146031



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##
@@ -468,6 +468,7 @@ public static void serializeStreamStateHandle(
dos.flush();
}
 
+   @Nullable

Review comment:
   Returning `Optional` here would mean reworking the method. I usually try 
to keep the method unless they are in really bad shape.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

2020-05-16 Thread GitBox


StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r426145923



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##
@@ -487,6 +488,15 @@ public static StreamStateHandle 
deserializeStreamStateHandle(DataInputStream dis
}
}
 
+   public static ByteStreamStateHandle 
deserializeAndCheckByteStreamStateHandle(DataInputStream dis) throws 
IOException {

Review comment:
   True, will add that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

2020-05-15 Thread GitBox


StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r425829755



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##
@@ -122,7 +135,20 @@ public boolean canRestart() {
 * @return result of a set of tasks to restart to recover from the 
failure
 */
public static FailureHandlingResult restartable(Set 
verticesToRestart, long restartDelayMS) {

Review comment:
   Happy to do that, it is simpler indeed. Originally, I wanted to preserve 
the "spirit of the class", with its descriptive factory methods. But I think 
you are right, the simplicity argument here is stronger.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

2020-05-15 Thread GitBox


StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r425829228



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##
@@ -1127,16 +1127,44 @@ public boolean restoreLatestCheckpointedState(
boolean errorIfNoCheckpoint,
boolean allowNonRestoredState) throws Exception {
 
-   return restoreLatestCheckpointedState(new 
HashSet<>(tasks.values()), errorIfNoCheckpoint, allowNonRestoredState);
+   return restoreLatestCheckpointedStateInternal(new 
HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState);
}
 
/**
-* Restores the latest checkpointed state.
+* Restores the latest checkpointed state to a set of subtasks. This 
method represents a "local"
+* or "regional" failover and does restore states to coordinators. Note 
that a regional failover
+* might still include all tasks.
+*
+* @param tasks Set of job vertices to restore. State for these 
vertices is
+* restored via {@link 
Execution#setInitialState(JobManagerTaskRestore)}.
+
+* @return true if state was restored, false 
otherwise.
+* @throws IllegalStateException If the CheckpointCoordinator is shut 
down.
+* @throws IllegalStateException If no completed checkpoint is 
available and
+*   the failIfNoCheckpoint 
flag has been set.
+* @throws IllegalStateException If the checkpoint contains state that 
cannot be
+*   mapped to any job vertex in 
tasks and the
+*   allowNonRestoredState 
flag has not been set.
+* @throws IllegalStateException If the max parallelism changed for an 
operator
+*   that restores state from this 
checkpoint.
+* @throws IllegalStateException If the parallelism changed for an 
operator
+*   that restores non-partitioned 
state from this
+*   checkpoint.
+*/
+   public boolean restoreLatestCheckpointedStateToSubtasks(final 
Set tasks) throws Exception {
+   // when restoring subtasks only we accept potentially unmatched 
state because what we

Review comment:
   Thanks, will add this to the comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org