This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cc30a4ea080abda6bc58515e95fdf74f24792ec8 Author: Zhu Zhu <reed...@gmail.com> AuthorDate: Thu Apr 9 15:58:21 2020 +0800 [FLINK-17181][runtime] Drop generic Types in Topology Interface Co-authored-by: Gary Yao <g...@apache.org> --- .../runtime/executiongraph/ExecutionGraph.java | 6 ++-- .../failover/flip1/ExecutionFailureHandler.java | 4 +-- .../failover/flip1/FailoverRegion.java | 6 ++-- .../failover/flip1/FailoverStrategy.java | 2 +- .../failover/flip1/PipelinedRegionComputeUtil.java | 4 +-- .../failover/flip1/RestartAllFailoverStrategy.java | 6 ++-- .../RestartPipelinedRegionFailoverStrategy.java | 24 ++++++++-------- .../NotReleasingPartitionReleaseStrategy.java | 2 +- .../partitionrelease/PartitionReleaseStrategy.java | 2 +- .../RegionPartitionReleaseStrategy.java | 18 ++++++------ .../flink/runtime/scheduler/SchedulerBase.java | 4 +-- .../adapter/DefaultExecutionTopology.java | 2 +- .../scheduler/adapter/DefaultExecutionVertex.java | 2 +- .../scheduler/adapter/DefaultResultPartition.java | 2 +- .../strategy/EagerSchedulingStrategy.java | 6 ++-- .../strategy/InputDependencyConstraintChecker.java | 32 +++++++++++----------- .../LazyFromSourcesSchedulingStrategy.java | 20 +++++++------- .../strategy/SchedulingExecutionVertex.java | 4 +-- .../strategy/SchedulingPipelinedRegion.java | 2 +- .../strategy/SchedulingResultPartition.java | 4 +-- .../strategy/SchedulingStrategyFactory.java | 2 +- .../strategy/SchedulingStrategyUtils.java | 8 +++--- .../scheduler/strategy/SchedulingTopology.java | 8 +++--- .../flink/runtime/topology/BaseTopology.java | 2 +- .../flink/runtime/topology/PipelinedRegion.java | 4 +-- .../org/apache/flink/runtime/topology/Result.java | 2 +- .../apache/flink/runtime/topology/Topology.java | 2 +- .../org/apache/flink/runtime/topology/Vertex.java | 4 +-- .../flip1/ExecutionFailureHandlerTest.java | 2 +- .../runtime/scheduler/DefaultSchedulerTest.java | 8 +++--- .../InputDependencyConstraintCheckerTest.java | 2 +- .../scheduler/strategy/TestSchedulingStrategy.java | 8 +++--- .../strategy/TestingSchedulingExecutionVertex.java | 3 +- .../strategy/TestingSchedulingResultPartition.java | 3 +- .../strategy/TestingSchedulingTopology.java | 3 +- 35 files changed, 105 insertions(+), 108 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 190259d..e348059 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -410,7 +410,7 @@ public class ExecutionGraph implements AccessExecutionGraph { return this.verticesInCreationOrder.size(); } - public SchedulingTopology<?, ?> getSchedulingTopology() { + public SchedulingTopology getSchedulingTopology() { return executionTopology; } @@ -1541,9 +1541,9 @@ public class ExecutionGraph implements AccessExecutionGraph { } ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) { - final SchedulingResultPartition<?, ?> schedulingResultPartition = + final SchedulingResultPartition schedulingResultPartition = getSchedulingTopology().getResultPartition(resultPartitionId); - final SchedulingExecutionVertex<?, ?> producer = schedulingResultPartition.getProducer(); + final SchedulingExecutionVertex producer = schedulingResultPartition.getProducer(); final ExecutionVertexID producerId = producer.getId(); final JobVertexID jobVertexId = producerId.getJobVertexId(); final ExecutionJobVertex jobVertex = getJobVertex(jobVertexId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java index c89717e..63d5e88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java @@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ExecutionFailureHandler { - private final SchedulingTopology<?, ?> schedulingTopology; + private final SchedulingTopology schedulingTopology; /** Strategy to judge which tasks should be restarted. */ private final FailoverStrategy failoverStrategy; @@ -57,7 +57,7 @@ public class ExecutionFailureHandler { * @param restartBackoffTimeStrategy helps to decide whether to restart failed tasks and the restarting delay */ public ExecutionFailureHandler( - final SchedulingTopology<?, ?> schedulingTopology, + final SchedulingTopology schedulingTopology, final FailoverStrategy failoverStrategy, final RestartBackoffTimeStrategy restartBackoffTimeStrategy) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java index 9e0369b..d1efb6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java @@ -35,14 +35,14 @@ public class FailoverRegion { private final Set<ExecutionVertexID> executionVertexIDs; /** All vertices in this region. */ - private final Set<? extends SchedulingExecutionVertex<?, ?>> executionVertices; + private final Set<? extends SchedulingExecutionVertex> executionVertices; /** * Creates a new failover region containing a set of vertices. * * @param executionVertices to be contained in this region */ - public FailoverRegion(Set<? extends SchedulingExecutionVertex<?, ?>> executionVertices) { + public FailoverRegion(Set<? extends SchedulingExecutionVertex> executionVertices) { this.executionVertices = checkNotNull(executionVertices); this.executionVertexIDs = new HashSet<>(); executionVertices.forEach(v -> this.executionVertexIDs.add(v.getId())); @@ -62,7 +62,7 @@ public class FailoverRegion { * * @return all vertices in this region */ - public Set<? extends SchedulingExecutionVertex<?, ?>> getAllExecutionVertices() { + public Set<? extends SchedulingExecutionVertex> getAllExecutionVertices() { return executionVertices; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java index 11e9431..981b198 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java @@ -53,7 +53,7 @@ public interface FailoverStrategy { * @return The instantiated failover strategy. */ FailoverStrategy create( - SchedulingTopology<?, ?> topology, + SchedulingTopology topology, ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java index 10a5342..aa94841 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java @@ -45,7 +45,7 @@ public final class PipelinedRegionComputeUtil { private static final Logger LOG = LoggerFactory.getLogger(PipelinedRegionComputeUtil.class); public static Set<PipelinedRegion> toPipelinedRegionsSet( - final Set<? extends Set<? extends SchedulingExecutionVertex<?, ?>>> distinctRegions) { + final Set<? extends Set<? extends SchedulingExecutionVertex>> distinctRegions) { return distinctRegions.stream() .map(toExecutionVertexIdSet()) @@ -53,7 +53,7 @@ public final class PipelinedRegionComputeUtil { .collect(Collectors.toSet()); } - private static Function<Set<? extends SchedulingExecutionVertex<?, ?>>, Set<ExecutionVertexID>> toExecutionVertexIdSet() { + private static Function<Set<? extends SchedulingExecutionVertex>, Set<ExecutionVertexID>> toExecutionVertexIdSet() { return failoverVertices -> failoverVertices.stream() .map(SchedulingExecutionVertex::getId) .collect(Collectors.toSet()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartAllFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartAllFailoverStrategy.java index fef759d..2bd09da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartAllFailoverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartAllFailoverStrategy.java @@ -32,9 +32,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class RestartAllFailoverStrategy implements FailoverStrategy { - private final SchedulingTopology<?, ?> topology; + private final SchedulingTopology topology; - public RestartAllFailoverStrategy(final SchedulingTopology<?, ?> topology) { + public RestartAllFailoverStrategy(final SchedulingTopology topology) { this.topology = checkNotNull(topology); } @@ -59,7 +59,7 @@ public class RestartAllFailoverStrategy implements FailoverStrategy { @Override public FailoverStrategy create( - final SchedulingTopology<?, ?> topology, + final SchedulingTopology topology, final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) { return new RestartAllFailoverStrategy(topology); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java index b099021..3c158f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java @@ -51,7 +51,7 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionFailoverStrategy.class); /** The topology containing info about all the vertices and result partitions. */ - private final SchedulingTopology<?, ?> topology; + private final SchedulingTopology topology; /** All failover regions. */ private final Set<FailoverRegion> regions; @@ -69,7 +69,7 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy * @param topology containing info about all the vertices and result partitions */ @VisibleForTesting - public RestartPipelinedRegionFailoverStrategy(SchedulingTopology<?, ?> topology) { + public RestartPipelinedRegionFailoverStrategy(SchedulingTopology topology) { this(topology, resultPartitionID -> true); } @@ -80,7 +80,7 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy * @param resultPartitionAvailabilityChecker helps to query result partition availability */ public RestartPipelinedRegionFailoverStrategy( - SchedulingTopology<?, ?> topology, + SchedulingTopology topology, ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) { this.topology = checkNotNull(topology); @@ -98,15 +98,15 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy // ------------------------------------------------------------------------ private void buildFailoverRegions() { - final Set<? extends Set<? extends SchedulingExecutionVertex<?, ?>>> distinctRegions = + final Set<Set<SchedulingExecutionVertex>> distinctRegions = PipelinedRegionComputeUtil.computePipelinedRegions(topology); // creating all the failover regions and register them - for (Set<? extends SchedulingExecutionVertex<?, ?>> regionVertices : distinctRegions) { + for (Set<SchedulingExecutionVertex> regionVertices : distinctRegions) { LOG.debug("Creating a failover region with {} vertices.", regionVertices.size()); final FailoverRegion failoverRegion = new FailoverRegion(regionVertices); regions.add(failoverRegion); - for (SchedulingExecutionVertex<?, ?> vertex : regionVertices) { + for (SchedulingExecutionVertex vertex : regionVertices) { vertexToRegionMap.put(vertex.getId(), failoverRegion); } } @@ -190,8 +190,8 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy regionsToRestart.add(regionToRestart); // if a needed input result partition is not available, its producer region is involved - for (SchedulingExecutionVertex<?, ?> vertex : regionToRestart.getAllExecutionVertices()) { - for (SchedulingResultPartition<?, ?> consumedPartition : vertex.getConsumedResults()) { + for (SchedulingExecutionVertex vertex : regionToRestart.getAllExecutionVertices()) { + for (SchedulingResultPartition consumedPartition : vertex.getConsumedResults()) { if (!resultPartitionAvailabilityChecker.isAvailable(consumedPartition.getId())) { FailoverRegion producerRegion = vertexToRegionMap.get(consumedPartition.getProducer().getId()); if (!visitedRegions.contains(producerRegion)) { @@ -203,9 +203,9 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy } // all consumer regions of an involved region should be involved - for (SchedulingExecutionVertex<?, ?> vertex : regionToRestart.getAllExecutionVertices()) { - for (SchedulingResultPartition<?, ?> producedPartition : vertex.getProducedResults()) { - for (SchedulingExecutionVertex<?, ?> consumerVertex : producedPartition.getConsumers()) { + for (SchedulingExecutionVertex vertex : regionToRestart.getAllExecutionVertices()) { + for (SchedulingResultPartition producedPartition : vertex.getProducedResults()) { + for (SchedulingExecutionVertex consumerVertex : producedPartition.getConsumers()) { FailoverRegion consumerRegion = vertexToRegionMap.get(consumerVertex.getId()); if (!visitedRegions.contains(consumerRegion)) { visitedRegions.add(consumerRegion); @@ -271,7 +271,7 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy @Override public FailoverStrategy create( - final SchedulingTopology<?, ?> topology, + final SchedulingTopology topology, final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) { return new RestartPipelinedRegionFailoverStrategy(topology, resultPartitionAvailabilityChecker); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java index 019a595..7f2eaa8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java @@ -47,7 +47,7 @@ public class NotReleasingPartitionReleaseStrategy implements PartitionReleaseStr public static class Factory implements PartitionReleaseStrategy.Factory { @Override - public PartitionReleaseStrategy createInstance(final SchedulingTopology<?, ?> schedulingStrategy) { + public PartitionReleaseStrategy createInstance(final SchedulingTopology schedulingStrategy) { return new NotReleasingPartitionReleaseStrategy(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java index 7002ced..2f590c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java @@ -52,6 +52,6 @@ public interface PartitionReleaseStrategy { * Factory for {@link PartitionReleaseStrategy}. */ interface Factory { - PartitionReleaseStrategy createInstance(SchedulingTopology<?, ?> schedulingStrategy); + PartitionReleaseStrategy createInstance(SchedulingTopology schedulingStrategy); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java index d5529b7..18d9a4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java @@ -46,14 +46,14 @@ import static org.apache.flink.util.Preconditions.checkState; */ public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy { - private final SchedulingTopology<?, ?> schedulingTopology; + private final SchedulingTopology schedulingTopology; private final Map<PipelinedRegion, PipelinedRegionConsumedBlockingPartitions> consumedBlockingPartitionsByRegion = new IdentityHashMap<>(); private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap<>(); public RegionPartitionReleaseStrategy( - final SchedulingTopology<?, ?> schedulingTopology, + final SchedulingTopology schedulingTopology, final Set<PipelinedRegion> pipelinedRegions) { this.schedulingTopology = checkNotNull(schedulingTopology); @@ -85,7 +85,7 @@ public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy } private Set<IntermediateResultPartitionID> findResultPartitionsOutsideOfRegion(final PipelinedRegion pipelinedRegion) { - final Set<SchedulingResultPartition<?, ?>> allConsumedPartitionsInRegion = pipelinedRegion + final Set<SchedulingResultPartition> allConsumedPartitionsInRegion = pipelinedRegion .getExecutionVertexIds() .stream() .map(schedulingTopology::getVertex) @@ -96,12 +96,12 @@ public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy } private static Set<IntermediateResultPartitionID> filterResultPartitionsOutsideOfRegion( - final Collection<SchedulingResultPartition<?, ?>> resultPartitions, + final Collection<SchedulingResultPartition> resultPartitions, final PipelinedRegion pipelinedRegion) { final Set<IntermediateResultPartitionID> result = new HashSet<>(); - for (final SchedulingResultPartition<?, ?> maybeOutsidePartition : resultPartitions) { - final SchedulingExecutionVertex<?, ?> producer = maybeOutsidePartition.getProducer(); + for (final SchedulingResultPartition maybeOutsidePartition : resultPartitions) { + final SchedulingExecutionVertex producer = maybeOutsidePartition.getProducer(); if (!pipelinedRegion.contains(producer.getId())) { result.add(maybeOutsidePartition.getId()); } @@ -157,7 +157,7 @@ public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy } private boolean areConsumerRegionsFinished(final IntermediateResultPartitionID resultPartitionId) { - final SchedulingResultPartition<?, ?> resultPartition = schedulingTopology.getResultPartition(resultPartitionId); + final SchedulingResultPartition resultPartition = schedulingTopology.getResultPartition(resultPartitionId); return IterableUtils.toStream(resultPartition.getConsumers()) .map(SchedulingExecutionVertex::getId) .allMatch(this::isRegionOfVertexFinished); @@ -174,9 +174,9 @@ public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy public static class Factory implements PartitionReleaseStrategy.Factory { @Override - public PartitionReleaseStrategy createInstance(final SchedulingTopology<?, ?> schedulingStrategy) { + public PartitionReleaseStrategy createInstance(final SchedulingTopology schedulingStrategy) { - final Set<? extends Set<? extends SchedulingExecutionVertex<?, ?>>> distinctRegions = + final Set<? extends Set<SchedulingExecutionVertex>> distinctRegions = PipelinedRegionComputeUtil.computePipelinedRegions(schedulingStrategy); return new RegionPartitionReleaseStrategy( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 9fc9b45..aa38b95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -132,7 +132,7 @@ public abstract class SchedulerBase implements SchedulerNG { private final ExecutionGraph executionGraph; - private final SchedulingTopology<?, ?> schedulingTopology; + private final SchedulingTopology schedulingTopology; private final InputsLocationsRetriever inputsLocationsRetriever; @@ -380,7 +380,7 @@ public abstract class SchedulerBase implements SchedulerNG { executionGraph.failJob(cause); } - protected final SchedulingTopology<?, ?> getSchedulingTopology() { + protected final SchedulingTopology getSchedulingTopology() { return schedulingTopology; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java index a535fdc..63bd39f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java @@ -39,7 +39,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Adapter of {@link ExecutionGraph} to {@link SchedulingTopology}. */ -public class DefaultExecutionTopology implements SchedulingTopology<DefaultExecutionVertex, DefaultResultPartition> { +public class DefaultExecutionTopology implements SchedulingTopology { private final boolean containsCoLocationConstraints; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java index 4527d0b..0c5ffd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java @@ -32,7 +32,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Default implementation of {@link SchedulingExecutionVertex}. */ -class DefaultExecutionVertex implements SchedulingExecutionVertex<DefaultExecutionVertex, DefaultResultPartition> { +class DefaultExecutionVertex implements SchedulingExecutionVertex { private final ExecutionVertexID executionVertexId; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java index 7f4ebaa..1b1bdde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java @@ -33,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Default implementation of {@link SchedulingResultPartition}. */ -class DefaultResultPartition implements SchedulingResultPartition<DefaultExecutionVertex, DefaultResultPartition> { +class DefaultResultPartition implements SchedulingResultPartition { private final IntermediateResultPartitionID resultPartitionId; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java index 79ca452..b0f6188 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java @@ -36,13 +36,13 @@ public class EagerSchedulingStrategy implements SchedulingStrategy { private final SchedulerOperations schedulerOperations; - private final SchedulingTopology<?, ?> schedulingTopology; + private final SchedulingTopology schedulingTopology; private final DeploymentOption deploymentOption = new DeploymentOption(false); public EagerSchedulingStrategy( SchedulerOperations schedulerOperations, - SchedulingTopology<?, ?> schedulingTopology) { + SchedulingTopology schedulingTopology) { this.schedulerOperations = checkNotNull(schedulerOperations); this.schedulingTopology = checkNotNull(schedulingTopology); } @@ -84,7 +84,7 @@ public class EagerSchedulingStrategy implements SchedulingStrategy { @Override public SchedulingStrategy createInstance( SchedulerOperations schedulerOperations, - SchedulingTopology<?, ?> schedulingTopology) { + SchedulingTopology schedulingTopology) { return new EagerSchedulingStrategy(schedulerOperations, schedulingTopology); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java index d210907..8382190 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java @@ -42,7 +42,7 @@ public class InputDependencyConstraintChecker { private final SchedulingIntermediateDataSetManager intermediateDataSetManager = new SchedulingIntermediateDataSetManager(); - public boolean check(final SchedulingExecutionVertex<?, ?> schedulingExecutionVertex) { + public boolean check(final SchedulingExecutionVertex schedulingExecutionVertex) { if (Iterables.isEmpty(schedulingExecutionVertex.getConsumedResults())) { return true; } @@ -58,20 +58,20 @@ public class InputDependencyConstraintChecker { } } - List<SchedulingResultPartition<?, ?>> markSchedulingResultPartitionFinished(SchedulingResultPartition<?, ?> srp) { + List<SchedulingResultPartition> markSchedulingResultPartitionFinished(SchedulingResultPartition srp) { return intermediateDataSetManager.markSchedulingResultPartitionFinished(srp); } - void resetSchedulingResultPartition(SchedulingResultPartition<?, ?> srp) { + void resetSchedulingResultPartition(SchedulingResultPartition srp) { intermediateDataSetManager.resetSchedulingResultPartition(srp); } - void addSchedulingResultPartition(SchedulingResultPartition<?, ?> srp) { + void addSchedulingResultPartition(SchedulingResultPartition srp) { intermediateDataSetManager.addSchedulingResultPartition(srp); } - private boolean checkAll(final SchedulingExecutionVertex<?, ?> schedulingExecutionVertex) { - for (SchedulingResultPartition<?, ?> consumedResultPartition : schedulingExecutionVertex.getConsumedResults()) { + private boolean checkAll(final SchedulingExecutionVertex schedulingExecutionVertex) { + for (SchedulingResultPartition consumedResultPartition : schedulingExecutionVertex.getConsumedResults()) { if (!partitionConsumable(consumedResultPartition)) { return false; } @@ -79,8 +79,8 @@ public class InputDependencyConstraintChecker { return true; } - private boolean checkAny(final SchedulingExecutionVertex<?, ?> schedulingExecutionVertex) { - for (SchedulingResultPartition<?, ?> consumedResultPartition : schedulingExecutionVertex.getConsumedResults()) { + private boolean checkAny(final SchedulingExecutionVertex schedulingExecutionVertex) { + for (SchedulingResultPartition consumedResultPartition : schedulingExecutionVertex.getConsumedResults()) { if (partitionConsumable(consumedResultPartition)) { return true; } @@ -88,7 +88,7 @@ public class InputDependencyConstraintChecker { return false; } - private boolean partitionConsumable(SchedulingResultPartition<?, ?> partition) { + private boolean partitionConsumable(SchedulingResultPartition partition) { if (BLOCKING.equals(partition.getResultType())) { return intermediateDataSetManager.allPartitionsFinished(partition); } else { @@ -101,7 +101,7 @@ public class InputDependencyConstraintChecker { private final Map<IntermediateDataSetID, SchedulingIntermediateDataSet> intermediateDataSets = new HashMap<>(); - List<SchedulingResultPartition<?, ?>> markSchedulingResultPartitionFinished(SchedulingResultPartition<?, ?> srp) { + List<SchedulingResultPartition> markSchedulingResultPartitionFinished(SchedulingResultPartition srp) { SchedulingIntermediateDataSet intermediateDataSet = getSchedulingIntermediateDataSet(srp.getResultId()); if (intermediateDataSet.markPartitionFinished(srp.getId())) { return intermediateDataSet.getSchedulingResultPartitions(); @@ -109,17 +109,17 @@ public class InputDependencyConstraintChecker { return Collections.emptyList(); } - void resetSchedulingResultPartition(SchedulingResultPartition<?, ?> srp) { + void resetSchedulingResultPartition(SchedulingResultPartition srp) { SchedulingIntermediateDataSet sid = getSchedulingIntermediateDataSet(srp.getResultId()); sid.resetPartition(srp.getId()); } - void addSchedulingResultPartition(SchedulingResultPartition<?, ?> srp) { + void addSchedulingResultPartition(SchedulingResultPartition srp) { SchedulingIntermediateDataSet sid = getOrCreateSchedulingIntermediateDataSetIfAbsent(srp.getResultId()); sid.addSchedulingResultPartition(srp); } - boolean allPartitionsFinished(SchedulingResultPartition<?, ?> srp) { + boolean allPartitionsFinished(SchedulingResultPartition srp) { SchedulingIntermediateDataSet sid = getSchedulingIntermediateDataSet(srp.getResultId()); return sid.allPartitionsFinished(); } @@ -155,7 +155,7 @@ public class InputDependencyConstraintChecker { */ private static class SchedulingIntermediateDataSet { - private final List<SchedulingResultPartition<?, ?>> partitions; + private final List<SchedulingResultPartition> partitions; private final Set<IntermediateResultPartitionID> producingPartitionIds; @@ -177,12 +177,12 @@ public class InputDependencyConstraintChecker { return producingPartitionIds.isEmpty(); } - void addSchedulingResultPartition(SchedulingResultPartition<?, ?> partition) { + void addSchedulingResultPartition(SchedulingResultPartition partition) { partitions.add(partition); producingPartitionIds.add(partition.getId()); } - List<SchedulingResultPartition<?, ?>> getSchedulingResultPartitions() { + List<SchedulingResultPartition> getSchedulingResultPartitions() { return Collections.unmodifiableList(partitions); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java index 6520100..5e36b16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java @@ -42,11 +42,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { - private static final Predicate<SchedulingExecutionVertex<?, ?>> IS_IN_CREATED_EXECUTION_STATE = schedulingExecutionVertex -> CREATED == schedulingExecutionVertex.getState(); + private static final Predicate<SchedulingExecutionVertex> IS_IN_CREATED_EXECUTION_STATE = schedulingExecutionVertex -> CREATED == schedulingExecutionVertex.getState(); private final SchedulerOperations schedulerOperations; - private final SchedulingTopology<?, ?> schedulingTopology; + private final SchedulingTopology schedulingTopology; private final Map<ExecutionVertexID, DeploymentOption> deploymentOptions; @@ -54,7 +54,7 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { public LazyFromSourcesSchedulingStrategy( SchedulerOperations schedulerOperations, - SchedulingTopology<?, ?> schedulingTopology) { + SchedulingTopology schedulingTopology) { this.schedulerOperations = checkNotNull(schedulerOperations); this.schedulingTopology = checkNotNull(schedulingTopology); @@ -67,9 +67,9 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { final DeploymentOption updateOption = new DeploymentOption(true); final DeploymentOption nonUpdateOption = new DeploymentOption(false); - for (SchedulingExecutionVertex<?, ?> schedulingVertex : schedulingTopology.getVertices()) { + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { DeploymentOption option = nonUpdateOption; - for (SchedulingResultPartition<?, ?> srp : schedulingVertex.getProducedResults()) { + for (SchedulingResultPartition srp : schedulingVertex.getProducedResults()) { if (srp.getResultType().isPipelined()) { option = updateOption; } @@ -100,7 +100,7 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { return; } - final Set<SchedulingExecutionVertex<?, ?>> verticesToSchedule = IterableUtils + final Set<SchedulingExecutionVertex> verticesToSchedule = IterableUtils .toStream(schedulingTopology.getVertex(executionVertexId).getProducedResults()) .filter(partition -> partition.getResultType().isBlocking()) .flatMap(partition -> inputConstraintChecker.markSchedulingResultPartitionFinished(partition).stream()) @@ -112,7 +112,7 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { @Override public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) { - final SchedulingResultPartition<?, ?> resultPartition = schedulingTopology + final SchedulingResultPartition resultPartition = schedulingTopology .getResultPartition(resultPartitionId); if (!resultPartition.getResultType().isPipelined()) { @@ -123,7 +123,7 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { } private void allocateSlotsAndDeployExecutionVertices( - final Iterable<? extends SchedulingExecutionVertex<?, ?>> vertices) { + final Iterable<? extends SchedulingExecutionVertex> vertices) { final Set<ExecutionVertexID> verticesToDeploy = IterableUtils.toStream(vertices) .filter(IS_IN_CREATED_EXECUTION_STATE.and(isInputConstraintSatisfied())) @@ -141,7 +141,7 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { } } - private Predicate<SchedulingExecutionVertex<?, ?>> isInputConstraintSatisfied() { + private Predicate<SchedulingExecutionVertex> isInputConstraintSatisfied() { return inputConstraintChecker::check; } @@ -152,7 +152,7 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { @Override public SchedulingStrategy createInstance( SchedulerOperations schedulerOperations, - SchedulingTopology<?, ?> schedulingTopology) { + SchedulingTopology schedulingTopology) { return new LazyFromSourcesSchedulingStrategy(schedulerOperations, schedulingTopology); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java index 5c5cb31..9a822e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java @@ -27,8 +27,8 @@ import org.apache.flink.runtime.topology.Vertex; /** * Scheduling representation of {@link ExecutionVertex}. */ -public interface SchedulingExecutionVertex<V extends SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>> - extends Vertex<ExecutionVertexID, IntermediateResultPartitionID, V, R> { +public interface SchedulingExecutionVertex + extends Vertex<ExecutionVertexID, IntermediateResultPartitionID, SchedulingExecutionVertex, SchedulingResultPartition> { /** * Gets the state of the execution vertex. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java index a4a05d5..759e82e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.java @@ -26,5 +26,5 @@ import org.apache.flink.runtime.topology.PipelinedRegion; /** * Pipelined region on execution level, i.e., {@link ExecutionGraph} level. */ -public interface SchedulingPipelinedRegion<V extends SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>> extends PipelinedRegion<ExecutionVertexID, IntermediateResultPartitionID, V, R> { +public interface SchedulingPipelinedRegion extends PipelinedRegion<ExecutionVertexID, IntermediateResultPartitionID, SchedulingExecutionVertex, SchedulingResultPartition> { } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java index 25f6fb6..0d36127 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java @@ -26,8 +26,8 @@ import org.apache.flink.runtime.topology.Result; /** * Representation of {@link IntermediateResultPartition}. */ -public interface SchedulingResultPartition<V extends SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>> - extends Result<ExecutionVertexID, IntermediateResultPartitionID, V, R> { +public interface SchedulingResultPartition + extends Result<ExecutionVertexID, IntermediateResultPartitionID, SchedulingExecutionVertex, SchedulingResultPartition> { /** * Gets id of the intermediate result. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java index 90cd7d4..aa634f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java @@ -27,5 +27,5 @@ public interface SchedulingStrategyFactory { SchedulingStrategy createInstance( SchedulerOperations schedulerOperations, - SchedulingTopology<?, ?> schedulingTopology); + SchedulingTopology schedulingTopology); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java index b1e9dd9..d24cbd0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java @@ -32,14 +32,14 @@ import java.util.stream.Collectors; */ class SchedulingStrategyUtils { - static Set<ExecutionVertexID> getAllVertexIdsFromTopology(final SchedulingTopology<?, ?> topology) { + static Set<ExecutionVertexID> getAllVertexIdsFromTopology(final SchedulingTopology topology) { return IterableUtils.toStream(topology.getVertices()) .map(SchedulingExecutionVertex::getId) .collect(Collectors.toSet()); } - static Set<SchedulingExecutionVertex<?, ?>> getVerticesFromIds( - final SchedulingTopology<?, ?> topology, + static Set<SchedulingExecutionVertex> getVerticesFromIds( + final SchedulingTopology topology, final Set<ExecutionVertexID> vertexIds) { return vertexIds.stream() @@ -48,7 +48,7 @@ class SchedulingStrategyUtils { } static List<ExecutionVertexDeploymentOption> createExecutionVertexDeploymentOptionsInTopologicalOrder( - final SchedulingTopology<?, ?> topology, + final SchedulingTopology topology, final Set<ExecutionVertexID> verticesToDeploy, final Function<ExecutionVertexID, DeploymentOption> deploymentOptionRetriever) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java index b9aac91..2920828 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java @@ -24,8 +24,8 @@ import org.apache.flink.runtime.topology.Topology; /** * Topology of {@link SchedulingExecutionVertex}. */ -public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>> - extends Topology<ExecutionVertexID, IntermediateResultPartitionID, V, R, SchedulingPipelinedRegion<V, R>> { +public interface SchedulingTopology + extends Topology<ExecutionVertexID, IntermediateResultPartitionID, SchedulingExecutionVertex, SchedulingResultPartition, SchedulingPipelinedRegion> { /** * Looks up the {@link SchedulingExecutionVertex} for the given {@link ExecutionVertexID}. @@ -34,7 +34,7 @@ public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, R * @return The respective scheduling vertex * @throws IllegalArgumentException If the vertex does not exist */ - V getVertex(ExecutionVertexID executionVertexId); + SchedulingExecutionVertex getVertex(ExecutionVertexID executionVertexId); /** * Looks up the {@link SchedulingResultPartition} for the given {@link IntermediateResultPartitionID}. @@ -43,5 +43,5 @@ public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, R * @return The respective scheduling result partition * @throws IllegalArgumentException If the partition does not exist */ - R getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId); + SchedulingResultPartition getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java index 103f836..20d5fc3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java @@ -30,7 +30,7 @@ public interface BaseTopology<VID extends VertexID, RID extends ResultID, * * @return topologically sorted iterable over all vertices */ - Iterable<V> getVertices(); + Iterable<? extends V> getVertices(); /** * Returns whether the topology contains co-location constraints. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/PipelinedRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/PipelinedRegion.java index fd3d0b8..e3f5505 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/PipelinedRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/PipelinedRegion.java @@ -36,7 +36,7 @@ public interface PipelinedRegion<VID extends VertexID, RID extends ResultID, * * @return Iterable over all vertices in this pipelined region */ - Iterable<V> getVertices(); + Iterable<? extends V> getVertices(); /** * Returns the vertex with the specified vertex id. @@ -53,5 +53,5 @@ public interface PipelinedRegion<VID extends VertexID, RID extends ResultID, * * @return Iterable over all consumed results */ - Iterable<R> getConsumedResults(); + Iterable<? extends R> getConsumedResults(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Result.java b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Result.java index 937eeaf..58a5a26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Result.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Result.java @@ -34,5 +34,5 @@ public interface Result<VID extends VertexID, RID extends ResultID, V getProducer(); - Iterable<V> getConsumers(); + Iterable<? extends V> getConsumers(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Topology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Topology.java index cd0f4d9..f93af98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Topology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Topology.java @@ -31,7 +31,7 @@ public interface Topology<VID extends VertexID, RID extends ResultID, * * @return Iterable over pipelined regions in this topology */ - default Iterable<PR> getAllPipelinedRegions() { + default Iterable<? extends PR> getAllPipelinedRegions() { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Vertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Vertex.java index 5f88843..28d099a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Vertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/topology/Vertex.java @@ -28,7 +28,7 @@ public interface Vertex<VID extends VertexID, RID extends ResultID, VID getId(); - Iterable<R> getConsumedResults(); + Iterable<? extends R> getConsumedResults(); - Iterable<R> getProducedResults(); + Iterable<? extends R> getProducedResults(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java index 35af2fb..cf1a989 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java @@ -47,7 +47,7 @@ public class ExecutionFailureHandlerTest extends TestLogger { private static final long RESTART_DELAY_MS = 1234L; - private SchedulingTopology<?, ?> schedulingTopology; + private SchedulingTopology schedulingTopology; private TestFailoverStrategy failoverStrategy; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index e97740e..1d0eef2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -343,11 +343,11 @@ public class DefaultSchedulerTest extends TestLogger { final TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory(); final DefaultScheduler scheduler = createScheduler(jobGraph, schedulingStrategyFactory); final TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy(); - final SchedulingTopology<?, ?> topology = schedulingStrategy.getSchedulingTopology(); + final SchedulingTopology topology = schedulingStrategy.getSchedulingTopology(); startScheduling(scheduler); - final SchedulingExecutionVertex<?, ?> onlySchedulingVertex = Iterables.getOnlyElement(topology.getVertices()); + final SchedulingExecutionVertex onlySchedulingVertex = Iterables.getOnlyElement(topology.getVertices()); schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertex.getId())); final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()); @@ -367,7 +367,7 @@ public class DefaultSchedulerTest extends TestLogger { final TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory(); final DefaultScheduler scheduler = createScheduler(jobGraph, schedulingStrategyFactory); final TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy(); - final SchedulingTopology<?, ?> topology = schedulingStrategy.getSchedulingTopology(); + final SchedulingTopology topology = schedulingStrategy.getSchedulingTopology(); startScheduling(scheduler); @@ -632,7 +632,7 @@ public class DefaultSchedulerTest extends TestLogger { final JobGraph jobGraph = singleJobVertexJobGraph(2); final JobID jobid = jobGraph.getJobID(); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); - final SchedulingTopology<?, ?> topology = scheduler.getSchedulingTopology(); + final SchedulingTopology topology = scheduler.getSchedulingTopology(); final Iterator<ArchivedExecutionVertex> vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator(); final ExecutionAttemptID attemptId1 = vertexIterator.next().getCurrentExecutionAttempt().getAttemptId(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java index 98955c2..5dec719 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java @@ -239,7 +239,7 @@ public class InputDependencyConstraintCheckerTest extends TestLogger { List<TestingSchedulingResultPartition> partitions) { InputDependencyConstraintChecker inputChecker = new InputDependencyConstraintChecker(); - for (SchedulingResultPartition<?, ?> partition : partitions) { + for (SchedulingResultPartition partition : partitions) { inputChecker.addSchedulingResultPartition(partition); } return inputChecker; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestSchedulingStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestSchedulingStrategy.java index c9f3b88..6266b56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestSchedulingStrategy.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestSchedulingStrategy.java @@ -37,7 +37,7 @@ public class TestSchedulingStrategy implements SchedulingStrategy { private final SchedulerOperations schedulerOperations; - private final SchedulingTopology<?, ?> schedulingTopology; + private final SchedulingTopology schedulingTopology; private final DeploymentOption deploymentOption = new DeploymentOption(false); @@ -45,7 +45,7 @@ public class TestSchedulingStrategy implements SchedulingStrategy { public TestSchedulingStrategy( final SchedulerOperations schedulerOperations, - final SchedulingTopology<?, ?> schedulingTopology) { + final SchedulingTopology schedulingTopology) { this.schedulerOperations = checkNotNull(schedulerOperations); this.schedulingTopology = checkNotNull(schedulingTopology); @@ -72,7 +72,7 @@ public class TestSchedulingStrategy implements SchedulingStrategy { allocateSlotsAndDeploy(verticesToSchedule); } - public SchedulingTopology<?, ?> getSchedulingTopology() { + public SchedulingTopology getSchedulingTopology() { return schedulingTopology; } @@ -106,7 +106,7 @@ public class TestSchedulingStrategy implements SchedulingStrategy { @Override public SchedulingStrategy createInstance( final SchedulerOperations schedulerOperations, - final SchedulingTopology<?, ?> schedulingTopology) { + final SchedulingTopology schedulingTopology) { lastInstance = new TestSchedulingStrategy(schedulerOperations, schedulingTopology); return lastInstance; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java index dcb0df5..31cb8b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java @@ -30,8 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * A simple scheduling execution vertex for testing purposes. */ -public class TestingSchedulingExecutionVertex - implements SchedulingExecutionVertex<TestingSchedulingExecutionVertex, TestingSchedulingResultPartition> { +public class TestingSchedulingExecutionVertex implements SchedulingExecutionVertex { private final ExecutionVertexID executionVertexId; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java index 7eb3b41..55fabcf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java @@ -30,8 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * A simple implementation of {@link SchedulingResultPartition} for testing. */ -public class TestingSchedulingResultPartition - implements SchedulingResultPartition<TestingSchedulingExecutionVertex, TestingSchedulingResultPartition> { +public class TestingSchedulingResultPartition implements SchedulingResultPartition { private final IntermediateDataSetID intermediateDataSetID; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java index 5e930b4..af22b63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java @@ -37,8 +37,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * A simple scheduling topology for testing purposes. */ -public class TestingSchedulingTopology - implements SchedulingTopology<TestingSchedulingExecutionVertex, TestingSchedulingResultPartition> { +public class TestingSchedulingTopology implements SchedulingTopology { // Use linked map here to so we can get the values in inserted order private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex> schedulingExecutionVertices = new LinkedHashMap<>();