This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bcec7d711591bdbafe9ec460e68e0f609740b9d4 Author: Gao Yun <yungao...@alibaba-inc.com> AuthorDate: Mon Jun 17 08:22:41 2019 +0800 [FLINK-12765][jobmanager] Keep track of the resources used in the task slot hierarchy --- .../clusterframework/types/ResourceProfile.java | 4 ++ .../runtime/jobmaster/slotpool/SchedulerImpl.java | 1 + .../jobmaster/slotpool/SlotSharingManager.java | 49 +++++++++++++++ .../jobmaster/slotpool/SlotSharingManagerTest.java | 72 ++++++++++++++++++++++ 4 files changed, 126 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index 7ac1dd4..c373fbd 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -51,11 +51,15 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile private static final long serialVersionUID = 1L; + /** A ResourceProfile that indicates an unknown set of resources. */ public static final ResourceProfile UNKNOWN = new ResourceProfile(); /** ResourceProfile which matches any other ResourceProfile. */ public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap()); + /** A ResourceProfile describing zero resources. */ + public static final ResourceProfile ZERO = new ResourceProfile(0, 0); + // ------------------------------------------------------------------------ /** How many cpu cores are needed, use double so we can specify cpu like 0.1. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java index b8d8540..57c42b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java @@ -320,6 +320,7 @@ public class SchedulerImpl implements Scheduler { final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot( slotRequestId, + slotProfile.getResourceProfile(), scheduledUnit.getJobVertexId(), multiTaskSlotLocality.getLocality()); return leaf.getLogicalSlotFuture(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java index f5d0dec..a7afbac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmaster.LogicalSlot; @@ -293,6 +294,11 @@ public class SlotSharingManager { * @param cause for the release */ public abstract void release(Throwable cause); + + /** + * Gets the total reserved resources of the slot and its descendants. + */ + public abstract ResourceProfile getReservedResources(); } /** @@ -316,6 +322,9 @@ public class SlotSharingManager { // true if we are currently releasing our children private boolean releasingChildren; + // the total resources reserved by all the descendants. + private ResourceProfile reservedResources; + private MultiTaskSlot( SlotRequestId slotRequestId, AbstractID groupId, @@ -355,6 +364,8 @@ public class SlotSharingManager { this.children = new HashMap<>(16); this.releasingChildren = false; + this.reservedResources = ResourceProfile.ZERO; + slotContextFuture.whenComplete( (SlotContext ignored, Throwable throwable) -> { if (throwable != null) { @@ -404,6 +415,7 @@ public class SlotSharingManager { */ SingleTaskSlot allocateSingleTaskSlot( SlotRequestId slotRequestId, + ResourceProfile resourceProfile, AbstractID groupId, Locality locality) { Preconditions.checkState(!super.contains(groupId)); @@ -412,6 +424,7 @@ public class SlotSharingManager { final SingleTaskSlot leaf = new SingleTaskSlot( slotRequestId, + resourceProfile, groupId, this, locality); @@ -421,6 +434,8 @@ public class SlotSharingManager { // register the newly allocated slot also at the SlotSharingManager allTaskSlots.put(slotRequestId, leaf); + reserveResource(resourceProfile); + return leaf; } @@ -490,6 +505,11 @@ public class SlotSharingManager { } } + @Override + public ResourceProfile getReservedResources() { + return reservedResources; + } + /** * Releases the child with the given childGroupId. * @@ -501,6 +521,9 @@ public class SlotSharingManager { if (child != null) { allTaskSlots.remove(child.getSlotRequestId()); + + // Update the resources of this slot and the parents + releaseResource(child.getReservedResources()); } if (children.isEmpty()) { @@ -509,6 +532,22 @@ public class SlotSharingManager { } } + private void reserveResource(ResourceProfile resourceProfile) { + reservedResources = reservedResources.merge(resourceProfile); + + if (parent != null) { + parent.reserveResource(resourceProfile); + } + } + + private void releaseResource(ResourceProfile resourceProfile) { + reservedResources = reservedResources.subtract(resourceProfile); + + if (parent != null) { + parent.releaseResource(resourceProfile); + } + } + @Override public String toString() { String physicalSlotDescription; @@ -539,13 +578,18 @@ public class SlotSharingManager { // future containing a LogicalSlot which is completed once the underlying SlotContext future is completed private final CompletableFuture<SingleLogicalSlot> singleLogicalSlotFuture; + // the resource profile of this slot. + private final ResourceProfile resourceProfile; + private SingleTaskSlot( SlotRequestId slotRequestId, + ResourceProfile resourceProfile, AbstractID groupId, MultiTaskSlot parent, Locality locality) { super(slotRequestId, groupId); + this.resourceProfile = Preconditions.checkNotNull(resourceProfile); this.parent = Preconditions.checkNotNull(parent); Preconditions.checkNotNull(locality); @@ -581,6 +625,11 @@ public class SlotSharingManager { } @Override + public ResourceProfile getReservedResources() { + return resourceProfile; + } + + @Override public String toString() { String logicalSlotString = "(pending)"; try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java index 8e8db1a..0742ac2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; +import org.apache.flink.api.common.resources.GPUResource; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; @@ -135,6 +136,7 @@ public class SlotSharingManagerTest extends TestLogger { SlotRequestId singleTaskSlotRequestId = new SlotRequestId(); SlotSharingManager.SingleTaskSlot singleTaskSlot = rootSlot.allocateSingleTaskSlot( singleTaskSlotRequestId, + ResourceProfile.UNKNOWN, singleTaskSlotGroupId, Locality.LOCAL); @@ -180,6 +182,7 @@ public class SlotSharingManagerTest extends TestLogger { SlotRequestId singleTaskSlotRequestId = new SlotRequestId(); SlotSharingManager.SingleTaskSlot singleTaskSlot = rootSlot.allocateSingleTaskSlot( singleTaskSlotRequestId, + ResourceProfile.UNKNOWN, new AbstractID(), Locality.LOCAL); @@ -237,6 +240,7 @@ public class SlotSharingManagerTest extends TestLogger { SlotSharingManager.SingleTaskSlot singleTaskSlot1 = multiTaskSlot.allocateSingleTaskSlot( new SlotRequestId(), + ResourceProfile.UNKNOWN, new AbstractID(), Locality.LOCAL); @@ -284,12 +288,14 @@ public class SlotSharingManagerTest extends TestLogger { Locality locality1 = Locality.LOCAL; SlotSharingManager.SingleTaskSlot singleTaskSlot1 = rootSlot.allocateSingleTaskSlot( new SlotRequestId(), + ResourceProfile.UNKNOWN, new AbstractID(), locality1); Locality locality2 = Locality.HOST_LOCAL; SlotSharingManager.SingleTaskSlot singleTaskSlot2 = rootSlot.allocateSingleTaskSlot( new SlotRequestId(), + ResourceProfile.UNKNOWN, new AbstractID(), locality2); @@ -314,6 +320,7 @@ public class SlotSharingManagerTest extends TestLogger { Locality locality3 = Locality.NON_LOCAL; SlotSharingManager.SingleTaskSlot singleTaskSlot3 = rootSlot.allocateSingleTaskSlot( new SlotRequestId(), + ResourceProfile.UNKNOWN, new AbstractID(), locality3); @@ -349,6 +356,7 @@ public class SlotSharingManagerTest extends TestLogger { SlotSharingManager.SingleTaskSlot singleTaskSlot = rootSlot.allocateSingleTaskSlot( new SlotRequestId(), + ResourceProfile.UNKNOWN, new AbstractID(), Locality.LOCAL); @@ -435,6 +443,7 @@ public class SlotSharingManagerTest extends TestLogger { // occupy the resolved root slot resolvedMultiTaskSlot.allocateSingleTaskSlot( new SlotRequestId(), + ResourceProfile.UNKNOWN, groupId, Locality.UNCONSTRAINED); @@ -491,6 +500,7 @@ public class SlotSharingManagerTest extends TestLogger { // occupy the slot resolvedRootSlot.allocateSingleTaskSlot( new SlotRequestId(), + ResourceProfile.UNKNOWN, groupId, slotInfoAndLocality.getLocality()); @@ -525,6 +535,7 @@ public class SlotSharingManagerTest extends TestLogger { // occupy the unresolved slot unresolvedRootSlot.allocateSingleTaskSlot( new SlotRequestId(), + ResourceProfile.UNKNOWN, groupId, Locality.UNKNOWN); @@ -533,4 +544,65 @@ public class SlotSharingManagerTest extends TestLogger { // we should no longer have a free unresolved root slot assertNull(unresolvedRootSlot1); } + + @Test + public void testResourceCalculationOnSlotAllocatingAndReleasing() { + ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap()); + ResourceProfile rp2 = new ResourceProfile(2.0, 200, 200, 200, 200, 200, Collections.singletonMap("gpu", new GPUResource(2.0))); + ResourceProfile rp3 = new ResourceProfile(3.0, 300, 300, 300, 300, 300, Collections.singletonMap("gpu", new GPUResource(3.0))); + + final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions(); + + SlotSharingManager slotSharingManager = new SlotSharingManager( + SLOT_SHARING_GROUP_ID, + allocatedSlotActions, + SLOT_OWNER); + + SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.createRootSlot( + new SlotRequestId(), + new CompletableFuture<>(), + new SlotRequestId()); + + // Allocates the left subtree. + SlotSharingManager.MultiTaskSlot leftMultiTaskSlot = + unresolvedRootSlot.allocateMultiTaskSlot(new SlotRequestId(), new SlotSharingGroupId()); + + SlotSharingManager.SingleTaskSlot firstChild = leftMultiTaskSlot.allocateSingleTaskSlot( + new SlotRequestId(), + rp1, + new SlotSharingGroupId(), + Locality.LOCAL); + SlotSharingManager.SingleTaskSlot secondChild = leftMultiTaskSlot.allocateSingleTaskSlot( + new SlotRequestId(), + rp2, + new SlotSharingGroupId(), + Locality.LOCAL); + + assertEquals(rp1, firstChild.getReservedResources()); + assertEquals(rp2, secondChild.getReservedResources()); + assertEquals(rp1.merge(rp2), leftMultiTaskSlot.getReservedResources()); + assertEquals(rp1.merge(rp2), unresolvedRootSlot.getReservedResources()); + + // Allocates the right subtree. + SlotSharingManager.SingleTaskSlot thirdChild = unresolvedRootSlot.allocateSingleTaskSlot( + new SlotRequestId(), + rp3, + new SlotSharingGroupId(), + Locality.LOCAL); + assertEquals(rp3, thirdChild.getReservedResources()); + assertEquals(rp1.merge(rp2).merge(rp3), unresolvedRootSlot.getReservedResources()); + + // Releases the second child in the left-side tree. + secondChild.release(new Throwable("Release for testing")); + assertEquals(rp1, leftMultiTaskSlot.getReservedResources()); + assertEquals(rp1.merge(rp3), unresolvedRootSlot.getReservedResources()); + + // Releases the third child in the right-side tree. + thirdChild.release(new Throwable("Release for testing")); + assertEquals(rp1, unresolvedRootSlot.getReservedResources()); + + // Releases the first child in the left-side tree. + firstChild.release(new Throwable("Release for testing")); + assertEquals(ResourceProfile.ZERO, unresolvedRootSlot.getReservedResources()); + } }