This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 265612c2cf93a589d87d7fc8ca168bc19d838885
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Mon Jul 11 18:44:22 2022 +0800

    [FLINK-28137][runtime] Enable SimpleExecutionSlotAllocator to do batch slot 
request timeout check
---
 .../slotpool/DeclarativeSlotPoolBridge.java        |  5 ++--
 .../jobmaster/slotpool/PhysicalSlotProvider.java   |  6 +++++
 .../slotpool/PhysicalSlotProviderImpl.java         |  4 ++++
 .../SlotSharingExecutionSlotAllocator.java         |  2 ++
 ...erImplWithDefaultSlotSelectionStrategyTest.java |  6 +++--
 ...lSlotProviderImplWithSpreadOutStrategyTest.java |  6 +++--
 .../slotpool/SlotPoolBatchSlotRequestTest.java     | 28 ----------------------
 .../SimpleExecutionSlotAllocatorTest.java          |  6 +++++
 .../SlotSharingExecutionSlotAllocatorTest.java     |  6 +++++
 .../scheduler/TestingPhysicalSlotProvider.java     | 11 +++++++++
 10 files changed, 46 insertions(+), 34 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 6391657b18a..c7ae507ad6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -397,8 +397,9 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
     }
 
     private void failPendingRequests(Collection<ResourceRequirement> 
acquiredResources) {
-        Predicate<PendingRequest> predicate =
-                request -> !isBatchSlotRequestTimeoutCheckDisabled || 
!request.isBatchRequest();
+        // only fails streaming requests because batch jobs do not require all 
resources
+        // requirements to be fullfilled at the same time
+        Predicate<PendingRequest> predicate = request -> 
!request.isBatchRequest();
         if (pendingRequests.values().stream().anyMatch(predicate)) {
             log.warn(
                     "Could not acquire the minimum required resources, failing 
slot requests. Acquired: {}. Current slot pool status: {}",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
index 83b2aa5567f..bab5693dcbc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
@@ -46,4 +46,10 @@ public interface PhysicalSlotProvider {
      * @param cause of the cancellation
      */
     void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause);
+
+    /**
+     * Disables batch slot request timeout check. Invoked when someone else 
wants to take over the
+     * timeout check responsibility.
+     */
+    void disableBatchSlotRequestTimeoutCheck();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
index 433cf89c6db..270b8749cf4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
@@ -45,6 +45,10 @@ public class PhysicalSlotProviderImpl implements 
PhysicalSlotProvider {
             SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
         this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
         this.slotPool = checkNotNull(slotPool);
+    }
+
+    @Override
+    public void disableBatchSlotRequestTimeoutCheck() {
         slotPool.disableBatchSlotRequestTimeoutCheck();
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
index 67c8b3bb970..f1995a7062a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
@@ -95,6 +95,8 @@ class SlotSharingExecutionSlotAllocator implements 
ExecutionSlotAllocator {
         this.allocationTimeout = checkNotNull(allocationTimeout);
         this.resourceProfileRetriever = checkNotNull(resourceProfileRetriever);
         this.sharedSlots = new IdentityHashMap<>();
+
+        this.slotProvider.disableBatchSlotRequestTimeoutCheck();
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
index 560a26a3d1a..a29a698cbe0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
@@ -71,8 +71,10 @@ public class 
PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest extend
                         
.buildAndStart(physicalSlotProviderResource.getMainThreadExecutor());
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));
 
-        new PhysicalSlotProviderImpl(
-                LocationPreferenceSlotSelectionStrategy.createDefault(), 
slotPool);
+        final PhysicalSlotProvider slotProvider =
+                new PhysicalSlotProviderImpl(
+                        
LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
+        slotProvider.disableBatchSlotRequestTimeoutCheck();
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), 
is(false));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
index 1d77b9ed794..a1d2edfb9e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
@@ -115,8 +115,10 @@ public class 
PhysicalSlotProviderImplWithSpreadOutStrategyTest extends TestLogge
                         
.buildAndStart(physicalSlotProviderResource.getMainThreadExecutor());
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));
 
-        new PhysicalSlotProviderImpl(
-                
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool);
+        final PhysicalSlotProvider slotProvider =
+                new PhysicalSlotProviderImpl(
+                        
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool);
+        slotProvider.disableBatchSlotRequestTimeoutCheck();
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), 
is(false));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
index fb95adc47df..718cf87c1c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
@@ -41,7 +41,6 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -134,33 +133,6 @@ public class SlotPoolBatchSlotRequestTest extends 
TestLogger {
         }
     }
 
-    /**
-     * Tests that a batch slot request does react to {@link
-     * SlotPoolService#notifyNotEnoughResourcesAvailable}.
-     */
-    @Test
-    public void 
testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws 
Exception {
-        final TestingResourceManagerGateway testingResourceManagerGateway =
-                new TestingResourceManagerGateway();
-
-        try (final DeclarativeSlotPoolBridge slotPool =
-                new DeclarativeSlotPoolBridgeBuilder()
-                        
.setResourceManagerGateway(testingResourceManagerGateway)
-                        .buildAndStart(mainThreadExecutor)) {
-
-            final CompletableFuture<PhysicalSlot> slotFuture =
-                    SlotPoolUtils.requestNewAllocatedBatchSlot(
-                            slotPool, mainThreadExecutor, resourceProfile);
-
-            SlotPoolUtils.notifyNotEnoughResourcesAvailable(
-                    slotPool, mainThreadExecutor, Collections.emptyList());
-
-            assertThat(
-                    slotFuture,
-                    
FlinkMatchers.futureWillCompleteExceptionally(Duration.ofSeconds(10L)));
-        }
-    }
-
     /**
      * Tests that a batch slot request won't fail if its resource manager 
request fails with
      * exceptions other than {@link UnfulfillableSlotRequestException}.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
index 1fd6ca850f6..033ff2a1d26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
@@ -226,6 +226,12 @@ class SimpleExecutionSlotAllocatorTest {
                 .isEqualTo(context.getSlotProvider().getRequests().keySet());
     }
 
+    @Test
+    void testSlotProviderBatchSlotRequestTimeoutCheckIsEnabled() {
+        final AllocationContext context = new AllocationContext();
+        
assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isTrue();
+    }
+
     private static class AllocationContext {
         private final TestingPhysicalSlotProvider slotProvider;
         private final boolean slotWillBeOccupiedIndefinitely;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
index 8fbbf3adc91..277869fba3d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
@@ -448,6 +448,12 @@ class SlotSharingExecutionSlotAllocatorTest {
                 .containsExactlyInAnyOrder(resourceProfile1, resourceProfile2);
     }
 
+    @Test
+    void testSlotProviderBatchSlotRequestTimeoutCheckIsDisabled() {
+        final AllocationContext context = 
AllocationContext.newBuilder().build();
+        
assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isFalse();
+    }
+
     private static List<ExecutionVertexID> getAssignIds(
             Collection<ExecutionSlotAssignment> assignments) {
         return assignments.stream()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
index 6fd6d01ab49..2f870ae82ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
@@ -48,6 +48,8 @@ public class TestingPhysicalSlotProvider implements 
PhysicalSlotProvider {
     private final Function<ResourceProfile, 
CompletableFuture<TestingPhysicalSlot>>
             physicalSlotCreator;
 
+    private boolean batchSlotRequestTimeoutCheckEnabled = true;
+
     public static TestingPhysicalSlotProvider create(
             Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>> 
physicalSlotCreator) {
         return new TestingPhysicalSlotProvider(physicalSlotCreator);
@@ -126,6 +128,11 @@ public class TestingPhysicalSlotProvider implements 
PhysicalSlotProvider {
         cancellations.put(slotRequestId, cause);
     }
 
+    @Override
+    public void disableBatchSlotRequestTimeoutCheck() {
+        batchSlotRequestTimeoutCheckEnabled = false;
+    }
+
     public CompletableFuture<TestingPhysicalSlot> getResultForRequestId(
             SlotRequestId slotRequestId) {
         return getResponses().get(slotRequestId);
@@ -160,4 +167,8 @@ public class TestingPhysicalSlotProvider implements 
PhysicalSlotProvider {
         Preconditions.checkState(element.isPresent());
         return element.get();
     }
+
+    boolean isBatchSlotRequestTimeoutCheckEnabled() {
+        return batchSlotRequestTimeoutCheckEnabled;
+    }
 }

Reply via email to