This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 265612c2cf93a589d87d7fc8ca168bc19d838885 Author: Zhu Zhu <reed...@gmail.com> AuthorDate: Mon Jul 11 18:44:22 2022 +0800 [FLINK-28137][runtime] Enable SimpleExecutionSlotAllocator to do batch slot request timeout check --- .../slotpool/DeclarativeSlotPoolBridge.java | 5 ++-- .../jobmaster/slotpool/PhysicalSlotProvider.java | 6 +++++ .../slotpool/PhysicalSlotProviderImpl.java | 4 ++++ .../SlotSharingExecutionSlotAllocator.java | 2 ++ ...erImplWithDefaultSlotSelectionStrategyTest.java | 6 +++-- ...lSlotProviderImplWithSpreadOutStrategyTest.java | 6 +++-- .../slotpool/SlotPoolBatchSlotRequestTest.java | 28 ---------------------- .../SimpleExecutionSlotAllocatorTest.java | 6 +++++ .../SlotSharingExecutionSlotAllocatorTest.java | 6 +++++ .../scheduler/TestingPhysicalSlotProvider.java | 11 +++++++++ 10 files changed, 46 insertions(+), 34 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index 6391657b18a..c7ae507ad6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -397,8 +397,9 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem } private void failPendingRequests(Collection<ResourceRequirement> acquiredResources) { - Predicate<PendingRequest> predicate = - request -> !isBatchSlotRequestTimeoutCheckDisabled || !request.isBatchRequest(); + // only fails streaming requests because batch jobs do not require all resources + // requirements to be fullfilled at the same time + Predicate<PendingRequest> predicate = request -> !request.isBatchRequest(); if (pendingRequests.values().stream().anyMatch(predicate)) { log.warn( "Could not acquire the minimum required resources, failing slot requests. Acquired: {}. Current slot pool status: {}", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java index 83b2aa5567f..bab5693dcbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java @@ -46,4 +46,10 @@ public interface PhysicalSlotProvider { * @param cause of the cancellation */ void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause); + + /** + * Disables batch slot request timeout check. Invoked when someone else wants to take over the + * timeout check responsibility. + */ + void disableBatchSlotRequestTimeoutCheck(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java index 433cf89c6db..270b8749cf4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java @@ -45,6 +45,10 @@ public class PhysicalSlotProviderImpl implements PhysicalSlotProvider { SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) { this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy); this.slotPool = checkNotNull(slotPool); + } + + @Override + public void disableBatchSlotRequestTimeoutCheck() { slotPool.disableBatchSlotRequestTimeoutCheck(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java index 67c8b3bb970..f1995a7062a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java @@ -95,6 +95,8 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator { this.allocationTimeout = checkNotNull(allocationTimeout); this.resourceProfileRetriever = checkNotNull(resourceProfileRetriever); this.sharedSlots = new IdentityHashMap<>(); + + this.slotProvider.disableBatchSlotRequestTimeoutCheck(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java index 560a26a3d1a..a29a698cbe0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java @@ -71,8 +71,10 @@ public class PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest extend .buildAndStart(physicalSlotProviderResource.getMainThreadExecutor()); assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true)); - new PhysicalSlotProviderImpl( - LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool); + final PhysicalSlotProvider slotProvider = + new PhysicalSlotProviderImpl( + LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool); + slotProvider.disableBatchSlotRequestTimeoutCheck(); assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java index 1d77b9ed794..a1d2edfb9e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java @@ -115,8 +115,10 @@ public class PhysicalSlotProviderImplWithSpreadOutStrategyTest extends TestLogge .buildAndStart(physicalSlotProviderResource.getMainThreadExecutor()); assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true)); - new PhysicalSlotProviderImpl( - LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool); + final PhysicalSlotProvider slotProvider = + new PhysicalSlotProviderImpl( + LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool); + slotProvider.disableBatchSlotRequestTimeoutCheck(); assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java index fb95adc47df..718cf87c1c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java @@ -41,7 +41,6 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -134,33 +133,6 @@ public class SlotPoolBatchSlotRequestTest extends TestLogger { } } - /** - * Tests that a batch slot request does react to {@link - * SlotPoolService#notifyNotEnoughResourcesAvailable}. - */ - @Test - public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception { - final TestingResourceManagerGateway testingResourceManagerGateway = - new TestingResourceManagerGateway(); - - try (final DeclarativeSlotPoolBridge slotPool = - new DeclarativeSlotPoolBridgeBuilder() - .setResourceManagerGateway(testingResourceManagerGateway) - .buildAndStart(mainThreadExecutor)) { - - final CompletableFuture<PhysicalSlot> slotFuture = - SlotPoolUtils.requestNewAllocatedBatchSlot( - slotPool, mainThreadExecutor, resourceProfile); - - SlotPoolUtils.notifyNotEnoughResourcesAvailable( - slotPool, mainThreadExecutor, Collections.emptyList()); - - assertThat( - slotFuture, - FlinkMatchers.futureWillCompleteExceptionally(Duration.ofSeconds(10L))); - } - } - /** * Tests that a batch slot request won't fail if its resource manager request fails with * exceptions other than {@link UnfulfillableSlotRequestException}. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java index 1fd6ca850f6..033ff2a1d26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java @@ -226,6 +226,12 @@ class SimpleExecutionSlotAllocatorTest { .isEqualTo(context.getSlotProvider().getRequests().keySet()); } + @Test + void testSlotProviderBatchSlotRequestTimeoutCheckIsEnabled() { + final AllocationContext context = new AllocationContext(); + assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isTrue(); + } + private static class AllocationContext { private final TestingPhysicalSlotProvider slotProvider; private final boolean slotWillBeOccupiedIndefinitely; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java index 8fbbf3adc91..277869fba3d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java @@ -448,6 +448,12 @@ class SlotSharingExecutionSlotAllocatorTest { .containsExactlyInAnyOrder(resourceProfile1, resourceProfile2); } + @Test + void testSlotProviderBatchSlotRequestTimeoutCheckIsDisabled() { + final AllocationContext context = AllocationContext.newBuilder().build(); + assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isFalse(); + } + private static List<ExecutionVertexID> getAssignIds( Collection<ExecutionSlotAssignment> assignments) { return assignments.stream() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java index 6fd6d01ab49..2f870ae82ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java @@ -48,6 +48,8 @@ public class TestingPhysicalSlotProvider implements PhysicalSlotProvider { private final Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>> physicalSlotCreator; + private boolean batchSlotRequestTimeoutCheckEnabled = true; + public static TestingPhysicalSlotProvider create( Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>> physicalSlotCreator) { return new TestingPhysicalSlotProvider(physicalSlotCreator); @@ -126,6 +128,11 @@ public class TestingPhysicalSlotProvider implements PhysicalSlotProvider { cancellations.put(slotRequestId, cause); } + @Override + public void disableBatchSlotRequestTimeoutCheck() { + batchSlotRequestTimeoutCheckEnabled = false; + } + public CompletableFuture<TestingPhysicalSlot> getResultForRequestId( SlotRequestId slotRequestId) { return getResponses().get(slotRequestId); @@ -160,4 +167,8 @@ public class TestingPhysicalSlotProvider implements PhysicalSlotProvider { Preconditions.checkState(element.isPresent()); return element.get(); } + + boolean isBatchSlotRequestTimeoutCheckEnabled() { + return batchSlotRequestTimeoutCheckEnabled; + } }