[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714574#comment-16714574 ] ASF GitHub Bot commented on FLINK-10712: StefanRRichter commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#issuecomment-445776062 Ok, sounds good, looking forward to the new changes! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704290#comment-16704290 ] ASF GitHub Bot commented on FLINK-10712: Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#issuecomment-443096765 @StefanRRichter Thanks for your comments, I would refactor this PR. BTW, I found region failover without letting checkpoint coordinator restart its `checkpointScheduler` would not guarantee `EXACTLY_ONCE` mechanism. I'll include this part of modification in next commits. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699078#comment-16699078 ] ASF GitHub Bot commented on FLINK-10712: StefanRRichter commented on a change in pull request #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#discussion_r236284582 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1073,6 +1073,108 @@ public boolean restoreLatestCheckpointedState( } } + /** +* Restores the latest checkpointed state at the granularity of execution vertex. +* +* @param executionVertices Set of execution vertices to restore. State for these vertices is +* restored via {@link Execution#setInitialState(JobManagerTaskRestore)}. +* @param errorIfNoCheckpoint Fail if no completed checkpoint is available to +* restore from. +* @param allowNonRestoredState Allow checkpoint state that cannot be mapped +* to any jobID vertex in tasks. +* @return true if state was restored, false otherwise. +* @throws IllegalStateException If the CheckpointCoordinator is shut down. +* @throws IllegalStateException If no completed checkpoint is available and +* the failIfNoCheckpoint flag has been set. +* @throws IllegalStateException If the checkpoint contains state that cannot be +* mapped to any jobID vertex in tasks and the +* allowNonRestoredState flag has not been set. +* @throws IllegalStateException If the max parallelism changed for an operator +* that restores state from this checkpoint. +* @throws IllegalStateException If the parallelism changed for an operator +* that restores non-partitioned state from this +* checkpoint. +*/ + public boolean restoreLatestCheckpointedState( Review comment: Again, this is almost a complete duplication of the original method. We should unify boths methods to keep this maintainable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699077#comment-16699077 ] ASF GitHub Bot commented on FLINK-10712: StefanRRichter commented on a change in pull request #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#discussion_r236283201 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ## @@ -201,31 +261,33 @@ private void assignTaskStateToExecutionJobVertices( for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) { - Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[subTaskIndex] - .getCurrentExecutionAttempt(); + if (subTaskIndices.contains(subTaskIndex)) { Review comment: instead of `for i in (0 .. newParallelism) -> `contains(i)`, why not supply an `Iterable subtaskIDs` instead and then `for (int subtask : subtaskIDs)`? The old codepath would just pass in an iterable from 0 to new parallelism. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699080#comment-16699080 ] ASF GitHub Bot commented on FLINK-10712: StefanRRichter commented on a change in pull request #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#discussion_r236285508 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ## @@ -105,13 +106,70 @@ public boolean assignStates() throws Exception { continue; } - assignAttemptState(task.getValue(), operatorStates); + Set executionVertexIndices = new HashSet<>(); + for (ExecutionVertex executionVertex : task.getValue().getTaskVertices()) { + executionVertexIndices.add(executionVertex.getParallelSubtaskIndex()); + } + assignAttemptState(task.getValue(), operatorStates, executionVertexIndices); + } + + return true; + } + + /** +* Assign states to given execution vertices. +*/ + public boolean assignStates(List executionVertices) throws Exception { + Map localOperators = new HashMap<>(operatorStates); + Map localTasks = this.tasks; + + checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks); + + // get job vertex and its subTaskIndex from given executionVertices. + Map> jobVertexIDSetMap = new HashMap<>(); + for (ExecutionVertex executionVertex : executionVertices) { + JobVertexID jobvertexId = executionVertex.getJobvertexId(); + jobVertexIDSetMap.putIfAbsent(jobvertexId, new HashSet<>()); + jobVertexIDSetMap.get(jobvertexId).add(executionVertex.getParallelSubtaskIndex()); + } + + for (Map.Entry task : localTasks.entrySet()) { + final ExecutionJobVertex executionJobVertex = task.getValue(); + + // find the states of all operators belonging to this task + List operatorIDs = executionJobVertex.getOperatorIDs(); + List altOperatorIDs = executionJobVertex.getUserDefinedOperatorIDs(); + List operatorStates = new ArrayList<>(); + boolean statelessTask = true; + for (int x = 0; x < operatorIDs.size(); x++) { + OperatorID operatorID = altOperatorIDs.get(x) == null + ? operatorIDs.get(x) + : altOperatorIDs.get(x); + + OperatorState operatorState = localOperators.remove(operatorID); + if (operatorState == null) { + operatorState = new OperatorState( + operatorID, + executionJobVertex.getParallelism(), + executionJobVertex.getMaxParallelism()); + } else { + statelessTask = false; + } + operatorStates.add(operatorState); + } + if (statelessTask) { // skip tasks where no operator has any state + continue; + } + + if (jobVertexIDSetMap.containsKey(executionJobVertex.getJobVertexId())) { + assignAttemptState(executionJobVertex, operatorStates, jobVertexIDSetMap.get(executionJobVertex.getJobVertexId())); + } } return true; } - private void assignAttemptState(ExecutionJobVertex executionJobVertex, List operatorStates) { + private void assignAttemptState(ExecutionJobVertex executionJobVertex, List operatorStates, Set subTaskIndices) { Review comment: I doubt that `Set` is the best representation of subtask indexes. At least from the interface leven, an `Iterable` could do the job if we rewite the loop as I suggested. Forthermore, we can have a more memory friendly implementation to back this, for example `boolean[]` or `Bitset`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699079#comment-16699079 ] ASF GitHub Bot commented on FLINK-10712: StefanRRichter commented on a change in pull request #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009#discussion_r236283793 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ## @@ -105,13 +106,70 @@ public boolean assignStates() throws Exception { continue; } - assignAttemptState(task.getValue(), operatorStates); + Set executionVertexIndices = new HashSet<>(); + for (ExecutionVertex executionVertex : task.getValue().getTaskVertices()) { + executionVertexIndices.add(executionVertex.getParallelSubtaskIndex()); + } + assignAttemptState(task.getValue(), operatorStates, executionVertexIndices); + } + + return true; + } + + /** +* Assign states to given execution vertices. +*/ + public boolean assignStates(List executionVertices) throws Exception { Review comment: There is a lot of duplicated code between this method and the orginal `assignStates()`. I am sure that this is not required and if we rethink this a bit the old method should just be able to call the new one. Duplicating most of the code is not very maintainable. I suggest to rething what we gibe to the state assignent operation in constructor or as parameter to this method and unify it. From a quick look I am very sure this is easily possible. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673402#comment-16673402 ] ASF GitHub Bot commented on FLINK-10712: Myasuka opened a new pull request #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy URL: https://github.com/apache/flink/pull/7009 ## What is the purpose of the change Currently, RestartPipelinedRegionStrategy does not perform any state restore. This is big problem because all restored regions will be restarted with empty state. This PR supports to restore state when using RestartPipelinedRegionStrategy. ## Brief change log - Implement new `restoreLatestCheckpointedState` API for region-based failover in `CheckpointCoordinator`. - Reload checkpointed state when `FailoverRegion` called `restart` method. - `StateAssignmentOperation` could assign state with given executionVertices. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests for `FailoverRegion` to ensure the failover region ever called new `restoreLatestCheckpointedState` API within `CheckpointCoordinator`. - Added unit tests for `CheckpointCoordinatorTest` to ensure `CheckpointCoordinator` could restore with `RestartPipelinedRegionStrategy`. - Added unit tests for `CheckpointStateRestoreTest` to ensure `RestartPipelinedRegionStrategy` could handle well when restoring state from a checkpoint to the task executions. - Added new integration test `RegionFailoverITCase` to verify state could be restored properly when the job consists multi regions. - Refactored `StreamFaultToleranceTestBase` to let all sub-classes ITs could failover with state using RestartPipelinedRegionStrategy. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): don't know - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671867#comment-16671867 ] Till Rohrmann commented on FLINK-10712: --- Thanks a lot for contributing [~yunta]. [~aljoscha] I don't think that this is a release blocker since it is broken for quite some time. However, we should fix it soon. > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > Fix For: 1.7.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668964#comment-16668964 ] Yun Tang commented on FLINK-10712: -- We have refactored _FailoverRegion.java_ to support fail-over with state when using region-failover strategy. I'll organize related code to crate a new PR in these days. > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Priority: Critical > Fix For: 1.7.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668962#comment-16668962 ] Aljoscha Krettek commented on FLINK-10712: -- Isn't this a blocker? > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Priority: Critical > Fix For: 1.7.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)