[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators
StephanEwen commented on a change in pull request #12137: URL: https://github.com/apache/flink/pull/12137#discussion_r426152842 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -186,10 +188,18 @@ private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) { setGlobalFailureCause(error); + notifyCoordinatorsAboutTaskFailure(executionVertexId, error); Review comment: I think we may need to remove that check anyways. It is quite possible that tasks fail immediately after they were deployed, before they could register. The `OperatorCoordinator` interface also has no notion of "registering", so it cannot decide which notifications to forward and which not. I'd rather err on the "forward too many" side, because missing a notification easily leads to a stall or data loss (split not added back, result is missing). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators
StephanEwen commented on a change in pull request #12137: URL: https://github.com/apache/flink/pull/12137#discussion_r426146031 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java ## @@ -468,6 +468,7 @@ public static void serializeStreamStateHandle( dos.flush(); } + @Nullable Review comment: Returning `Optional` here would mean reworking the method. I usually try to keep the method unless they are in really bad shape. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators
StephanEwen commented on a change in pull request #12137: URL: https://github.com/apache/flink/pull/12137#discussion_r426145923 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java ## @@ -487,6 +488,15 @@ public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis } } + public static ByteStreamStateHandle deserializeAndCheckByteStreamStateHandle(DataInputStream dis) throws IOException { Review comment: True, will add that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators
StephanEwen commented on a change in pull request #12137: URL: https://github.com/apache/flink/pull/12137#discussion_r425829755 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java ## @@ -122,7 +135,20 @@ public boolean canRestart() { * @return result of a set of tasks to restart to recover from the failure */ public static FailureHandlingResult restartable(Set verticesToRestart, long restartDelayMS) { Review comment: Happy to do that, it is simpler indeed. Originally, I wanted to preserve the "spirit of the class", with its descriptive factory methods. But I think you are right, the simplicity argument here is stronger. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators
StephanEwen commented on a change in pull request #12137: URL: https://github.com/apache/flink/pull/12137#discussion_r425829228 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1127,16 +1127,44 @@ public boolean restoreLatestCheckpointedState( boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception { - return restoreLatestCheckpointedState(new HashSet<>(tasks.values()), errorIfNoCheckpoint, allowNonRestoredState); + return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState); } /** -* Restores the latest checkpointed state. +* Restores the latest checkpointed state to a set of subtasks. This method represents a "local" +* or "regional" failover and does restore states to coordinators. Note that a regional failover +* might still include all tasks. +* +* @param tasks Set of job vertices to restore. State for these vertices is +* restored via {@link Execution#setInitialState(JobManagerTaskRestore)}. + +* @return true if state was restored, false otherwise. +* @throws IllegalStateException If the CheckpointCoordinator is shut down. +* @throws IllegalStateException If no completed checkpoint is available and +* the failIfNoCheckpoint flag has been set. +* @throws IllegalStateException If the checkpoint contains state that cannot be +* mapped to any job vertex in tasks and the +* allowNonRestoredState flag has not been set. +* @throws IllegalStateException If the max parallelism changed for an operator +* that restores state from this checkpoint. +* @throws IllegalStateException If the parallelism changed for an operator +* that restores non-partitioned state from this +* checkpoint. +*/ + public boolean restoreLatestCheckpointedStateToSubtasks(final Set tasks) throws Exception { + // when restoring subtasks only we accept potentially unmatched state because what we Review comment: Thanks, will add this to the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org