This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4c16a3468c65b876ac687a3805411aa3c7a9f295 Author: Zhu Zhu <reed...@gmail.com> AuthorDate: Thu Jun 4 15:46:45 2020 +0800 [FLINK-17017][runtime] Allow nullable timeout for streaming slot request in slot pool --- .../flink/runtime/jobmaster/slotpool/SlotPool.java | 3 ++- .../runtime/jobmaster/slotpool/SlotPoolImpl.java | 30 ++++++++++++---------- .../flink/runtime/jobmaster/JobMasterTest.java | 5 +++- .../jobmaster/slotpool/TestingSlotPoolImpl.java | 4 ++- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 8b4202c6..4392e70 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collection; import java.util.Optional; @@ -170,7 +171,7 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable { CompletableFuture<PhysicalSlot> requestNewAllocatedSlot( @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, - Time timeout); + @Nullable Time timeout); /** * Requests the allocation of a new batch slot from the resource manager. Unlike the normal slot, a batch diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java index 66d36ac..d9407f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java @@ -419,25 +419,27 @@ public class SlotPoolImpl implements SlotPool { public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot( @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, - Time timeout) { + @Nullable Time timeout) { componentMainThreadExecutor.assertRunningInMainThread(); final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile); - // register request timeout - FutureUtils - .orTimeout( - pendingRequest.getAllocatedSlotFuture(), - timeout.toMilliseconds(), - TimeUnit.MILLISECONDS, - componentMainThreadExecutor) - .whenComplete( - (AllocatedSlot ignored, Throwable throwable) -> { - if (throwable instanceof TimeoutException) { - timeoutPendingSlotRequest(slotRequestId); - } - }); + if (timeout != null) { + // register request timeout + FutureUtils + .orTimeout( + pendingRequest.getAllocatedSlotFuture(), + timeout.toMilliseconds(), + TimeUnit.MILLISECONDS, + componentMainThreadExecutor) + .whenComplete( + (AllocatedSlot ignored, Throwable throwable) -> { + if (throwable instanceof TimeoutException) { + timeoutPendingSlotRequest(slotRequestId); + } + }); + } return requestNewAllocatedSlotInternal(pendingRequest) .thenApply((Function.identity())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 724129d..24f0f20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -596,7 +596,10 @@ public class JobMasterTest extends TestLogger { @Nonnull @Override - public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, Time timeout) { + public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot( + @Nonnull SlotRequestId slotRequestId, + @Nonnull ResourceProfile resourceProfile, + @Nullable Time timeout) { return new CompletableFuture<>(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java index c007def1..776b51e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; +import javax.annotation.Nullable; + import java.util.concurrent.CompletableFuture; /** @@ -67,7 +69,7 @@ public class TestingSlotPoolImpl extends SlotPoolImpl { public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot( final SlotRequestId slotRequestId, final ResourceProfile resourceProfile, - final Time timeout) { + @Nullable final Time timeout) { this.lastRequestedSlotResourceProfile = resourceProfile;