This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b462d0ec4d1d421a369e45f8dca33284b5be6bc2 Author: sunxia <xingbe...@gmail.com> AuthorDate: Fri Jun 16 10:14:21 2023 +0800 [FLINK-32288][runtime] Improve the scheduling performance of AdaptiveBatchScheduler This close #22798. --- .../AllFinishedInputConsumableDecider.java | 5 +++-- .../strategy/DefaultInputConsumableDecider.java | 13 +++++++++++++ .../scheduler/strategy/InputConsumableDecider.java | 10 +++++++++- .../PartialFinishedInputConsumableDecider.java | 5 +++-- .../strategy/VertexwiseSchedulingStrategy.java | 5 +++++ .../DefaultInputConsumableDeciderTest.java | 22 ++++++++++++++++++++++ .../strategy/TestingInputConsumableDecider.java | 6 ++++++ 7 files changed, 61 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java index 6c23757a1a7..f8cbb260488 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java @@ -37,14 +37,15 @@ public class AllFinishedInputConsumableDecider implements InputConsumableDecider executionVertex.getConsumedPartitionGroups()) { if (!consumableStatusCache.computeIfAbsent( - consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) { + consumedPartitionGroup, this::isConsumableBasedOnFinishedProducers)) { return false; } } return true; } - private boolean isConsumedPartitionGroupConsumable( + @Override + public boolean isConsumableBasedOnFinishedProducers( final ConsumedPartitionGroup consumedPartitionGroup) { return consumedPartitionGroup.getNumberOfUnfinishedPartitions() == 0; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java index 93db09c94b6..ccd354b0d0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java @@ -65,6 +65,19 @@ public class DefaultInputConsumableDecider implements InputConsumableDecider { return true; } + @Override + public boolean isConsumableBasedOnFinishedProducers( + final ConsumedPartitionGroup consumedPartitionGroup) { + if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) { + // For canBePipelined consumed partition group, whether it is consumable does not depend + // on task finish. To optimize performance and avoid unnecessary computation, we simply + // return false. + return false; + } else { + return consumedPartitionGroup.areAllPartitionsFinished(); + } + } + private boolean isConsumedPartitionGroupConsumable( final ConsumedPartitionGroup consumedPartitionGroup, final Set<ExecutionVertexID> verticesToSchedule) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java index e34cb06bc4e..1d19dd2cf62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java @@ -24,7 +24,7 @@ import java.util.function.Function; /** * {@link InputConsumableDecider} is responsible for determining whether the input of an - * executionVertex is consumable. + * executionVertex or a consumed partition group is consumable. */ public interface InputConsumableDecider { /** @@ -41,6 +41,14 @@ public interface InputConsumableDecider { Set<ExecutionVertexID> verticesToSchedule, Map<ConsumedPartitionGroup, Boolean> consumableStatusCache); + /** + * Determining whether the consumed partition group is consumable based on finished producers. + * + * @param consumedPartitionGroup to be determined whether it is consumable. + */ + boolean isConsumableBasedOnFinishedProducers( + final ConsumedPartitionGroup consumedPartitionGroup); + /** Factory for {@link InputConsumableDecider}. */ interface Factory { InputConsumableDecider createInstance( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java index 48d626e4259..df7c353e94e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java @@ -43,14 +43,15 @@ public class PartialFinishedInputConsumableDecider implements InputConsumableDec executionVertex.getConsumedPartitionGroups()) { if (!consumableStatusCache.computeIfAbsent( - consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) { + consumedPartitionGroup, this::isConsumableBasedOnFinishedProducers)) { return false; } } return true; } - private boolean isConsumedPartitionGroupConsumable( + @Override + public boolean isConsumableBasedOnFinishedProducers( final ConsumedPartitionGroup consumedPartitionGroup) { if (consumedPartitionGroup .getResultPartitionType() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java index 60ee3ab7871..b2d25b1f82f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java @@ -103,6 +103,11 @@ public class VertexwiseSchedulingStrategy IterableUtils.toStream(executionVertex.getProducedResults()) .map(SchedulingResultPartition::getConsumerVertexGroups) .flatMap(Collection::stream) + .filter( + group -> + inputConsumableDecider + .isConsumableBasedOnFinishedProducers( + group.getConsumedPartitionGroup())) .flatMap(IterableUtils::toStream) .collect(Collectors.toSet()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java index c0184fe0061..af61b1e3676 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java @@ -50,6 +50,17 @@ class DefaultInputConsumableDeciderTest { DefaultInputConsumableDecider inputConsumableDecider = createDefaultInputConsumableDecider(Collections.emptySet(), topology); + consumer.forEach( + vertex -> + vertex.getConsumedPartitionGroups() + .forEach( + group -> + assertThat( + inputConsumableDecider + .isConsumableBasedOnFinishedProducers( + group)) + .isFalse())); + assertThat( inputConsumableDecider.isInputConsumable( consumer.get(0), Collections.emptySet(), new HashMap<>())) @@ -78,6 +89,17 @@ class DefaultInputConsumableDeciderTest { DefaultInputConsumableDecider inputConsumableDecider = createDefaultInputConsumableDecider(Collections.emptySet(), topology); + consumer.forEach( + vertex -> + vertex.getConsumedPartitionGroups() + .forEach( + group -> + assertThat( + inputConsumableDecider + .isConsumableBasedOnFinishedProducers( + group)) + .isTrue())); + assertThat( inputConsumableDecider.isInputConsumable( consumer.get(0), Collections.emptySet(), new HashMap<>())) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java index da2aa8241ac..67f3af93180 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java @@ -42,6 +42,12 @@ public class TestingInputConsumableDecider implements InputConsumableDecider { || inputConsumableExecutionVertices.contains(executionVertex); } + @Override + public boolean isConsumableBasedOnFinishedProducers( + ConsumedPartitionGroup consumedPartitionGroup) { + return true; + } + public void setInputConsumable(SchedulingExecutionVertex executionVertex) { inputConsumableExecutionVertices.add(executionVertex); }