[FLINK-4516] update leadership information in ResourceManager The leadership information remained static for connected JobMasters. This updates it to remove stale JobMasters when they lose leadership status.
This closes #2624 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cef31912 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cef31912 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cef31912 Branch: refs/heads/flip-6 Commit: cef319126ced676b5c6d08e6a963986f1dd6c5ee Parents: b380634 Author: Maximilian Michels <[email protected]> Authored: Mon Oct 10 17:36:10 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Oct 14 15:14:43 2016 +0200 ---------------------------------------------------------------------- .../resourcemanager/ResourceManager.java | 196 +++++++++++++------ .../resourcemanager/ResourceManagerGateway.java | 4 +- .../ResourceManagerServices.java | 6 + .../registration/JobMasterRegistration.java | 62 ++++++ .../slotmanager/SlotManager.java | 16 +- .../resourcemanager/TestingSlotManager.java | 8 + .../slotmanager/SlotManagerTest.java | 10 +- 7 files changed, 224 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index d2d00cf..8fbb34b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; @@ -40,6 +41,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected; import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply; +import org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration; import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; @@ -53,17 +55,14 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; -import org.apache.flink.runtime.util.LeaderConnectionInfo; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -85,10 +84,10 @@ public abstract class ResourceManager<WorkerType extends Serializable> protected static final int EXIT_CODE_FATAL_ERROR = -13; /** All currently registered JobMasterGateways scoped by JobID. */ - private final Map<JobID, JobMasterGateway> jobMasterGateways; + private final Map<JobID, JobMasterRegistration> jobMasters; - /** LeaderListeners for all registered JobMasters. */ - private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners; + /** LeaderListeners for all registered JobIDs. */ + private final Map<JobID, JobIdLeaderListener> leaderListeners; /** All currently registered TaskExecutors with there framework specific worker information. */ private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors; @@ -106,7 +105,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> private LeaderElectionService leaderElectionService; /** ResourceManager's leader session id which is updated on leader election. */ - private UUID leaderSessionID; + private volatile UUID leaderSessionID; /** All registered listeners for status updates of the ResourceManager. */ private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners; @@ -121,8 +120,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> super(rpcService); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); this.slotManagerFactory = checkNotNull(slotManagerFactory); - this.jobMasterGateways = new HashMap<>(); - this.jobMasterLeaderRetrievalListeners = new HashMap<>(); + this.jobMasters = new HashMap<>(); + this.leaderListeners = new HashMap<>(); this.taskExecutors = new HashMap<>(); this.leaderSessionID = new UUID(0, 0); infoMessageListeners = new HashMap<>(); @@ -149,9 +148,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> public void shutDown() { try { leaderElectionService.stop(); - for (JobID jobID : jobMasterGateways.keySet()) { - highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop(); - } + clearState(); super.shutDown(); } catch (Throwable e) { log.error("A fatal error happened when shutdown the ResourceManager", e); @@ -185,6 +182,24 @@ public abstract class ResourceManager<WorkerType extends Serializable> checkNotNull(jobMasterAddress); checkNotNull(jobID); + // create a leader retriever in case it doesn't exist + final JobIdLeaderListener jobIdLeaderListener; + if (leaderListeners.containsKey(jobID)) { + jobIdLeaderListener = leaderListeners.get(jobID); + } else { + try { + LeaderRetrievalService jobMasterLeaderRetriever = + highAvailabilityServices.getJobManagerLeaderRetriever(jobID); + jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); + } catch (Exception e) { + log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); + FlinkCompletableFuture<RegistrationResponse> responseFuture = new FlinkCompletableFuture<>(); + responseFuture.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever")); + return responseFuture; + } + leaderListeners.put(jobID, jobIdLeaderListener); + } + return getRpcService() .execute(new Callable<JobMasterGateway>() { @Override @@ -197,21 +212,13 @@ public abstract class ResourceManager<WorkerType extends Serializable> throw new Exception("Invalid leader session id"); } - final LeaderConnectionInfo jobMasterLeaderInfo; - try { - jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo( - highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS)); - } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); - throw new Exception("Failed to retrieve JobMasterLeaderRetriever"); - } - - if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) { - log.info("Declining registration request from non-leading JobManager {}", jobMasterAddress); - throw new Exception("JobManager is not leading"); + if (!jobIdLeaderListener.getLeaderID().get(timeout.getSize(), timeout.getUnit()) + .equals(jobMasterLeaderId)) { + throw new Exception("Leader Id did not match"); } - return getRpcService().connect(jobMasterAddress, JobMasterGateway.class).get(5, TimeUnit.SECONDS); + return getRpcService().connect(jobMasterAddress, JobMasterGateway.class) + .get(timeout.getSize(), timeout.getUnit()); } }) .handleAsync(new BiFunction<JobMasterGateway, Throwable, RegistrationResponse>() { @@ -220,24 +227,34 @@ public abstract class ResourceManager<WorkerType extends Serializable> if (throwable != null) { return new RegistrationResponse.Decline(throwable.getMessage()); - } else { - if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) { - JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID); - try { - LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobManagerLeaderRetriever(jobID); - jobMasterLeaderRetriever.start(jobMasterLeaderListener); - } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); - return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"); - } - jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener); - } - final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway); - if (existingGateway != null) { - log.info("Replacing gateway for registered JobID {}.", jobID); + } + + if (!leaderSessionID.equals(resourceManagerLeaderId)) { + log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" + + " did not equal the received leader session ID {}", + jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId); + return new RegistrationResponse.Decline("Invalid leader session id"); + } + + try { + // LeaderID should be available now, but if not we fail the registration + UUID currentJobMasterLeaderId = jobIdLeaderListener.getLeaderID().getNow(null); + if (currentJobMasterLeaderId == null || !currentJobMasterLeaderId.equals(jobMasterLeaderId)) { + throw new Exception("Leader Id did not match"); } - return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId); + } catch (Exception e) { + return new RegistrationResponse.Decline(e.getMessage()); + } + + final JobMasterRegistration registration = + new JobMasterRegistration(jobID, jobMasterLeaderId, jobMasterGateway); + + final JobMasterRegistration existingRegistration = jobMasters.put(jobID, registration); + if (existingRegistration != null) { + log.info("Replacing JobMaster registration for newly registered JobMaster with JobID {}.", jobID); } + return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId); + } }, getMainThreadExecutor()); } @@ -305,13 +322,10 @@ public abstract class ResourceManager<WorkerType extends Serializable> SlotRequest slotRequest) { JobID jobId = slotRequest.getJobId(); - JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); - JobMasterLeaderListener jobMasterLeaderListener = jobMasterLeaderRetrievalListeners.get(jobId); + JobMasterRegistration jobMasterRegistration = jobMasters.get(jobId); - UUID leaderID = jobMasterLeaderListener.getLeaderID(); - - if (jobMasterGateway != null - && jobMasterLeaderID.equals(leaderID) + if (jobMasterRegistration != null + && jobMasterLeaderID.equals(jobMasterRegistration.getLeaderID()) && resourceManagerLeaderID.equals(leaderSessionID)) { return slotManager.requestSlot(slotRequest); } else { @@ -371,8 +385,6 @@ public abstract class ResourceManager<WorkerType extends Serializable> log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); // confirming the leader session ID might be blocking, leaderElectionService.confirmLeaderSessionID(leaderSessionID); - // notify SlotManager - slotManager.setLeaderUUID(leaderSessionID); ResourceManager.this.leaderSessionID = leaderSessionID; } }); @@ -387,10 +399,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> @Override public void run() { log.info("ResourceManager {} was revoked leadership.", getAddress()); - jobMasterGateways.clear(); - taskExecutors.clear(); - slotManager.clearState(); - leaderSessionID = new UUID(0, 0); + clearState(); } }); } @@ -577,6 +586,11 @@ public abstract class ResourceManager<WorkerType extends Serializable> private class DefaultResourceManagerServices implements ResourceManagerServices { @Override + public UUID getLeaderID() { + return ResourceManager.this.leaderSessionID; + } + + @Override public void allocateResource(ResourceProfile resourceProfile) { ResourceManager.this.startNewWorker(resourceProfile); } @@ -592,33 +606,95 @@ public abstract class ResourceManager<WorkerType extends Serializable> } } - private static class JobMasterLeaderListener implements LeaderRetrievalListener { + /** + * Leader instantiated for each connected JobMaster + */ + private class JobIdLeaderListener implements LeaderRetrievalListener { private final JobID jobID; - private UUID leaderID; + private final LeaderRetrievalService retrievalService; - private JobMasterLeaderListener(JobID jobID) { + private final FlinkCompletableFuture<UUID> initialLeaderIdFuture; + + private volatile UUID leaderID; + + private JobIdLeaderListener( + JobID jobID, + LeaderRetrievalService retrievalService) throws Exception { this.jobID = jobID; + this.retrievalService = retrievalService; + this.initialLeaderIdFuture = new FlinkCompletableFuture<>(); + this.retrievalService.start(this); + } + + public Future<UUID> getLeaderID() { + if (!initialLeaderIdFuture.isDone()) { + return initialLeaderIdFuture; + } else { + return FlinkCompletableFuture.completed(leaderID); + } } public JobID getJobID() { return jobID; } - public UUID getLeaderID() { - return leaderID; + + public void stopService() throws Exception { + retrievalService.stop(); } @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { this.leaderID = leaderSessionID; + + if (!initialLeaderIdFuture.isDone()) { + initialLeaderIdFuture.complete(leaderSessionID); + } + + ResourceManager.this.runAsync(new Runnable() { + @Override + public void run() { + JobMasterRegistration jobMasterRegistration = ResourceManager.this.jobMasters.get(jobID); + if (jobMasterRegistration == null || !jobMasterRegistration.getLeaderID().equals(leaderSessionID)) { + // registration is not valid anymore, remove registration + ResourceManager.this.jobMasters.remove(jobID); + // leader listener is not necessary anymore + JobIdLeaderListener listener = ResourceManager.this.leaderListeners.remove(jobID); + if (listener != null) { + try { + listener.stopService(); + } catch (Exception e) { + ResourceManager.this.handleError(e); + } + } + } + } + }); } @Override public void handleError(final Exception exception) { - // TODO + ResourceManager.this.handleError(exception); } } + private void clearState() { + jobMasters.clear(); + taskExecutors.clear(); + slotManager.clearState(); + Iterator<JobIdLeaderListener> leaderListenerIterator = + leaderListeners.values().iterator(); + while (leaderListenerIterator.hasNext()) { + JobIdLeaderListener listener = leaderListenerIterator.next(); + try { + listener.stopService(); + } catch (Exception e) { + handleError(e); + } + leaderListenerIterator.remove(); + } + leaderSessionID = new UUID(0, 0); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 3c81227..07e9e43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -61,14 +61,14 @@ public interface ResourceManagerGateway extends RpcGateway { /** * Requests a slot from the resource manager. * - * @param jobMasterLeaderID leader id of the JobMaster * @param resourceManagerLeaderID leader if of the ResourceMaster + * @param jobMasterLeaderID leader if of the JobMaster * @param slotRequest The slot to request * @return The confirmation that the slot gets allocated */ Future<RMSlotRequestReply> requestSlot( - UUID jobMasterLeaderID, UUID resourceManagerLeaderID, + UUID jobMasterLeaderID, SlotRequest slotRequest, @RpcTimeout Time timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java index b997a3a..16d0a7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import java.util.UUID; import java.util.concurrent.Executor; /** @@ -27,6 +28,11 @@ import java.util.concurrent.Executor; public interface ResourceManagerServices { /** + * Gets the current leader id assigned at the ResourceManager. + */ + UUID getLeaderID(); + + /** * Allocates a resource according to the resource profile. */ void allocateResource(ResourceProfile resourceProfile); http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java new file mode 100644 index 0000000..f417935 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.registration; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; + +import java.util.UUID; + +/** + * This class is responsible for grouping the JobMasterGateway and the JobMaster's + * leader id + */ +public class JobMasterRegistration { + + private static final long serialVersionUID = -2062957799469434614L; + + private final JobID jobID; + + private final UUID leaderID; + + private final JobMasterGateway jobMasterGateway; + + public JobMasterRegistration( + JobID jobID, + UUID leaderID, + JobMasterGateway jobMasterGateway) { + this.jobID = jobID; + this.leaderID = leaderID; + this.jobMasterGateway = jobMasterGateway; + } + + public JobID getJobID() { + return jobID; + } + + + public UUID getLeaderID() { + return leaderID; + } + + public JobMasterGateway getJobMasterGateway() { + return jobMasterGateway; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 7eb2d78..e312ea2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; -import java.util.UUID; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -85,9 +84,6 @@ public abstract class SlotManager { private final Time timeout; - /** The current leader id set by the ResourceManager */ - private UUID leaderID; - public SlotManager(ResourceManagerServices rmServices) { this.rmServices = checkNotNull(rmServices); this.registeredSlots = new HashMap<>(16); @@ -96,7 +92,6 @@ public abstract class SlotManager { this.allocationMap = new AllocationMap(); this.taskManagers = new HashMap<>(); this.timeout = Time.seconds(10); - this.leaderID = new UUID(0, 0); } // ------------------------------------------------------------------------ @@ -303,7 +298,7 @@ public abstract class SlotManager { final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration(); final Future<TMSlotRequestReply> slotRequestReplyFuture = registration.getTaskExecutorGateway() - .requestSlot(freeSlot.getSlotId(), allocationID, leaderID, timeout); + .requestSlot(freeSlot.getSlotId(), allocationID, rmServices.getLeaderID(), timeout); slotRequestReplyFuture.handleAsync(new BiFunction<TMSlotRequestReply, Throwable, Void>() { @Override @@ -488,15 +483,6 @@ public abstract class SlotManager { pendingSlotRequests.clear(); freeSlots.clear(); allocationMap.clear(); - leaderID = new UUID(0, 0); - } - - // ------------------------------------------------------------------------ - // High availability (called by the ResourceManager) - // ------------------------------------------------------------------------ - - public void setLeaderUUID(UUID leaderSessionID) { - this.leaderID = leaderSessionID; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java index 0b2c42b..67b208d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java @@ -26,6 +26,7 @@ import org.mockito.Mockito; import java.util.Iterator; import java.util.Map; +import java.util.UUID; import java.util.concurrent.Executor; public class TestingSlotManager extends SlotManager { @@ -60,6 +61,13 @@ public class TestingSlotManager extends SlotManager { private static class TestingResourceManagerServices implements ResourceManagerServices { + private final UUID leaderID = UUID.randomUUID(); + + @Override + public UUID getLeaderID() { + return leaderID; + } + @Override public void allocateResource(ResourceProfile resourceProfile) { http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 0d2b40d..558d3c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -498,13 +498,21 @@ public class SlotManagerTest { private static class TestingRmServices implements ResourceManagerServices { - private List<ResourceProfile> allocatedContainers; + private final UUID leaderID; + + private final List<ResourceProfile> allocatedContainers; public TestingRmServices() { + this.leaderID = UUID.randomUUID(); this.allocatedContainers = new LinkedList<>(); } @Override + public UUID getLeaderID() { + return leaderID; + } + + @Override public void allocateResource(ResourceProfile resourceProfile) { allocatedContainers.add(resourceProfile); }
