This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6644321f8637d96bd1b74fa6b244bb2bf22d194d Author: Xintong Song <tonysong...@gmail.com> AuthorDate: Fri Aug 2 15:02:06 2019 +0200 [FLINK-13555][runtime] Fail slot requests immediately at the SlotPool if unfulfillable Remove the SlotManagerException as extension of ResourceManagerException. [FLINK-13555][runtime][test] Add cases in SlotPoolBatchSlotRequestTest validates that pending batch slot requests fail on UnfulfillableSlotRequestException. [FLINK-13555][runtime] Add the information which slot request failed in stack trace of exception in SlotManager#registerSlotRequest. [FLINK-13555][runtime][test] Update SlotManagerFailUnfulfillableTest to validate that slot manager throws UnfulfillableSlotRequestException for failing unfulfillable requests. This closes #9339. --- .../runtime/jobmaster/slotpool/SlotPoolImpl.java | 11 ++++- .../runtime/resourcemanager/ResourceManager.java | 3 +- .../UnfulfillableSlotRequestException.java} | 22 +++++---- .../resourcemanager/slotmanager/SlotManager.java | 5 +- .../slotmanager/SlotManagerImpl.java | 15 +++--- .../slotpool/SlotPoolBatchSlotRequestTest.java | 54 ++++++++++++++++++++-- .../runtime/jobmaster/slotpool/SlotPoolUtils.java | 4 +- .../SlotManagerFailUnfulfillableTest.java | 23 +++++---- .../slotmanager/SlotManagerTest.java | 4 +- 9 files changed, 102 insertions(+), 39 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java index 14beb8f..a381613 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java @@ -38,9 +38,11 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.clock.Clock; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -348,7 +350,7 @@ public class SlotPoolImpl implements SlotPool { private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) { final PendingRequest request = pendingRequests.getKeyA(slotRequestID); if (request != null) { - if (request.isBatchRequest) { + if (isBatchRequestAndFailureCanBeIgnored(request, failure)) { log.debug("Ignoring failed request to the resource manager for a batch slot request."); } else { pendingRequests.removeKeyA(slotRequestID); @@ -370,6 +372,11 @@ public class SlotPoolImpl implements SlotPool { waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest); } + private boolean isBatchRequestAndFailureCanBeIgnored(PendingRequest request, Throwable failure){ + return request.isBatchRequest && + !ExceptionUtils.findThrowable(failure, UnfulfillableSlotRequestException.class).isPresent(); + } + // ------------------------------------------------------------------------ // Slot releasing & offering // ------------------------------------------------------------------------ @@ -679,7 +686,7 @@ public class SlotPoolImpl implements SlotPool { final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); if (pendingRequest != null) { - if (pendingRequest.isBatchRequest) { + if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) { // pending batch requests don't react to this signal --> put it back pendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, pendingRequest); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index ec71e85..13544d3 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -58,7 +58,6 @@ import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistrat import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration; import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; @@ -442,7 +441,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> try { slotManager.registerSlotRequest(slotRequest); - } catch (SlotManagerException e) { + } catch (ResourceManagerException e) { return FutureUtils.completedExceptionally(e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/UnfulfillableSlotRequestException.java similarity index 51% rename from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/UnfulfillableSlotRequestException.java index c322c81..fb6eb45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/UnfulfillableSlotRequestException.java @@ -16,19 +16,21 @@ * limitations under the License. */ -package org.apache.flink.runtime.resourcemanager.slotmanager; +package org.apache.flink.runtime.resourcemanager.exceptions; -import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -public class SlotManagerException extends ResourceManagerException { - private static final long serialVersionUID = -3723028616920379071L; - - public SlotManagerException(String message) { - super(message); - } +/** + * Exception denoting that a slot request can not be fulfilled by any slot in the cluster. + * This usually indicates that the slot request should not be pended or retried. + */ +public class UnfulfillableSlotRequestException extends ResourceManagerException { + private static final long serialVersionUID = 4453490263648758730L; - public SlotManagerException(String message, Throwable cause) { - super(message, cause); + public UnfulfillableSlotRequestException(AllocationID allocationId, ResourceProfile resourceProfile) { + super("Could not fulfill slot request " + allocationId + ". " + + "Requested resource profile (" + resourceProfile + ") is unfulfillable."); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 0bdbc5b..b177e11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; @@ -73,9 +74,9 @@ public interface SlotManager extends AutoCloseable { * * @param slotRequest specifying the requested slot specs * @return true if the slot request was registered; false if the request is a duplicate - * @throws SlotManagerException if the slot request failed (e.g. not enough resources left) + * @throws ResourceManagerException if the slot request failed (e.g. not enough resources left) */ - boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException; + boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException; /** * Cancels and removes a pending slot request with the given allocation id. If there is no such diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java index cd894e3..6fed1fe 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException; import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; @@ -290,10 +291,10 @@ public class SlotManagerImpl implements SlotManager { * * @param slotRequest specifying the requested slot specs * @return true if the slot request was registered; false if the request is a duplicate - * @throws SlotManagerException if the slot request failed (e.g. not enough resources left) + * @throws ResourceManagerException if the slot request failed (e.g. not enough resources left) */ @Override - public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { + public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException { checkInit(); if (checkDuplicateRequest(slotRequest.getAllocationId())) { @@ -311,7 +312,7 @@ public class SlotManagerImpl implements SlotManager { // requesting the slot failed --> remove pending slot request pendingSlotRequests.remove(slotRequest.getAllocationId()); - throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); + throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); } return true; @@ -494,8 +495,7 @@ public class SlotManagerImpl implements SlotManager { resourceActions.notifyAllocationFailure( pendingSlotRequest.getJobId(), pendingSlotRequest.getAllocationId(), - new ResourceManagerException("Could not fulfill slot request " + pendingSlotRequest.getAllocationId() + ". " - + "Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.") + new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile()) ); } } @@ -745,7 +745,7 @@ public class SlotManagerImpl implements SlotManager { * registered. * * @param pendingSlotRequest to allocate a slot for - * @throws ResourceManagerException if the resource manager cannot allocate more resource + * @throws ResourceManagerException if the slot request failed or is unfulfillable */ private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); @@ -767,8 +767,7 @@ public class SlotManagerImpl implements SlotManager { // request can not be fulfilled by any free slot or pending slot that can be allocated, // check whether it can be fulfilled by allocated slots if (failUnfulfillableRequest && !isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) { - throw new ResourceManagerException("Requested resource profile (" + - pendingSlotRequest.getResourceProfile() + ") is unfulfillable."); + throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile()); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java index 2231060..df4dfb3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.util.clock.ManualClock; import org.apache.flink.util.ExceptionUtils; @@ -129,7 +130,7 @@ public class SlotPoolBatchSlotRequestTest extends TestLogger { /** * Tests that a batch slot request does not react to {@link SlotPool#failAllocation(AllocationID, Exception)} - * signals. + * signals whose exception is not {@link UnfulfillableSlotRequestException}. */ @Test public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exception { @@ -147,14 +148,40 @@ public class SlotPoolBatchSlotRequestTest extends TestLogger { final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile); - SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, allocationIdFuture.get()); + SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, allocationIdFuture.get(), new FlinkException("Failed request")); assertThat(slotFuture.isDone(), is(false)); } } /** - * Tests that a batch slot request won't fail if its resource manager request fails. + * Tests that a batch slot request does react to {@link SlotPool#failAllocation(AllocationID, Exception)} + * signals whose exception is {@link UnfulfillableSlotRequestException}. + */ + @Test + public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>(); + testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); + + try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor) + .setResourceManagerGateway(testingResourceManagerGateway) + .build()) { + + final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile); + + SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, allocationIdFuture.get(), + new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN)); + + assertThat(slotFuture.isCompletedExceptionally(), is(true)); + } + } + + /** + * Tests that a batch slot request won't fail if its resource manager request fails with exceptions other than + * {@link UnfulfillableSlotRequestException}. */ @Test public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exception { @@ -176,6 +203,27 @@ public class SlotPoolBatchSlotRequestTest extends TestLogger { } /** + * Tests that a batch slot request fails if its resource manager request fails with {@link UnfulfillableSlotRequestException}. + */ + @Test + public void testPendingBatchSlotRequestFailsIfRMRequestFailsUnfulfillably() throws Exception { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally( + new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN))); + + final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); + + try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor) + .setResourceManagerGateway(testingResourceManagerGateway) + .build()) { + + final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile); + + assertThat(slotFuture.isCompletedExceptionally(), is(true)); + } + } + + /** * Tests that a pending batch slot request times out after the last fulfilling slot gets * released. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java index c4dfe88..3836881 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java @@ -97,9 +97,9 @@ public class SlotPoolUtils { return taskManagerLocation.getResourceID(); } - public static void failAllocation(SlotPoolImpl slotPool, ComponentMainThreadExecutor mainThreadExecutor, AllocationID allocationId) { + public static void failAllocation(SlotPoolImpl slotPool, ComponentMainThreadExecutor mainThreadExecutor, AllocationID allocationId, Exception exception) { CompletableFuture.runAsync( - () -> slotPool.failAllocation(allocationId, new FlinkException("Test exception")), + () -> slotPool.failAllocation(allocationId, exception), mainThreadExecutor).join(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java index cc4a2c1..af31c3a 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -26,11 +27,14 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException; import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -40,6 +44,7 @@ import java.util.Collections; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -73,7 +78,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger { final ResourceProfile availableProfile = new ResourceProfile(2.0, 100); final ResourceProfile unfulfillableProfile = new ResourceProfile(1.0, 200); - final List<AllocationID> allocationFailures = new ArrayList<>(); + final List<Tuple3<JobID, AllocationID, Exception>> allocationFailures = new ArrayList<>(); final SlotManager slotManager = createSlotManagerNotStartingNewTMs(allocationFailures); slotManager.setFailUnfulfillableRequest(false); registerFreeSlot(slotManager, availableProfile); @@ -85,7 +90,8 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger { // assert assertEquals(1, allocationFailures.size()); - assertEquals(request.getAllocationId(), allocationFailures.get(0)); + assertEquals(request.getAllocationId(), allocationFailures.get(0).f1); + assertTrue(ExceptionUtils.findThrowable(allocationFailures.get(0).f2, UnfulfillableSlotRequestException.class).isPresent()); assertEquals(0, slotManager.getNumberPendingSlotRequests()); } @@ -124,12 +130,12 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger { } @Test - public void testUnfulfillableRequestsFailWhenOn() throws Exception { + public void testUnfulfillableRequestsFailWhenOn() { // setup final ResourceProfile availableProfile = new ResourceProfile(2.0, 100); final ResourceProfile unfulfillableProfile = new ResourceProfile(2.0, 200); - final List<AllocationID> notifiedAllocationFailures = new ArrayList<>(); + final List<Tuple3<JobID, AllocationID, Exception>> notifiedAllocationFailures = new ArrayList<>(); final SlotManager slotManager = createSlotManagerNotStartingNewTMs(notifiedAllocationFailures); registerFreeSlot(slotManager, availableProfile); @@ -137,8 +143,9 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger { try { slotManager.registerSlotRequest(slotRequest(unfulfillableProfile)); fail("this should cause an exception"); + } catch (ResourceManagerException exception) { + assertTrue(ExceptionUtils.findThrowable(exception, UnfulfillableSlotRequestException.class).isPresent()); } - catch (SlotManagerException ignored) {} // assert assertEquals(0, notifiedAllocationFailures.size()); @@ -169,7 +176,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger { return createSlotManager(new ArrayList<>(), false); } - private static SlotManager createSlotManagerNotStartingNewTMs(List<AllocationID> notifiedAllocationFailures) { + private static SlotManager createSlotManagerNotStartingNewTMs(List<Tuple3<JobID, AllocationID, Exception>> notifiedAllocationFailures) { return createSlotManager(notifiedAllocationFailures, false); } @@ -178,14 +185,14 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger { } private static SlotManager createSlotManager( - List<AllocationID> notifiedAllocationFailures, + List<Tuple3<JobID, AllocationID, Exception>> notifiedAllocationFailures, boolean startNewTMs) { final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() .setAllocateResourceFunction((resourceProfile) -> startNewTMs ? Collections.singleton(resourceProfile) : Collections.emptyList()) - .setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add(tuple3.f1)) + .setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add(tuple3)) .build(); SlotManager slotManager = SlotManagerBuilder.newBuilder().build(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index b155ed6..a99fceb 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -842,7 +842,7 @@ public class SlotManagerTest extends TestLogger { (Object value) -> { try { slotManager.registerSlotRequest(slotRequest); - } catch (SlotManagerException e) { + } catch (ResourceManagerException e) { throw new RuntimeException("Could not register slots.", e); } }); @@ -953,7 +953,7 @@ public class SlotManagerTest extends TestLogger { () -> { try { return slotManager.registerSlotRequest(slotRequest); - } catch (SlotManagerException e) { + } catch (ResourceManagerException e) { throw new CompletionException(e); } },