This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 541f28fe4e0f90bde479fa457841e3cbc610f2ef Author: Zhu Zhu <reed...@gmail.com> AuthorDate: Thu Jun 4 15:33:27 2020 +0800 [FLINK-17017][runtime] Allow to disable batch slot request timeout check --- .../apache/flink/runtime/jobmaster/slotpool/SlotPool.java | 8 ++++++++ .../flink/runtime/jobmaster/slotpool/SlotPoolImpl.java | 13 +++++++++++++ 2 files changed, 21 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 484e810..8b4202c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -187,6 +187,14 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable { @Nonnull ResourceProfile resourceProfile); /** + * Disables batch slot request timeout check. Invoked when someone else wants to + * take over the timeout check responsibility. + */ + default void disableBatchSlotRequestTimeoutCheck() { + throw new UnsupportedOperationException("Not properly implemented."); + } + + /** * Create report about the allocated slots belonging to the specified task manager. * * @param taskManagerId identifies the task manager diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java index 1f1bc66..66d36ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java @@ -134,6 +134,8 @@ public class SlotPoolImpl implements SlotPool { private ComponentMainThreadExecutor componentMainThreadExecutor; + private boolean batchSlotRequestTimeoutCheckEnabled; + // ------------------------------------------------------------------------ public SlotPoolImpl( @@ -160,6 +162,8 @@ public class SlotPoolImpl implements SlotPool { this.jobManagerAddress = null; this.componentMainThreadExecutor = null; + + this.batchSlotRequestTimeoutCheckEnabled = true; } // ------------------------------------------------------------------------ @@ -454,6 +458,11 @@ public class SlotPoolImpl implements SlotPool { } @Override + public void disableBatchSlotRequestTimeoutCheck() { + batchSlotRequestTimeoutCheckEnabled = false; + } + + @Override @Nonnull public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() { final Map<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = availableSlots.getSlotsByTaskManager(); @@ -874,6 +883,10 @@ public class SlotPoolImpl implements SlotPool { } protected void checkBatchSlotTimeout() { + if (!batchSlotRequestTimeoutCheckEnabled) { + return; + } + final Collection<PendingRequest> pendingBatchRequests = getPendingBatchRequests(); if (!pendingBatchRequests.isEmpty()) {