This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9aa9a170ff657198c4710706bc3802de42063ca
Author: Gary Yao <g...@apache.org>
AuthorDate: Wed Jun 19 15:06:51 2019 +0200

    [FLINK-12883][runtime] Introduce PartitionReleaseStrategy
    
    - Introduce interface PartitionReleaseStrategy.
    - Introduce RegionPartitionReleaseStrategy and
      NotReleasingPartitionReleaseStrategy implementations, which can be 
configured
      via a new config option.
    - Add unit tests for new classes.
    - Increase visibility of methods in TestingSchedulingTopology for unit tests
      outside of its package.
---
 .../flink/configuration/JobManagerOptions.java     |   9 +
 .../runtime/executiongraph/ExecutionGraph.java     | 132 +++++++++++---
 .../executiongraph/ExecutionGraphBuilder.java      |   6 +
 .../runtime/executiongraph/ExecutionVertex.java    |   1 +
 .../failover/flip1/PipelinedRegionComputeUtil.java |  20 +-
 .../NotReleasingPartitionReleaseStrategy.java      |  56 ++++++
 .../partitionrelease/PartitionReleaseStrategy.java |  58 ++++++
 .../PartitionReleaseStrategyFactoryLoader.java     |  41 +++++
 .../flip1/partitionrelease/PipelinedRegion.java    |  69 +++++++
 .../PipelinedRegionConsumedBlockingPartitions.java |  51 ++++++
 .../PipelinedRegionExecutionView.java              |  64 +++++++
 .../RegionPartitionReleaseStrategy.java            | 190 +++++++++++++++++++
 .../ExecutionGraphPartitionReleaseTest.java        | 202 +++++++++++++++++++++
 .../RegionPartitionReleaseStrategyTest.java        | 149 +++++++++++++++
 .../PartitionReleaseStrategyFactoryLoaderTest.java |  55 ++++++
 .../PipelinedRegionExecutionViewTest.java          |  75 ++++++++
 .../strategy/TestingSchedulingTopology.java        |  14 +-
 17 files changed, 1154 insertions(+), 38 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 89515fd..0f9e18d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -173,6 +173,15 @@ public class JobManagerOptions {
                                        text("'legacy': legacy scheduler"),
                                        text("'ng': new generation scheduler"))
                                .build());
+       /**
+        * Config parameter controlling whether partitions should already be 
released during the job execution.
+        */
+       @Documentation.ExcludeFromDocumentation("User normally should not be 
expected to deactivate this feature. " +
+               "We aim at removing this flag eventually.")
+       public static final ConfigOption<Boolean> 
PARTITION_RELEASE_DURING_JOB_EXECUTION =
+               key("jobmanager.partition.release-during-job-execution")
+                       .defaultValue(true)
+                       .withDescription("Controls whether partitions should 
already be released during the job execution.");
 
        @Documentation.ExcludeFromDocumentation("dev use only; likely 
temporary")
        public static final ConfigOption<Boolean> 
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 514121e..ce65b68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -48,6 +48,9 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
 import 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -55,6 +58,7 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -66,6 +70,11 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import 
org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -250,6 +259,12 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
        /** The total number of vertices currently in the execution graph. */
        private int numVerticesTotal;
 
+       private final PartitionReleaseStrategy.Factory 
partitionReleaseStrategyFactory;
+
+       private PartitionReleaseStrategy partitionReleaseStrategy;
+
+       private SchedulingTopology schedulingTopology;
+
        // ------ Configuration of the Execution -------
 
        /** Flag to indicate whether the scheduler may queue tasks for 
execution, or needs to be able
@@ -413,6 +428,7 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                        userClassLoader,
                        blobWriter,
                        allocationTimeout,
+                       new NotReleasingPartitionReleaseStrategy.Factory(),
                        NettyShuffleMaster.INSTANCE,
                        true,
                        new PartitionTrackerImpl(
@@ -433,6 +449,7 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                        ClassLoader userClassLoader,
                        BlobWriter blobWriter,
                        Time allocationTimeout,
+                       PartitionReleaseStrategy.Factory 
partitionReleaseStrategyFactory,
                        ShuffleMaster<?> shuffleMaster,
                        boolean forcePartitionReleaseOnConsumption,
                        PartitionTracker partitionTracker) throws IOException {
@@ -464,6 +481,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                this.rpcTimeout = checkNotNull(rpcTimeout);
                this.allocationTimeout = checkNotNull(allocationTimeout);
 
+               this.partitionReleaseStrategyFactory = 
checkNotNull(partitionReleaseStrategyFactory);
+
                this.restartStrategy = restartStrategy;
                this.kvStateLocationRegistry = new 
KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());
 
@@ -913,6 +932,11 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                }
 
                failoverStrategy.notifyNewVertices(newExecJobVertices);
+
+               schedulingTopology = new 
ExecutionGraphToSchedulingTopologyAdapter(this);
+               partitionReleaseStrategy = 
partitionReleaseStrategyFactory.createInstance(
+                       schedulingTopology,
+                       new DefaultFailoverTopology(this));
        }
 
        public void scheduleForExecution() throws JobException {
@@ -1605,36 +1629,9 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
 
                if (attempt != null) {
                        try {
-                               Map<String, Accumulator<?, ?>> accumulators;
-
-                               switch (state.getExecutionState()) {
-                                       case RUNNING:
-                                               return 
attempt.switchToRunning();
-
-                                       case FINISHED:
-                                               // this deserialization is 
exception-free
-                                               accumulators = 
deserializeAccumulators(state);
-                                               
attempt.markFinished(accumulators, state.getIOMetrics());
-                                               return true;
-
-                                       case CANCELED:
-                                               // this deserialization is 
exception-free
-                                               accumulators = 
deserializeAccumulators(state);
-                                               
attempt.completeCancelling(accumulators, state.getIOMetrics());
-                                               return true;
-
-                                       case FAILED:
-                                               // this deserialization is 
exception-free
-                                               accumulators = 
deserializeAccumulators(state);
-                                               
attempt.markFailed(state.getError(userClassLoader), accumulators, 
state.getIOMetrics());
-                                               return true;
-
-                                       default:
-                                               // we mark as failed and return 
false, which triggers the TaskManager
-                                               // to remove the task
-                                               attempt.fail(new 
Exception("TaskManager sent illegal state update: " + 
state.getExecutionState()));
-                                               return false;
-                               }
+                               final boolean stateUpdated = 
updateStateInternal(state, attempt);
+                               maybeReleasePartitions(attempt);
+                               return stateUpdated;
                        }
                        catch (Throwable t) {
                                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
@@ -1649,6 +1646,77 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                }
        }
 
+       private boolean updateStateInternal(final TaskExecutionState state, 
final Execution attempt) {
+               Map<String, Accumulator<?, ?>> accumulators;
+
+               switch (state.getExecutionState()) {
+                       case RUNNING:
+                               return attempt.switchToRunning();
+
+                       case FINISHED:
+                               // this deserialization is exception-free
+                               accumulators = deserializeAccumulators(state);
+                               attempt.markFinished(accumulators, 
state.getIOMetrics());
+                               return true;
+
+                       case CANCELED:
+                               // this deserialization is exception-free
+                               accumulators = deserializeAccumulators(state);
+                               attempt.completeCancelling(accumulators, 
state.getIOMetrics());
+                               return true;
+
+                       case FAILED:
+                               // this deserialization is exception-free
+                               accumulators = deserializeAccumulators(state);
+                               
attempt.markFailed(state.getError(userClassLoader), accumulators, 
state.getIOMetrics());
+                               return true;
+
+                       default:
+                               // we mark as failed and return false, which 
triggers the TaskManager
+                               // to remove the task
+                               attempt.fail(new Exception("TaskManager sent 
illegal state update: " + state.getExecutionState()));
+                               return false;
+               }
+       }
+
+       private void maybeReleasePartitions(final Execution attempt) {
+               final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
+
+               if (attempt.getState() == ExecutionState.FINISHED) {
+                       final List<IntermediateResultPartitionID> 
releasablePartitions = 
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
+                       releasePartitions(releasablePartitions);
+               } else {
+                       
partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
+               }
+       }
+
+       private void releasePartitions(final 
List<IntermediateResultPartitionID> releasablePartitions) {
+               if (releasablePartitions.size() > 0) {
+                       final List<ResultPartitionID> partitionIds = 
releasablePartitions.stream()
+                               .map(this::createResultPartitionId)
+                               .collect(Collectors.toList());
+
+                       
partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
+               }
+       }
+
+       private ResultPartitionID createResultPartitionId(final 
IntermediateResultPartitionID resultPartitionId) {
+               final SchedulingResultPartition schedulingResultPartition = 
schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
+               final SchedulingExecutionVertex producer = 
schedulingResultPartition.getProducer();
+               final ExecutionVertexID producerId = producer.getId();
+               final JobVertexID jobVertexId = producerId.getJobVertexId();
+               final ExecutionJobVertex jobVertex = getJobVertex(jobVertexId);
+               checkNotNull(jobVertex, "Unknown job vertex %s", jobVertexId);
+
+               final ExecutionVertex[] taskVertices = 
jobVertex.getTaskVertices();
+               final int subtaskIndex = producerId.getSubtaskIndex();
+               checkState(subtaskIndex < taskVertices.length, "Invalid subtask 
index %d for job vertex %s", subtaskIndex, jobVertexId);
+
+               final ExecutionVertex taskVertex = taskVertices[subtaskIndex];
+               final Execution execution = 
taskVertex.getCurrentExecutionAttempt();
+               return new ResultPartitionID(resultPartitionId, 
execution.getAttemptId());
+       }
+
        /**
         * Deserializes accumulators from a task state update.
         *
@@ -1835,4 +1903,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
        public PartitionTracker getPartitionTracker() {
                return partitionTracker;
        }
+
+       PartitionReleaseStrategy getPartitionReleaseStrategy() {
+               return partitionReleaseStrategy;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f21d703..1b9d7a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
 import 
org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
@@ -117,6 +119,9 @@ public class ExecutionGraphBuilder {
                final int maxPriorAttemptsHistoryLength =
                                
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
 
+               final PartitionReleaseStrategy.Factory 
partitionReleaseStrategyFactory =
+                       
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
+
                final boolean forcePartitionReleaseOnConsumption =
                        
jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
 
@@ -136,6 +141,7 @@ public class ExecutionGraphBuilder {
                                        classLoader,
                                        blobWriter,
                                        allocationTimeout,
+                                       partitionReleaseStrategyFactory,
                                        shuffleMaster,
                                        forcePartitionReleaseOnConsumption,
                                        partitionTracker);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 5fc0b72..349aaff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -601,6 +601,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                        if (oldState.isTerminal()) {
                                if (oldState == FINISHED) {
                                        
oldExecution.stopTrackingAndReleasePartitions();
+                                       
getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
                                }
 
                                priorExecutions.add(oldExecution.archive());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index 14d28b7..8f19ed9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -19,6 +19,9 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,14 +30,29 @@ import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
- * Utility for computing pipeliend regions.
+ * Utility for computing pipelined regions.
  */
 public final class PipelinedRegionComputeUtil {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);
 
+       public static Set<PipelinedRegion> toPipelinedRegionsSet(final 
Set<Set<FailoverVertex>> distinctRegions) {
+               return distinctRegions.stream()
+                       .map(toExecutionVertexIdSet())
+                       .map(PipelinedRegion::from)
+                       .collect(Collectors.toSet());
+       }
+
+       private static Function<Set<FailoverVertex>, Set<ExecutionVertexID>> 
toExecutionVertexIdSet() {
+               return failoverVertices -> failoverVertices.stream()
+                       .map(FailoverVertex::getExecutionVertexID)
+                       .collect(Collectors.toSet());
+       }
+
        public static Set<Set<FailoverVertex>> computePipelinedRegions(final 
FailoverTopology topology) {
                // currently we let a job with co-location constraints fail as 
one region
                // putting co-located vertices in the same region with each 
other can be a future improvement
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
new file mode 100644
index 0000000..e386870
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Does not release intermediate result partitions during job execution. 
Relies on partitions being
+ * released at the end of the job.
+ */
+public class NotReleasingPartitionReleaseStrategy implements 
PartitionReleaseStrategy {
+
+       @Override
+       public List<IntermediateResultPartitionID> vertexFinished(final 
ExecutionVertexID finishedVertex) {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public void vertexUnfinished(final ExecutionVertexID executionVertexID) 
{
+       }
+
+       /**
+        * Factory for {@link NotReleasingPartitionReleaseStrategy}.
+        */
+       public static class Factory implements PartitionReleaseStrategy.Factory 
{
+
+               @Override
+               public PartitionReleaseStrategy createInstance(final 
SchedulingTopology schedulingStrategy, final FailoverTopology failoverTopology) 
{
+                       return new NotReleasingPartitionReleaseStrategy();
+               }
+       }
+
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
new file mode 100644
index 0000000..d7b317e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.List;
+
+/**
+ * Interface for strategies that decide when to release
+ * {@link IntermediateResultPartition IntermediateResultPartitions}.
+ */
+public interface PartitionReleaseStrategy {
+
+       /**
+        * Calling this method informs the strategy that a vertex finished.
+        *
+        * @param finishedVertex Id of the vertex that finished the execution
+        * @return A list of result partitions that can be released
+        */
+       List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID 
finishedVertex);
+
+       /**
+        * Calling this method informs the strategy that a vertex is no longer 
in finished state, e.g.,
+        * when a vertex is re-executed.
+        *
+        * @param executionVertexID Id of the vertex that is no longer in 
finished state.
+        */
+       void vertexUnfinished(ExecutionVertexID executionVertexID);
+
+       /**
+        * Factory for {@link PartitionReleaseStrategy}.
+        */
+       interface Factory {
+               PartitionReleaseStrategy createInstance(SchedulingTopology 
schedulingStrategy, FailoverTopology failoverTopology);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java
new file mode 100644
index 0000000..744a270
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+
+/**
+ * Instantiates a {@link RegionPartitionReleaseStrategy}.
+ */
+public final class PartitionReleaseStrategyFactoryLoader {
+
+       public static PartitionReleaseStrategy.Factory 
loadPartitionReleaseStrategyFactory(final Configuration configuration) {
+               final boolean partitionReleaseDuringJobExecution = 
configuration.getBoolean(JobManagerOptions.PARTITION_RELEASE_DURING_JOB_EXECUTION);
+               if (partitionReleaseDuringJobExecution) {
+                       return new RegionPartitionReleaseStrategy.Factory();
+               } else {
+                       return new 
NotReleasingPartitionReleaseStrategy.Factory();
+               }
+       }
+
+       private PartitionReleaseStrategyFactoryLoader() {
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
new file mode 100644
index 0000000..36c042e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Set of execution vertices that are connected through pipelined intermediate 
result partitions.
+ */
+public class PipelinedRegion implements Iterable<ExecutionVertexID> {
+
+       private final Set<ExecutionVertexID> executionVertexIds;
+
+       private PipelinedRegion(final Set<ExecutionVertexID> 
executionVertexIds) {
+               this.executionVertexIds = new 
HashSet<>(checkNotNull(executionVertexIds));
+       }
+
+       public static PipelinedRegion from(final Set<ExecutionVertexID> 
executionVertexIds) {
+               return new PipelinedRegion(executionVertexIds);
+       }
+
+       public static PipelinedRegion from(final ExecutionVertexID... 
executionVertexIds) {
+               return new PipelinedRegion(new 
HashSet<>(Arrays.asList(executionVertexIds)));
+       }
+
+       public Set<ExecutionVertexID> getExecutionVertexIds() {
+               return executionVertexIds;
+       }
+
+       public boolean contains(final ExecutionVertexID executionVertexId) {
+               return executionVertexIds.contains(executionVertexId);
+       }
+
+       @Override
+       public Iterator<ExecutionVertexID> iterator() {
+               return executionVertexIds.iterator();
+       }
+
+       @Override
+       public String toString() {
+               return "PipelinedRegion{" +
+                       "executionVertexIds=" + executionVertexIds +
+                       '}';
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java
new file mode 100644
index 0000000..5cbce45
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A set of intermediate result partitions that are incident to one {@link 
PipelinedRegion}.
+ */
+class PipelinedRegionConsumedBlockingPartitions {
+
+       private final PipelinedRegion pipelinedRegion;
+
+       private final Set<IntermediateResultPartitionID> 
consumedBlockingPartitions;
+
+       PipelinedRegionConsumedBlockingPartitions(
+                       final PipelinedRegion pipelinedRegion,
+                       final Set<IntermediateResultPartitionID> 
consumedBlockingPartitions) {
+               this.pipelinedRegion = checkNotNull(pipelinedRegion);
+               this.consumedBlockingPartitions = 
checkNotNull(consumedBlockingPartitions);
+       }
+
+       public Set<IntermediateResultPartitionID> 
getConsumedBlockingPartitions() {
+               return consumedBlockingPartitions;
+       }
+
+       public PipelinedRegion getPipelinedRegion() {
+               return pipelinedRegion;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java
new file mode 100644
index 0000000..c92fa8b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a virtual execution state of a {@link PipelinedRegion}.
+ *
+ * <p>A pipelined region can be either finished or unfinished. It is finished 
iff. all its
+ * executions have reached the finished state.
+ */
+class PipelinedRegionExecutionView {
+
+       private final PipelinedRegion pipelinedRegion;
+
+       private final Set<ExecutionVertexID> unfinishedVertices;
+
+       PipelinedRegionExecutionView(final PipelinedRegion pipelinedRegion) {
+               this.pipelinedRegion = checkNotNull(pipelinedRegion);
+               this.unfinishedVertices = new 
HashSet<>(pipelinedRegion.getExecutionVertexIds());
+       }
+
+       public boolean isFinished() {
+               return unfinishedVertices.isEmpty();
+       }
+
+       public void vertexFinished(final ExecutionVertexID executionVertexId) {
+               checkArgument(pipelinedRegion.contains(executionVertexId));
+               unfinishedVertices.remove(executionVertexId);
+       }
+
+       public void vertexUnfinished(final ExecutionVertexID executionVertexId) 
{
+               checkArgument(pipelinedRegion.contains(executionVertexId));
+               unfinishedVertices.add(executionVertexId);
+       }
+
+       public PipelinedRegion getPipelinedRegion() {
+               return pipelinedRegion;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
new file mode 100644
index 0000000..b930e10
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Releases blocking intermediate result partitions that are incident to a 
{@link PipelinedRegion},
+ * as soon as the region's execution vertices are finished.
+ */
+public class RegionPartitionReleaseStrategy implements 
PartitionReleaseStrategy {
+
+       private final SchedulingTopology schedulingTopology;
+
+       private final Map<PipelinedRegion, 
PipelinedRegionConsumedBlockingPartitions> consumedBlockingPartitionsByRegion = 
new IdentityHashMap<>();
+
+       private final Map<ExecutionVertexID, PipelinedRegionExecutionView> 
regionExecutionViewByVertex = new HashMap<>();
+
+       public RegionPartitionReleaseStrategy(
+                       final SchedulingTopology schedulingTopology,
+                       final Set<PipelinedRegion> pipelinedRegions) {
+
+               this.schedulingTopology = checkNotNull(schedulingTopology);
+
+               checkNotNull(pipelinedRegions);
+               initConsumedBlockingPartitionsByRegion(pipelinedRegions);
+               initRegionExecutionViewByVertex(pipelinedRegions);
+       }
+
+       private void initConsumedBlockingPartitionsByRegion(final 
Set<PipelinedRegion> pipelinedRegions) {
+               for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
+                       final PipelinedRegionConsumedBlockingPartitions 
consumedPartitions = computeConsumedPartitionsOfVertexRegion(pipelinedRegion);
+                       consumedBlockingPartitionsByRegion.put(pipelinedRegion, 
consumedPartitions);
+               }
+       }
+
+       private void initRegionExecutionViewByVertex(final Set<PipelinedRegion> 
pipelinedRegions) {
+               for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
+                       final PipelinedRegionExecutionView regionExecutionView 
= new PipelinedRegionExecutionView(pipelinedRegion);
+                       for (ExecutionVertexID executionVertexId : 
pipelinedRegion) {
+                               
regionExecutionViewByVertex.put(executionVertexId, regionExecutionView);
+                       }
+               }
+       }
+
+       private PipelinedRegionConsumedBlockingPartitions 
computeConsumedPartitionsOfVertexRegion(final PipelinedRegion pipelinedRegion) {
+               final Set<IntermediateResultPartitionID> 
resultPartitionsOutsideOfRegion = 
findResultPartitionsOutsideOfRegion(pipelinedRegion);
+               return new 
PipelinedRegionConsumedBlockingPartitions(pipelinedRegion, 
resultPartitionsOutsideOfRegion);
+       }
+
+       private Set<IntermediateResultPartitionID> 
findResultPartitionsOutsideOfRegion(final PipelinedRegion pipelinedRegion) {
+               final Set<SchedulingResultPartition> 
allConsumedPartitionsInRegion = pipelinedRegion
+                       .getExecutionVertexIds()
+                       .stream()
+                       .map(schedulingTopology::getVertexOrThrow)
+                       .flatMap(schedulingExecutionVertex -> 
schedulingExecutionVertex.getConsumedResultPartitions().stream())
+                       .collect(Collectors.toSet());
+
+               return 
filterResultPartitionsOutsideOfRegion(allConsumedPartitionsInRegion, 
pipelinedRegion);
+       }
+
+       private static Set<IntermediateResultPartitionID> 
filterResultPartitionsOutsideOfRegion(
+                       final Collection<SchedulingResultPartition> 
resultPartitions,
+                       final PipelinedRegion pipelinedRegion) {
+
+               final Set<IntermediateResultPartitionID> result = new 
HashSet<>();
+               for (final SchedulingResultPartition maybeOutsidePartition : 
resultPartitions) {
+                       final SchedulingExecutionVertex producer = 
maybeOutsidePartition.getProducer();
+                       if (!pipelinedRegion.contains(producer.getId())) {
+                               result.add(maybeOutsidePartition.getId());
+                       }
+               }
+               return result;
+       }
+
+       @Override
+       public List<IntermediateResultPartitionID> vertexFinished(final 
ExecutionVertexID finishedVertex) {
+               final PipelinedRegionExecutionView regionExecutionView = 
getPipelinedRegionExecutionViewForVertex(finishedVertex);
+               regionExecutionView.vertexFinished(finishedVertex);
+
+               if (regionExecutionView.isFinished()) {
+                       final PipelinedRegion pipelinedRegion = 
getPipelinedRegionForVertex(finishedVertex);
+                       final PipelinedRegionConsumedBlockingPartitions 
consumedPartitionsOfVertexRegion = 
getConsumedBlockingPartitionsForRegion(pipelinedRegion);
+                       return 
filterReleasablePartitions(consumedPartitionsOfVertexRegion);
+               }
+               return Collections.emptyList();
+       }
+
+       @Override
+       public void vertexUnfinished(final ExecutionVertexID executionVertexId) 
{
+               final PipelinedRegionExecutionView regionExecutionView = 
getPipelinedRegionExecutionViewForVertex(executionVertexId);
+               regionExecutionView.vertexUnfinished(executionVertexId);
+       }
+
+       private PipelinedRegionExecutionView 
getPipelinedRegionExecutionViewForVertex(final ExecutionVertexID 
executionVertexId) {
+               final PipelinedRegionExecutionView pipelinedRegionExecutionView 
= regionExecutionViewByVertex.get(executionVertexId);
+               checkState(pipelinedRegionExecutionView != null,
+                       "PipelinedRegionExecutionView not found for execution 
vertex %s", executionVertexId);
+               return pipelinedRegionExecutionView;
+       }
+
+       private PipelinedRegion getPipelinedRegionForVertex(final 
ExecutionVertexID executionVertexId) {
+               final PipelinedRegionExecutionView pipelinedRegionExecutionView 
= getPipelinedRegionExecutionViewForVertex(executionVertexId);
+               return pipelinedRegionExecutionView.getPipelinedRegion();
+       }
+
+       private PipelinedRegionConsumedBlockingPartitions 
getConsumedBlockingPartitionsForRegion(final PipelinedRegion pipelinedRegion) {
+               final PipelinedRegionConsumedBlockingPartitions 
pipelinedRegionConsumedBlockingPartitions = 
consumedBlockingPartitionsByRegion.get(pipelinedRegion);
+               checkState(pipelinedRegionConsumedBlockingPartitions != null,
+                       "Consumed partitions not found for pipelined region 
%s", pipelinedRegion);
+               
checkState(pipelinedRegionConsumedBlockingPartitions.getPipelinedRegion() == 
pipelinedRegion);
+               return pipelinedRegionConsumedBlockingPartitions;
+       }
+
+       private List<IntermediateResultPartitionID> 
filterReleasablePartitions(final PipelinedRegionConsumedBlockingPartitions 
consumedPartitionsOfVertexRegion) {
+               return consumedPartitionsOfVertexRegion
+                       .getConsumedBlockingPartitions()
+                       .stream()
+                       .filter(this::areConsumerRegionsFinished)
+                       .collect(Collectors.toList());
+       }
+
+       private boolean areConsumerRegionsFinished(final 
IntermediateResultPartitionID resultPartitionId) {
+               final SchedulingResultPartition resultPartition = 
schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
+               final Collection<SchedulingExecutionVertex> consumers = 
resultPartition.getConsumers();
+               return consumers
+                       .stream()
+                       .map(SchedulingExecutionVertex::getId)
+                       .allMatch(this::isRegionOfVertexFinished);
+       }
+
+       private boolean isRegionOfVertexFinished(final ExecutionVertexID 
executionVertexId) {
+               final PipelinedRegionExecutionView regionExecutionView = 
getPipelinedRegionExecutionViewForVertex(executionVertexId);
+               return regionExecutionView.isFinished();
+       }
+
+       /**
+        * Factory for {@link PartitionReleaseStrategy}.
+        */
+       public static class Factory implements PartitionReleaseStrategy.Factory 
{
+
+               @Override
+               public PartitionReleaseStrategy createInstance(
+                               final SchedulingTopology schedulingStrategy,
+                               final FailoverTopology failoverTopology) {
+
+                       final Set<Set<FailoverVertex>> distinctRegions = 
PipelinedRegionComputeUtil.computePipelinedRegions(failoverTopology);
+                       return new RegionPartitionReleaseStrategy(
+                               schedulingStrategy,
+                               
PipelinedRegionComputeUtil.toPipelinedRegionsSet(distinctRegions));
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
new file mode 100644
index 0000000..0110642
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+/**
+ * Tests for the interactions of the {@link ExecutionGraph} and {@link 
PartitionReleaseStrategy}.
+ */
+public class ExecutionGraphPartitionReleaseTest extends TestLogger {
+
+       private static final ScheduledExecutorService scheduledExecutorService 
= Executors.newSingleThreadScheduledExecutor();
+       private static final TestingComponentMainThreadExecutor 
mainThreadExecutor =
+               new TestingComponentMainThreadExecutor(
+                       
TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));
+
+       @Test
+       public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() 
throws Exception {
+               // setup a simple pipeline of 3 operators with blocking 
partitions
+               final JobVertex sourceVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+               final JobVertex operatorVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+               final JobVertex sinkVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+
+               operatorVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+               sinkVertex.connectNewDataSetAsInput(operatorVertex, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+               // setup partition tracker to intercept partition release calls
+               final TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
+               final Queue<ResultPartitionID> releasedPartitions = new 
ArrayDeque<>();
+               partitionTracker.setStopTrackingAndReleasePartitionsConsumer(
+                       partitionIds -> 
releasedPartitions.add(partitionIds.iterator().next()));
+
+               final ExecutionGraph executionGraph = 
createExecutionGraph(partitionTracker, sourceVertex, operatorVertex, 
sinkVertex);
+
+               // finish vertices one after another, and verify that the 
appropriate partitions are released
+               mainThreadExecutor.execute(() -> {
+                       final Execution sourceExecution = 
getCurrentExecution(sourceVertex, executionGraph);
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), 
ExecutionState.FINISHED));
+                       assertThat(releasedPartitions, empty());
+               });
+
+               mainThreadExecutor.execute(() -> {
+                       final Execution sourceExecution = 
getCurrentExecution(sourceVertex, executionGraph);
+                       final Execution operatorExecution = 
getCurrentExecution(operatorVertex, executionGraph);
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), operatorExecution.getAttemptId(), 
ExecutionState.FINISHED));
+                       assertThat(releasedPartitions, hasSize(1));
+                       assertThat(releasedPartitions.remove(), equalTo(new 
ResultPartitionID(
+                               
sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+                               sourceExecution.getAttemptId())));
+               });
+
+               mainThreadExecutor.execute(() -> {
+                       final Execution operatorExecution = 
getCurrentExecution(operatorVertex, executionGraph);
+                       final Execution sinkExecution = 
getCurrentExecution(sinkVertex, executionGraph);
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), sinkExecution.getAttemptId(), 
ExecutionState.FINISHED));
+
+                       assertThat(releasedPartitions, hasSize(1));
+                       assertThat(releasedPartitions.remove(), equalTo(new 
ResultPartitionID(
+                               
operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(),
+                               operatorExecution.getAttemptId())));
+               });
+       }
+
+       @Test
+       public void testStrategyNotifiedOfUnFinishedVertices() throws Exception 
{
+               // setup a pipeline of 2 failover regions (f1 -> f2), where
+               // f1 is just a source
+               // f2 consists of 3 operators (o1,o2,o3), where o1 consumes f1, 
and o2/o3 consume o1
+               final JobVertex sourceVertex = 
ExecutionGraphTestUtils.createNoOpVertex("source", 1);
+               final JobVertex operator1Vertex = 
ExecutionGraphTestUtils.createNoOpVertex("operator1", 1);
+               final JobVertex operator2Vertex = 
ExecutionGraphTestUtils.createNoOpVertex("operator2", 1);
+               final JobVertex operator3Vertex = 
ExecutionGraphTestUtils.createNoOpVertex("operator3", 1);
+
+               operator1Vertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+               operator2Vertex.connectNewDataSetAsInput(operator1Vertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+               operator3Vertex.connectNewDataSetAsInput(operator1Vertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+               // setup partition tracker to intercept partition release calls
+               final TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
+               final Queue<ResultPartitionID> releasedPartitions = new 
ArrayDeque<>();
+               partitionTracker.setStopTrackingAndReleasePartitionsConsumer(
+                       partitionIds -> 
releasedPartitions.add(partitionIds.iterator().next()));
+
+               final ExecutionGraph executionGraph = createExecutionGraph(
+                       partitionTracker, sourceVertex, operator1Vertex, 
operator2Vertex, operator3Vertex);
+
+               mainThreadExecutor.execute(() -> {
+                       final Execution sourceExecution = 
getCurrentExecution(sourceVertex, executionGraph);
+                       // finish the source; this should not result in any 
release calls since the consumer o1 was not finished
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), 
ExecutionState.FINISHED));
+                       assertThat(releasedPartitions, empty());
+               });
+
+               mainThreadExecutor.execute(() -> {
+                       final Execution operator1Execution = 
getCurrentExecution(operator1Vertex, executionGraph);
+                       // finish o1 and schedule the consumers (o2,o3); this 
should not result in any release calls since not all operators of the pipelined 
region are finished
+                       for (final IntermediateResultPartitionID partitionId : 
operator1Execution.getVertex().getProducedPartitions().keySet()) {
+                               executionGraph.scheduleOrUpdateConsumers(new 
ResultPartitionID(partitionId, operator1Execution.getAttemptId()));
+                       }
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), 
operator1Execution.getAttemptId(), ExecutionState.FINISHED));
+                       assertThat(releasedPartitions, empty());
+               });
+
+               mainThreadExecutor.execute(() -> {
+                       final Execution operator2Execution = 
getCurrentExecution(operator2Vertex, executionGraph);
+                       // finish o2; this should not result in any release 
calls since o3 was not finished
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), 
operator2Execution.getAttemptId(), ExecutionState.FINISHED));
+                       assertThat(releasedPartitions, empty());
+               });
+
+               mainThreadExecutor.execute(() -> {
+                       final Execution operator2Execution = 
getCurrentExecution(operator2Vertex, executionGraph);
+                       // reset o2
+                       operator2Execution.getVertex().resetForNewExecution(0L, 
1L);
+                       assertThat(releasedPartitions, empty());
+               });
+
+               mainThreadExecutor.execute(() -> {
+                       final Execution operator3Execution = 
getCurrentExecution(operator3Vertex, executionGraph);
+                       // finish o3; this should not result in any release 
calls since o2 was reset
+                       executionGraph.updateState(new 
TaskExecutionState(executionGraph.getJobID(), 
operator3Execution.getAttemptId(), ExecutionState.FINISHED));
+                       assertThat(releasedPartitions, empty());
+               });
+       }
+
+       private static Execution getCurrentExecution(final JobVertex jobVertex, 
final ExecutionGraph executionGraph) {
+               return 
executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
+       }
+
+       private ExecutionGraph createExecutionGraph(final PartitionTracker 
partitionTracker, final JobVertex... vertices) throws Exception {
+               final ExecutionGraph executionGraph = 
ExecutionGraphBuilder.buildGraph(
+                       null,
+                       new JobGraph(new JobID(), "test job", vertices),
+                       new Configuration(),
+                       scheduledExecutorService,
+                       mainThreadExecutor.getMainThreadExecutor(),
+                       new TestingSlotProvider(ignored -> 
CompletableFuture.completedFuture(new TestingLogicalSlot())),
+                       
ExecutionGraphPartitionReleaseTest.class.getClassLoader(),
+                       new StandaloneCheckpointRecoveryFactory(),
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy(),
+                       new UnregisteredMetricsGroup(),
+                       VoidBlobWriter.getInstance(),
+                       AkkaUtils.getDefaultTimeout(),
+                       log,
+                       NettyShuffleMaster.INSTANCE,
+                       partitionTracker);
+
+               
executionGraph.start(mainThreadExecutor.getMainThreadExecutor());
+               
mainThreadExecutor.execute(executionGraph::scheduleForExecution);
+
+               return executionGraph;
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
new file mode 100644
index 0000000..1b9ca7a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.RegionPartitionReleaseStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link RegionPartitionReleaseStrategy}.
+ */
+public class RegionPartitionReleaseStrategyTest extends TestLogger {
+
+       private TestingSchedulingTopology testingSchedulingTopology;
+
+       @Before
+       public void setUp() throws Exception {
+               testingSchedulingTopology = new TestingSchedulingTopology();
+       }
+
+       @Test
+       public void releasePartitionsIfDownstreamRegionIsFinished() {
+               final List<TestingSchedulingExecutionVertex> producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+               final List<TestingSchedulingExecutionVertex> consumers = 
testingSchedulingTopology.addExecutionVertices().finish();
+               final List<TestingSchedulingResultPartition> resultPartitions = 
testingSchedulingTopology.connectPointwise(producers, consumers).finish();
+
+               final ExecutionVertexID onlyProducerVertexId = 
producers.get(0).getId();
+               final ExecutionVertexID onlyConsumerVertexId = 
consumers.get(0).getId();
+               final IntermediateResultPartitionID onlyResultPartitionId = 
resultPartitions.get(0).getId();
+
+               final Set<PipelinedRegion> pipelinedRegions = 
pipelinedRegionsSet(
+                       PipelinedRegion.from(onlyProducerVertexId),
+                       PipelinedRegion.from(onlyConsumerVertexId));
+
+               final RegionPartitionReleaseStrategy 
regionPartitionReleaseStrategy = new 
RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+               final List<IntermediateResultPartitionID> partitionsToRelease = 
regionPartitionReleaseStrategy.vertexFinished(onlyConsumerVertexId);
+               assertThat(partitionsToRelease, 
contains(onlyResultPartitionId));
+       }
+
+       @Test
+       public void 
releasePartitionsIfDownstreamRegionWithMultipleOperatorsIsFinished() {
+               final List<TestingSchedulingExecutionVertex> sourceVertices = 
testingSchedulingTopology.addExecutionVertices().finish();
+               final List<TestingSchedulingExecutionVertex> 
intermediateVertices = 
testingSchedulingTopology.addExecutionVertices().finish();
+               final List<TestingSchedulingExecutionVertex> sinkVertices = 
testingSchedulingTopology.addExecutionVertices().finish();
+               final List<TestingSchedulingResultPartition> 
sourceResultPartitions = 
testingSchedulingTopology.connectAllToAll(sourceVertices, 
intermediateVertices).finish();
+               testingSchedulingTopology.connectAllToAll(intermediateVertices, 
sinkVertices).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
+
+               final ExecutionVertexID onlySourceVertexId = 
sourceVertices.get(0).getId();
+               final ExecutionVertexID onlyIntermediateVertexId = 
intermediateVertices.get(0).getId();
+               final ExecutionVertexID onlySinkVertexId = 
sinkVertices.get(0).getId();
+               final IntermediateResultPartitionID onlySourceResultPartitionId 
= sourceResultPartitions.get(0).getId();
+
+               final Set<PipelinedRegion> pipelinedRegions = 
pipelinedRegionsSet(
+                       PipelinedRegion.from(onlySourceVertexId),
+                       PipelinedRegion.from(onlyIntermediateVertexId, 
onlySinkVertexId));
+
+               final RegionPartitionReleaseStrategy 
regionPartitionReleaseStrategy = new 
RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+               
regionPartitionReleaseStrategy.vertexFinished(onlyIntermediateVertexId);
+               final List<IntermediateResultPartitionID> partitionsToRelease = 
regionPartitionReleaseStrategy.vertexFinished(onlySinkVertexId);
+               assertThat(partitionsToRelease, 
contains(onlySourceResultPartitionId));
+       }
+
+       @Test
+       public void notReleasePartitionsIfDownstreamRegionIsNotFinished() {
+               final List<TestingSchedulingExecutionVertex> producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+               final List<TestingSchedulingExecutionVertex> consumers = 
testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
+               testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+               final ExecutionVertexID onlyProducerVertexId = 
producers.get(0).getId();
+               final ExecutionVertexID consumerVertex1 = 
consumers.get(0).getId();
+               final ExecutionVertexID consumerVertex2 = 
consumers.get(1).getId();
+
+               final Set<PipelinedRegion> pipelinedRegions = 
pipelinedRegionsSet(
+                       PipelinedRegion.from(onlyProducerVertexId),
+                       PipelinedRegion.from(consumerVertex1, consumerVertex2));
+
+               final RegionPartitionReleaseStrategy 
regionPartitionReleaseStrategy = new 
RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+               final List<IntermediateResultPartitionID> partitionsToRelease = 
regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+               assertThat(partitionsToRelease, is(empty()));
+       }
+
+       @Test
+       public void toggleVertexFinishedUnfinished() {
+               final List<TestingSchedulingExecutionVertex> producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+               final List<TestingSchedulingExecutionVertex> consumers = 
testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
+               testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+               final ExecutionVertexID onlyProducerVertexId = 
producers.get(0).getId();
+               final ExecutionVertexID consumerVertex1 = 
consumers.get(0).getId();
+               final ExecutionVertexID consumerVertex2 = 
consumers.get(1).getId();
+
+               final Set<PipelinedRegion> pipelinedRegions = 
pipelinedRegionsSet(
+                       PipelinedRegion.from(onlyProducerVertexId),
+                       PipelinedRegion.from(consumerVertex1, consumerVertex2));
+
+               final RegionPartitionReleaseStrategy 
regionPartitionReleaseStrategy = new 
RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);
+
+               regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+               regionPartitionReleaseStrategy.vertexFinished(consumerVertex2);
+               
regionPartitionReleaseStrategy.vertexUnfinished(consumerVertex2);
+
+               final List<IntermediateResultPartitionID> partitionsToRelease = 
regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
+               assertThat(partitionsToRelease, is(empty()));
+       }
+
+       private static Set<PipelinedRegion> pipelinedRegionsSet(final 
PipelinedRegion... pipelinedRegions) {
+               return new HashSet<>(Arrays.asList(pipelinedRegions));
+       }
+
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java
new file mode 100644
index 0000000..c108938
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PartitionReleaseStrategyFactoryLoader}.
+ */
+public class PartitionReleaseStrategyFactoryLoaderTest {
+
+       @Test
+       public void featureEnabledByDefault() {
+               final Configuration emptyConfiguration = new Configuration();
+               final PartitionReleaseStrategy.Factory factory =
+                       
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(emptyConfiguration);
+
+               assertThat(factory, 
is(instanceOf(RegionPartitionReleaseStrategy.Factory.class)));
+       }
+
+       @Test
+       public void featureCanBeDisabled() {
+               final Configuration emptyConfiguration = new Configuration();
+               
emptyConfiguration.setBoolean(JobManagerOptions.PARTITION_RELEASE_DURING_JOB_EXECUTION,
 false);
+
+               final PartitionReleaseStrategy.Factory factory =
+                       
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(emptyConfiguration);
+
+               assertThat(factory, 
is(instanceOf(NotReleasingPartitionReleaseStrategy.Factory.class)));
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java
new file mode 100644
index 0000000..a6960ef
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link PipelinedRegionExecutionView}.
+ */
+public class PipelinedRegionExecutionViewTest extends TestLogger {
+
+       private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID = new 
ExecutionVertexID(new JobVertexID(), 0);
+
+       @Test
+       public void regionIsUnfinishedIfNotAllVerticesAreFinished() {
+               final PipelinedRegion pipelinedRegion = 
PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+               final PipelinedRegionExecutionView pipelinedRegionExecutionView 
= new PipelinedRegionExecutionView(pipelinedRegion);
+
+               assertFalse(pipelinedRegionExecutionView.isFinished());
+       }
+
+       @Test
+       public void regionIsFinishedIfAllVerticesAreFinished() {
+               final PipelinedRegion pipelinedRegion = 
PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+               final PipelinedRegionExecutionView pipelinedRegionExecutionView 
= new PipelinedRegionExecutionView(pipelinedRegion);
+
+               
pipelinedRegionExecutionView.vertexFinished(TEST_EXECUTION_VERTEX_ID);
+
+               assertTrue(pipelinedRegionExecutionView.isFinished());
+       }
+
+       @Test
+       public void vertexCanBeUnfinished() {
+               final PipelinedRegion pipelinedRegion = 
PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+               final PipelinedRegionExecutionView pipelinedRegionExecutionView 
= new PipelinedRegionExecutionView(pipelinedRegion);
+
+               
pipelinedRegionExecutionView.vertexFinished(TEST_EXECUTION_VERTEX_ID);
+               
pipelinedRegionExecutionView.vertexUnfinished(TEST_EXECUTION_VERTEX_ID);
+
+               assertFalse(pipelinedRegionExecutionView.isFinished());
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void finishingUnknownVertexThrowsException() {
+               final PipelinedRegion from = 
PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID);
+               final PipelinedRegionExecutionView pipelinedRegionExecutionView 
= new PipelinedRegionExecutionView(from);
+
+               final ExecutionVertexID unknownVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
+               pipelinedRegionExecutionView.vertexFinished(unknownVertexId);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index 1aaac58..e0ea6c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -79,18 +79,18 @@ public class TestingSchedulingTopology implements 
SchedulingTopology {
                }
        }
 
-       SchedulingExecutionVerticesBuilder addExecutionVertices() {
+       public SchedulingExecutionVerticesBuilder addExecutionVertices() {
                return new SchedulingExecutionVerticesBuilder();
        }
 
-       ProducerConsumerConnectionBuilder connectPointwise(
+       public ProducerConsumerConnectionBuilder connectPointwise(
                final List<TestingSchedulingExecutionVertex> producers,
                final List<TestingSchedulingExecutionVertex> consumers) {
 
                return new 
ProducerConsumerPointwiseConnectionBuilder(producers, consumers);
        }
 
-       ProducerConsumerConnectionBuilder connectAllToAll(
+       public ProducerConsumerConnectionBuilder connectAllToAll(
                final List<TestingSchedulingExecutionVertex> producers,
                final List<TestingSchedulingExecutionVertex> consumers) {
 
@@ -117,12 +117,12 @@ public class TestingSchedulingTopology implements 
SchedulingTopology {
                        this.consumers = consumers;
                }
 
-               ProducerConsumerConnectionBuilder withResultPartitionType(final 
ResultPartitionType resultPartitionType) {
+               public ProducerConsumerConnectionBuilder 
withResultPartitionType(final ResultPartitionType resultPartitionType) {
                        this.resultPartitionType = resultPartitionType;
                        return this;
                }
 
-               ProducerConsumerConnectionBuilder 
withResultPartitionState(final SchedulingResultPartition.ResultPartitionState 
state) {
+               public ProducerConsumerConnectionBuilder 
withResultPartitionState(final SchedulingResultPartition.ResultPartitionState 
state) {
                        this.resultPartitionState = state;
                        return this;
                }
@@ -229,12 +229,12 @@ public class TestingSchedulingTopology implements 
SchedulingTopology {
 
                private InputDependencyConstraint inputDependencyConstraint = 
InputDependencyConstraint.ANY;
 
-               SchedulingExecutionVerticesBuilder withParallelism(final int 
parallelism) {
+               public SchedulingExecutionVerticesBuilder withParallelism(final 
int parallelism) {
                        this.parallelism = parallelism;
                        return this;
                }
 
-               SchedulingExecutionVerticesBuilder 
withInputDependencyConstraint(final InputDependencyConstraint 
inputDependencyConstraint) {
+               public SchedulingExecutionVerticesBuilder 
withInputDependencyConstraint(final InputDependencyConstraint 
inputDependencyConstraint) {
                        this.inputDependencyConstraint = 
inputDependencyConstraint;
                        return this;
                }

Reply via email to