This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 541f28fe4e0f90bde479fa457841e3cbc610f2ef
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Thu Jun 4 15:33:27 2020 +0800

    [FLINK-17017][runtime] Allow to disable batch slot request timeout check
---
 .../apache/flink/runtime/jobmaster/slotpool/SlotPool.java   |  8 ++++++++
 .../flink/runtime/jobmaster/slotpool/SlotPoolImpl.java      | 13 +++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 484e810..8b4202c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -187,6 +187,14 @@ public interface SlotPool extends AllocatedSlotActions, 
AutoCloseable {
                @Nonnull ResourceProfile resourceProfile);
 
        /**
+        * Disables batch slot request timeout check. Invoked when someone else 
wants to
+        * take over the timeout check responsibility.
+        */
+       default void disableBatchSlotRequestTimeoutCheck() {
+               throw new UnsupportedOperationException("Not properly 
implemented.");
+       }
+
+       /**
         * Create report about the allocated slots belonging to the specified 
task manager.
         *
         * @param taskManagerId identifies the task manager
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index 1f1bc66..66d36ac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -134,6 +134,8 @@ public class SlotPoolImpl implements SlotPool {
 
        private ComponentMainThreadExecutor componentMainThreadExecutor;
 
+       private boolean batchSlotRequestTimeoutCheckEnabled;
+
        // 
------------------------------------------------------------------------
 
        public SlotPoolImpl(
@@ -160,6 +162,8 @@ public class SlotPoolImpl implements SlotPool {
                this.jobManagerAddress = null;
 
                this.componentMainThreadExecutor = null;
+
+               this.batchSlotRequestTimeoutCheckEnabled = true;
        }
 
        // 
------------------------------------------------------------------------
@@ -454,6 +458,11 @@ public class SlotPoolImpl implements SlotPool {
        }
 
        @Override
+       public void disableBatchSlotRequestTimeoutCheck() {
+               batchSlotRequestTimeoutCheckEnabled = false;
+       }
+
+       @Override
        @Nonnull
        public Collection<SlotInfoWithUtilization> 
getAvailableSlotsInformation() {
                final Map<ResourceID, Set<AllocatedSlot>> 
availableSlotsByTaskManager = availableSlots.getSlotsByTaskManager();
@@ -874,6 +883,10 @@ public class SlotPoolImpl implements SlotPool {
        }
 
        protected void checkBatchSlotTimeout() {
+               if (!batchSlotRequestTimeoutCheckEnabled) {
+                       return;
+               }
+
                final Collection<PendingRequest> pendingBatchRequests = 
getPendingBatchRequests();
 
                if (!pendingBatchRequests.isEmpty()) {

Reply via email to