[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284602469
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
 ##
 @@ -30,37 +38,245 @@
 public class RestartPipelinedRegionStrategyTest extends TestLogger {
 
 Review comment:
   I'll add tests for them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284684306
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -165,6 +181,33 @@ private void buildOneRegionForAllVertices() {
for (FailoverVertex vertex : topology.getFailoverVertices()) {
vertexToRegionMap.put(vertex.getExecutionVertexID(), 
region);
}
+
+   buildRegionInputsAndOutputs();
+   }
+
+   private void buildRegionInputsAndOutputs() {
+   for (FailoverRegion region : regions.keySet()) {
+   IdentityHashMap consumers = new 
IdentityHashMap<>();
+   Set inputs = new 
HashSet<>();
+   Set consumerVertices = new 
HashSet<>();
+   Set regionVertices = 
region.getAllExecutionVertices();
+   regionVertices.forEach(v -> {
 
 Review comment:
   I tried the merged logic and find the region building becomes even slower.
   I think this is due to the merged logic saved some iteration overhead, but 
added some more work, including building some legacy relations and later 
deprecating them in region merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284599683
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -57,7 +66,9 @@
 */
public RestartPipelinedRegionStrategy(FailoverTopology topology) {
this.topology = checkNotNull(topology);
-   this.regions = new HashMap<>();
+   this.regions = new IdentityHashMap<>();
+   this.vertexToRegionMap = new HashMap<>();
+   this.resultPartitionAvailabilityChecker = new 
RegionFailoverResultPartitionAvailabilityChecker();
 
 Review comment:
   Good idea.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284585195
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -162,40 +176,96 @@ private void buildOneRegionForAllVertices() {
 * In this strategy, all task vertices in 'involved' regions are 
proposed to be restarted.
 * The 'involved' regions are calculated with rules below:
 * 1. The region containing the failed task is always involved
-* 2. TODO: If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
+* 2. If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
 *the region containing the partition producer task is involved
-* 3. TODO: If a region is involved, all of its consumer regions are 
involved
+* 3. If a region is involved, all of its consumer regions are involved
 *
 * @param executionVertexId ID of the failed task
 * @param cause cause of the failure
 * @return set of IDs of vertices to restart
 */
@Override
public Set getTasksNeedingRestart(ExecutionVertexID 
executionVertexId, Throwable cause) {
-   final FailoverRegion failedRegion = 
regions.get(executionVertexId);
+   LOG.info("Calculating tasks to restart to recover the failed 
task {}.", executionVertexId);
+
+   final FailoverRegion failedRegion = 
vertexToRegionMap.get(executionVertexId);
if (failedRegion == null) {
// TODO: show the task name in the log
throw new IllegalStateException("Can not find the 
failover region for task " + executionVertexId, cause);
}
 
-   // TODO: if the failure cause is data consumption error, mark 
the corresponding data partition to be unavailable
+   // if the failure cause is data consumption error, mark the 
corresponding data partition to be failed,
+   // so that the failover process will try to recover it
+   Optional dataConsumptionException = 
ExceptionUtils.findThrowable(
+   cause, DataConsumptionException.class);
+   if (dataConsumptionException.isPresent()) {
+   
resultPartitionAvailabilityChecker.markResultPartitionFailed(
+   
dataConsumptionException.get().getPartitionId().getPartitionId());
+   }
 
-   return getRegionsToRestart(failedRegion).stream().flatMap(
+   // calculate the tasks to restart based on the result of 
regions to restart
+   Set regionsToRestart = 
getRegionsToRestart(failedRegion);
+   Set tasksToRestart = 
regionsToRestart.stream().flatMap(
r -> 
r.getAllExecutionVertexIDs().stream()).collect(Collectors.toSet());
+
+   // the previous failed partition will be recovered. remove its 
failed state from the checker
+   if (dataConsumptionException.isPresent()) {
+   
resultPartitionAvailabilityChecker.removeResultPartitionFromFailedState(
+   
dataConsumptionException.get().getPartitionId().getPartitionId());
+   }
+
+   LOG.info("{} tasks should be restarted to recover the failed 
task {}. ", tasksToRestart.size(), executionVertexId);
+   return tasksToRestart;
}
 
/**
 * All 'involved' regions are proposed to be restarted.
 * The 'involved' regions are calculated with rules below:
 * 1. The region containing the failed task is always involved
-* 2. TODO: If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
+* 2. If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
 *the region containing the partition producer task is involved
-* 3. TODO: If a region is involved, all of its consumer regions are 
involved
+* 3. If a region is involved, all of its consumer regions are involved
 */
-   private Set getRegionsToRestart(FailoverRegion 
regionToRestart) {
-   return Collections.singleton(regionToRestart);
+   private Set getRegionsToRestart(FailoverRegion 
failedRegion) {
+   IdentityHashMap regionsToRestart = new 
IdentityHashMap<>();
+   IdentityHashMap visitedRegions = new 
IdentityHashMap<>();
+
+   // start from the failed region to visit all involved regions
+   Queue regionsToVisit = new ArrayDeque<>();
+   visitedRegions.put(failedRegion, null);
+   regions

[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284569230
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
 ##
 @@ -33,18 +30,21 @@
  */
 public class FailoverRegion {
 
+   /** All vertex IDs in this region. */
+   private final Set executionVertexIDs;
+
/** All vertices in this region. */
-   private final Map executionVertices;
+   private final Set executionVertices;
 
 Review comment:
   This is to avoid the cost to convert `Collection` to 
`Set`. 
   We need a `Set` and `Set`. But the map 
provides `Set` and `Collection`.
   
   Though I think this may not make a big difference as it is vertex scale 
complexity.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284579572
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -162,40 +201,90 @@ private void buildOneRegionForAllVertices() {
 * In this strategy, all task vertices in 'involved' regions are 
proposed to be restarted.
 * The 'involved' regions are calculated with rules below:
 * 1. The region containing the failed task is always involved
-* 2. TODO: If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
+* 2. If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
 *the region containing the partition producer task is involved
-* 3. TODO: If a region is involved, all of its consumer regions are 
involved
+* 3. If a region is involved, all of its consumer regions are involved
 *
 * @param executionVertexId ID of the failed task
 * @param cause cause of the failure
 * @return set of IDs of vertices to restart
 */
@Override
public Set getTasksNeedingRestart(ExecutionVertexID 
executionVertexId, Throwable cause) {
-   final FailoverRegion failedRegion = 
regions.get(executionVertexId);
+   LOG.info("Calculating tasks to restart to recover the failed 
task {}.", executionVertexId);
+
+   final FailoverRegion failedRegion = 
vertexToRegionMap.get(executionVertexId);
if (failedRegion == null) {
// TODO: show the task name in the log
throw new IllegalStateException("Can not find the 
failover region for task " + executionVertexId, cause);
}
 
-   // TODO: if the failure cause is data consumption error, mark 
the corresponding data partition to be unavailable
+   // if the failure cause is data consumption error, mark the 
corresponding data partition to be failed,
+   // so that the failover process will try to recover it
+   if (cause instanceof DataConsumptionException) {
+   
resultPartitionAvailabilityChecker.markResultPartitionFailed(
+   ((DataConsumptionException) 
cause).getPartitionId().getPartitionId());
+   }
 
-   return getRegionsToRestart(failedRegion).stream().flatMap(
+   // calculate the tasks to restart based on the result of 
regions to restart
+   Set regionsToRestart = 
getRegionsToRestart(failedRegion);
+   Set tasksToRestart = 
regionsToRestart.stream().flatMap(
r -> 
r.getAllExecutionVertexIDs().stream()).collect(Collectors.toSet());
+
+   // the previous failed partition will be recovered. remove its 
failed state from the checker
+   if (cause instanceof DataConsumptionException) {
+   
resultPartitionAvailabilityChecker.removeResultPartitionFromFailedState(
+   ((DataConsumptionException) 
cause).getPartitionId().getPartitionId());
+   }
+
+   LOG.info("{} tasks should be restarted to recover the failed 
task {}. ", tasksToRestart.size(), executionVertexId);
+   return tasksToRestart;
}
 
/**
 * All 'involved' regions are proposed to be restarted.
 * The 'involved' regions are calculated with rules below:
 * 1. The region containing the failed task is always involved
-* 2. TODO: If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
+* 2. If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
 *the region containing the partition producer task is involved
-* 3. TODO: If a region is involved, all of its consumer regions are 
involved
+* 3. If a region is involved, all of its consumer regions are involved
 */
-   private Set getRegionsToRestart(FailoverRegion 
regionToRestart) {
-   return Collections.singleton(regionToRestart);
+   private Set getRegionsToRestart(FailoverRegion 
failedRegion) {
+   IdentityHashMap regionsToRestart = new 
IdentityHashMap<>();
+   IdentityHashMap visitedRegions = new 
IdentityHashMap<>();
+
+   // start from the failed region to visit all involved regions
+   Queue regionsToVisit = new ArrayDeque<>();
+   regionsToVisit.add(failedRegion);
+   while (!regionsToVisit.isEmpty()) {
+   FailoverRegion regionToRestart = regionsToVisit.poll();
+
+   

[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284573458
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -162,40 +201,90 @@ private void buildOneRegionForAllVertices() {
 * In this strategy, all task vertices in 'involved' regions are 
proposed to be restarted.
 * The 'involved' regions are calculated with rules below:
 * 1. The region containing the failed task is always involved
-* 2. TODO: If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
+* 2. If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
 *the region containing the partition producer task is involved
-* 3. TODO: If a region is involved, all of its consumer regions are 
involved
+* 3. If a region is involved, all of its consumer regions are involved
 *
 * @param executionVertexId ID of the failed task
 * @param cause cause of the failure
 * @return set of IDs of vertices to restart
 */
@Override
public Set getTasksNeedingRestart(ExecutionVertexID 
executionVertexId, Throwable cause) {
-   final FailoverRegion failedRegion = 
regions.get(executionVertexId);
+   LOG.info("Calculating tasks to restart to recover the failed 
task {}.", executionVertexId);
+
+   final FailoverRegion failedRegion = 
vertexToRegionMap.get(executionVertexId);
if (failedRegion == null) {
// TODO: show the task name in the log
throw new IllegalStateException("Can not find the 
failover region for task " + executionVertexId, cause);
}
 
-   // TODO: if the failure cause is data consumption error, mark 
the corresponding data partition to be unavailable
+   // if the failure cause is data consumption error, mark the 
corresponding data partition to be failed,
+   // so that the failover process will try to recover it
+   if (cause instanceof DataConsumptionException) {
+   
resultPartitionAvailabilityChecker.markResultPartitionFailed(
+   ((DataConsumptionException) 
cause).getPartitionId().getPartitionId());
+   }
 
-   return getRegionsToRestart(failedRegion).stream().flatMap(
+   // calculate the tasks to restart based on the result of 
regions to restart
+   Set regionsToRestart = 
getRegionsToRestart(failedRegion);
+   Set tasksToRestart = 
regionsToRestart.stream().flatMap(
 
 Review comment:
   Yes we can.
   It may not make a big difference though, as it is vertex scale 
complexity(N), small compared to edge scale operations(N^2).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284569287
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -47,8 +50,14 @@
/** The topology containing info about all the vertices and edges. */
private final FailoverTopology topology;
 
+   /** All failover regions. */
+   private final IdentityHashMap regions;
+
/** Maps execution vertex id to failover region. */
-   private final Map regions;
+   private final Map vertexToRegionMap;
+
+   /** The checker helps to query result partition availability. */
+   private RegionFailoverResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker;
 
 Review comment:
   Yes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services