This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b8e5be0984153c4187d95e78b7b7c0ce5dbb68e
Author: Xintong Song <tonysong...@gmail.com>
AuthorDate: Fri Aug 2 15:02:06 2019 +0200

    [FLINK-13555][runtime] Fail slot requests immediately at the SlotPool if 
unfulfillable
    
    Remove the SlotManagerException as extension of ResourceManagerException.
    
    [FLINK-13555][runtime][test] Add cases in SlotPoolBatchSlotRequestTest 
validates that pending batch slot requests fail on 
UnfulfillableSlotRequestException.
    
    [FLINK-13555][runtime] Add the information which slot request failed in 
stack trace of exception in SlotManager#registerSlotRequest.
    
    [FLINK-13555][runtime][test] Update SlotManagerFailUnfulfillableTest to 
validate that slot manager throws UnfulfillableSlotRequestException for failing 
unfulfillable requests.
    
    This closes #9339.
---
 .../runtime/jobmaster/slotpool/SlotPoolImpl.java   | 11 ++++-
 .../runtime/resourcemanager/ResourceManager.java   |  3 +-
 .../UnfulfillableSlotRequestException.java}        | 22 +++++----
 .../resourcemanager/slotmanager/SlotManager.java   |  5 +-
 .../slotmanager/SlotManagerImpl.java               | 15 +++---
 .../slotpool/SlotPoolBatchSlotRequestTest.java     | 54 ++++++++++++++++++++--
 .../runtime/jobmaster/slotpool/SlotPoolUtils.java  |  4 +-
 .../SlotManagerFailUnfulfillableTest.java          | 23 +++++----
 .../slotmanager/SlotManagerTest.java               |  4 +-
 9 files changed, 102 insertions(+), 39 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index 14beb8f..a381613 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -38,9 +38,11 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -348,7 +350,7 @@ public class SlotPoolImpl implements SlotPool {
        private void slotRequestToResourceManagerFailed(SlotRequestId 
slotRequestID, Throwable failure) {
                final PendingRequest request = 
pendingRequests.getKeyA(slotRequestID);
                if (request != null) {
-                       if (request.isBatchRequest) {
+                       if (isBatchRequestAndFailureCanBeIgnored(request, 
failure)) {
                                log.debug("Ignoring failed request to the 
resource manager for a batch slot request.");
                        } else {
                                pendingRequests.removeKeyA(slotRequestID);
@@ -370,6 +372,11 @@ public class SlotPoolImpl implements SlotPool {
                
waitingForResourceManager.put(pendingRequest.getSlotRequestId(), 
pendingRequest);
        }
 
+       private boolean isBatchRequestAndFailureCanBeIgnored(PendingRequest 
request, Throwable failure){
+               return request.isBatchRequest &&
+                       !ExceptionUtils.findThrowable(failure, 
UnfulfillableSlotRequestException.class).isPresent();
+       }
+
        // 
------------------------------------------------------------------------
        //  Slot releasing & offering
        // 
------------------------------------------------------------------------
@@ -679,7 +686,7 @@ public class SlotPoolImpl implements SlotPool {
 
                final PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);
                if (pendingRequest != null) {
-                       if (pendingRequest.isBatchRequest) {
+                       if 
(isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) {
                                // pending batch requests don't react to this 
signal --> put it back
                                
pendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, 
pendingRequest);
                        } else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index ec71e85..13544d3 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -58,7 +58,6 @@ import 
org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistrat
 import 
org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
@@ -442,7 +441,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
 
                                try {
                                        
slotManager.registerSlotRequest(slotRequest);
-                               } catch (SlotManagerException e) {
+                               } catch (ResourceManagerException e) {
                                        return 
FutureUtils.completedExceptionally(e);
                                }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/UnfulfillableSlotRequestException.java
similarity index 51%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/UnfulfillableSlotRequestException.java
index c322c81..fb6eb45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/UnfulfillableSlotRequestException.java
@@ -16,19 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.resourcemanager.slotmanager;
+package org.apache.flink.runtime.resourcemanager.exceptions;
 
-import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 
-public class SlotManagerException extends ResourceManagerException {
 
-       private static final long serialVersionUID = -3723028616920379071L;
-
-       public SlotManagerException(String message) {
-               super(message);
-       }
+/**
+ * Exception denoting that a slot request can not be fulfilled by any slot in 
the cluster.
+ * This usually indicates that the slot request should not be pended or 
retried.
+ */
+public class UnfulfillableSlotRequestException extends 
ResourceManagerException {
+       private static final long serialVersionUID = 4453490263648758730L;
 
-       public SlotManagerException(String message, Throwable cause) {
-               super(message, cause);
+       public UnfulfillableSlotRequestException(AllocationID allocationId, 
ResourceProfile resourceProfile) {
+               super("Could not fulfill slot request " + allocationId + ". "
+                       + "Requested resource profile (" + resourceProfile + ") 
is unfulfillable.");
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 0bdbc5b..b177e11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 
@@ -73,9 +74,9 @@ public interface SlotManager extends AutoCloseable {
         *
         * @param slotRequest specifying the requested slot specs
         * @return true if the slot request was registered; false if the 
request is a duplicate
-        * @throws SlotManagerException if the slot request failed (e.g. not 
enough resources left)
+        * @throws ResourceManagerException if the slot request failed (e.g. 
not enough resources left)
         */
-       boolean registerSlotRequest(SlotRequest slotRequest) throws 
SlotManagerException;
+       boolean registerSlotRequest(SlotRequest slotRequest) throws 
ResourceManagerException;
 
        /**
         * Cancels and removes a pending slot request with the given allocation 
id. If there is no such
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index cd894e3..6fed1fe 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -290,10 +291,10 @@ public class SlotManagerImpl implements SlotManager {
         *
         * @param slotRequest specifying the requested slot specs
         * @return true if the slot request was registered; false if the 
request is a duplicate
-        * @throws SlotManagerException if the slot request failed (e.g. not 
enough resources left)
+        * @throws ResourceManagerException if the slot request failed (e.g. 
not enough resources left)
         */
        @Override
-       public boolean registerSlotRequest(SlotRequest slotRequest) throws 
SlotManagerException {
+       public boolean registerSlotRequest(SlotRequest slotRequest) throws 
ResourceManagerException {
                checkInit();
 
                if (checkDuplicateRequest(slotRequest.getAllocationId())) {
@@ -311,7 +312,7 @@ public class SlotManagerImpl implements SlotManager {
                                // requesting the slot failed --> remove 
pending slot request
                                
pendingSlotRequests.remove(slotRequest.getAllocationId());
 
-                               throw new SlotManagerException("Could not 
fulfill slot request " + slotRequest.getAllocationId() + '.', e);
+                               throw new ResourceManagerException("Could not 
fulfill slot request " + slotRequest.getAllocationId() + '.', e);
                        }
 
                        return true;
@@ -494,8 +495,7 @@ public class SlotManagerImpl implements SlotManager {
                                        resourceActions.notifyAllocationFailure(
                                                pendingSlotRequest.getJobId(),
                                                
pendingSlotRequest.getAllocationId(),
-                                               new 
ResourceManagerException("Could not fulfill slot request " + 
pendingSlotRequest.getAllocationId() + ". "
-                                                       + "Requested resource 
profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.")
+                                               new 
UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), 
pendingSlotRequest.getResourceProfile())
                                        );
                                }
                        }
@@ -745,7 +745,7 @@ public class SlotManagerImpl implements SlotManager {
         * registered.
         *
         * @param pendingSlotRequest to allocate a slot for
-        * @throws ResourceManagerException if the resource manager cannot 
allocate more resource
+        * @throws ResourceManagerException if the slot request failed or is 
unfulfillable
         */
        private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) 
throws ResourceManagerException {
                final ResourceProfile resourceProfile = 
pendingSlotRequest.getResourceProfile();
@@ -767,8 +767,7 @@ public class SlotManagerImpl implements SlotManager {
                                // request can not be fulfilled by any free 
slot or pending slot that can be allocated,
                                // check whether it can be fulfilled by 
allocated slots
                                if (failUnfulfillableRequest && 
!isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) {
-                                       throw new 
ResourceManagerException("Requested resource profile (" +
-                                               
pendingSlotRequest.getResourceProfile() + ") is unfulfillable.");
+                                       throw new 
UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), 
pendingSlotRequest.getResourceProfile());
                                }
                        }
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
index 2231060..df4dfb3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.util.clock.ManualClock;
 import org.apache.flink.util.ExceptionUtils;
@@ -129,7 +130,7 @@ public class SlotPoolBatchSlotRequestTest extends 
TestLogger {
 
        /**
         * Tests that a batch slot request does not react to {@link 
SlotPool#failAllocation(AllocationID, Exception)}
-        * signals.
+        * signals whose exception is not {@link 
UnfulfillableSlotRequestException}.
         */
        @Test
        public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() 
throws Exception {
@@ -147,14 +148,40 @@ public class SlotPoolBatchSlotRequestTest extends 
TestLogger {
 
                        final CompletableFuture<PhysicalSlot> slotFuture = 
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, 
resourceProfile);
 
-                       SlotPoolUtils.failAllocation(slotPool, 
directMainThreadExecutor, allocationIdFuture.get());
+                       SlotPoolUtils.failAllocation(slotPool, 
directMainThreadExecutor, allocationIdFuture.get(), new FlinkException("Failed 
request"));
 
                        assertThat(slotFuture.isDone(), is(false));
                }
        }
 
        /**
-        * Tests that a batch slot request won't fail if its resource manager 
request fails.
+        * Tests that a batch slot request does react to {@link 
SlotPool#failAllocation(AllocationID, Exception)}
+        * signals whose exception is {@link UnfulfillableSlotRequestException}.
+        */
+       @Test
+       public void 
testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws 
Exception {
+               final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+               final CompletableFuture<AllocationID> allocationIdFuture = new 
CompletableFuture<>();
+               
testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+               final ComponentMainThreadExecutor directMainThreadExecutor = 
ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+               try (final SlotPoolImpl slotPool = new 
SlotPoolBuilder(directMainThreadExecutor)
+                       
.setResourceManagerGateway(testingResourceManagerGateway)
+                       .build()) {
+
+                       final CompletableFuture<PhysicalSlot> slotFuture = 
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, 
resourceProfile);
+
+                       SlotPoolUtils.failAllocation(slotPool, 
directMainThreadExecutor, allocationIdFuture.get(),
+                               new UnfulfillableSlotRequestException(new 
AllocationID(), ResourceProfile.UNKNOWN));
+
+                       assertThat(slotFuture.isCompletedExceptionally(), 
is(true));
+               }
+       }
+
+       /**
+        * Tests that a batch slot request won't fail if its resource manager 
request fails with exceptions other than
+        * {@link UnfulfillableSlotRequestException}.
         */
        @Test
        public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() 
throws Exception {
@@ -176,6 +203,27 @@ public class SlotPoolBatchSlotRequestTest extends 
TestLogger {
        }
 
        /**
+        * Tests that a batch slot request fails if its resource manager 
request fails with {@link UnfulfillableSlotRequestException}.
+        */
+       @Test
+       public void 
testPendingBatchSlotRequestFailsIfRMRequestFailsUnfulfillably() throws 
Exception {
+               final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+               
testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(
+                       new UnfulfillableSlotRequestException(new 
AllocationID(), ResourceProfile.UNKNOWN)));
+
+               final ComponentMainThreadExecutor directMainThreadExecutor = 
ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+               try (final SlotPoolImpl slotPool = new 
SlotPoolBuilder(directMainThreadExecutor)
+                       
.setResourceManagerGateway(testingResourceManagerGateway)
+                       .build()) {
+
+                       final CompletableFuture<PhysicalSlot> slotFuture = 
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, 
resourceProfile);
+
+                       assertThat(slotFuture.isCompletedExceptionally(), 
is(true));
+               }
+       }
+
+       /**
         * Tests that a pending batch slot request times out after the last 
fulfilling slot gets
         * released.
         */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
index c4dfe88..3836881 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
@@ -97,9 +97,9 @@ public class SlotPoolUtils {
                return taskManagerLocation.getResourceID();
        }
 
-       public static void failAllocation(SlotPoolImpl slotPool, 
ComponentMainThreadExecutor mainThreadExecutor, AllocationID allocationId) {
+       public static void failAllocation(SlotPoolImpl slotPool, 
ComponentMainThreadExecutor mainThreadExecutor, AllocationID allocationId, 
Exception exception) {
                CompletableFuture.runAsync(
-                       () -> slotPool.failAllocation(allocationId, new 
FlinkException("Test exception")),
+                       () -> slotPool.failAllocation(allocationId, exception),
                        mainThreadExecutor).join();
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
index cc4a2c1..af31c3a 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -26,11 +27,14 @@ import 
org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -40,6 +44,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -73,7 +78,7 @@ public class SlotManagerFailUnfulfillableTest extends 
TestLogger {
                final ResourceProfile availableProfile = new 
ResourceProfile(2.0, 100);
                final ResourceProfile unfulfillableProfile = new 
ResourceProfile(1.0, 200);
 
-               final List<AllocationID> allocationFailures = new ArrayList<>();
+               final List<Tuple3<JobID, AllocationID, Exception>> 
allocationFailures = new ArrayList<>();
                final SlotManager slotManager = 
createSlotManagerNotStartingNewTMs(allocationFailures);
                slotManager.setFailUnfulfillableRequest(false);
                registerFreeSlot(slotManager, availableProfile);
@@ -85,7 +90,8 @@ public class SlotManagerFailUnfulfillableTest extends 
TestLogger {
 
                // assert
                assertEquals(1, allocationFailures.size());
-               assertEquals(request.getAllocationId(), 
allocationFailures.get(0));
+               assertEquals(request.getAllocationId(), 
allocationFailures.get(0).f1);
+               
assertTrue(ExceptionUtils.findThrowable(allocationFailures.get(0).f2, 
UnfulfillableSlotRequestException.class).isPresent());
                assertEquals(0, slotManager.getNumberPendingSlotRequests());
        }
 
@@ -124,12 +130,12 @@ public class SlotManagerFailUnfulfillableTest extends 
TestLogger {
        }
 
        @Test
-       public void testUnfulfillableRequestsFailWhenOn() throws Exception {
+       public void testUnfulfillableRequestsFailWhenOn() {
                // setup
                final ResourceProfile availableProfile = new 
ResourceProfile(2.0, 100);
                final ResourceProfile unfulfillableProfile = new 
ResourceProfile(2.0, 200);
 
-               final List<AllocationID> notifiedAllocationFailures = new 
ArrayList<>();
+               final List<Tuple3<JobID, AllocationID, Exception>> 
notifiedAllocationFailures = new ArrayList<>();
                final SlotManager slotManager = 
createSlotManagerNotStartingNewTMs(notifiedAllocationFailures);
                registerFreeSlot(slotManager, availableProfile);
 
@@ -137,8 +143,9 @@ public class SlotManagerFailUnfulfillableTest extends 
TestLogger {
                try {
                        
slotManager.registerSlotRequest(slotRequest(unfulfillableProfile));
                        fail("this should cause an exception");
+               } catch (ResourceManagerException exception) {
+                       assertTrue(ExceptionUtils.findThrowable(exception, 
UnfulfillableSlotRequestException.class).isPresent());
                }
-               catch (SlotManagerException ignored) {}
 
                // assert
                assertEquals(0, notifiedAllocationFailures.size());
@@ -169,7 +176,7 @@ public class SlotManagerFailUnfulfillableTest extends 
TestLogger {
                return createSlotManager(new ArrayList<>(), false);
        }
 
-       private static SlotManager 
createSlotManagerNotStartingNewTMs(List<AllocationID> 
notifiedAllocationFailures) {
+       private static SlotManager 
createSlotManagerNotStartingNewTMs(List<Tuple3<JobID, AllocationID, Exception>> 
notifiedAllocationFailures) {
                return createSlotManager(notifiedAllocationFailures, false);
        }
 
@@ -178,14 +185,14 @@ public class SlotManagerFailUnfulfillableTest extends 
TestLogger {
        }
 
        private static SlotManager createSlotManager(
-                       List<AllocationID> notifiedAllocationFailures,
+                       List<Tuple3<JobID, AllocationID, Exception>> 
notifiedAllocationFailures,
                        boolean startNewTMs) {
 
                final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
                        .setAllocateResourceFunction((resourceProfile) -> 
startNewTMs ?
                                                        
Collections.singleton(resourceProfile) :
                                                        Collections.emptyList())
-                       .setNotifyAllocationFailureConsumer(tuple3 -> 
notifiedAllocationFailures.add(tuple3.f1))
+                       .setNotifyAllocationFailureConsumer(tuple3 -> 
notifiedAllocationFailures.add(tuple3))
                        .build();
 
                SlotManager slotManager = 
SlotManagerBuilder.newBuilder().build();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index b155ed6..a99fceb 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -842,7 +842,7 @@ public class SlotManagerTest extends TestLogger {
                                (Object value) -> {
                                        try {
                                                
slotManager.registerSlotRequest(slotRequest);
-                                       } catch (SlotManagerException e) {
+                                       } catch (ResourceManagerException e) {
                                                throw new 
RuntimeException("Could not register slots.", e);
                                        }
                                });
@@ -953,7 +953,7 @@ public class SlotManagerTest extends TestLogger {
                                () -> {
                                        try {
                                                return 
slotManager.registerSlotRequest(slotRequest);
-                                       } catch (SlotManagerException e) {
+                                       } catch (ResourceManagerException e) {
                                                throw new 
CompletionException(e);
                                        }
                                },

Reply via email to