This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc30a4ea080abda6bc58515e95fdf74f24792ec8
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Thu Apr 9 15:58:21 2020 +0800

    [FLINK-17181][runtime] Drop generic Types in Topology Interface
    
    Co-authored-by: Gary Yao <g...@apache.org>
---
 .../runtime/executiongraph/ExecutionGraph.java     |  6 ++--
 .../failover/flip1/ExecutionFailureHandler.java    |  4 +--
 .../failover/flip1/FailoverRegion.java             |  6 ++--
 .../failover/flip1/FailoverStrategy.java           |  2 +-
 .../failover/flip1/PipelinedRegionComputeUtil.java |  4 +--
 .../failover/flip1/RestartAllFailoverStrategy.java |  6 ++--
 .../RestartPipelinedRegionFailoverStrategy.java    | 24 ++++++++--------
 .../NotReleasingPartitionReleaseStrategy.java      |  2 +-
 .../partitionrelease/PartitionReleaseStrategy.java |  2 +-
 .../RegionPartitionReleaseStrategy.java            | 18 ++++++------
 .../flink/runtime/scheduler/SchedulerBase.java     |  4 +--
 .../adapter/DefaultExecutionTopology.java          |  2 +-
 .../scheduler/adapter/DefaultExecutionVertex.java  |  2 +-
 .../scheduler/adapter/DefaultResultPartition.java  |  2 +-
 .../strategy/EagerSchedulingStrategy.java          |  6 ++--
 .../strategy/InputDependencyConstraintChecker.java | 32 +++++++++++-----------
 .../LazyFromSourcesSchedulingStrategy.java         | 20 +++++++-------
 .../strategy/SchedulingExecutionVertex.java        |  4 +--
 .../strategy/SchedulingPipelinedRegion.java        |  2 +-
 .../strategy/SchedulingResultPartition.java        |  4 +--
 .../strategy/SchedulingStrategyFactory.java        |  2 +-
 .../strategy/SchedulingStrategyUtils.java          |  8 +++---
 .../scheduler/strategy/SchedulingTopology.java     |  8 +++---
 .../flink/runtime/topology/BaseTopology.java       |  2 +-
 .../flink/runtime/topology/PipelinedRegion.java    |  4 +--
 .../org/apache/flink/runtime/topology/Result.java  |  2 +-
 .../apache/flink/runtime/topology/Topology.java    |  2 +-
 .../org/apache/flink/runtime/topology/Vertex.java  |  4 +--
 .../flip1/ExecutionFailureHandlerTest.java         |  2 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  8 +++---
 .../InputDependencyConstraintCheckerTest.java      |  2 +-
 .../scheduler/strategy/TestSchedulingStrategy.java |  8 +++---
 .../strategy/TestingSchedulingExecutionVertex.java |  3 +-
 .../strategy/TestingSchedulingResultPartition.java |  3 +-
 .../strategy/TestingSchedulingTopology.java        |  3 +-
 35 files changed, 105 insertions(+), 108 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 190259d..e348059 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -410,7 +410,7 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                return this.verticesInCreationOrder.size();
        }
 
-       public SchedulingTopology<?, ?> getSchedulingTopology() {
+       public SchedulingTopology getSchedulingTopology() {
                return executionTopology;
        }
 
@@ -1541,9 +1541,9 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
        }
 
        ResultPartitionID createResultPartitionId(final 
IntermediateResultPartitionID resultPartitionId) {
-               final SchedulingResultPartition<?, ?> schedulingResultPartition 
=
+               final SchedulingResultPartition schedulingResultPartition =
                        
getSchedulingTopology().getResultPartition(resultPartitionId);
-               final SchedulingExecutionVertex<?, ?> producer = 
schedulingResultPartition.getProducer();
+               final SchedulingExecutionVertex producer = 
schedulingResultPartition.getProducer();
                final ExecutionVertexID producerId = producer.getId();
                final JobVertexID jobVertexId = producerId.getJobVertexId();
                final ExecutionJobVertex jobVertex = getJobVertex(jobVertexId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index c89717e..63d5e88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -38,7 +38,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ExecutionFailureHandler {
 
-       private final SchedulingTopology<?, ?> schedulingTopology;
+       private final SchedulingTopology schedulingTopology;
 
        /** Strategy to judge which tasks should be restarted. */
        private final FailoverStrategy failoverStrategy;
@@ -57,7 +57,7 @@ public class ExecutionFailureHandler {
         * @param restartBackoffTimeStrategy helps to decide whether to restart 
failed tasks and the restarting delay
         */
        public ExecutionFailureHandler(
-                       final SchedulingTopology<?, ?> schedulingTopology,
+                       final SchedulingTopology schedulingTopology,
                        final FailoverStrategy failoverStrategy,
                        final RestartBackoffTimeStrategy 
restartBackoffTimeStrategy) {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
index 9e0369b..d1efb6f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
@@ -35,14 +35,14 @@ public class FailoverRegion {
        private final Set<ExecutionVertexID> executionVertexIDs;
 
        /** All vertices in this region. */
-       private final Set<? extends SchedulingExecutionVertex<?, ?>> 
executionVertices;
+       private final Set<? extends SchedulingExecutionVertex> 
executionVertices;
 
        /**
         * Creates a new failover region containing a set of vertices.
         *
         * @param executionVertices to be contained in this region
         */
-       public FailoverRegion(Set<? extends SchedulingExecutionVertex<?, ?>> 
executionVertices) {
+       public FailoverRegion(Set<? extends SchedulingExecutionVertex> 
executionVertices) {
                this.executionVertices = checkNotNull(executionVertices);
                this.executionVertexIDs = new HashSet<>();
                executionVertices.forEach(v -> 
this.executionVertexIDs.add(v.getId()));
@@ -62,7 +62,7 @@ public class FailoverRegion {
         *
         * @return all vertices in this region
         */
-       public Set<? extends SchedulingExecutionVertex<?, ?>> 
getAllExecutionVertices() {
+       public Set<? extends SchedulingExecutionVertex> 
getAllExecutionVertices() {
                return executionVertices;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
index 11e9431..981b198 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
@@ -53,7 +53,7 @@ public interface FailoverStrategy {
                 * @return The instantiated failover strategy.
                 */
                FailoverStrategy create(
-                       SchedulingTopology<?, ?> topology,
+                       SchedulingTopology topology,
                        ResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index 10a5342..aa94841 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -45,7 +45,7 @@ public final class PipelinedRegionComputeUtil {
        private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);
 
        public static Set<PipelinedRegion> toPipelinedRegionsSet(
-                       final Set<? extends Set<? extends 
SchedulingExecutionVertex<?, ?>>> distinctRegions) {
+                       final Set<? extends Set<? extends 
SchedulingExecutionVertex>> distinctRegions) {
 
                return distinctRegions.stream()
                        .map(toExecutionVertexIdSet())
@@ -53,7 +53,7 @@ public final class PipelinedRegionComputeUtil {
                        .collect(Collectors.toSet());
        }
 
-       private static Function<Set<? extends SchedulingExecutionVertex<?, ?>>, 
Set<ExecutionVertexID>> toExecutionVertexIdSet() {
+       private static Function<Set<? extends SchedulingExecutionVertex>, 
Set<ExecutionVertexID>> toExecutionVertexIdSet() {
                return failoverVertices -> failoverVertices.stream()
                        .map(SchedulingExecutionVertex::getId)
                        .collect(Collectors.toSet());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartAllFailoverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartAllFailoverStrategy.java
index fef759d..2bd09da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartAllFailoverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartAllFailoverStrategy.java
@@ -32,9 +32,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class RestartAllFailoverStrategy implements FailoverStrategy {
 
-       private final SchedulingTopology<?, ?> topology;
+       private final SchedulingTopology topology;
 
-       public RestartAllFailoverStrategy(final SchedulingTopology<?, ?> 
topology) {
+       public RestartAllFailoverStrategy(final SchedulingTopology topology) {
                this.topology = checkNotNull(topology);
        }
 
@@ -59,7 +59,7 @@ public class RestartAllFailoverStrategy implements 
FailoverStrategy {
 
                @Override
                public FailoverStrategy create(
-                               final SchedulingTopology<?, ?> topology,
+                               final SchedulingTopology topology,
                                final ResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker) {
 
                        return new RestartAllFailoverStrategy(topology);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
index b099021..3c158f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
@@ -51,7 +51,7 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
        private static final Logger LOG = 
LoggerFactory.getLogger(RestartPipelinedRegionFailoverStrategy.class);
 
        /** The topology containing info about all the vertices and result 
partitions. */
-       private final SchedulingTopology<?, ?> topology;
+       private final SchedulingTopology topology;
 
        /** All failover regions. */
        private final Set<FailoverRegion> regions;
@@ -69,7 +69,7 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
         * @param topology containing info about all the vertices and result 
partitions
         */
        @VisibleForTesting
-       public RestartPipelinedRegionFailoverStrategy(SchedulingTopology<?, ?> 
topology) {
+       public RestartPipelinedRegionFailoverStrategy(SchedulingTopology 
topology) {
                this(topology, resultPartitionID -> true);
        }
 
@@ -80,7 +80,7 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
         * @param resultPartitionAvailabilityChecker helps to query result 
partition availability
         */
        public RestartPipelinedRegionFailoverStrategy(
-               SchedulingTopology<?, ?> topology,
+               SchedulingTopology topology,
                ResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker) {
 
                this.topology = checkNotNull(topology);
@@ -98,15 +98,15 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
        // 
------------------------------------------------------------------------
 
        private void buildFailoverRegions() {
-               final Set<? extends Set<? extends SchedulingExecutionVertex<?, 
?>>> distinctRegions =
+               final Set<Set<SchedulingExecutionVertex>> distinctRegions =
                        
PipelinedRegionComputeUtil.computePipelinedRegions(topology);
 
                // creating all the failover regions and register them
-               for (Set<? extends SchedulingExecutionVertex<?, ?>> 
regionVertices : distinctRegions) {
+               for (Set<SchedulingExecutionVertex> regionVertices : 
distinctRegions) {
                        LOG.debug("Creating a failover region with {} 
vertices.", regionVertices.size());
                        final FailoverRegion failoverRegion = new 
FailoverRegion(regionVertices);
                        regions.add(failoverRegion);
-                       for (SchedulingExecutionVertex<?, ?> vertex : 
regionVertices) {
+                       for (SchedulingExecutionVertex vertex : regionVertices) 
{
                                vertexToRegionMap.put(vertex.getId(), 
failoverRegion);
                        }
                }
@@ -190,8 +190,8 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
                        regionsToRestart.add(regionToRestart);
 
                        // if a needed input result partition is not available, 
its producer region is involved
-                       for (SchedulingExecutionVertex<?, ?> vertex : 
regionToRestart.getAllExecutionVertices()) {
-                               for (SchedulingResultPartition<?, ?> 
consumedPartition : vertex.getConsumedResults()) {
+                       for (SchedulingExecutionVertex vertex : 
regionToRestart.getAllExecutionVertices()) {
+                               for (SchedulingResultPartition 
consumedPartition : vertex.getConsumedResults()) {
                                        if 
(!resultPartitionAvailabilityChecker.isAvailable(consumedPartition.getId())) {
                                                FailoverRegion producerRegion = 
vertexToRegionMap.get(consumedPartition.getProducer().getId());
                                                if 
(!visitedRegions.contains(producerRegion)) {
@@ -203,9 +203,9 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
                        }
 
                        // all consumer regions of an involved region should be 
involved
-                       for (SchedulingExecutionVertex<?, ?> vertex : 
regionToRestart.getAllExecutionVertices()) {
-                               for (SchedulingResultPartition<?, ?> 
producedPartition : vertex.getProducedResults()) {
-                                       for (SchedulingExecutionVertex<?, ?> 
consumerVertex : producedPartition.getConsumers()) {
+                       for (SchedulingExecutionVertex vertex : 
regionToRestart.getAllExecutionVertices()) {
+                               for (SchedulingResultPartition 
producedPartition : vertex.getProducedResults()) {
+                                       for (SchedulingExecutionVertex 
consumerVertex : producedPartition.getConsumers()) {
                                                FailoverRegion consumerRegion = 
vertexToRegionMap.get(consumerVertex.getId());
                                                if 
(!visitedRegions.contains(consumerRegion)) {
                                                        
visitedRegions.add(consumerRegion);
@@ -271,7 +271,7 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
 
                @Override
                public FailoverStrategy create(
-                               final SchedulingTopology<?, ?> topology,
+                               final SchedulingTopology topology,
                                final ResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker) {
 
                        return new 
RestartPipelinedRegionFailoverStrategy(topology, 
resultPartitionAvailabilityChecker);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
index 019a595..7f2eaa8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
@@ -47,7 +47,7 @@ public class NotReleasingPartitionReleaseStrategy implements 
PartitionReleaseStr
        public static class Factory implements PartitionReleaseStrategy.Factory 
{
 
                @Override
-               public PartitionReleaseStrategy createInstance(final 
SchedulingTopology<?, ?> schedulingStrategy) {
+               public PartitionReleaseStrategy createInstance(final 
SchedulingTopology schedulingStrategy) {
                        return new NotReleasingPartitionReleaseStrategy();
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
index 7002ced..2f590c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
@@ -52,6 +52,6 @@ public interface PartitionReleaseStrategy {
         * Factory for {@link PartitionReleaseStrategy}.
         */
        interface Factory {
-               PartitionReleaseStrategy createInstance(SchedulingTopology<?, 
?> schedulingStrategy);
+               PartitionReleaseStrategy createInstance(SchedulingTopology 
schedulingStrategy);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
index d5529b7..18d9a4e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
@@ -46,14 +46,14 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  */
 public class RegionPartitionReleaseStrategy implements 
PartitionReleaseStrategy {
 
-       private final SchedulingTopology<?, ?> schedulingTopology;
+       private final SchedulingTopology schedulingTopology;
 
        private final Map<PipelinedRegion, 
PipelinedRegionConsumedBlockingPartitions> consumedBlockingPartitionsByRegion = 
new IdentityHashMap<>();
 
        private final Map<ExecutionVertexID, PipelinedRegionExecutionView> 
regionExecutionViewByVertex = new HashMap<>();
 
        public RegionPartitionReleaseStrategy(
-                       final SchedulingTopology<?, ?> schedulingTopology,
+                       final SchedulingTopology schedulingTopology,
                        final Set<PipelinedRegion> pipelinedRegions) {
 
                this.schedulingTopology = checkNotNull(schedulingTopology);
@@ -85,7 +85,7 @@ public class RegionPartitionReleaseStrategy implements 
PartitionReleaseStrategy
        }
 
        private Set<IntermediateResultPartitionID> 
findResultPartitionsOutsideOfRegion(final PipelinedRegion pipelinedRegion) {
-               final Set<SchedulingResultPartition<?, ?>> 
allConsumedPartitionsInRegion = pipelinedRegion
+               final Set<SchedulingResultPartition> 
allConsumedPartitionsInRegion = pipelinedRegion
                        .getExecutionVertexIds()
                        .stream()
                        .map(schedulingTopology::getVertex)
@@ -96,12 +96,12 @@ public class RegionPartitionReleaseStrategy implements 
PartitionReleaseStrategy
        }
 
        private static Set<IntermediateResultPartitionID> 
filterResultPartitionsOutsideOfRegion(
-                       final Collection<SchedulingResultPartition<?, ?>> 
resultPartitions,
+                       final Collection<SchedulingResultPartition> 
resultPartitions,
                        final PipelinedRegion pipelinedRegion) {
 
                final Set<IntermediateResultPartitionID> result = new 
HashSet<>();
-               for (final SchedulingResultPartition<?, ?> 
maybeOutsidePartition : resultPartitions) {
-                       final SchedulingExecutionVertex<?, ?> producer = 
maybeOutsidePartition.getProducer();
+               for (final SchedulingResultPartition maybeOutsidePartition : 
resultPartitions) {
+                       final SchedulingExecutionVertex producer = 
maybeOutsidePartition.getProducer();
                        if (!pipelinedRegion.contains(producer.getId())) {
                                result.add(maybeOutsidePartition.getId());
                        }
@@ -157,7 +157,7 @@ public class RegionPartitionReleaseStrategy implements 
PartitionReleaseStrategy
        }
 
        private boolean areConsumerRegionsFinished(final 
IntermediateResultPartitionID resultPartitionId) {
-               final SchedulingResultPartition<?, ?> resultPartition = 
schedulingTopology.getResultPartition(resultPartitionId);
+               final SchedulingResultPartition resultPartition = 
schedulingTopology.getResultPartition(resultPartitionId);
                return IterableUtils.toStream(resultPartition.getConsumers())
                        .map(SchedulingExecutionVertex::getId)
                        .allMatch(this::isRegionOfVertexFinished);
@@ -174,9 +174,9 @@ public class RegionPartitionReleaseStrategy implements 
PartitionReleaseStrategy
        public static class Factory implements PartitionReleaseStrategy.Factory 
{
 
                @Override
-               public PartitionReleaseStrategy createInstance(final 
SchedulingTopology<?, ?> schedulingStrategy) {
+               public PartitionReleaseStrategy createInstance(final 
SchedulingTopology schedulingStrategy) {
 
-                       final Set<? extends Set<? extends 
SchedulingExecutionVertex<?, ?>>> distinctRegions =
+                       final Set<? extends Set<SchedulingExecutionVertex>> 
distinctRegions =
                                
PipelinedRegionComputeUtil.computePipelinedRegions(schedulingStrategy);
 
                        return new RegionPartitionReleaseStrategy(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 9fc9b45..aa38b95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -132,7 +132,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 
        private final ExecutionGraph executionGraph;
 
-       private final SchedulingTopology<?, ?> schedulingTopology;
+       private final SchedulingTopology schedulingTopology;
 
        private final InputsLocationsRetriever inputsLocationsRetriever;
 
@@ -380,7 +380,7 @@ public abstract class SchedulerBase implements SchedulerNG {
                executionGraph.failJob(cause);
        }
 
-       protected final SchedulingTopology<?, ?> getSchedulingTopology() {
+       protected final SchedulingTopology getSchedulingTopology() {
                return schedulingTopology;
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
index a535fdc..63bd39f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
@@ -39,7 +39,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Adapter of {@link ExecutionGraph} to {@link SchedulingTopology}.
  */
-public class DefaultExecutionTopology implements 
SchedulingTopology<DefaultExecutionVertex, DefaultResultPartition> {
+public class DefaultExecutionTopology implements SchedulingTopology {
 
        private final boolean containsCoLocationConstraints;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
index 4527d0b..0c5ffd2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
@@ -32,7 +32,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Default implementation of {@link SchedulingExecutionVertex}.
  */
-class DefaultExecutionVertex implements 
SchedulingExecutionVertex<DefaultExecutionVertex, DefaultResultPartition> {
+class DefaultExecutionVertex implements SchedulingExecutionVertex {
 
        private final ExecutionVertexID executionVertexId;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
index 7f4ebaa..1b1bdde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
@@ -33,7 +33,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Default implementation of {@link SchedulingResultPartition}.
  */
-class DefaultResultPartition implements 
SchedulingResultPartition<DefaultExecutionVertex, DefaultResultPartition> {
+class DefaultResultPartition implements SchedulingResultPartition {
 
        private final IntermediateResultPartitionID resultPartitionId;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
index 79ca452..b0f6188 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
@@ -36,13 +36,13 @@ public class EagerSchedulingStrategy implements 
SchedulingStrategy {
 
        private final SchedulerOperations schedulerOperations;
 
-       private final SchedulingTopology<?, ?> schedulingTopology;
+       private final SchedulingTopology schedulingTopology;
 
        private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
 
        public EagerSchedulingStrategy(
                        SchedulerOperations schedulerOperations,
-                       SchedulingTopology<?, ?> schedulingTopology) {
+                       SchedulingTopology schedulingTopology) {
                this.schedulerOperations = checkNotNull(schedulerOperations);
                this.schedulingTopology = checkNotNull(schedulingTopology);
        }
@@ -84,7 +84,7 @@ public class EagerSchedulingStrategy implements 
SchedulingStrategy {
                @Override
                public SchedulingStrategy createInstance(
                                SchedulerOperations schedulerOperations,
-                               SchedulingTopology<?, ?> schedulingTopology) {
+                               SchedulingTopology schedulingTopology) {
                        return new EagerSchedulingStrategy(schedulerOperations, 
schedulingTopology);
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
index d210907..8382190 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
@@ -42,7 +42,7 @@ public class InputDependencyConstraintChecker {
        private final SchedulingIntermediateDataSetManager 
intermediateDataSetManager =
                new SchedulingIntermediateDataSetManager();
 
-       public boolean check(final SchedulingExecutionVertex<?, ?> 
schedulingExecutionVertex) {
+       public boolean check(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
                if 
(Iterables.isEmpty(schedulingExecutionVertex.getConsumedResults())) {
                        return true;
                }
@@ -58,20 +58,20 @@ public class InputDependencyConstraintChecker {
                }
        }
 
-       List<SchedulingResultPartition<?, ?>> 
markSchedulingResultPartitionFinished(SchedulingResultPartition<?, ?> srp) {
+       List<SchedulingResultPartition> 
markSchedulingResultPartitionFinished(SchedulingResultPartition srp) {
                return 
intermediateDataSetManager.markSchedulingResultPartitionFinished(srp);
        }
 
-       void resetSchedulingResultPartition(SchedulingResultPartition<?, ?> 
srp) {
+       void resetSchedulingResultPartition(SchedulingResultPartition srp) {
                intermediateDataSetManager.resetSchedulingResultPartition(srp);
        }
 
-       void addSchedulingResultPartition(SchedulingResultPartition<?, ?> srp) {
+       void addSchedulingResultPartition(SchedulingResultPartition srp) {
                intermediateDataSetManager.addSchedulingResultPartition(srp);
        }
 
-       private boolean checkAll(final SchedulingExecutionVertex<?, ?> 
schedulingExecutionVertex) {
-               for (SchedulingResultPartition<?, ?> consumedResultPartition : 
schedulingExecutionVertex.getConsumedResults()) {
+       private boolean checkAll(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+               for (SchedulingResultPartition consumedResultPartition : 
schedulingExecutionVertex.getConsumedResults()) {
                        if (!partitionConsumable(consumedResultPartition)) {
                                return false;
                        }
@@ -79,8 +79,8 @@ public class InputDependencyConstraintChecker {
                return true;
        }
 
-       private boolean checkAny(final SchedulingExecutionVertex<?, ?> 
schedulingExecutionVertex) {
-               for (SchedulingResultPartition<?, ?> consumedResultPartition : 
schedulingExecutionVertex.getConsumedResults()) {
+       private boolean checkAny(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+               for (SchedulingResultPartition consumedResultPartition : 
schedulingExecutionVertex.getConsumedResults()) {
                        if (partitionConsumable(consumedResultPartition)) {
                                return true;
                        }
@@ -88,7 +88,7 @@ public class InputDependencyConstraintChecker {
                return false;
        }
 
-       private boolean partitionConsumable(SchedulingResultPartition<?, ?> 
partition) {
+       private boolean partitionConsumable(SchedulingResultPartition 
partition) {
                if (BLOCKING.equals(partition.getResultType())) {
                        return 
intermediateDataSetManager.allPartitionsFinished(partition);
                } else {
@@ -101,7 +101,7 @@ public class InputDependencyConstraintChecker {
 
                private final Map<IntermediateDataSetID, 
SchedulingIntermediateDataSet> intermediateDataSets = new HashMap<>();
 
-               List<SchedulingResultPartition<?, ?>> 
markSchedulingResultPartitionFinished(SchedulingResultPartition<?, ?> srp) {
+               List<SchedulingResultPartition> 
markSchedulingResultPartitionFinished(SchedulingResultPartition srp) {
                        SchedulingIntermediateDataSet intermediateDataSet = 
getSchedulingIntermediateDataSet(srp.getResultId());
                        if 
(intermediateDataSet.markPartitionFinished(srp.getId())) {
                                return 
intermediateDataSet.getSchedulingResultPartitions();
@@ -109,17 +109,17 @@ public class InputDependencyConstraintChecker {
                        return Collections.emptyList();
                }
 
-               void 
resetSchedulingResultPartition(SchedulingResultPartition<?, ?> srp) {
+               void resetSchedulingResultPartition(SchedulingResultPartition 
srp) {
                        SchedulingIntermediateDataSet sid = 
getSchedulingIntermediateDataSet(srp.getResultId());
                        sid.resetPartition(srp.getId());
                }
 
-               void addSchedulingResultPartition(SchedulingResultPartition<?, 
?> srp) {
+               void addSchedulingResultPartition(SchedulingResultPartition 
srp) {
                        SchedulingIntermediateDataSet sid = 
getOrCreateSchedulingIntermediateDataSetIfAbsent(srp.getResultId());
                        sid.addSchedulingResultPartition(srp);
                }
 
-               boolean allPartitionsFinished(SchedulingResultPartition<?, ?> 
srp) {
+               boolean allPartitionsFinished(SchedulingResultPartition srp) {
                        SchedulingIntermediateDataSet sid = 
getSchedulingIntermediateDataSet(srp.getResultId());
                        return sid.allPartitionsFinished();
                }
@@ -155,7 +155,7 @@ public class InputDependencyConstraintChecker {
         */
        private static class SchedulingIntermediateDataSet {
 
-               private final List<SchedulingResultPartition<?, ?>> partitions;
+               private final List<SchedulingResultPartition> partitions;
 
                private final Set<IntermediateResultPartitionID> 
producingPartitionIds;
 
@@ -177,12 +177,12 @@ public class InputDependencyConstraintChecker {
                        return producingPartitionIds.isEmpty();
                }
 
-               void addSchedulingResultPartition(SchedulingResultPartition<?, 
?> partition) {
+               void addSchedulingResultPartition(SchedulingResultPartition 
partition) {
                        partitions.add(partition);
                        producingPartitionIds.add(partition.getId());
                }
 
-               List<SchedulingResultPartition<?, ?>> 
getSchedulingResultPartitions() {
+               List<SchedulingResultPartition> getSchedulingResultPartitions() 
{
                        return Collections.unmodifiableList(partitions);
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
index 6520100..5e36b16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
@@ -42,11 +42,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
 
-       private static final Predicate<SchedulingExecutionVertex<?, ?>> 
IS_IN_CREATED_EXECUTION_STATE = schedulingExecutionVertex -> CREATED == 
schedulingExecutionVertex.getState();
+       private static final Predicate<SchedulingExecutionVertex> 
IS_IN_CREATED_EXECUTION_STATE = schedulingExecutionVertex -> CREATED == 
schedulingExecutionVertex.getState();
 
        private final SchedulerOperations schedulerOperations;
 
-       private final SchedulingTopology<?, ?> schedulingTopology;
+       private final SchedulingTopology schedulingTopology;
 
        private final Map<ExecutionVertexID, DeploymentOption> 
deploymentOptions;
 
@@ -54,7 +54,7 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
 
        public LazyFromSourcesSchedulingStrategy(
                        SchedulerOperations schedulerOperations,
-                       SchedulingTopology<?, ?> schedulingTopology) {
+                       SchedulingTopology schedulingTopology) {
 
                this.schedulerOperations = checkNotNull(schedulerOperations);
                this.schedulingTopology = checkNotNull(schedulingTopology);
@@ -67,9 +67,9 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
                final DeploymentOption updateOption = new 
DeploymentOption(true);
                final DeploymentOption nonUpdateOption = new 
DeploymentOption(false);
 
-               for (SchedulingExecutionVertex<?, ?> schedulingVertex : 
schedulingTopology.getVertices()) {
+               for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
                        DeploymentOption option = nonUpdateOption;
-                       for (SchedulingResultPartition<?, ?> srp : 
schedulingVertex.getProducedResults()) {
+                       for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResults()) {
                                if (srp.getResultType().isPipelined()) {
                                        option = updateOption;
                                }
@@ -100,7 +100,7 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
                        return;
                }
 
-               final Set<SchedulingExecutionVertex<?, ?>> verticesToSchedule = 
IterableUtils
+               final Set<SchedulingExecutionVertex> verticesToSchedule = 
IterableUtils
                        
.toStream(schedulingTopology.getVertex(executionVertexId).getProducedResults())
                        .filter(partition -> 
partition.getResultType().isBlocking())
                        .flatMap(partition -> 
inputConstraintChecker.markSchedulingResultPartitionFinished(partition).stream())
@@ -112,7 +112,7 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
 
        @Override
        public void onPartitionConsumable(IntermediateResultPartitionID 
resultPartitionId) {
-               final SchedulingResultPartition<?, ?> resultPartition = 
schedulingTopology
+               final SchedulingResultPartition resultPartition = 
schedulingTopology
                        .getResultPartition(resultPartitionId);
 
                if (!resultPartition.getResultType().isPipelined()) {
@@ -123,7 +123,7 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
        }
 
        private void allocateSlotsAndDeployExecutionVertices(
-                       final Iterable<? extends SchedulingExecutionVertex<?, 
?>> vertices) {
+                       final Iterable<? extends SchedulingExecutionVertex> 
vertices) {
 
                final Set<ExecutionVertexID> verticesToDeploy = 
IterableUtils.toStream(vertices)
                        
.filter(IS_IN_CREATED_EXECUTION_STATE.and(isInputConstraintSatisfied()))
@@ -141,7 +141,7 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
                }
        }
 
-       private Predicate<SchedulingExecutionVertex<?, ?>> 
isInputConstraintSatisfied() {
+       private Predicate<SchedulingExecutionVertex> 
isInputConstraintSatisfied() {
                return inputConstraintChecker::check;
        }
 
@@ -152,7 +152,7 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
                @Override
                public SchedulingStrategy createInstance(
                                SchedulerOperations schedulerOperations,
-                               SchedulingTopology<?, ?> schedulingTopology) {
+                               SchedulingTopology schedulingTopology) {
                        return new 
LazyFromSourcesSchedulingStrategy(schedulerOperations, schedulingTopology);
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
index 5c5cb31..9a822e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
@@ -27,8 +27,8 @@ import org.apache.flink.runtime.topology.Vertex;
 /**
  * Scheduling representation of {@link ExecutionVertex}.
  */
-public interface SchedulingExecutionVertex<V extends 
SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>>
-       extends Vertex<ExecutionVertexID, IntermediateResultPartitionID, V, R> {
+public interface SchedulingExecutionVertex
+       extends Vertex<ExecutionVertexID, IntermediateResultPartitionID, 
SchedulingExecutionVertex, SchedulingResultPartition> {
 
        /**
         * Gets the state of the execution vertex.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
index a4a05d5..759e82e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java
@@ -26,5 +26,5 @@ import org.apache.flink.runtime.topology.PipelinedRegion;
 /**
  * Pipelined region on execution level, i.e., {@link ExecutionGraph} level.
  */
-public interface SchedulingPipelinedRegion<V extends 
SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>> 
extends PipelinedRegion<ExecutionVertexID, IntermediateResultPartitionID, V, R> 
{
+public interface SchedulingPipelinedRegion extends 
PipelinedRegion<ExecutionVertexID, IntermediateResultPartitionID, 
SchedulingExecutionVertex, SchedulingResultPartition> {
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
index 25f6fb6..0d36127 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.topology.Result;
 /**
  * Representation of {@link IntermediateResultPartition}.
  */
-public interface SchedulingResultPartition<V extends 
SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>>
-       extends Result<ExecutionVertexID, IntermediateResultPartitionID, V, R> {
+public interface SchedulingResultPartition
+       extends Result<ExecutionVertexID, IntermediateResultPartitionID, 
SchedulingExecutionVertex, SchedulingResultPartition> {
 
        /**
         * Gets id of the intermediate result.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java
index 90cd7d4..aa634f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java
@@ -27,5 +27,5 @@ public interface SchedulingStrategyFactory {
 
        SchedulingStrategy createInstance(
                        SchedulerOperations schedulerOperations,
-                       SchedulingTopology<?, ?> schedulingTopology);
+                       SchedulingTopology schedulingTopology);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
index b1e9dd9..d24cbd0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
@@ -32,14 +32,14 @@ import java.util.stream.Collectors;
  */
 class SchedulingStrategyUtils {
 
-       static Set<ExecutionVertexID> getAllVertexIdsFromTopology(final 
SchedulingTopology<?, ?> topology) {
+       static Set<ExecutionVertexID> getAllVertexIdsFromTopology(final 
SchedulingTopology topology) {
                return IterableUtils.toStream(topology.getVertices())
                        .map(SchedulingExecutionVertex::getId)
                        .collect(Collectors.toSet());
        }
 
-       static Set<SchedulingExecutionVertex<?, ?>> getVerticesFromIds(
-                       final SchedulingTopology<?, ?> topology,
+       static Set<SchedulingExecutionVertex> getVerticesFromIds(
+                       final SchedulingTopology topology,
                        final Set<ExecutionVertexID> vertexIds) {
 
                return vertexIds.stream()
@@ -48,7 +48,7 @@ class SchedulingStrategyUtils {
        }
 
        static List<ExecutionVertexDeploymentOption> 
createExecutionVertexDeploymentOptionsInTopologicalOrder(
-                       final SchedulingTopology<?, ?> topology,
+                       final SchedulingTopology topology,
                        final Set<ExecutionVertexID> verticesToDeploy,
                        final Function<ExecutionVertexID, DeploymentOption> 
deploymentOptionRetriever) {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
index b9aac91..2920828 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
@@ -24,8 +24,8 @@ import org.apache.flink.runtime.topology.Topology;
 /**
  * Topology of {@link SchedulingExecutionVertex}.
  */
-public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, 
R extends SchedulingResultPartition<V, R>>
-       extends Topology<ExecutionVertexID, IntermediateResultPartitionID, V, 
R, SchedulingPipelinedRegion<V, R>> {
+public interface SchedulingTopology
+       extends Topology<ExecutionVertexID, IntermediateResultPartitionID, 
SchedulingExecutionVertex, SchedulingResultPartition, 
SchedulingPipelinedRegion> {
 
        /**
         * Looks up the {@link SchedulingExecutionVertex} for the given {@link 
ExecutionVertexID}.
@@ -34,7 +34,7 @@ public interface SchedulingTopology<V extends 
SchedulingExecutionVertex<V, R>, R
         * @return The respective scheduling vertex
         * @throws IllegalArgumentException If the vertex does not exist
         */
-       V getVertex(ExecutionVertexID executionVertexId);
+       SchedulingExecutionVertex getVertex(ExecutionVertexID 
executionVertexId);
 
        /**
         * Looks up the {@link SchedulingResultPartition} for the given {@link 
IntermediateResultPartitionID}.
@@ -43,5 +43,5 @@ public interface SchedulingTopology<V extends 
SchedulingExecutionVertex<V, R>, R
         * @return The respective scheduling result partition
         * @throws IllegalArgumentException If the partition does not exist
         */
-       R getResultPartition(IntermediateResultPartitionID 
intermediateResultPartitionId);
+       SchedulingResultPartition 
getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java
index 103f836..20d5fc3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java
@@ -30,7 +30,7 @@ public interface BaseTopology<VID extends VertexID, RID 
extends ResultID,
         *
         * @return topologically sorted iterable over all vertices
         */
-       Iterable<V> getVertices();
+       Iterable<? extends V> getVertices();
 
        /**
         * Returns whether the topology contains co-location constraints.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/PipelinedRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/PipelinedRegion.java
index fd3d0b8..e3f5505 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/PipelinedRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/PipelinedRegion.java
@@ -36,7 +36,7 @@ public interface PipelinedRegion<VID extends VertexID, RID 
extends ResultID,
         *
         * @return Iterable over all vertices in this pipelined region
         */
-       Iterable<V> getVertices();
+       Iterable<? extends V> getVertices();
 
        /**
         * Returns the vertex with the specified vertex id.
@@ -53,5 +53,5 @@ public interface PipelinedRegion<VID extends VertexID, RID 
extends ResultID,
         *
         * @return Iterable over all consumed results
         */
-       Iterable<R> getConsumedResults();
+       Iterable<? extends R> getConsumedResults();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Result.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Result.java
index 937eeaf..58a5a26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Result.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Result.java
@@ -34,5 +34,5 @@ public interface Result<VID extends VertexID, RID extends 
ResultID,
 
        V getProducer();
 
-       Iterable<V> getConsumers();
+       Iterable<? extends V> getConsumers();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Topology.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Topology.java
index cd0f4d9..f93af98 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Topology.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Topology.java
@@ -31,7 +31,7 @@ public interface Topology<VID extends VertexID, RID extends 
ResultID,
         *
         * @return Iterable over pipelined regions in this topology
         */
-       default Iterable<PR> getAllPipelinedRegions() {
+       default Iterable<? extends PR> getAllPipelinedRegions() {
                throw new UnsupportedOperationException();
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Vertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Vertex.java
index 5f88843..28d099a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Vertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Vertex.java
@@ -28,7 +28,7 @@ public interface Vertex<VID extends VertexID, RID extends 
ResultID,
 
        VID getId();
 
-       Iterable<R> getConsumedResults();
+       Iterable<? extends R> getConsumedResults();
 
-       Iterable<R> getProducedResults();
+       Iterable<? extends R> getProducedResults();
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
index 35af2fb..cf1a989 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
@@ -47,7 +47,7 @@ public class ExecutionFailureHandlerTest extends TestLogger {
 
        private static final long RESTART_DELAY_MS = 1234L;
 
-       private SchedulingTopology<?, ?> schedulingTopology;
+       private SchedulingTopology schedulingTopology;
 
        private TestFailoverStrategy failoverStrategy;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index e97740e..1d0eef2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -343,11 +343,11 @@ public class DefaultSchedulerTest extends TestLogger {
                final TestSchedulingStrategy.Factory schedulingStrategyFactory 
= new TestSchedulingStrategy.Factory();
                final DefaultScheduler scheduler = createScheduler(jobGraph, 
schedulingStrategyFactory);
                final TestSchedulingStrategy schedulingStrategy = 
schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
-               final SchedulingTopology<?, ?> topology = 
schedulingStrategy.getSchedulingTopology();
+               final SchedulingTopology topology = 
schedulingStrategy.getSchedulingTopology();
 
                startScheduling(scheduler);
 
-               final SchedulingExecutionVertex<?, ?> onlySchedulingVertex = 
Iterables.getOnlyElement(topology.getVertices());
+               final SchedulingExecutionVertex onlySchedulingVertex = 
Iterables.getOnlyElement(topology.getVertices());
                
schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertex.getId()));
 
                final ArchivedExecutionVertex onlyExecutionVertex = 
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
@@ -367,7 +367,7 @@ public class DefaultSchedulerTest extends TestLogger {
                final TestSchedulingStrategy.Factory schedulingStrategyFactory 
= new TestSchedulingStrategy.Factory();
                final DefaultScheduler scheduler = createScheduler(jobGraph, 
schedulingStrategyFactory);
                final TestSchedulingStrategy schedulingStrategy = 
schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
-               final SchedulingTopology<?, ?> topology = 
schedulingStrategy.getSchedulingTopology();
+               final SchedulingTopology topology = 
schedulingStrategy.getSchedulingTopology();
 
                startScheduling(scheduler);
 
@@ -632,7 +632,7 @@ public class DefaultSchedulerTest extends TestLogger {
                final JobGraph jobGraph = singleJobVertexJobGraph(2);
                final JobID jobid = jobGraph.getJobID();
                final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
-               final SchedulingTopology<?, ?> topology = 
scheduler.getSchedulingTopology();
+               final SchedulingTopology topology = 
scheduler.getSchedulingTopology();
 
                final Iterator<ArchivedExecutionVertex> vertexIterator = 
scheduler.requestJob().getAllExecutionVertices().iterator();
                final ExecutionAttemptID attemptId1 = 
vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
index 98955c2..5dec719 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
@@ -239,7 +239,7 @@ public class InputDependencyConstraintCheckerTest extends 
TestLogger {
                List<TestingSchedulingResultPartition> partitions) {
 
                InputDependencyConstraintChecker inputChecker = new 
InputDependencyConstraintChecker();
-               for (SchedulingResultPartition<?, ?> partition : partitions) {
+               for (SchedulingResultPartition partition : partitions) {
                        inputChecker.addSchedulingResultPartition(partition);
                }
                return inputChecker;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestSchedulingStrategy.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestSchedulingStrategy.java
index c9f3b88..6266b56 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestSchedulingStrategy.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestSchedulingStrategy.java
@@ -37,7 +37,7 @@ public class TestSchedulingStrategy implements 
SchedulingStrategy {
 
        private final SchedulerOperations schedulerOperations;
 
-       private final SchedulingTopology<?, ?> schedulingTopology;
+       private final SchedulingTopology schedulingTopology;
 
        private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
 
@@ -45,7 +45,7 @@ public class TestSchedulingStrategy implements 
SchedulingStrategy {
 
        public TestSchedulingStrategy(
                        final SchedulerOperations schedulerOperations,
-                       final SchedulingTopology<?, ?> schedulingTopology) {
+                       final SchedulingTopology schedulingTopology) {
 
                this.schedulerOperations = checkNotNull(schedulerOperations);
                this.schedulingTopology = checkNotNull(schedulingTopology);
@@ -72,7 +72,7 @@ public class TestSchedulingStrategy implements 
SchedulingStrategy {
                allocateSlotsAndDeploy(verticesToSchedule);
        }
 
-       public SchedulingTopology<?, ?> getSchedulingTopology() {
+       public SchedulingTopology getSchedulingTopology() {
                return schedulingTopology;
        }
 
@@ -106,7 +106,7 @@ public class TestSchedulingStrategy implements 
SchedulingStrategy {
                @Override
                public SchedulingStrategy createInstance(
                                final SchedulerOperations schedulerOperations,
-                               final SchedulingTopology<?, ?> 
schedulingTopology) {
+                               final SchedulingTopology schedulingTopology) {
 
                        lastInstance = new 
TestSchedulingStrategy(schedulerOperations, schedulingTopology);
                        return lastInstance;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
index dcb0df5..31cb8b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
@@ -30,8 +30,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A simple scheduling execution vertex for testing purposes.
  */
-public class TestingSchedulingExecutionVertex
-       implements SchedulingExecutionVertex<TestingSchedulingExecutionVertex, 
TestingSchedulingResultPartition> {
+public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVertex {
 
        private final ExecutionVertexID executionVertexId;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
index 7eb3b41..55fabcf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
@@ -30,8 +30,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A simple implementation of {@link SchedulingResultPartition} for testing.
  */
-public class TestingSchedulingResultPartition
-       implements SchedulingResultPartition<TestingSchedulingExecutionVertex, 
TestingSchedulingResultPartition> {
+public class TestingSchedulingResultPartition implements 
SchedulingResultPartition {
 
        private final IntermediateDataSetID intermediateDataSetID;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index 5e930b4..af22b63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -37,8 +37,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * A simple scheduling topology for testing purposes.
  */
-public class TestingSchedulingTopology
-       implements SchedulingTopology<TestingSchedulingExecutionVertex, 
TestingSchedulingResultPartition> {
+public class TestingSchedulingTopology implements SchedulingTopology {
 
        // Use linked map here to so we can get the values in inserted order
        private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex> 
schedulingExecutionVertices = new LinkedHashMap<>();

Reply via email to