[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714574#comment-16714574
 ] 

ASF GitHub Bot commented on FLINK-10712:


StefanRRichter commented on issue #7009: [FLINK-10712] Support to restore state 
when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#issuecomment-445776062
 
 
   Ok, sounds good, looking forward to the new changes!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704290#comment-16704290
 ] 

ASF GitHub Bot commented on FLINK-10712:


Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when 
using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#issuecomment-443096765
 
 
   @StefanRRichter Thanks for your comments, I would refactor this PR.
   BTW, I found region failover without letting checkpoint coordinator restart 
its `checkpointScheduler` would not guarantee `EXACTLY_ONCE` mechanism. I'll 
include this part of modification in next commits.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699078#comment-16699078
 ] 

ASF GitHub Bot commented on FLINK-10712:


StefanRRichter commented on a change in pull request #7009: [FLINK-10712] 
Support to restore state when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#discussion_r236284582
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1073,6 +1073,108 @@ public boolean restoreLatestCheckpointedState(
}
}
 
+   /**
+* Restores the latest checkpointed state at the granularity of 
execution vertex.
+*
+* @param executionVertices Set of execution vertices to restore. State 
for these vertices is
+* restored via {@link 
Execution#setInitialState(JobManagerTaskRestore)}.
+* @param errorIfNoCheckpoint Fail if no completed checkpoint is 
available to
+* restore from.
+* @param allowNonRestoredState Allow checkpoint state that cannot be 
mapped
+* to any jobID vertex in tasks.
+* @return true if state was restored, false 
otherwise.
+* @throws IllegalStateException If the CheckpointCoordinator is shut 
down.
+* @throws IllegalStateException If no completed checkpoint is 
available and
+*   the failIfNoCheckpoint 
flag has been set.
+* @throws IllegalStateException If the checkpoint contains state that 
cannot be
+*   mapped to any jobID vertex in 
tasks and the
+*   allowNonRestoredState 
flag has not been set.
+* @throws IllegalStateException If the max parallelism changed for an 
operator
+*   that restores state from this 
checkpoint.
+* @throws IllegalStateException If the parallelism changed for an 
operator
+*   that restores non-partitioned 
state from this
+*   checkpoint.
+*/
+   public boolean restoreLatestCheckpointedState(
 
 Review comment:
   Again, this is almost a complete duplication of the original method. We 
should unify boths methods to keep this maintainable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699077#comment-16699077
 ] 

ASF GitHub Bot commented on FLINK-10712:


StefanRRichter commented on a change in pull request #7009: [FLINK-10712] 
Support to restore state when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#discussion_r236283201
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ##
 @@ -201,31 +261,33 @@ private void assignTaskStateToExecutionJobVertices(
 
for (int subTaskIndex = 0; subTaskIndex < newParallelism; 
subTaskIndex++) {
 
-   Execution currentExecutionAttempt = 
executionJobVertex.getTaskVertices()[subTaskIndex]
-   .getCurrentExecutionAttempt();
+   if (subTaskIndices.contains(subTaskIndex)) {
 
 Review comment:
   instead of `for i in (0 .. newParallelism) -> `contains(i)`, why not supply 
an `Iterable subtaskIDs` instead and then `for (int subtask : 
subtaskIDs)`? The old codepath would just pass in an iterable from 0 to new 
parallelism.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699080#comment-16699080
 ] 

ASF GitHub Bot commented on FLINK-10712:


StefanRRichter commented on a change in pull request #7009: [FLINK-10712] 
Support to restore state when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#discussion_r236285508
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ##
 @@ -105,13 +106,70 @@ public boolean assignStates() throws Exception {
continue;
}
 
-   assignAttemptState(task.getValue(), operatorStates);
+   Set executionVertexIndices = new HashSet<>();
+   for (ExecutionVertex executionVertex : 
task.getValue().getTaskVertices()) {
+   
executionVertexIndices.add(executionVertex.getParallelSubtaskIndex());
+   }
+   assignAttemptState(task.getValue(), operatorStates, 
executionVertexIndices);
+   }
+
+   return true;
+   }
+
+   /**
+* Assign states to given execution vertices.
+*/
+   public boolean assignStates(List executionVertices) 
throws Exception {
+   Map localOperators = new 
HashMap<>(operatorStates);
+   Map localTasks = this.tasks;
+
+   checkStateMappingCompleteness(allowNonRestoredState, 
operatorStates, tasks);
+
+   // get job vertex and its subTaskIndex from given 
executionVertices.
+   Map> jobVertexIDSetMap = new 
HashMap<>();
+   for (ExecutionVertex executionVertex : executionVertices) {
+   JobVertexID jobvertexId = 
executionVertex.getJobvertexId();
+   jobVertexIDSetMap.putIfAbsent(jobvertexId, new 
HashSet<>());
+   
jobVertexIDSetMap.get(jobvertexId).add(executionVertex.getParallelSubtaskIndex());
+   }
+
+   for (Map.Entry task : 
localTasks.entrySet()) {
+   final ExecutionJobVertex executionJobVertex = 
task.getValue();
+
+   // find the states of all operators belonging to this 
task
+   List operatorIDs = 
executionJobVertex.getOperatorIDs();
+   List altOperatorIDs = 
executionJobVertex.getUserDefinedOperatorIDs();
+   List operatorStates = new ArrayList<>();
+   boolean statelessTask = true;
+   for (int x = 0; x < operatorIDs.size(); x++) {
+   OperatorID operatorID = altOperatorIDs.get(x) 
== null
+   ? operatorIDs.get(x)
+   : altOperatorIDs.get(x);
+
+   OperatorState operatorState = 
localOperators.remove(operatorID);
+   if (operatorState == null) {
+   operatorState = new OperatorState(
+   operatorID,
+   
executionJobVertex.getParallelism(),
+   
executionJobVertex.getMaxParallelism());
+   } else {
+   statelessTask = false;
+   }
+   operatorStates.add(operatorState);
+   }
+   if (statelessTask) { // skip tasks where no operator 
has any state
+   continue;
+   }
+
+   if 
(jobVertexIDSetMap.containsKey(executionJobVertex.getJobVertexId())) {
+   assignAttemptState(executionJobVertex, 
operatorStates, jobVertexIDSetMap.get(executionJobVertex.getJobVertexId()));
+   }
}
 
return true;
}
 
-   private void assignAttemptState(ExecutionJobVertex executionJobVertex, 
List operatorStates) {
+   private void assignAttemptState(ExecutionJobVertex executionJobVertex, 
List operatorStates, Set subTaskIndices) {
 
 Review comment:
   I doubt that `Set` is the best representation of subtask indexes. 
At least from the interface leven, an `Iterable` could do the job if 
we rewite the loop as I suggested. Forthermore, we can have a more memory 
friendly implementation to back this, for example `boolean[]` or `Bitset`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact 

[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699079#comment-16699079
 ] 

ASF GitHub Bot commented on FLINK-10712:


StefanRRichter commented on a change in pull request #7009: [FLINK-10712] 
Support to restore state when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#discussion_r236283793
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ##
 @@ -105,13 +106,70 @@ public boolean assignStates() throws Exception {
continue;
}
 
-   assignAttemptState(task.getValue(), operatorStates);
+   Set executionVertexIndices = new HashSet<>();
+   for (ExecutionVertex executionVertex : 
task.getValue().getTaskVertices()) {
+   
executionVertexIndices.add(executionVertex.getParallelSubtaskIndex());
+   }
+   assignAttemptState(task.getValue(), operatorStates, 
executionVertexIndices);
+   }
+
+   return true;
+   }
+
+   /**
+* Assign states to given execution vertices.
+*/
+   public boolean assignStates(List executionVertices) 
throws Exception {
 
 Review comment:
   There is a lot of duplicated code between this method and the orginal 
`assignStates()`. I am sure that this is not required and if we rethink this a 
bit the old method should just be able to call the new one. Duplicating most of 
the code is not very maintainable. I suggest to rething what we gibe to the 
state assignent operation in constructor or as parameter to this method and 
unify it. From a quick look I am very sure this is easily possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673402#comment-16673402
 ] 

ASF GitHub Bot commented on FLINK-10712:


Myasuka opened a new pull request #7009: [FLINK-10712] Support to restore state 
when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009
 
 
   ## What is the purpose of the change
   
   Currently, RestartPipelinedRegionStrategy does not perform any state 
restore. This is big problem because all restored regions will be restarted 
with empty state. This PR supports to restore state when using 
RestartPipelinedRegionStrategy.
   
   
   ## Brief change log
   
 - Implement new `restoreLatestCheckpointedState` API for region-based 
failover in `CheckpointCoordinator`.
 - Reload checkpointed state when `FailoverRegion` called `restart` method.
- `StateAssignmentOperation` could assign state with given 
executionVertices.
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
 - Added unit tests for `FailoverRegion` to ensure the failover region ever 
called new `restoreLatestCheckpointedState` API within `CheckpointCoordinator`.
 - Added unit tests for `CheckpointCoordinatorTest` to ensure 
`CheckpointCoordinator` could restore with `RestartPipelinedRegionStrategy`.
- Added unit tests for `CheckpointStateRestoreTest` to ensure 
`RestartPipelinedRegionStrategy` could handle well when restoring state from a 
checkpoint to the task executions.
 - Added new integration test `RegionFailoverITCase` to verify state could 
be restored properly when the job consists multi regions.
 - Refactored `StreamFaultToleranceTestBase` to let all sub-classes ITs 
could failover with state using RestartPipelinedRegionStrategy.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): don't know
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-01 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671867#comment-16671867
 ] 

Till Rohrmann commented on FLINK-10712:
---

Thanks a lot for contributing [~yunta]. 

[~aljoscha] I don't think that this is a release blocker since it is broken for 
quite some time. However, we should fix it soon.

> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
> Fix For: 1.7.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-10-30 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668964#comment-16668964
 ] 

Yun Tang commented on FLINK-10712:
--

We have refactored _FailoverRegion.java_ to support fail-over with state when 
using region-failover strategy. I'll organize related code to crate a new PR in 
these days.

> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Priority: Critical
> Fix For: 1.7.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-10-30 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668962#comment-16668962
 ] 

Aljoscha Krettek commented on FLINK-10712:
--

Isn't this a blocker?

> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Priority: Critical
> Fix For: 1.7.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)