This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c16a3468c65b876ac687a3805411aa3c7a9f295
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Thu Jun 4 15:46:45 2020 +0800

    [FLINK-17017][runtime] Allow nullable timeout for streaming slot request in 
slot pool
---
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |  3 ++-
 .../runtime/jobmaster/slotpool/SlotPoolImpl.java   | 30 ++++++++++++----------
 .../flink/runtime/jobmaster/JobMasterTest.java     |  5 +++-
 .../jobmaster/slotpool/TestingSlotPoolImpl.java    |  4 ++-
 4 files changed, 25 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 8b4202c6..4392e70 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collection;
 import java.util.Optional;
@@ -170,7 +171,7 @@ public interface SlotPool extends AllocatedSlotActions, 
AutoCloseable {
        CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
                @Nonnull SlotRequestId slotRequestId,
                @Nonnull ResourceProfile resourceProfile,
-               Time timeout);
+               @Nullable Time timeout);
 
        /**
         * Requests the allocation of a new batch slot from the resource 
manager. Unlike the normal slot, a batch
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index 66d36ac..d9407f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -419,25 +419,27 @@ public class SlotPoolImpl implements SlotPool {
        public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
                        @Nonnull SlotRequestId slotRequestId,
                        @Nonnull ResourceProfile resourceProfile,
-                       Time timeout) {
+                       @Nullable Time timeout) {
 
                componentMainThreadExecutor.assertRunningInMainThread();
 
                final PendingRequest pendingRequest = 
PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);
 
-               // register request timeout
-               FutureUtils
-                       .orTimeout(
-                               pendingRequest.getAllocatedSlotFuture(),
-                               timeout.toMilliseconds(),
-                               TimeUnit.MILLISECONDS,
-                               componentMainThreadExecutor)
-                       .whenComplete(
-                               (AllocatedSlot ignored, Throwable throwable) -> 
{
-                                       if (throwable instanceof 
TimeoutException) {
-                                               
timeoutPendingSlotRequest(slotRequestId);
-                                       }
-                               });
+               if (timeout != null) {
+                       // register request timeout
+                       FutureUtils
+                               .orTimeout(
+                                       pendingRequest.getAllocatedSlotFuture(),
+                                       timeout.toMilliseconds(),
+                                       TimeUnit.MILLISECONDS,
+                                       componentMainThreadExecutor)
+                               .whenComplete(
+                                       (AllocatedSlot ignored, Throwable 
throwable) -> {
+                                               if (throwable instanceof 
TimeoutException) {
+                                                       
timeoutPendingSlotRequest(slotRequestId);
+                                               }
+                                       });
+               }
 
                return requestNewAllocatedSlotInternal(pendingRequest)
                        .thenApply((Function.identity()));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 724129d..24f0f20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -596,7 +596,10 @@ public class JobMasterTest extends TestLogger {
 
                @Nonnull
                @Override
-               public CompletableFuture<PhysicalSlot> 
requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull 
ResourceProfile resourceProfile, Time timeout) {
+               public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
+                               @Nonnull SlotRequestId slotRequestId,
+                               @Nonnull ResourceProfile resourceProfile,
+                               @Nullable Time timeout) {
                        return new CompletableFuture<>();
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
index c007def1..776b51e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -67,7 +69,7 @@ public class TestingSlotPoolImpl extends SlotPoolImpl {
        public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
                        final SlotRequestId slotRequestId,
                        final ResourceProfile resourceProfile,
-                       final Time timeout) {
+                       @Nullable final Time timeout) {
 
                this.lastRequestedSlotResourceProfile = resourceProfile;
 

Reply via email to